Skip to content

Commit

Permalink
Pause/Resume Upload for Transfer Manager with Java S3Client (#4908) (#…
Browse files Browse the repository at this point in the history
…4937)

* Pause/Resume Upload for Transfer Manager with Java S3Client

* Fix checkstyle and equalsVerifier

* temporarily disable failing test

* Address comments

* ifPresent check for PauseObservable

* Address comments

* Wrap subscriber with PausibleUpload

* add changelog

* integ test upload resume with different TMs

* Rename to PausableUpload

* Add unit tests

* Refactor KnownLengthUploadHelper

* Add unit tests

* Move PauseObservable and S3ResumeToken out of internal

* Refactor UploadWithKnownContentLengthHelper

* Address comments and update tests

* Fix import order

* Extract Subscriber and MpuRequestContext to separate classes

* Update Subscriber and add tests

* Create separate tmJava with multipartClient in integ test

* Address comments

* Remove TestInternalApis and refacor tests

* Address comment
  • Loading branch information
davidh44 authored Feb 16, 2024
1 parent 189bbcf commit e970a6f
Show file tree
Hide file tree
Showing 29 changed files with 1,792 additions and 349 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AmazonS3-a20b910.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "Amazon S3",
"contributor": "",
"type": "feature",
"description": "Add support for pause/resume upload for TransferManager with Java-based S3Client that has multipart enabled"
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public static void setUpForAllIntegTests() throws Exception {
Log.initLoggingToStdout(Log.LogLevel.Warn);
System.setProperty("aws.crt.debugnative", "true");
s3 = s3ClientBuilder().build();
// TODO - enable multipart once TransferListener fixed for MultipartClient
s3Async = s3AsyncClientBuilder().build();
s3CrtAsync = S3CrtAsyncClient.builder()
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package software.amazon.awssdk.transfer.s3;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;

Expand All @@ -26,13 +25,17 @@
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
import software.amazon.awssdk.core.waiters.AsyncWaiter;
import software.amazon.awssdk.core.waiters.Waiter;
import software.amazon.awssdk.core.waiters.WaiterAcceptor;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse;
import software.amazon.awssdk.services.s3.model.ListPartsResponse;
import software.amazon.awssdk.services.s3.model.NoSuchUploadException;
Expand All @@ -48,17 +51,25 @@ public class S3TransferManagerUploadPauseResumeIntegrationTest extends S3Integra
private static final String BUCKET = temporaryBucketName(S3TransferManagerUploadPauseResumeIntegrationTest.class);
private static final String KEY = "key";
// 24 * MB is chosen to make sure we have data written in the file already upon pausing.
private static final long OBJ_SIZE = 24 * MB;
private static final long LARGE_OBJ_SIZE = 24 * MB;
private static final long SMALL_OBJ_SIZE = 2 * MB;
private static File largeFile;
private static File smallFile;
private static ScheduledExecutorService executorService;

// TODO - switch to tmJava from TestBase once TransferListener fixed for MultipartClient
protected static S3TransferManager tmJavaMpu;

@BeforeAll
public static void setup() throws Exception {
createBucket(BUCKET);
largeFile = new RandomTempFile(OBJ_SIZE);
smallFile = new RandomTempFile(2 * MB);
largeFile = new RandomTempFile(LARGE_OBJ_SIZE);
smallFile = new RandomTempFile(SMALL_OBJ_SIZE);
executorService = Executors.newScheduledThreadPool(3);

// TODO - switch to tmJava from TestBase once TransferListener fixed for MultipartClient
S3AsyncClient s3AsyncMpu = s3AsyncClientBuilder().multipartEnabled(true).build();
tmJavaMpu = S3TransferManager.builder().s3Client(s3AsyncMpu).build();
}

@AfterAll
Expand All @@ -69,30 +80,42 @@ public static void cleanup() {
executorService.shutdown();
}

@Test
void pause_singlePart_shouldResume() {
private static Stream<Arguments> transferManagers() {
return Stream.of(
Arguments.of(tmJavaMpu, tmJavaMpu),
Arguments.of(tmCrt, tmCrt),
Arguments.of(tmCrt, tmJavaMpu),
Arguments.of(tmJavaMpu, tmCrt)
);
}

@ParameterizedTest
@MethodSource("transferManagers")
void pause_singlePart_shouldResume(S3TransferManager uploadTm, S3TransferManager resumeTm) {
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.source(smallFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
FileUpload fileUpload = uploadTm.uploadFile(request);
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);

validateEmptyResumeToken(resumableFileUpload);

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(SMALL_OBJ_SIZE);
}

@Test
void pause_fileNotChanged_shouldResume() {
@ParameterizedTest
@MethodSource("transferManagers")
void pause_fileNotChanged_shouldResume(S3TransferManager uploadTm, S3TransferManager resumeTm) throws Exception {
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.addTransferListener(LoggingTransferListener.create())
.source(largeFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
FileUpload fileUpload = uploadTm.uploadFile(request);
waitUntilMultipartUploadExists();
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);
Expand All @@ -103,33 +126,37 @@ void pause_fileNotChanged_shouldResume() {

verifyMultipartUploadIdExists(resumableFileUpload);

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(LARGE_OBJ_SIZE);
}

@Test
void pauseImmediately_resume_shouldStartFromBeginning() {
@ParameterizedTest
@MethodSource("transferManagers")
void pauseImmediately_resume_shouldStartFromBeginning(S3TransferManager uploadTm, S3TransferManager resumeTm) {
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.source(largeFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.source(largeFile)
.build();
FileUpload fileUpload = uploadTm.uploadFile(request);
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);

validateEmptyResumeToken(resumableFileUpload);

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(LARGE_OBJ_SIZE);
}

@Test
void pause_fileChanged_resumeShouldStartFromBeginning() throws Exception {
@ParameterizedTest
@MethodSource("transferManagers")
void pause_fileChanged_resumeShouldStartFromBeginning(S3TransferManager uploadTm, S3TransferManager resumeTm) throws Exception {
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.source(largeFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
FileUpload fileUpload = uploadTm.uploadFile(request);
waitUntilMultipartUploadExists();
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);
Expand All @@ -139,13 +166,18 @@ void pause_fileChanged_resumeShouldStartFromBeginning() throws Exception {
assertThat(resumableFileUpload.totalParts()).isNotEmpty();
verifyMultipartUploadIdExists(resumableFileUpload);

byte[] bytes = "helloworld".getBytes(StandardCharsets.UTF_8);
Files.write(largeFile.toPath(), bytes);

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
verifyMultipartUploadIdNotExist(resumableFileUpload);
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(bytes.length);
byte[] originalBytes = Files.readAllBytes(largeFile.toPath());
try {
byte[] bytes = "helloworld".getBytes(StandardCharsets.UTF_8);
Files.write(largeFile.toPath(), bytes);

FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
verifyMultipartUploadIdNotExist(resumableFileUpload);
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(bytes.length);
} finally {
Files.write(largeFile.toPath(), originalBytes);
}
}

private void verifyMultipartUploadIdExists(ResumableFileUpload resumableFileUpload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN;
import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.assertNotUnsupportedArn;
import static software.amazon.awssdk.transfer.s3.internal.utils.FileUtils.fileNotModified;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
Expand All @@ -31,7 +30,6 @@
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestPauseObservable;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
Expand All @@ -51,6 +49,7 @@
@SdkInternalApi
class CrtS3TransferManager extends DelegatingS3TransferManager {
private static final Logger log = Logger.loggerFor(S3TransferManager.class);
private static final PauseResumeHelper PAUSE_RESUME_HELPER = new PauseResumeHelper();
private final S3AsyncClient s3AsyncClient;

CrtS3TransferManager(TransferManagerConfiguration transferConfiguration, S3AsyncClient s3AsyncClient,
Expand Down Expand Up @@ -99,67 +98,15 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
return new CrtFileUpload(returnFuture, progressUpdater.progress(), observable, uploadFileRequest);
}

private FileUpload uploadFromBeginning(ResumableFileUpload resumableFileUpload, boolean fileModified,
boolean noResumeToken) {
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
if (fileModified) {
log.debug(() -> String.format("The file (%s) has been modified since "
+ "the last pause. " +
"The SDK will upload the requested object in bucket"
+ " (%s) with key (%s) from "
+ "the "
+ "beginning.",
uploadFileRequest.source(),
putObjectRequest.bucket(),
putObjectRequest.key()));
resumableFileUpload.multipartUploadId()
.ifPresent(id -> {
log.debug(() -> "Aborting previous upload with multipartUploadId: " + id);
s3AsyncClient.abortMultipartUpload(
AbortMultipartUploadRequest.builder()
.bucket(putObjectRequest.bucket())
.key(putObjectRequest.key())
.uploadId(id)
.build())
.exceptionally(t -> {
log.warn(() -> String.format("Failed to abort previous multipart upload "
+ "(id: %s)"
+ ". You may need to call "
+ "S3AsyncClient#abortMultiPartUpload to "
+ "free all storage consumed by"
+ " all parts. ",
id), t);
return null;
});
});
}

if (noResumeToken) {
log.debug(() -> String.format("No resume token is found. " +
"The SDK will upload the requested object in bucket"
+ " (%s) with key (%s) from "
+ "the beginning.",
putObjectRequest.bucket(),
putObjectRequest.key()));
}


return uploadFile(uploadFileRequest);
}

@Override
public FileUpload resumeUploadFile(ResumableFileUpload resumableFileUpload) {
Validate.paramNotNull(resumableFileUpload, "resumableFileUpload");

boolean fileModified = !fileNotModified(resumableFileUpload.fileLength(),
resumableFileUpload.fileLastModified(),
resumableFileUpload.uploadFileRequest().source());

boolean noResumeToken = !hasResumeToken(resumableFileUpload);
boolean fileModified = PAUSE_RESUME_HELPER.fileModified(resumableFileUpload, s3AsyncClient);
boolean noResumeToken = !PAUSE_RESUME_HELPER.hasResumeToken(resumableFileUpload);

if (fileModified || noResumeToken) {
return uploadFromBeginning(resumableFileUpload, fileModified, noResumeToken);
return uploadFile(resumableFileUpload.uploadFileRequest());
}

return doResumeUpload(resumableFileUpload);
Expand Down Expand Up @@ -188,10 +135,6 @@ private static ResumeToken crtResumeToken(ResumableFileUpload resumableFileUploa
.withUploadId(resumableFileUpload.multipartUploadId().orElse(null)));
}

private boolean hasResumeToken(ResumableFileUpload resumableFileUpload) {
return resumableFileUpload.totalParts().isPresent() && resumableFileUpload.partSizeInBytes().isPresent();
}

private PutObjectRequest attachSdkAttribute(PutObjectRequest putObjectRequest,
Consumer<SdkHttpExecutionAttributes.Builder> builderMutation) {
SdkHttpExecutionAttributes modifiedAttributes =
Expand Down
Loading

0 comments on commit e970a6f

Please sign in to comment.