Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor S3 Transfer Manager uploadDirectory to limit the number of c… #5031

Merged
merged 2 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-S3TransferManager-687fcee.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "S3 Transfer Manager",
"contributor": "",
"description": "Set a limit on the number of concurrent upload file requests for upload directory. This fixes the OOM issue that could surface when users try to upload a directory that has millions of small files. See [#5023](https://github.com/aws/aws-sdk-java-v2/issues/5023)."
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DOWNLOAD_DIRECTORY_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX;

import java.io.IOException;
Expand Down Expand Up @@ -110,7 +110,7 @@ private void doDownloadDirectory(CompletableFuture<CompletedDirectoryDownload> r
new AsyncBufferingSubscriber<>(downloadSingleFile(downloadDirectoryRequest, request,
failedFileDownloads),
allOfFutures,
DEFAULT_DOWNLOAD_DIRECTORY_MAX_CONCURRENCY);
DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY);
listObjectsHelper.listS3ObjectsRecursively(request)
.filter(downloadDirectoryRequest.filter())
.subscribe(asyncBufferingSubscriber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final class TransferConfigurationOption<T> extends AttributeMap.Key<T> {

public static final String DEFAULT_DELIMITER = "/";
public static final String DEFAULT_PREFIX = "";
public static final int DEFAULT_DOWNLOAD_DIRECTORY_MAX_CONCURRENCY = 100;
public static final int DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY = 100;

private static final int DEFAULT_UPLOAD_DIRECTORY_MAX_DEPTH = Integer.MAX_VALUE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX;

import java.io.IOException;
Expand All @@ -24,14 +25,13 @@
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
Expand Down Expand Up @@ -90,23 +90,26 @@ private void doUploadDirectory(CompletableFuture<CompletedDirectoryUpload> retur
validateDirectory(uploadDirectoryRequest);

Collection<FailedFileUpload> failedFileUploads = new ConcurrentLinkedQueue<>();
List<CompletableFuture<CompletedFileUpload>> futures;

try (Stream<Path> entries = listFiles(directory, uploadDirectoryRequest)) {
futures = entries.map(path -> {
CompletableFuture<CompletedFileUpload> future = uploadSingleFile(uploadDirectoryRequest,
failedFileUploads, path);
Stream<Path> stream = listFiles(directory, uploadDirectoryRequest);

// Forward cancellation of the return future to all individual futures.
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
return future;
}).collect(Collectors.toList());
}
SdkPublisher<Path> iterablePublisher = SdkPublisher.fromIterable(() -> stream.iterator())
.doAfterOnCancel(() -> stream.close())
.doAfterOnError(t -> stream.close())
.doAfterOnComplete(() -> stream.close());

CompletableFuture<Void> allOfFutures = new CompletableFuture<>();

AsyncBufferingSubscriber<Path> bufferingSubscriber =
new AsyncBufferingSubscriber<>(path -> uploadSingleFile(uploadDirectoryRequest, failedFileUploads, path),
allOfFutures, DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY);

iterablePublisher.subscribe(bufferingSubscriber);
CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures);

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((r, t) -> returnFuture.complete(CompletedDirectoryUpload.builder()
.failedTransfers(failedFileUploads)
.build()));
allOfFutures.whenComplete((r, t) -> returnFuture.complete(CompletedDirectoryUpload.builder()
.failedTransfers(failedFileUploads)
.build()));
}

private void validateDirectory(UploadDirectoryRequest uploadDirectoryRequest) {
Expand Down
Loading