Skip to content

Commit

Permalink
Expose futureCompletionExecutor on S3 CRT client (#4880)
Browse files Browse the repository at this point in the history
* Expose futureCompletionExecutor on S3 CRT client

* Adderss feedback and fix build
  • Loading branch information
zoewangg authored Feb 2, 2024
1 parent 0ab7f75 commit 19e6862
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS CRT-based S3 Client",
"contributor": "",
"description": "Allow users to configure future completion executor on the AWS CRT-based S3 client via `S3CrtAsyncClientBuilder#futureCompletionExecutor`. See [#4879](https://github.com/aws/aws-sdk-java-v2/issues/4879)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import java.net.URI;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
Expand Down Expand Up @@ -281,6 +284,30 @@ default S3CrtAsyncClientBuilder retryConfiguration(Consumer<S3CrtRetryConfigurat
*/
S3CrtAsyncClientBuilder thresholdInBytes(Long thresholdInBytes);

/**
* Configure the {@link Executor} that should be used to complete the {@link CompletableFuture} that is returned by the async
* service client. By default, this is a dedicated, per-client {@link ThreadPoolExecutor} that is managed by the SDK.
* <p>
* The configured {@link Executor} will be invoked by the async HTTP client's I/O threads (e.g., EventLoops), which must be
* reserved for non-blocking behavior. Blocking an I/O thread can cause severe performance degradation, including across
* multiple clients, as clients are configured, by default, to share a single I/O thread pool (e.g., EventLoopGroup).
* <p>
* You should typically only want to customize the future-completion {@link Executor} for a few possible reasons:
* <ol>
* <li>You want more fine-grained control over the {@link ThreadPoolExecutor} used, such as configuring the pool size
* or sharing a single pool between multiple clients.
* <li>You want to add instrumentation (i.e., metrics) around how the {@link Executor} is used.
* <li>You know, for certain, that all of your {@link CompletableFuture} usage is strictly non-blocking, and you wish to
* remove the minor overhead incurred by using a separate thread. In this case, you can use
* {@code Runnable::run} to execute the future-completion directly from within the I/O thread.
* </ol>
*
* @param futureCompletionExecutor the executor
* @return an instance of this builder.
*/
S3CrtAsyncClientBuilder futureCompletionExecutor(Executor futureCompletionExecutor);


@Override
S3AsyncClient build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.services.s3.internal.crt;

import static software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute.SERVICE_SIGNING_NAME;
import static software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR;
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.AUTH_SCHEMES;
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM;
Expand All @@ -30,6 +31,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute;
Expand All @@ -56,6 +58,7 @@
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.DelegatingS3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
Expand Down Expand Up @@ -120,22 +123,30 @@ private static S3AsyncClient initializeS3AsyncClient(DefaultS3CrtClientBuilder b
builder.executionInterceptors.forEach(overrideConfigurationBuilder::addExecutionInterceptor);
}

return S3AsyncClient.builder()
// Disable checksum for streaming operations, it is handled in CRT. Checksumming for non-streaming
// operations is still handled in HttpChecksumStage
.serviceConfiguration(S3Configuration.builder()
.checksumValidationEnabled(false)
.build())
.region(builder.region)
.endpointOverride(builder.endpointOverride)
.credentialsProvider(builder.credentialsProvider)
.overrideConfiguration(overrideConfigurationBuilder.build())
.accelerate(builder.accelerate)
.forcePathStyle(builder.forcePathStyle)
.crossRegionAccessEnabled(builder.crossRegionAccessEnabled)
.putAuthScheme(new CrtS3ExpressNoOpAuthScheme())
.httpClientBuilder(initializeS3CrtAsyncHttpClient(builder))
.build();
S3AsyncClientBuilder s3AsyncClientBuilder =
S3AsyncClient.builder()
// Disable checksum for streaming operations, it is handled in
// CRT. Checksumming for non-streaming
// operations is still handled in HttpChecksumStage
.serviceConfiguration(S3Configuration.builder()
.checksumValidationEnabled(false)
.build())
.region(builder.region)
.endpointOverride(builder.endpointOverride)
.credentialsProvider(builder.credentialsProvider)
.overrideConfiguration(overrideConfigurationBuilder.build())
.accelerate(builder.accelerate)
.forcePathStyle(builder.forcePathStyle)
.crossRegionAccessEnabled(builder.crossRegionAccessEnabled)
.putAuthScheme(new CrtS3ExpressNoOpAuthScheme())
.httpClientBuilder(initializeS3CrtAsyncHttpClient(builder));


if (builder.futureCompletionExecutor != null) {
s3AsyncClientBuilder.asyncConfiguration(b -> b.advancedOption(FUTURE_COMPLETION_EXECUTOR,
builder.futureCompletionExecutor));
}
return s3AsyncClientBuilder.build();
}

private static S3CrtAsyncHttpClient.Builder initializeS3CrtAsyncHttpClient(DefaultS3CrtClientBuilder builder) {
Expand Down Expand Up @@ -186,6 +197,7 @@ public static final class DefaultS3CrtClientBuilder implements S3CrtAsyncClientB
private S3CrtRetryConfiguration retryConfiguration;
private boolean crossRegionAccessEnabled;
private Long thresholdInBytes;
private Executor futureCompletionExecutor;

@Override
public S3CrtAsyncClientBuilder credentialsProvider(
Expand Down Expand Up @@ -281,6 +293,12 @@ public S3CrtAsyncClientBuilder thresholdInBytes(Long thresholdInBytes) {
return this;
}

@Override
public S3CrtAsyncClientBuilder futureCompletionExecutor(Executor futureCompletionExecutor) {
this.futureCompletionExecutor = futureCompletionExecutor;
return this;
}

@Override
public S3CrtAsyncClient build() {
return new DefaultS3CrtAsyncClient(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.any;
import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
import static com.github.tomakehurst.wiremock.client.WireMock.head;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;

import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import java.net.URI;
import java.util.concurrent.Executor;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.crt.CrtResource;
Expand All @@ -42,7 +49,21 @@
@WireMockTest
public class S3CrtClientWiremockTest {

private static final String LOCATION = "http://Example-Bucket.s3.amazonaws.com/Example-Object";
private static final String BUCKET = "Example-Bucket";
private static final String KEY = "Example-Object";
private static final String E_TAG = "\"3858f62230ac3c915f300c664312c11f-9\"";
private static final String XML_RESPONSE_BODY = String.format(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ "<CompleteMultipartUploadResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\n"
+ "<Location>%s</Location>\n"
+ "<Bucket>%s</Bucket>\n"
+ "<Key>%s</Key>\n"
+ "<ETag>%s</ETag>\n"
+ "</CompleteMultipartUploadResult>", LOCATION, BUCKET, KEY, E_TAG);
private S3AsyncClient s3AsyncClient;
private S3AsyncClient clientWithCustomExecutor;
private SpyableExecutor mockExecutor;

@BeforeAll
public static void setUpBeforeAll() {
Expand All @@ -68,27 +89,43 @@ public void tearDown() {

@Test
public void completeMultipartUpload_completeResponse() {
String location = "http://Example-Bucket.s3.amazonaws.com/Example-Object";
String bucket = "Example-Bucket";
String key = "Example-Object";
String eTag = "\"3858f62230ac3c915f300c664312c11f-9\"";
String xmlResponseBody = String.format(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ "<CompleteMultipartUploadResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\n"
+ "<Location>%s</Location>\n"
+ "<Bucket>%s</Bucket>\n"
+ "<Key>%s</Key>\n"
+ "<ETag>%s</ETag>\n"
+ "</CompleteMultipartUploadResult>", location, bucket, key, eTag);

stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(xmlResponseBody)));
stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(XML_RESPONSE_BODY)));

CompleteMultipartUploadResponse response = s3AsyncClient.completeMultipartUpload(
r -> r.bucket(bucket).key(key).uploadId("upload-id")).join();
r -> r.bucket(BUCKET).key(KEY).uploadId("upload-id")).join();

assertThat(response.location()).isEqualTo(location);
assertThat(response.bucket()).isEqualTo(bucket);
assertThat(response.key()).isEqualTo(key);
assertThat(response.eTag()).isEqualTo(eTag);
assertThat(response.location()).isEqualTo(LOCATION);
assertThat(response.bucket()).isEqualTo(BUCKET);
assertThat(response.key()).isEqualTo(KEY);
assertThat(response.eTag()).isEqualTo(E_TAG);
}

@Test
void overrideResponseCompletionExecutor_shouldCompleteWithCustomExecutor(WireMockRuntimeInfo wiremock) {

mockExecutor = Mockito.spy(new SpyableExecutor());

try (S3AsyncClient s3AsyncClient = S3AsyncClient.crtBuilder()
.region(Region.US_EAST_1)
.endpointOverride(URI.create("http://localhost:" + wiremock.getHttpPort()))
.futureCompletionExecutor(mockExecutor)
.credentialsProvider(
StaticCredentialsProvider.create(AwsBasicCredentials.create("key",
"secret")))
.build()) {
stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(XML_RESPONSE_BODY)));

CompleteMultipartUploadResponse response = s3AsyncClient.completeMultipartUpload(
r -> r.bucket(BUCKET).key(KEY).uploadId("upload-id")).join();

verify(mockExecutor).execute(any(Runnable.class));
}
}

private static class SpyableExecutor implements Executor {
@Override
public void execute(Runnable command) {
command.run();
}
}
}

0 comments on commit 19e6862

Please sign in to comment.