diff --git a/.changes/next-release/bugfix-S3TransferManager-687fcee.json b/.changes/next-release/bugfix-S3TransferManager-687fcee.json new file mode 100644 index 000000000000..b53542d7f080 --- /dev/null +++ b/.changes/next-release/bugfix-S3TransferManager-687fcee.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "S3 Transfer Manager", + "contributor": "", + "description": "Set a limit on the number of concurrent upload file requests for upload directory. This fixes the OOM issue that could surface when users try to upload a directory that has millions of small files. See [#5023](https://github.com/aws/aws-sdk-java-v2/issues/5023)." +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java index cbf0e7e4ef38..5343cb7e87f6 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java @@ -16,7 +16,7 @@ package software.amazon.awssdk.transfer.s3.internal; import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER; -import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DOWNLOAD_DIRECTORY_MAX_CONCURRENCY; +import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY; import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX; import java.io.IOException; @@ -110,7 +110,7 @@ private void doDownloadDirectory(CompletableFuture r new AsyncBufferingSubscriber<>(downloadSingleFile(downloadDirectoryRequest, request, failedFileDownloads), allOfFutures, - DEFAULT_DOWNLOAD_DIRECTORY_MAX_CONCURRENCY); + DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY); listObjectsHelper.listS3ObjectsRecursively(request) .filter(downloadDirectoryRequest.filter()) .subscribe(asyncBufferingSubscriber); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferConfigurationOption.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferConfigurationOption.java index d96677d71d5d..33d124eb7c13 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferConfigurationOption.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferConfigurationOption.java @@ -37,7 +37,7 @@ public final class TransferConfigurationOption extends AttributeMap.Key { public static final String DEFAULT_DELIMITER = "/"; public static final String DEFAULT_PREFIX = ""; - public static final int DEFAULT_DOWNLOAD_DIRECTORY_MAX_CONCURRENCY = 100; + public static final int DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY = 100; private static final int DEFAULT_UPLOAD_DIRECTORY_MAX_DEPTH = Integer.MAX_VALUE; diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java index 9408da5595fd..d92e0b3b6450 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.transfer.s3.internal; import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER; +import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY; import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX; import java.io.IOException; @@ -24,14 +25,13 @@ import java.nio.file.LinkOption; import java.nio.file.Path; import java.util.Collection; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Function; -import java.util.stream.Collectors; import java.util.stream.Stream; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.transfer.s3.S3TransferManager; @@ -90,23 +90,26 @@ private void doUploadDirectory(CompletableFuture retur validateDirectory(uploadDirectoryRequest); Collection failedFileUploads = new ConcurrentLinkedQueue<>(); - List> futures; - try (Stream entries = listFiles(directory, uploadDirectoryRequest)) { - futures = entries.map(path -> { - CompletableFuture future = uploadSingleFile(uploadDirectoryRequest, - failedFileUploads, path); + Stream stream = listFiles(directory, uploadDirectoryRequest); - // Forward cancellation of the return future to all individual futures. - CompletableFutureUtils.forwardExceptionTo(returnFuture, future); - return future; - }).collect(Collectors.toList()); - } + SdkPublisher iterablePublisher = SdkPublisher.fromIterable(() -> stream.iterator()) + .doAfterOnCancel(() -> stream.close()) + .doAfterOnError(t -> stream.close()) + .doAfterOnComplete(() -> stream.close()); + + CompletableFuture allOfFutures = new CompletableFuture<>(); + + AsyncBufferingSubscriber bufferingSubscriber = + new AsyncBufferingSubscriber<>(path -> uploadSingleFile(uploadDirectoryRequest, failedFileUploads, path), + allOfFutures, DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY); + + iterablePublisher.subscribe(bufferingSubscriber); + CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures); - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .whenComplete((r, t) -> returnFuture.complete(CompletedDirectoryUpload.builder() - .failedTransfers(failedFileUploads) - .build())); + allOfFutures.whenComplete((r, t) -> returnFuture.complete(CompletedDirectoryUpload.builder() + .failedTransfers(failedFileUploads) + .build())); } private void validateDirectory(UploadDirectoryRequest uploadDirectoryRequest) {