diff --git a/.changes/next-release/feature-AWSCRTbasedS3Client-5e7651e.json b/.changes/next-release/feature-AWSCRTbasedS3Client-5e7651e.json new file mode 100644 index 000000000000..d9e96bb08ad8 --- /dev/null +++ b/.changes/next-release/feature-AWSCRTbasedS3Client-5e7651e.json @@ -0,0 +1,6 @@ +{ + "type": "feature", + "category": "AWS CRT-based S3 Client", + "contributor": "", + "description": "Allow users to configure future completion executor on the AWS CRT-based S3 client via `S3CrtAsyncClientBuilder#futureCompletionExecutor`. See [#4879](https://github.com/aws/aws-sdk-java-v2/issues/4879)" +} diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/S3CrtAsyncClientBuilder.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/S3CrtAsyncClientBuilder.java index bd9558e1f87b..8c9c51da5a36 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/S3CrtAsyncClientBuilder.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/S3CrtAsyncClientBuilder.java @@ -17,6 +17,9 @@ import java.net.URI; import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Consumer; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -281,6 +284,30 @@ default S3CrtAsyncClientBuilder retryConfiguration(Consumer + * The configured {@link Executor} will be invoked by the async HTTP client's I/O threads (e.g., EventLoops), which must be + * reserved for non-blocking behavior. Blocking an I/O thread can cause severe performance degradation, including across + * multiple clients, as clients are configured, by default, to share a single I/O thread pool (e.g., EventLoopGroup). + *

+ * You should typically only want to customize the future-completion {@link Executor} for a few possible reasons: + *

    + *
  1. You want more fine-grained control over the {@link ThreadPoolExecutor} used, such as configuring the pool size + * or sharing a single pool between multiple clients. + *
  2. You want to add instrumentation (i.e., metrics) around how the {@link Executor} is used. + *
  3. You know, for certain, that all of your {@link CompletableFuture} usage is strictly non-blocking, and you wish to + * remove the minor overhead incurred by using a separate thread. In this case, you can use + * {@code Runnable::run} to execute the future-completion directly from within the I/O thread. + *
+ * + * @param futureCompletionExecutor the executor + * @return an instance of this builder. + */ + S3CrtAsyncClientBuilder futureCompletionExecutor(Executor futureCompletionExecutor); + + @Override S3AsyncClient build(); } \ No newline at end of file diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java index 2a8ad361bfb7..2c27e1c90176 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.services.s3.internal.crt; import static software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute.SERVICE_SIGNING_NAME; +import static software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR; import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.AUTH_SCHEMES; import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM; @@ -30,6 +31,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.annotations.SdkTestInternalApi; import software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute; @@ -56,6 +58,7 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.DelegatingS3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Configuration; import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder; import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration; @@ -120,22 +123,30 @@ private static S3AsyncClient initializeS3AsyncClient(DefaultS3CrtClientBuilder b builder.executionInterceptors.forEach(overrideConfigurationBuilder::addExecutionInterceptor); } - return S3AsyncClient.builder() - // Disable checksum for streaming operations, it is handled in CRT. Checksumming for non-streaming - // operations is still handled in HttpChecksumStage - .serviceConfiguration(S3Configuration.builder() - .checksumValidationEnabled(false) - .build()) - .region(builder.region) - .endpointOverride(builder.endpointOverride) - .credentialsProvider(builder.credentialsProvider) - .overrideConfiguration(overrideConfigurationBuilder.build()) - .accelerate(builder.accelerate) - .forcePathStyle(builder.forcePathStyle) - .crossRegionAccessEnabled(builder.crossRegionAccessEnabled) - .putAuthScheme(new CrtS3ExpressNoOpAuthScheme()) - .httpClientBuilder(initializeS3CrtAsyncHttpClient(builder)) - .build(); + S3AsyncClientBuilder s3AsyncClientBuilder = + S3AsyncClient.builder() + // Disable checksum for streaming operations, it is handled in + // CRT. Checksumming for non-streaming + // operations is still handled in HttpChecksumStage + .serviceConfiguration(S3Configuration.builder() + .checksumValidationEnabled(false) + .build()) + .region(builder.region) + .endpointOverride(builder.endpointOverride) + .credentialsProvider(builder.credentialsProvider) + .overrideConfiguration(overrideConfigurationBuilder.build()) + .accelerate(builder.accelerate) + .forcePathStyle(builder.forcePathStyle) + .crossRegionAccessEnabled(builder.crossRegionAccessEnabled) + .putAuthScheme(new CrtS3ExpressNoOpAuthScheme()) + .httpClientBuilder(initializeS3CrtAsyncHttpClient(builder)); + + + if (builder.futureCompletionExecutor != null) { + s3AsyncClientBuilder.asyncConfiguration(b -> b.advancedOption(FUTURE_COMPLETION_EXECUTOR, + builder.futureCompletionExecutor)); + } + return s3AsyncClientBuilder.build(); } private static S3CrtAsyncHttpClient.Builder initializeS3CrtAsyncHttpClient(DefaultS3CrtClientBuilder builder) { @@ -186,6 +197,7 @@ public static final class DefaultS3CrtClientBuilder implements S3CrtAsyncClientB private S3CrtRetryConfiguration retryConfiguration; private boolean crossRegionAccessEnabled; private Long thresholdInBytes; + private Executor futureCompletionExecutor; @Override public S3CrtAsyncClientBuilder credentialsProvider( @@ -281,6 +293,12 @@ public S3CrtAsyncClientBuilder thresholdInBytes(Long thresholdInBytes) { return this; } + @Override + public S3CrtAsyncClientBuilder futureCompletionExecutor(Executor futureCompletionExecutor) { + this.futureCompletionExecutor = futureCompletionExecutor; + return this; + } + @Override public S3CrtAsyncClient build() { return new DefaultS3CrtAsyncClient(this); diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientWiremockTest.java index 610eb508eca7..2f863bb42c18 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientWiremockTest.java @@ -18,16 +18,23 @@ import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; import static com.github.tomakehurst.wiremock.client.WireMock.any; import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl; +import static com.github.tomakehurst.wiremock.client.WireMock.head; import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import com.github.tomakehurst.wiremock.client.WireMock; import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; import com.github.tomakehurst.wiremock.junit5.WireMockTest; import java.net.URI; +import java.util.concurrent.Executor; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.crt.CrtResource; @@ -42,7 +49,21 @@ @WireMockTest public class S3CrtClientWiremockTest { + private static final String LOCATION = "http://Example-Bucket.s3.amazonaws.com/Example-Object"; + private static final String BUCKET = "Example-Bucket"; + private static final String KEY = "Example-Object"; + private static final String E_TAG = "\"3858f62230ac3c915f300c664312c11f-9\""; + private static final String XML_RESPONSE_BODY = String.format( + "\n" + + "\n" + + "%s\n" + + "%s\n" + + "%s\n" + + "%s\n" + + "", LOCATION, BUCKET, KEY, E_TAG); private S3AsyncClient s3AsyncClient; + private S3AsyncClient clientWithCustomExecutor; + private SpyableExecutor mockExecutor; @BeforeAll public static void setUpBeforeAll() { @@ -68,27 +89,43 @@ public void tearDown() { @Test public void completeMultipartUpload_completeResponse() { - String location = "http://Example-Bucket.s3.amazonaws.com/Example-Object"; - String bucket = "Example-Bucket"; - String key = "Example-Object"; - String eTag = "\"3858f62230ac3c915f300c664312c11f-9\""; - String xmlResponseBody = String.format( - "\n" - + "\n" - + "%s\n" - + "%s\n" - + "%s\n" - + "%s\n" - + "", location, bucket, key, eTag); - - stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(xmlResponseBody))); + stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(XML_RESPONSE_BODY))); CompleteMultipartUploadResponse response = s3AsyncClient.completeMultipartUpload( - r -> r.bucket(bucket).key(key).uploadId("upload-id")).join(); + r -> r.bucket(BUCKET).key(KEY).uploadId("upload-id")).join(); - assertThat(response.location()).isEqualTo(location); - assertThat(response.bucket()).isEqualTo(bucket); - assertThat(response.key()).isEqualTo(key); - assertThat(response.eTag()).isEqualTo(eTag); + assertThat(response.location()).isEqualTo(LOCATION); + assertThat(response.bucket()).isEqualTo(BUCKET); + assertThat(response.key()).isEqualTo(KEY); + assertThat(response.eTag()).isEqualTo(E_TAG); + } + + @Test + void overrideResponseCompletionExecutor_shouldCompleteWithCustomExecutor(WireMockRuntimeInfo wiremock) { + + mockExecutor = Mockito.spy(new SpyableExecutor()); + + try (S3AsyncClient s3AsyncClient = S3AsyncClient.crtBuilder() + .region(Region.US_EAST_1) + .endpointOverride(URI.create("http://localhost:" + wiremock.getHttpPort())) + .futureCompletionExecutor(mockExecutor) + .credentialsProvider( + StaticCredentialsProvider.create(AwsBasicCredentials.create("key", + "secret"))) + .build()) { + stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(XML_RESPONSE_BODY))); + + CompleteMultipartUploadResponse response = s3AsyncClient.completeMultipartUpload( + r -> r.bucket(BUCKET).key(KEY).uploadId("upload-id")).join(); + + verify(mockExecutor).execute(any(Runnable.class)); + } + } + + private static class SpyableExecutor implements Executor { + @Override + public void execute(Runnable command) { + command.run(); + } } }