Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pause/Resume Upload for Transfer Manager with Java S3Client #4908

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AmazonS3-a20b910.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "Amazon S3",
"contributor": "",
"type": "feature",
"description": "Add support for pause/resume upload for TransferManager with Java-based S3Client that has multipart enabled"
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public static void setUpForAllIntegTests() throws Exception {
Log.initLoggingToStdout(Log.LogLevel.Warn);
System.setProperty("aws.crt.debugnative", "true");
s3 = s3ClientBuilder().build();
// TODO - enable multipart once TransferListener fixed for MultipartClient
s3Async = s3AsyncClientBuilder().build();
s3CrtAsync = S3CrtAsyncClient.builder()
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package software.amazon.awssdk.transfer.s3;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;

Expand All @@ -26,13 +25,17 @@
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
import software.amazon.awssdk.core.waiters.AsyncWaiter;
import software.amazon.awssdk.core.waiters.Waiter;
import software.amazon.awssdk.core.waiters.WaiterAcceptor;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse;
import software.amazon.awssdk.services.s3.model.ListPartsResponse;
import software.amazon.awssdk.services.s3.model.NoSuchUploadException;
Expand All @@ -48,17 +51,25 @@ public class S3TransferManagerUploadPauseResumeIntegrationTest extends S3Integra
private static final String BUCKET = temporaryBucketName(S3TransferManagerUploadPauseResumeIntegrationTest.class);
private static final String KEY = "key";
// 24 * MB is chosen to make sure we have data written in the file already upon pausing.
private static final long OBJ_SIZE = 24 * MB;
private static final long LARGE_OBJ_SIZE = 24 * MB;
private static final long SMALL_OBJ_SIZE = 2 * MB;
private static File largeFile;
private static File smallFile;
private static ScheduledExecutorService executorService;

// TODO - switch to tmJava from TestBase once TransferListener fixed for MultipartClient
protected static S3TransferManager tmJavaMpu;

@BeforeAll
public static void setup() throws Exception {
createBucket(BUCKET);
largeFile = new RandomTempFile(OBJ_SIZE);
smallFile = new RandomTempFile(2 * MB);
largeFile = new RandomTempFile(LARGE_OBJ_SIZE);
smallFile = new RandomTempFile(SMALL_OBJ_SIZE);
executorService = Executors.newScheduledThreadPool(3);

// TODO - switch to tmJava from TestBase once TransferListener fixed for MultipartClient
S3AsyncClient s3AsyncMpu = s3AsyncClientBuilder().multipartEnabled(true).build();
tmJavaMpu = S3TransferManager.builder().s3Client(s3AsyncMpu).build();
}

@AfterAll
Expand All @@ -69,30 +80,42 @@ public static void cleanup() {
executorService.shutdown();
}

@Test
void pause_singlePart_shouldResume() {
private static Stream<Arguments> transferManagers() {
return Stream.of(
Arguments.of(tmJavaMpu, tmJavaMpu),
Arguments.of(tmCrt, tmCrt),
Arguments.of(tmCrt, tmJavaMpu),
Arguments.of(tmJavaMpu, tmCrt)
);
}

@ParameterizedTest
@MethodSource("transferManagers")
void pause_singlePart_shouldResume(S3TransferManager uploadTm, S3TransferManager resumeTm) {
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.source(smallFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
FileUpload fileUpload = uploadTm.uploadFile(request);
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);

validateEmptyResumeToken(resumableFileUpload);

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(SMALL_OBJ_SIZE);
}

@Test
void pause_fileNotChanged_shouldResume() {
@ParameterizedTest
@MethodSource("transferManagers")
void pause_fileNotChanged_shouldResume(S3TransferManager uploadTm, S3TransferManager resumeTm) throws Exception {
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.addTransferListener(LoggingTransferListener.create())
.source(largeFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
FileUpload fileUpload = uploadTm.uploadFile(request);
waitUntilMultipartUploadExists();
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);
Expand All @@ -103,33 +126,37 @@ void pause_fileNotChanged_shouldResume() {

verifyMultipartUploadIdExists(resumableFileUpload);

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(LARGE_OBJ_SIZE);
}

@Test
void pauseImmediately_resume_shouldStartFromBeginning() {
@ParameterizedTest
@MethodSource("transferManagers")
void pauseImmediately_resume_shouldStartFromBeginning(S3TransferManager uploadTm, S3TransferManager resumeTm) {
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.source(largeFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.source(largeFile)
.build();
FileUpload fileUpload = uploadTm.uploadFile(request);
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);

validateEmptyResumeToken(resumableFileUpload);

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(LARGE_OBJ_SIZE);
}

@Test
void pause_fileChanged_resumeShouldStartFromBeginning() throws Exception {
@ParameterizedTest
@MethodSource("transferManagers")
void pause_fileChanged_resumeShouldStartFromBeginning(S3TransferManager uploadTm, S3TransferManager resumeTm) throws Exception {
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.source(largeFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
FileUpload fileUpload = uploadTm.uploadFile(request);
waitUntilMultipartUploadExists();
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);
Expand All @@ -139,13 +166,18 @@ void pause_fileChanged_resumeShouldStartFromBeginning() throws Exception {
assertThat(resumableFileUpload.totalParts()).isNotEmpty();
verifyMultipartUploadIdExists(resumableFileUpload);

byte[] bytes = "helloworld".getBytes(StandardCharsets.UTF_8);
Files.write(largeFile.toPath(), bytes);

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
verifyMultipartUploadIdNotExist(resumableFileUpload);
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(bytes.length);
byte[] originalBytes = Files.readAllBytes(largeFile.toPath());
try {
byte[] bytes = "helloworld".getBytes(StandardCharsets.UTF_8);
Files.write(largeFile.toPath(), bytes);

FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
verifyMultipartUploadIdNotExist(resumableFileUpload);
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(bytes.length);
} finally {
Files.write(largeFile.toPath(), originalBytes);
}
}

private void verifyMultipartUploadIdExists(ResumableFileUpload resumableFileUpload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN;
import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.assertNotUnsupportedArn;
import static software.amazon.awssdk.transfer.s3.internal.utils.FileUtils.fileNotModified;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
Expand All @@ -31,7 +30,6 @@
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestPauseObservable;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
Expand All @@ -51,6 +49,7 @@
@SdkInternalApi
class CrtS3TransferManager extends DelegatingS3TransferManager {
private static final Logger log = Logger.loggerFor(S3TransferManager.class);
private static final PauseResumeHelper PAUSE_RESUME_HELPER = new PauseResumeHelper();
private final S3AsyncClient s3AsyncClient;

CrtS3TransferManager(TransferManagerConfiguration transferConfiguration, S3AsyncClient s3AsyncClient,
Expand Down Expand Up @@ -99,67 +98,15 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
return new CrtFileUpload(returnFuture, progressUpdater.progress(), observable, uploadFileRequest);
}

private FileUpload uploadFromBeginning(ResumableFileUpload resumableFileUpload, boolean fileModified,
boolean noResumeToken) {
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
if (fileModified) {
log.debug(() -> String.format("The file (%s) has been modified since "
+ "the last pause. " +
"The SDK will upload the requested object in bucket"
+ " (%s) with key (%s) from "
+ "the "
+ "beginning.",
uploadFileRequest.source(),
putObjectRequest.bucket(),
putObjectRequest.key()));
resumableFileUpload.multipartUploadId()
.ifPresent(id -> {
log.debug(() -> "Aborting previous upload with multipartUploadId: " + id);
s3AsyncClient.abortMultipartUpload(
AbortMultipartUploadRequest.builder()
.bucket(putObjectRequest.bucket())
.key(putObjectRequest.key())
.uploadId(id)
.build())
.exceptionally(t -> {
log.warn(() -> String.format("Failed to abort previous multipart upload "
+ "(id: %s)"
+ ". You may need to call "
+ "S3AsyncClient#abortMultiPartUpload to "
+ "free all storage consumed by"
+ " all parts. ",
id), t);
return null;
});
});
}

if (noResumeToken) {
log.debug(() -> String.format("No resume token is found. " +
"The SDK will upload the requested object in bucket"
+ " (%s) with key (%s) from "
+ "the beginning.",
putObjectRequest.bucket(),
putObjectRequest.key()));
}


return uploadFile(uploadFileRequest);
}

@Override
public FileUpload resumeUploadFile(ResumableFileUpload resumableFileUpload) {
Validate.paramNotNull(resumableFileUpload, "resumableFileUpload");

boolean fileModified = !fileNotModified(resumableFileUpload.fileLength(),
resumableFileUpload.fileLastModified(),
resumableFileUpload.uploadFileRequest().source());

boolean noResumeToken = !hasResumeToken(resumableFileUpload);
boolean fileModified = PAUSE_RESUME_HELPER.fileModified(resumableFileUpload, s3AsyncClient);
boolean noResumeToken = !PAUSE_RESUME_HELPER.hasResumeToken(resumableFileUpload);

if (fileModified || noResumeToken) {
return uploadFromBeginning(resumableFileUpload, fileModified, noResumeToken);
return uploadFile(resumableFileUpload.uploadFileRequest());
}

return doResumeUpload(resumableFileUpload);
Expand Down Expand Up @@ -188,10 +135,6 @@ private static ResumeToken crtResumeToken(ResumableFileUpload resumableFileUploa
.withUploadId(resumableFileUpload.multipartUploadId().orElse(null)));
}

private boolean hasResumeToken(ResumableFileUpload resumableFileUpload) {
return resumableFileUpload.totalParts().isPresent() && resumableFileUpload.partSizeInBytes().isPresent();
}

private PutObjectRequest attachSdkAttribute(PutObjectRequest putObjectRequest,
Consumer<SdkHttpExecutionAttributes.Builder> builderMutation) {
SdkHttpExecutionAttributes modifiedAttributes =
Expand Down
Loading