Skip to content

Commit

Permalink
Refactor S3 Transfer Manager uploadDirectory to limit the number of c…
Browse files Browse the repository at this point in the history
…oncurrent upload file requests. (#5031)
  • Loading branch information
zoewangg committed Mar 26, 2024
1 parent 97388aa commit 9be4570
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 19 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-S3TransferManager-687fcee.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "S3 Transfer Manager",
"contributor": "",
"description": "Set a limit on the number of concurrent upload file requests for upload directory. This fixes the OOM issue that could surface when users try to upload a directory that has millions of small files. See [#5023](https://github.com/aws/aws-sdk-java-v2/issues/5023)."
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DOWNLOAD_DIRECTORY_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX;

import java.io.IOException;
Expand Down Expand Up @@ -110,7 +110,7 @@ private void doDownloadDirectory(CompletableFuture<CompletedDirectoryDownload> r
new AsyncBufferingSubscriber<>(downloadSingleFile(downloadDirectoryRequest, request,
failedFileDownloads),
allOfFutures,
DEFAULT_DOWNLOAD_DIRECTORY_MAX_CONCURRENCY);
DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY);
listObjectsHelper.listS3ObjectsRecursively(request)
.filter(downloadDirectoryRequest.filter())
.subscribe(asyncBufferingSubscriber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final class TransferConfigurationOption<T> extends AttributeMap.Key<T> {

public static final String DEFAULT_DELIMITER = "/";
public static final String DEFAULT_PREFIX = "";
public static final int DEFAULT_DOWNLOAD_DIRECTORY_MAX_CONCURRENCY = 100;
public static final int DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY = 100;

private static final int DEFAULT_UPLOAD_DIRECTORY_MAX_DEPTH = Integer.MAX_VALUE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX;

import java.io.IOException;
Expand All @@ -24,14 +25,13 @@
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
Expand Down Expand Up @@ -90,23 +90,26 @@ private void doUploadDirectory(CompletableFuture<CompletedDirectoryUpload> retur
validateDirectory(uploadDirectoryRequest);

Collection<FailedFileUpload> failedFileUploads = new ConcurrentLinkedQueue<>();
List<CompletableFuture<CompletedFileUpload>> futures;

try (Stream<Path> entries = listFiles(directory, uploadDirectoryRequest)) {
futures = entries.map(path -> {
CompletableFuture<CompletedFileUpload> future = uploadSingleFile(uploadDirectoryRequest,
failedFileUploads, path);
Stream<Path> stream = listFiles(directory, uploadDirectoryRequest);

// Forward cancellation of the return future to all individual futures.
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
return future;
}).collect(Collectors.toList());
}
SdkPublisher<Path> iterablePublisher = SdkPublisher.fromIterable(() -> stream.iterator())
.doAfterOnCancel(() -> stream.close())
.doAfterOnError(t -> stream.close())
.doAfterOnComplete(() -> stream.close());

CompletableFuture<Void> allOfFutures = new CompletableFuture<>();

AsyncBufferingSubscriber<Path> bufferingSubscriber =
new AsyncBufferingSubscriber<>(path -> uploadSingleFile(uploadDirectoryRequest, failedFileUploads, path),
allOfFutures, DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY);

iterablePublisher.subscribe(bufferingSubscriber);
CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures);

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((r, t) -> returnFuture.complete(CompletedDirectoryUpload.builder()
.failedTransfers(failedFileUploads)
.build()));
allOfFutures.whenComplete((r, t) -> returnFuture.complete(CompletedDirectoryUpload.builder()
.failedTransfers(failedFileUploads)
.build()));
}

private void validateDirectory(UploadDirectoryRequest uploadDirectoryRequest) {
Expand Down

0 comments on commit 9be4570

Please sign in to comment.