Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pause/Resume Upload for Transfer Manager with Java S3Client #4886

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
745e402
Pause/Resume Upload for Transfer Manager with Java S3Client
davidh44 Feb 2, 2024
65069fb
Fix checkstyle and equalsVerifier
davidh44 Feb 3, 2024
3dfa1c2
temporarily disable failing test
davidh44 Feb 3, 2024
2fa0d9e
AWS Glue Update: Introduce Catalog Encryption Role within Glue Data C…
Feb 5, 2024
4d17804
Amazon WorkSpaces Update: Added definitions of various WorkSpace states
Feb 5, 2024
fd010e4
Release 2.23.18. Updated CHANGELOG.md, README.md and all pom.xml.
Feb 5, 2024
069a00c
Merge pull request #2890 from aws/staging/96e1b8b2-3a37-418d-b480-f27…
aws-sdk-java-automation Feb 5, 2024
9aeba32
update aws-sdk-java pom to add imds and dyanmodb-enhanced (#4890)
zoewangg Feb 5, 2024
0fedea3
Performance improvement for sigv4 signing. (#4891)
millems Feb 5, 2024
1089532
Merge remote-tracking branch 'public/release'
Feb 5, 2024
fce9bf9
Update to next snapshot version: 2.23.19-SNAPSHOT
Feb 5, 2024
1d99321
Delete CloudSearchv2IntegrationTest (#4888)
joviegas Feb 5, 2024
6afc377
Fix tag deletion command (#4892)
dagnir Feb 5, 2024
007ee65
including S3 Access Grants Plugin as part of Java SDK Bundle (#4881)
shiva958 Feb 6, 2024
7a3589c
Address comments
davidh44 Feb 6, 2024
3212620
Archive old changelog entries (< 2.23.0) (#4873)
dagnir Feb 6, 2024
f323822
Changing indentation of config files to 4 spaces (#4889)
cenedhryn Feb 6, 2024
d10ddf6
Amazon EC2 Container Service Update: This release is a documentation …
Feb 6, 2024
cd207db
AWS WAFV2 Update: You can now delete an API key that you've created f…
Feb 6, 2024
ca9b69b
Amazon OpenSearch Service Update: This release adds clear visibility …
Feb 6, 2024
f92e9f3
AWS AppSync Update: Support for environment variables in AppSync Grap…
Feb 6, 2024
4275763
Amazon CloudWatch Logs Update: This release adds a new field, logGrou…
Feb 6, 2024
b020266
Amazon Elasticsearch Service Update: This release adds clear visibili…
Feb 6, 2024
9597393
Release 2.23.19. Updated CHANGELOG.md, README.md and all pom.xml.
Feb 6, 2024
9ab7d51
Merge pull request #2891 from aws/staging/6f43c303-a28b-4966-b004-d38…
aws-sdk-java-automation Feb 6, 2024
2336fed
Update to next snapshot version: 2.23.20-SNAPSHOT
Feb 6, 2024
ccdd184
ifPresent check for PauseObservable
davidh44 Feb 7, 2024
cc06b46
Bump CRT version and expose setting memory limits for S3 calls (#4885)
cenedhryn Feb 7, 2024
bab7f24
Activating SRA for this service (#4896)
cenedhryn Feb 7, 2024
4580e26
Fix request cancellation logic in the AWS CRT Sync HTTP client (#4887)
zoewangg Feb 7, 2024
6a83122
AWS DataSync Update: AWS DataSync now supports manifests for specifyi…
Feb 7, 2024
7ab1028
Amazon Redshift Update: LisRecommendations API to fetch Amazon Redshi…
Feb 7, 2024
e16376e
Amazon Lex Model Building V2 Update: This release introduces a new bo…
Feb 7, 2024
9d8d4cf
Updated endpoints.json and partitions.json.
Feb 7, 2024
d182576
Release 2.23.20. Updated CHANGELOG.md, README.md and all pom.xml.
Feb 7, 2024
0c2d4eb
Merge pull request #2892 from aws/staging/1c0c8ade-7bcb-4158-8160-496…
aws-sdk-java-automation Feb 7, 2024
4a81f88
Update to next snapshot version: 2.23.21-SNAPSHOT
Feb 7, 2024
a5b277b
Address comments
davidh44 Feb 7, 2024
4bc0a09
Wrap subscriber with PausibleUpload
davidh44 Feb 7, 2024
a963bdf
add changelog
davidh44 Feb 8, 2024
02e807c
Amazon QuickSight Update: General Interactions for Visuals; Waterfall…
Feb 8, 2024
ee01442
AWS CodePipeline Update: Add ability to execute pipelines with new pa…
Feb 8, 2024
ab6113b
Amazon WorkSpaces Update: This release introduces User-Decoupling fea…
Feb 8, 2024
4ef752f
Release 2.23.21. Updated CHANGELOG.md, README.md and all pom.xml.
Feb 8, 2024
1548dd5
Merge pull request #2893 from aws/staging/e358ef45-7969-4222-a98d-7fc…
aws-sdk-java-automation Feb 8, 2024
f3da726
Update to next snapshot version: 2.23.22-SNAPSHOT
Feb 8, 2024
2f2e6e1
S3 should configure signer properties (#4856)
sugmanue Feb 8, 2024
1849320
Merge Enable Crc32 change into Pause-Resume Branch (#4904) (#4905)
davidh44 Feb 8, 2024
207a10e
integ test upload resume with different TMs
davidh44 Feb 8, 2024
824d6ad
Rename to PausableUpload
davidh44 Feb 8, 2024
6411744
Enable CRC32 for PUT for MultipartS3AsyncClient (#4898)
davidh44 Feb 8, 2024
fcfc6a5
Revert "Merge Enable Crc32 change into Pause-Resume Branch (#4904) (#…
davidh44 Feb 9, 2024
6bc4130
Merge commit '6411744f8e3e9c3b11a1383921ad4b8e4f822aee' into hdavidh/…
davidh44 Feb 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static void setUpForAllIntegTests() throws Exception {
Log.initLoggingToStdout(Log.LogLevel.Warn);
System.setProperty("aws.crt.debugnative", "true");
s3 = s3ClientBuilder().build();
s3Async = s3AsyncClientBuilder().build();
s3Async = s3AsyncClientBuilder().multipartEnabled(true).build();
davidh44 marked this conversation as resolved.
Show resolved Hide resolved
s3CrtAsync = S3CrtAsyncClient.builder()
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.region(DEFAULT_REGION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package software.amazon.awssdk.transfer.s3;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;

Expand All @@ -26,9 +25,12 @@
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
import software.amazon.awssdk.core.waiters.AsyncWaiter;
import software.amazon.awssdk.core.waiters.Waiter;
Expand Down Expand Up @@ -69,30 +71,42 @@ public static void cleanup() {
executorService.shutdown();
}

@Test
void pause_singlePart_shouldResume() {
enum S3ClientType {
CRT, JAVA
}

private static Stream<Arguments> s3ClientType() {
return Stream.of(Arguments.of(S3ClientType.CRT), Arguments.of(S3ClientType.JAVA));
}

@ParameterizedTest
@MethodSource("s3ClientType")
void pause_singlePart_shouldResume(S3ClientType s3ClientType) {
S3TransferManager transferManager = s3ClientType == S3ClientType.CRT ? tmCrt : tmJava;
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.source(smallFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
FileUpload fileUpload = transferManager.uploadFile(request);
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);

validateEmptyResumeToken(resumableFileUpload);

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
FileUpload resumedUpload = transferManager.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
}

@Test
void pause_fileNotChanged_shouldResume() {
@ParameterizedTest
@MethodSource("s3ClientType")
void pause_fileNotChanged_shouldResume(S3ClientType s3ClientType) {
S3TransferManager transferManager = s3ClientType == S3ClientType.CRT ? tmCrt : tmJava;
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.addTransferListener(LoggingTransferListener.create())
.source(largeFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
FileUpload fileUpload = transferManager.uploadFile(request);
waitUntilMultipartUploadExists();
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);
Expand All @@ -103,33 +117,43 @@ void pause_fileNotChanged_shouldResume() {

verifyMultipartUploadIdExists(resumableFileUpload);

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
FileUpload resumedUpload = transferManager.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
}

@Test
void pauseImmediately_resume_shouldStartFromBeginning() {
@ParameterizedTest
@MethodSource("s3ClientType")
void pauseImmediately_resume_shouldStartFromBeginning(S3ClientType s3ClientType) {
S3TransferManager transferManager = s3ClientType == S3ClientType.CRT ? tmCrt : tmJava;
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.source(largeFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
FileUpload fileUpload = transferManager.uploadFile(request);
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);

validateEmptyResumeToken(resumableFileUpload);
if (s3ClientType == S3ClientType.CRT) {
validateEmptyResumeToken(resumableFileUpload);
} else {
// join() is called on CreateMultipartUpload so resume token will be returned
assertThat(resumableFileUpload.transferredParts().isPresent()).isTrue();
assertThat(resumableFileUpload.transferredParts().getAsLong()).isZero();
}

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
FileUpload resumedUpload = transferManager.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
}

@Test
void pause_fileChanged_resumeShouldStartFromBeginning() throws Exception {
@ParameterizedTest
@MethodSource("s3ClientType")
void pause_fileChanged_resumeShouldStartFromBeginning(S3ClientType s3ClientType) throws Exception {
S3TransferManager transferManager = s3ClientType == S3ClientType.CRT ? tmCrt : tmJava;
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.source(largeFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
FileUpload fileUpload = transferManager.uploadFile(request);
waitUntilMultipartUploadExists();
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);
Expand All @@ -139,13 +163,16 @@ void pause_fileChanged_resumeShouldStartFromBeginning() throws Exception {
assertThat(resumableFileUpload.totalParts()).isNotEmpty();
verifyMultipartUploadIdExists(resumableFileUpload);

byte[] originalBytes = Files.readAllBytes(largeFile.toPath());
byte[] bytes = "helloworld".getBytes(StandardCharsets.UTF_8);
Files.write(largeFile.toPath(), bytes);

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
FileUpload resumedUpload = transferManager.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
verifyMultipartUploadIdNotExist(resumableFileUpload);
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(bytes.length);

Files.write(largeFile.toPath(), originalBytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unintentional change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to write back the original largeFile , since we overwrote it with "helloworld" to test file changed,
we are doing parameterized tests so this will run twice, one for each type of TM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do it in afterEach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only overwrite it in this test, so dont need to write back for other tests

}

private void verifyMultipartUploadIdExists(ResumableFileUpload resumableFileUpload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,31 @@

package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.services.s3.internal.multipart.UploadObjectHelper.PAUSE_OBSERVABLE;
import static software.amazon.awssdk.services.s3.internal.multipart.UploadObjectHelper.RESUME_TOKEN;
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;
import static software.amazon.awssdk.transfer.s3.internal.utils.ResumableRequestConverter.toDownloadFileRequestAndTransformer;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.FileTransformerConfiguration;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.internal.multipart.PauseObservable;
import software.amazon.awssdk.services.s3.internal.multipart.S3ResumeToken;
import software.amazon.awssdk.services.s3.internal.resource.S3AccessPointResource;
import software.amazon.awssdk.services.s3.internal.resource.S3ArnConverter;
import software.amazon.awssdk.services.s3.internal.resource.S3Resource;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
Expand All @@ -49,6 +56,7 @@
import software.amazon.awssdk.transfer.s3.internal.model.DefaultUpload;
import software.amazon.awssdk.transfer.s3.internal.progress.ResumeTransferProgress;
import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater;
import software.amazon.awssdk.transfer.s3.internal.utils.FileUtils;
import software.amazon.awssdk.transfer.s3.model.CompletedCopy;
import software.amazon.awssdk.transfer.s3.model.CompletedDownload;
import software.amazon.awssdk.transfer.s3.model.CompletedFileDownload;
Expand All @@ -65,6 +73,7 @@
import software.amazon.awssdk.transfer.s3.model.FileDownload;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.ResumableFileDownload;
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
import software.amazon.awssdk.transfer.s3.model.Upload;
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
Expand Down Expand Up @@ -156,7 +165,10 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
.chunkSizeInBytes(DEFAULT_FILE_UPLOAD_CHUNK_SIZE)
.build();

PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
PauseObservable pauseObservable = new PauseObservable();
Consumer<AwsRequestOverrideConfiguration.Builder> attachPauseObservable =
b -> b.putExecutionAttribute(PAUSE_OBSERVABLE, pauseObservable);
PutObjectRequest putObjectRequest = attachSdkAttribute(uploadFileRequest.putObjectRequest(), attachPauseObservable);

CompletableFuture<CompletedFileUpload> returnFuture = new CompletableFuture<>();

Expand All @@ -182,8 +194,109 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
} catch (Throwable throwable) {
returnFuture.completeExceptionally(throwable);
}
return new DefaultFileUpload(returnFuture, progressUpdater.progress(), pauseObservable, uploadFileRequest);
}

@Override
public FileUpload resumeUploadFile(ResumableFileUpload resumableFileUpload) {
Validate.paramNotNull(resumableFileUpload, "resumableFileUpload");

boolean fileModified = !FileUtils.fileNotModified(resumableFileUpload.fileLength(),
davidh44 marked this conversation as resolved.
Show resolved Hide resolved
resumableFileUpload.fileLastModified(),
resumableFileUpload.uploadFileRequest().source());

boolean noResumeToken = !hasResumeToken(resumableFileUpload);

if (fileModified || noResumeToken) {
return uploadFromBeginning(resumableFileUpload, fileModified);
}

return doResumeUpload(resumableFileUpload);
}

private FileUpload doResumeUpload(ResumableFileUpload resumableFileUpload) {
davidh44 marked this conversation as resolved.
Show resolved Hide resolved
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
S3ResumeToken s3ResumeToken = s3ResumeToken(resumableFileUpload);

Consumer<AwsRequestOverrideConfiguration.Builder> attachResumeToken =
b -> b.putExecutionAttribute(RESUME_TOKEN, s3ResumeToken);

PutObjectRequest modifiedPutObjectRequest = attachSdkAttribute(putObjectRequest, attachResumeToken);

return uploadFile(uploadFileRequest.toBuilder()
.putObjectRequest(modifiedPutObjectRequest)
.build());
}

private static S3ResumeToken s3ResumeToken(ResumableFileUpload resumableFileUpload) {
return new S3ResumeToken(resumableFileUpload.multipartUploadId().get(),
davidh44 marked this conversation as resolved.
Show resolved Hide resolved
resumableFileUpload.partSizeInBytes().getAsLong(),
resumableFileUpload.totalParts().getAsLong(),
resumableFileUpload.transferredParts().getAsLong());
}

private boolean hasResumeToken(ResumableFileUpload resumableFileUpload) {
return resumableFileUpload.totalParts().isPresent() && resumableFileUpload.partSizeInBytes().isPresent();
}

private PutObjectRequest attachSdkAttribute(PutObjectRequest putObjectRequest,
Consumer<AwsRequestOverrideConfiguration.Builder> builderMutation) {
AwsRequestOverrideConfiguration modifiedRequestOverrideConfig =
putObjectRequest.overrideConfiguration()
.map(o -> o.toBuilder().applyMutation(builderMutation).build())
.orElseGet(() -> AwsRequestOverrideConfiguration.builder()
.applyMutation(builderMutation)
.build());

return putObjectRequest.toBuilder()
.overrideConfiguration(modifiedRequestOverrideConfig)
.build();
}

private FileUpload uploadFromBeginning(ResumableFileUpload resumableFileUpload, boolean fileModified) {
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
davidh44 marked this conversation as resolved.
Show resolved Hide resolved
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
if (fileModified) {
log.debug(() -> String.format("The file (%s) has been modified since "
+ "the last pause. " +
"The SDK will upload the requested object in bucket"
+ " (%s) with key (%s) from "
+ "the "
+ "beginning.",
uploadFileRequest.source(),
putObjectRequest.bucket(),
putObjectRequest.key()));
resumableFileUpload.multipartUploadId()
.ifPresent(id -> {
log.debug(() -> "Aborting previous upload with multipartUploadId: " + id);
s3AsyncClient.abortMultipartUpload(
AbortMultipartUploadRequest.builder()
.bucket(putObjectRequest.bucket())
.key(putObjectRequest.key())
.uploadId(id)
.build())
.exceptionally(t -> {
log.warn(() -> String.format("Failed to abort previous multipart upload "
+ "(id: %s)"
+ ". You may need to call "
+ "S3AsyncClient#abortMultiPartUpload to "
+ "free all storage consumed by"
+ " all parts. ",
id), t);
return null;
});
});
}

log.debug(() -> String.format("No resume token is found. " +
"The SDK will upload the requested object in bucket"
+ " (%s) with key (%s) from "
+ "the beginning.",
putObjectRequest.bucket(),
putObjectRequest.key()));

return new DefaultFileUpload(returnFuture, progressUpdater.progress(), uploadFileRequest);
return uploadFile(uploadFileRequest);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public int hashCode() {

@Override
public String toString() {
return ToString.builder("DefaultFileUpload")
return ToString.builder("CrtFileUpload")
.add("completionFuture", completionFuture)
.add("progress", progress)
.add("request", request)
Expand Down
Loading
Loading