From 6dbb81d17042d098f1e80e9bcef51af3fce36cc4 Mon Sep 17 00:00:00 2001 From: Olivier Lepage-Applin Date: Thu, 25 Sep 2025 14:03:35 -0400 Subject: [PATCH 1/3] merge branch olapplin/large-object-resume --- ...FileAsyncResponseTransformerPublisher.java | 6 +- ...artDownloadPauseResumeIntegrationTest.java | 19 ++++ .../utils/ResumableRequestConverter.java | 10 +- ...ParallelMultipartDownloaderSubscriber.java | 99 ++++++++++++++++--- ...ipartDownloaderSubscriberWiremockTest.java | 7 +- 5 files changed, 117 insertions(+), 24 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerPublisher.java index 29208894cf31..04cef3cd76bb 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerPublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerPublisher.java @@ -136,10 +136,10 @@ private void handleError(Throwable e) { } private AsyncResponseTransformer getDelegateTransformer(Long startAt) { - if (transformerCount.get() == 0) { + if (transformerCount.get() == 0 && + initialConfig.fileWriteOption() != FileTransformerConfiguration.FileWriteOption.WRITE_TO_POSITION) { // On the first request we need to maintain the same config so - // that the file is actually created on disk if it doesn't exist (for example, if CREATE_NEW or - // CREATE_OR_REPLACE_EXISTING is used) + // that the file is actually created on disk if it doesn't exist (for CREATE_NEW or CREATE_OR_REPLACE_EXISTING) return AsyncResponseTransformer.toFile(path, initialConfig); } switch (initialConfig.fileWriteOption()) { diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerMultipartDownloadPauseResumeIntegrationTest.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerMultipartDownloadPauseResumeIntegrationTest.java index dcbace143b5f..e81de03b9f90 100644 --- a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerMultipartDownloadPauseResumeIntegrationTest.java +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerMultipartDownloadPauseResumeIntegrationTest.java @@ -17,6 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; +import static software.amazon.awssdk.transfer.s3.SizeConstant.KB; import static software.amazon.awssdk.transfer.s3.SizeConstant.MB; import java.io.File; @@ -82,6 +83,24 @@ void pauseAndResume_shouldResumeDownload() { assertThat(path.toFile()).hasSameBinaryContentAs(sourceFile); } + @Test + void pauseAndResume_beforeFirstPartCompletes_shouldResumeDownload() { + Path path = RandomTempFile.randomUncreatedFile().toPath(); + DownloadFileRequest request = DownloadFileRequest.builder() + .getObjectRequest(b -> b.bucket(BUCKET).key(KEY)) + .destination(path) + .build(); + FileDownload download = tmJava.downloadFile(request); + + // stop before we complete first part, so only wait for an amount of bytes much lower than 1 part, 1 KiB should do it + waitUntilAmountTransferred(download, KB); + ResumableFileDownload resumableFileDownload = download.pause(); + FileDownload resumed = tmJava.resumeDownloadFile(resumableFileDownload); + resumed.completionFuture().join(); + assertThat(path.toFile()).hasSameBinaryContentAs(sourceFile); + } + + @Test void pauseAndResume_whenAlreadyComplete_shouldHandleGracefully() { Path path = RandomTempFile.randomUncreatedFile().toPath(); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/utils/ResumableRequestConverter.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/utils/ResumableRequestConverter.java index 7e55a3f22560..7a38d55e70d7 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/utils/ResumableRequestConverter.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/utils/ResumableRequestConverter.java @@ -83,22 +83,18 @@ private ResumableRequestConverter() { } if (hasRemainingParts(getObjectRequest)) { - log.debug(() -> "The paused download was performed with part GET, now resuming download of remaining parts"); - Long positionToWriteFrom = - MultipartDownloadUtils.multipartDownloadResumeContext(originalDownloadRequest.getObjectRequest()) - .map(MultipartDownloadResumeContext::bytesToLastCompletedParts) - .orElse(0L); + log.info(() -> "The paused download was performed with part GET, now resuming download of remaining parts"); AsyncResponseTransformer responseTransformer = AsyncResponseTransformer.toFile(originalDownloadRequest.destination(), FileTransformerConfiguration.builder() .fileWriteOption(WRITE_TO_POSITION) - .position(positionToWriteFrom) + .position(0L) .failureBehavior(LEAVE) .build()); return Pair.of(originalDownloadRequest, responseTransformer); } - log.debug(() -> "The paused download was performed with range GET, now resuming download of remaining bytes."); + log.info(() -> "The paused download was performed with range GET, now resuming download of remaining bytes."); newDownloadFileRequest = resumedDownloadFileRequest(resumableFileDownload, originalDownloadRequest, getObjectRequest, diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelMultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelMultipartDownloaderSubscriber.java index 4c4423e72a5d..75803576a123 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelMultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelMultipartDownloaderSubscriber.java @@ -15,8 +15,12 @@ package software.amazon.awssdk.services.s3.internal.multipart; +import java.util.Collections; +import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Queue; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -30,6 +34,7 @@ import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.utils.CompletableFutureUtils; +import software.amazon.awssdk.utils.ContentRangeParser; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Pair; @@ -66,7 +71,7 @@ public class ParallelMultipartDownloaderSubscriber * The total number of completed parts. A part is considered complete once the completable future associated with its request * completes successfully. */ - private final AtomicInteger completedParts = new AtomicInteger(); + private final AtomicInteger completedParts; /** * The future returned to the user when calling @@ -80,7 +85,7 @@ public class ParallelMultipartDownloaderSubscriber * The {@link GetObjectResponse} to be returned in the completed future to the user. It corresponds to the response of first * part GetObject */ - private GetObjectResponse getObjectResponse; + private volatile GetObjectResponse getObjectResponse; /** * The subscription received from the publisher this subscriber subscribes to. @@ -135,12 +140,17 @@ public class ParallelMultipartDownloaderSubscriber private final AtomicInteger partNumber = new AtomicInteger(0); /** - * Tracks if one of the parts requests future completed exceptionally. If this occurs, it means all retries were - * attempted for that part, but it still failed. This is a failure state, the error should be reported back to the user - * and any more request should be ignored. + * Tracks if one of the parts requests future completed exceptionally. If this occurs, it means all retries were attempted for + * that part, but it still failed. This is a failure state, the error should be reported back to the user and any more request + * should be ignored. */ private final AtomicBoolean isCompletedExceptionally = new AtomicBoolean(false); + /** + * When resuming a paused download, indicates which parts were already completed before pausing. + */ + private final Set initialCompletedParts; + public ParallelMultipartDownloaderSubscriber(S3AsyncClient s3, GetObjectRequest getObjectRequest, CompletableFuture resultFuture, @@ -149,6 +159,36 @@ public ParallelMultipartDownloaderSubscriber(S3AsyncClient s3, this.getObjectRequest = getObjectRequest; this.resultFuture = resultFuture; this.maxInFlightParts = maxInFlightParts; + this.initialCompletedParts = initialCompletedParts(getObjectRequest); + this.completedParts = new AtomicInteger(initialCompletedParts.size()); + + if (resumingDownload()) { + int totalPartsFromInitialRequest = MultipartDownloadUtils.multipartDownloadResumeContext(getObjectRequest) + .map(MultipartDownloadResumeContext::totalParts) + .orElse(0); + if (totalPartsFromInitialRequest > 0) { + totalPartsFuture.complete(totalPartsFromInitialRequest); + } + getObjectResponse = MultipartDownloadUtils.multipartDownloadResumeContext(getObjectRequest) + .map(MultipartDownloadResumeContext::response) + .orElse(null); + } + } + + private static Set initialCompletedParts(GetObjectRequest getObjectRequest) { + return Collections.unmodifiableSet( + MultipartDownloadUtils.multipartDownloadResumeContext(getObjectRequest) + .map(MultipartDownloadResumeContext::completedParts) + .>map(HashSet::new) + .orElse(Collections.emptySet()) + ); + } + + private boolean resumingDownload() { + Optional hasAlreadyCompletedParts = + MultipartDownloadUtils.multipartDownloadResumeContext(getObjectRequest) + .map(ctx -> !ctx.completedParts().isEmpty()); + return hasAlreadyCompletedParts.orElse(false); } @Override @@ -176,19 +216,18 @@ public void onNext(AsyncResponseTransformer processingRequests(asyncResponseTransformer, currentPartNum, totalParts)); } } private void processingRequests(AsyncResponseTransformer asyncResponseTransformer, - int currentPartNum, Integer totalParts) { + int currentPartNum, int totalParts) { if (currentPartNum > totalParts) { // Do not process requests above total parts. @@ -203,6 +242,7 @@ private void processingRequests(AsyncResponseTransformer ctx.addCompletedPart(currentPartNumber)); if (completedParts.get() >= totalParts) { if (completedParts.get() > totalParts) { resultFuture.completeExceptionally(new IllegalStateException("Total parts exceeded")); } else { + updateResumeContextForCompletion(res); resultFuture.complete(getObjectResponse); } @@ -254,6 +297,14 @@ private void sendNextRequest(AsyncResponseTransformer MultipartDownloadUtils + .multipartDownloadResumeContext(getObjectRequest) + .ifPresent(ctx -> + ctx.addToBytesToLastCompletedParts(total))); + } + private void sendFirstRequest(AsyncResponseTransformer asyncResponseTransformer) { log.debug(() -> "Sending first request"); GetObjectRequest request = nextRequest(1); @@ -282,6 +333,13 @@ private void sendFirstRequest(AsyncResponseTransformer { + ctx.addCompletedPart(1); + ctx.response(res); + ctx.totalParts(res.partsCount()); + }); + synchronized (subscriptionLock) { subscription.request(1); } @@ -312,7 +370,7 @@ private void setInitialPartCountAndEtag(GetObjectResponse response) { private void handlePartError(Throwable e, int part) { isCompletedExceptionally.set(true); - log.debug(() -> "Error on part " + part, e); + log.debug(() -> "Error on part " + part, e); resultFuture.completeExceptionally(e); inFlightRequests.values().forEach(future -> future.cancel(true)); } @@ -334,9 +392,12 @@ private void processPendingTransformers(int totalParts) { private void doProcessPendingTransformers(int totalParts) { while (shouldProcessPendingTransformers()) { - Pair> transformer = - pendingTransformers.poll(); - sendNextRequest(transformer.right(), transformer.left(), totalParts); + Pair> pair = pendingTransformers.poll(); + Integer part = pair.left(); + AsyncResponseTransformer transformer = pair.right(); + if (part <= totalParts) { + sendNextRequest(transformer, part, totalParts); + } } } @@ -372,4 +433,18 @@ private GetObjectRequest nextRequest(int nextPartToGet) { }); } + private int nextPart() { + if (initialCompletedParts.isEmpty()) { + return partNumber.incrementAndGet(); + } + + synchronized (initialCompletedParts) { + int part = partNumber.incrementAndGet(); + while (initialCompletedParts.contains(part)) { + part = partNumber.incrementAndGet(); + } + return part; + } + } + } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelMultipartDownloaderSubscriberWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelMultipartDownloaderSubscriberWiremockTest.java index 54abbee78c64..1a74e32d04fe 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelMultipartDownloaderSubscriberWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelMultipartDownloaderSubscriberWiremockTest.java @@ -79,8 +79,11 @@ void tearDown() throws Exception { } @ParameterizedTest - @ValueSource(ints = {2, 3, 4, 5, 6, 7, 8, 9, 10, 49}) - void happyPath_multipartDownload_partsLessThanMaxInFlight(int numParts) throws Exception { + @ValueSource(ints = {2, 3, 4, 5, 6, 7, 8, 9, 10, 49, // less than maxInFlightParts + 50, // == maxInFlightParts + 51, 100, 101 // more than maxInFlightParts + }) + void happyPath_multipartDownload(int numParts) throws Exception { int partSize = 1024; byte[] expectedBody = utils.stubAllParts(testBucket, testKey, numParts, partSize); From 691feedd95cd288cfd3d7a2caf732ddccbedf726 Mon Sep 17 00:00:00 2001 From: Olivier Lepage-Applin Date: Thu, 25 Sep 2025 14:09:50 -0400 Subject: [PATCH 2/3] remove info logs --- .../transfer/s3/internal/utils/ResumableRequestConverter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/utils/ResumableRequestConverter.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/utils/ResumableRequestConverter.java index 7a38d55e70d7..a61da15e7a5d 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/utils/ResumableRequestConverter.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/utils/ResumableRequestConverter.java @@ -83,7 +83,7 @@ private ResumableRequestConverter() { } if (hasRemainingParts(getObjectRequest)) { - log.info(() -> "The paused download was performed with part GET, now resuming download of remaining parts"); + log.debug(() -> "The paused download was performed with part GET, now resuming download of remaining parts"); AsyncResponseTransformer responseTransformer = AsyncResponseTransformer.toFile(originalDownloadRequest.destination(), FileTransformerConfiguration.builder() @@ -94,7 +94,7 @@ private ResumableRequestConverter() { return Pair.of(originalDownloadRequest, responseTransformer); } - log.info(() -> "The paused download was performed with range GET, now resuming download of remaining bytes."); + log.debug(() -> "The paused download was performed with range GET, now resuming download of remaining bytes."); newDownloadFileRequest = resumedDownloadFileRequest(resumableFileDownload, originalDownloadRequest, getObjectRequest, From 66d560d8bbe86598daa2422d1e32ca5537182dc7 Mon Sep 17 00:00:00 2001 From: Olivier Lepage-Applin Date: Thu, 25 Sep 2025 16:56:56 -0400 Subject: [PATCH 3/3] do not wrap responseTransformer for multipart for range-get --- .../awssdk/transfer/s3/internal/GenericS3TransferManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java index f65cabec1ee3..695550480af4 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java @@ -409,7 +409,7 @@ private TransferProgressUpdater doDownloadFile( TransferProgressUpdater progressUpdater = new TransferProgressUpdater(downloadRequest, null); try { progressUpdater.transferInitiated(); - responseTransformer = isS3ClientMultipartEnabled() + responseTransformer = isS3ClientMultipartEnabled() && downloadRequest.getObjectRequest().range() == null ? progressUpdater.wrapForNonSerialFileDownload( responseTransformer, downloadRequest.getObjectRequest()) : progressUpdater.wrapResponseTransformer(responseTransformer);