Skip to content

Commit

Permalink
Add s3 express suppport for AWS CRT-based S3 Client (#4723)
Browse files Browse the repository at this point in the history
* Integrate with CRT S3 express support

* Add integ tests

* Fix test
  • Loading branch information
zoewangg authored Nov 29, 2023
1 parent 0340939 commit 2306005
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 105 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS CRT-based S3 client",
"contributor": "",
"description": "Add S3 express support for the AWS CRT-based S3 client."
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
<rxjava.version>2.2.21</rxjava.version>
<commons-codec.verion>1.15</commons-codec.verion>
<jmh.version>1.29</jmh.version>
<awscrt.version>0.28.0</awscrt.version>
<awscrt.version>0.28.10</awscrt.version>

<!--Test dependencies -->
<junit5.version>5.10.0</junit5.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -96,6 +98,7 @@ public class S3ExpressIntegrationTest extends S3ExpressIntegrationTestBase {
private static final String AZ = "use1-az4";
private static S3Client s3;
private static S3AsyncClient s3Async;
private static S3AsyncClient s3CrtAsync;
private static String testBucket;

@BeforeAll
Expand All @@ -104,6 +107,7 @@ static void setup() {
.build();
s3Async = s3AsyncClientBuilder(TEST_REGION).overrideConfiguration(o -> o.addExecutionInterceptor(capturingInterceptor))
.build();
s3CrtAsync = s3CrtAsyncClientBuilder(TEST_REGION).build();
testBucket = getS3ExpressBucketNameForAz(AZ);
createBucketS3Express(s3, testBucket, AZ);
}
Expand All @@ -113,25 +117,31 @@ static void teardown() {
deleteBucketAndAllContents(s3, testBucket);
s3.close();
s3Async.close();
s3CrtAsync.close();
}

@BeforeEach
void reset() {
capturingInterceptor.reset();
}

@Test
public void putCopyGetDeleteAsync() {
s3Async.putObject(r -> r.bucket(testBucket).key(KEY), AsyncRequestBody.fromString(CONTENTS)).join();
s3Async.headObject(r -> r.bucket(testBucket).key(KEY)).join();
private static Stream<S3AsyncClient> asyncClients() {
return Stream.of(s3Async, s3CrtAsync);
}

@ParameterizedTest(autoCloseArguments = false)
@MethodSource("asyncClients")
public void putCopyGetDeleteAsync(S3AsyncClient s3AsyncClient) {
s3AsyncClient.putObject(r -> r.bucket(testBucket).key(KEY), AsyncRequestBody.fromString(CONTENTS)).join();
s3AsyncClient.headObject(r -> r.bucket(testBucket).key(KEY)).join();

s3.copyObject(r -> r.sourceBucket(testBucket).sourceKey(KEY).destinationBucket(testBucket).destinationKey(COPY_DESTINATION_KEY));
s3Async.headObject(r -> r.bucket(testBucket).key(COPY_DESTINATION_KEY)).join();
s3AsyncClient.headObject(r -> r.bucket(testBucket).key(COPY_DESTINATION_KEY)).join();

String result = s3Async.getObject(r -> r.bucket(testBucket).key(KEY), AsyncResponseTransformer.toBytes()).join().asUtf8String();
String result = s3AsyncClient.getObject(r -> r.bucket(testBucket).key(KEY), AsyncResponseTransformer.toBytes()).join().asUtf8String();
assertThat(result).isEqualTo(CONTENTS);

s3Async.deleteObject(r -> r.bucket(testBucket).key(KEY)).join();
s3AsyncClient.deleteObject(r -> r.bucket(testBucket).key(KEY)).join();
}

@Test
Expand All @@ -148,16 +158,17 @@ public void putCopyGetDeleteSync() {
s3.deleteObject(r -> r.bucket(testBucket).key(KEY));
}

@Test
public void uploadMultiplePartAsync() {
String uploadId = s3Async.createMultipartUpload(b -> b.bucket(testBucket).key(KEY)).join().uploadId();
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("asyncClients")
public void uploadMultiplePartAsync(S3AsyncClient s3AsyncClient) {
String uploadId = s3AsyncClient.createMultipartUpload(b -> b.bucket(testBucket).key(KEY)).join().uploadId();

UploadPartRequest uploadPartRequest = UploadPartRequest.builder().bucket(testBucket).key(KEY)
.uploadId(uploadId)
.partNumber(1)
.build();

UploadPartResponse response = s3Async.uploadPart(uploadPartRequest, AsyncRequestBody.fromString(CONTENTS)).join();
UploadPartResponse response = s3AsyncClient.uploadPart(uploadPartRequest, AsyncRequestBody.fromString(CONTENTS)).join();

List<CompletedPart> completedParts = new ArrayList<>();
completedParts.add(CompletedPart.builder().eTag(response.eTag()).partNumber(1).build());
Expand All @@ -168,7 +179,7 @@ public void uploadMultiplePartAsync() {
.uploadId(uploadId)
.multipartUpload(completedUploadParts)
.build();
CompleteMultipartUploadResponse completeMultipartUploadResponse = s3Async.completeMultipartUpload(completeRequest).join();
CompleteMultipartUploadResponse completeMultipartUploadResponse = s3AsyncClient.completeMultipartUpload(completeRequest).join();
assertThat(completeMultipartUploadResponse).isNotNull();

ResponseBytes<GetObjectResponse> objectAsBytes = s3.getObject(b -> b.bucket(testBucket).key(KEY), ResponseTransformer.toBytes());
Expand All @@ -183,7 +194,7 @@ public void s3Express_nonObjectTransferApis_Sync(SyncTestCase tc) {
}

@MethodSource("asyncTestCases")
@ParameterizedTest
@ParameterizedTest(autoCloseArguments = false)
public void s3Express_nonObjectTransferApis_Async(AsyncTestCase tc) {
runAndVerify(tc);
}
Expand Down Expand Up @@ -351,30 +362,36 @@ private static List<SyncTestCase> syncTestCases() {
}

private static List<AsyncTestCase> asyncTestCases() {
return Stream.concat(asyncTestCasesPerClient(s3Async).stream(), asyncTestCasesPerClient(s3CrtAsync).stream()).collect(Collectors.toList());
}

private static List<AsyncTestCase> asyncTestCasesPerClient(S3AsyncClient s3Async) {
// getSimpleName is not "simple", but it's fine to be used for testing
String simpleName = s3Async.getClass().getSimpleName();
return Arrays.asList(
//control plane APIs
new AsyncTestCase("ListDirectoryBuckets", () -> {
new AsyncTestCase("ListDirectoryBuckets-" + simpleName, () -> {
ListDirectoryBucketsRequest request = ListDirectoryBucketsRequest.builder().build();
return s3Async.listDirectoryBuckets(request);
}, Expect.builder().build()),
new AsyncTestCase("PutBucketPolicy", () -> {
new AsyncTestCase("PutBucketPolicy-" + simpleName, () -> {
PutBucketPolicyRequest request = PutBucketPolicyRequest.builder().bucket(testBucket).policy("fake").build();
return s3Async.putBucketPolicy(request);
}, Expect.builder().error("Policies must be valid JSON").build()),
new AsyncTestCase("GetBucketPolicy", () -> {
new AsyncTestCase("GetBucketPolicy-" + simpleName, () -> {
GetBucketPolicyRequest request = GetBucketPolicyRequest.builder().bucket(testBucket).build();
return s3Async.getBucketPolicy(request);
}, Expect.builder().error("The bucket policy does not exist").build()),
new AsyncTestCase("DeleteBucketPolicy", () -> {
new AsyncTestCase("DeleteBucketPolicy-" + simpleName, () -> {
DeleteBucketPolicyRequest request = DeleteBucketPolicyRequest.builder().bucket(testBucket).build();
return s3Async.deleteBucketPolicy(request);
}, Expect.builder().build()),
//data plane APIs
new AsyncTestCase("ListObjectsV2", () -> {
new AsyncTestCase("ListObjectsV2-" + simpleName, () -> {
ListObjectsV2Request request = ListObjectsV2Request.builder().bucket(testBucket).build();
return s3Async.listObjectsV2(request);
}, Expect.builder().build()),
new AsyncTestCase("DeleteObjects", () -> {
new AsyncTestCase("DeleteObjects-" + simpleName, () -> {
DeleteObjectsRequest request = DeleteObjectsRequest.builder()
.bucket(testBucket)
.delete(Delete.builder()
Expand All @@ -385,7 +402,7 @@ private static List<AsyncTestCase> asyncTestCases() {
.build();
return s3Async.deleteObjects(request);
}, Expect.builder().build()),
new AsyncTestCase("HeadBucket", () -> {
new AsyncTestCase("HeadBucket-" + simpleName, () -> {
HeadBucketRequest request = HeadBucketRequest.builder().bucket(testBucket).build();
return s3Async.headBucket(request);
}, Expect.builder().build())
Expand Down Expand Up @@ -422,8 +439,6 @@ protected static void runAndVerify(SyncTestCase testCase) {
}
List<String> contentSha256Value = req.headers().get("x-amz-content-sha256");
assertThat(contentSha256Value).isNotNull().hasSize(1).isEqualTo(Collections.singletonList("UNSIGNED-PAYLOAD"));

req.headers().keySet().forEach(k -> System.out.println(k + " " + req.headers().get(k)));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.s3.model.BucketInfo;
import software.amazon.awssdk.services.s3.model.BucketType;
import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration;
Expand Down Expand Up @@ -63,6 +64,13 @@ protected static S3AsyncClientBuilder s3AsyncClientBuilder(Region region) {

}

protected static S3CrtAsyncClientBuilder s3CrtAsyncClientBuilder(Region region) {
return S3AsyncClient.crtBuilder()
.region(region)
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN);

}

protected static void createBucketS3Express(S3Client client, String bucketName, String az) {
try {
LocationInfo location = LocationInfo.builder().name(az).type(LocationType.AVAILABILITY_ZONE).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.internal.crt;

import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.auth.spi.signer.AsyncSignRequest;
import software.amazon.awssdk.http.auth.spi.signer.AsyncSignedRequest;
import software.amazon.awssdk.http.auth.spi.signer.HttpSigner;
import software.amazon.awssdk.http.auth.spi.signer.SignRequest;
import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
import software.amazon.awssdk.identity.spi.IdentityProvider;
import software.amazon.awssdk.identity.spi.IdentityProviders;
import software.amazon.awssdk.identity.spi.ResolveIdentityRequest;
import software.amazon.awssdk.services.s3.s3express.S3ExpressAuthScheme;
import software.amazon.awssdk.services.s3.s3express.S3ExpressSessionCredentials;

/**
* An implementation of {@link S3ExpressAuthScheme} that returns a noop {@link IdentityProvider}.
*/
@SdkInternalApi
public final class CrtS3ExpressNoOpAuthScheme implements S3ExpressAuthScheme {
@Override
public String schemeId() {
return S3ExpressAuthScheme.SCHEME_ID;
}

@Override
public IdentityProvider<S3ExpressSessionCredentials> identityProvider(IdentityProviders providers) {
return NoOpIdentityProvider.INSTANCE;
}

@Override
public HttpSigner<S3ExpressSessionCredentials> signer() {
return NoOpSigner.INSTANCE;
}

private static final class NoOpIdentityProvider implements IdentityProvider<S3ExpressSessionCredentials> {
private static final NoOpIdentityProvider INSTANCE = new NoOpIdentityProvider();

@Override
public Class<S3ExpressSessionCredentials> identityType() {
return S3ExpressSessionCredentials.class;
}

@Override
public CompletableFuture<? extends S3ExpressSessionCredentials> resolveIdentity(ResolveIdentityRequest request) {
return CompletableFuture.completedFuture(null);
}
}

private static final class NoOpSigner implements HttpSigner<S3ExpressSessionCredentials> {
private static final NoOpSigner INSTANCE = new NoOpSigner();

@Override
public SignedRequest sign(SignRequest<? extends S3ExpressSessionCredentials> request) {
throw new UnsupportedOperationException();
}

@Override
public CompletableFuture<AsyncSignedRequest> signAsync(AsyncSignRequest<? extends S3ExpressSessionCredentials> request) {
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@

package software.amazon.awssdk.services.s3.internal.crt;

import static software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute.SERVICE_SIGNING_NAME;
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.AUTH_SCHEMES;
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_NAME;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_REGION;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.USE_S3_EXPRESS_AUTH;
import static software.amazon.awssdk.services.s3.internal.crt.S3NativeClientConfiguration.DEFAULT_PART_SIZE_IN_BYTES;

import java.net.URI;
Expand Down Expand Up @@ -58,6 +61,7 @@
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration;
import software.amazon.awssdk.services.s3.internal.multipart.CopyObjectHelper;
import software.amazon.awssdk.services.s3.internal.s3express.S3ExpressUtils;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
Expand Down Expand Up @@ -129,6 +133,7 @@ private static S3AsyncClient initializeS3AsyncClient(DefaultS3CrtClientBuilder b
.accelerate(builder.accelerate)
.forcePathStyle(builder.forcePathStyle)
.crossRegionAccessEnabled(builder.crossRegionAccessEnabled)
.putAuthScheme(new CrtS3ExpressNoOpAuthScheme())
.httpClientBuilder(initializeS3CrtAsyncHttpClient(builder))
.build();
}
Expand Down Expand Up @@ -309,6 +314,8 @@ public void afterMarshalling(Context.AfterMarshalling context,
.put(SIGNING_REGION, executionAttributes.getAttribute(AwsSignerExecutionAttribute.SIGNING_REGION))
.put(S3InternalSdkHttpExecutionAttribute.OBJECT_FILE_PATH,
executionAttributes.getAttribute(OBJECT_FILE_PATH))
.put(USE_S3_EXPRESS_AUTH, S3ExpressUtils.useS3ExpressAuthScheme(executionAttributes))
.put(SIGNING_NAME, executionAttributes.getAttribute(SERVICE_SIGNING_NAME))
.build();

// For putObject and getObject, we rely on CRT to perform checksum validation
Expand Down
Loading

0 comments on commit 2306005

Please sign in to comment.