Skip to content

Commit

Permalink
Bump CRT version and ensure crt resources are closed (#4782)
Browse files Browse the repository at this point in the history
  • Loading branch information
zoewangg authored Dec 15, 2023
1 parent 2015471 commit 769e98b
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 13 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSCRTbasedS3Client-d7f038c.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT-based S3 Client",
"contributor": "",
"description": "Make sure all CRT resources are closed when the AWS CRT-based S3 client is closed."
}
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-693fec2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS SDK for Java v2",
"contributor": "",
"description": "Bump `aws-crt` version to `0.29.1`."
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
<rxjava.version>2.2.21</rxjava.version>
<commons-codec.verion>1.15</commons-codec.verion>
<jmh.version>1.29</jmh.version>
<awscrt.version>0.28.11</awscrt.version>
<awscrt.version>0.29.1</awscrt.version>

<!--Test dependencies -->
<junit5.version>5.10.0</junit5.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
import software.amazon.awssdk.services.s3.model.CSVInput;
Expand All @@ -54,16 +55,19 @@ public class SelectObjectContentIntegrationTest extends S3IntegrationTestBase {
+ "C,D";
private static final String QUERY = "select s._1 from S3Object s";

private static S3AsyncClient s3CrtClient;

@BeforeAll
public static void setup() throws Exception {
S3IntegrationTestBase.setUp();
s3.createBucket(r -> r.bucket(BUCKET_NAME));
s3.waiter().waitUntilBucketExists(r -> r.bucket(BUCKET_NAME));
s3.putObject(r -> r.bucket(BUCKET_NAME).key(KEY), RequestBody.fromString(CSV_CONTENTS));
s3CrtClient = crtClientBuilder().build();
}

private static Stream<S3AsyncClient> s3AsyncClients() {
return Stream.of(crtClientBuilder().build(), s3AsyncClientBuilder().build());
return Stream.of(s3CrtClient, s3Async);
}

@AfterAll
Expand All @@ -73,10 +77,11 @@ public static void teardown() {
} finally {
s3AsyncClients().forEach(SdkAutoCloseable::close);
s3.close();
CrtResource.waitForNoResources();
}
}

@ParameterizedTest
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("s3AsyncClients")
public void selectObjectContent_onResponseInvokedWithResponse(S3AsyncClient client) {
TestHandler handler = new TestHandler();
Expand All @@ -85,7 +90,7 @@ public void selectObjectContent_onResponseInvokedWithResponse(S3AsyncClient clie
assertThat(handler.response).isNotNull();
}

@ParameterizedTest
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("s3AsyncClients")
public void selectObjectContent_recordsEventUnmarshalledCorrectly(S3AsyncClient client) {
TestHandler handler = new TestHandler();
Expand All @@ -99,7 +104,7 @@ public void selectObjectContent_recordsEventUnmarshalledCorrectly(S3AsyncClient
assertThat(recordsEvent.payload().asUtf8String()).contains("A\nC");
}

@ParameterizedTest
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("s3AsyncClients")
public void selectObjectContent_invalidQuery_unmarshallsErrorResponse(S3AsyncClient client) {
TestHandler handler = new TestHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
checksumConfig(httpChecksum, requestType, s3NativeClientConfiguration.checksumValidationEnabled());
URI endpoint = getEndpoint(uri);

AwsSigningConfig defaultS3SigningConfig = awsSigningConfig(signingRegion, httpExecutionAttributes);
AwsSigningConfig signingConfig = awsSigningConfig(signingRegion, httpExecutionAttributes);

S3MetaRequestOptions requestOptions = new S3MetaRequestOptions()
.withHttpRequest(httpRequest)
Expand All @@ -157,7 +157,7 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
.withResponseHandler(responseHandler)
.withResumeToken(resumeToken)
.withRequestFilePath(requestFilePath)
.withSigningConfig(defaultS3SigningConfig);
.withSigningConfig(signingConfig);

S3MetaRequest s3MetaRequest = crtS3Client.makeMetaRequest(requestOptions);
S3MetaRequestPauseObservable observable =
Expand All @@ -168,7 +168,7 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
if (observable != null) {
observable.subscribe(s3MetaRequest);
}
addCancelCallback(executeFuture, s3MetaRequest, responseHandler);
closeResourceCallback(executeFuture, s3MetaRequest, responseHandler, signingConfig);

return executeFuture;
}
Expand Down Expand Up @@ -214,14 +214,19 @@ private static S3MetaRequestOptions.MetaRequestType requestType(AsyncExecuteRequ
return S3MetaRequestOptions.MetaRequestType.DEFAULT;
}

private static void addCancelCallback(CompletableFuture<Void> executeFuture,
S3MetaRequest s3MetaRequest,
S3CrtResponseHandlerAdapter responseHandler) {
private static void closeResourceCallback(CompletableFuture<Void> executeFuture,
S3MetaRequest s3MetaRequest,
S3CrtResponseHandlerAdapter responseHandler,
AwsSigningConfig signingConfig) {
executeFuture.whenComplete((r, t) -> {
if (executeFuture.isCancelled()) {
log.debug(() -> "The request is cancelled, cancelling meta request");
responseHandler.cancelRequest();
s3MetaRequest.cancel();
signingConfig.close();
} else {
s3MetaRequest.close();
signingConfig.close();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ public class S3NativeClientConfiguration implements SdkAutoCloseable {
private final URI endpointOverride;
private final boolean checksumValidationEnabled;
private final Long readBufferSizeInBytes;

private final TlsContext tlsContext;

private final TlsContextOptions clientTlsContextOptions;
private final HttpProxyOptions proxyOptions;
private final Duration connectionTimeout;
private final HttpMonitoringOptions httpMonitoringOptions;
Expand All @@ -70,7 +71,7 @@ public S3NativeClientConfiguration(Builder builder) {
this.signingRegion = builder.signingRegion == null ? DefaultAwsRegionProviderChain.builder().build().getRegion().id() :
builder.signingRegion;
this.clientBootstrap = new ClientBootstrap(null, null);
TlsContextOptions clientTlsContextOptions =
clientTlsContextOptions =
TlsContextOptions.createDefaultClient()
.withCipherPreference(TlsCipherPreference.TLS_CIPHER_SYSTEM_DEFAULT);

Expand Down Expand Up @@ -199,6 +200,7 @@ public Long readBufferSizeInBytes() {
@Override
public void close() {
clientBootstrap.close();
clientTlsContextOptions.close();
tlsContext.close();
credentialProviderAdapter.close();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.internal.crt;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.any;
import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static org.assertj.core.api.Assertions.assertThat;

import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import java.net.URI;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.Log;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;

/**
* Tests to make sure all CRT resources are cleaned up
*/
@WireMockTest
public class S3CrtClientWiremockTest {

private S3AsyncClient s3AsyncClient;

@BeforeAll
public static void setUpBeforeAll() {
System.setProperty("aws.crt.debugnative", "true");
Log.initLoggingToStdout(Log.LogLevel.Warn);
}

@BeforeEach
public void setup(WireMockRuntimeInfo wiremock) {
s3AsyncClient = S3AsyncClient.crtBuilder()
.region(Region.US_EAST_1)
.endpointOverride(URI.create("http://localhost:" + wiremock.getHttpPort()))
.credentialsProvider(
StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "secret")))
.build();
}

@AfterEach
public void tearDown() {
s3AsyncClient.close();
CrtResource.waitForNoResources();
}

@Test
public void completeMultipartUpload_completeResponse() {
String location = "http://Example-Bucket.s3.amazonaws.com/Example-Object";
String bucket = "Example-Bucket";
String key = "Example-Object";
String eTag = "\"3858f62230ac3c915f300c664312c11f-9\"";
String xmlResponseBody = String.format(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ "<CompleteMultipartUploadResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\n"
+ "<Location>%s</Location>\n"
+ "<Bucket>%s</Bucket>\n"
+ "<Key>%s</Key>\n"
+ "<ETag>%s</ETag>\n"
+ "</CompleteMultipartUploadResult>", location, bucket, key, eTag);

stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(xmlResponseBody)));

CompleteMultipartUploadResponse response = s3AsyncClient.completeMultipartUpload(
r -> r.bucket(bucket).key(key).uploadId("upload-id")).join();

assertThat(response.location()).isEqualTo(location);
assertThat(response.bucket()).isEqualTo(bucket);
assertThat(response.key()).isEqualTo(key);
assertThat(response.eTag()).isEqualTo(eTag);
}
}
18 changes: 18 additions & 0 deletions services/s3/src/test/resources/jetty-logging.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/apache2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
#

# Set up logging implementation
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=OFF

0 comments on commit 769e98b

Please sign in to comment.