Skip to content

Commit

Permalink
Refactor S3 Transfer Manager uploadDirectory to limit the number of c…
Browse files Browse the repository at this point in the history
…oncurrent upload file requests.
  • Loading branch information
zoewangg committed Mar 20, 2024
1 parent f79332b commit 77b88f5
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 17 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-S3TransferManager-687fcee.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "S3 Transfer Manager",
"contributor": "",
"description": "Set a limit on the number of concurrent upload file requests for upload directory. This fixes the OOM issue that could surface when users try to upload a directory that has millions of small files. See [#5023](https://github.com/aws/aws-sdk-java-v2/issues/5023)."
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import software.amazon.awssdk.utils.async.EventListeningSubscriber;
import software.amazon.awssdk.utils.async.FilteringSubscriber;
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
import software.amazon.awssdk.utils.async.IterablePublisher;
import software.amazon.awssdk.utils.async.LimitingSubscriber;
import software.amazon.awssdk.utils.async.SequentialSubscriber;
import software.amazon.awssdk.utils.internal.MappingSubscriber;
Expand All @@ -51,6 +52,10 @@ static <T> SdkPublisher<T> adapt(Publisher<T> toAdapt) {
return toAdapt::subscribe;
}

static <T> SdkPublisher<T> fromIterable(Iterable<T> iterable) {
return adapt(new IterablePublisher<>(iterable));
}

/**
* Filters published events to just those that are instances of the given class. This changes the type of
* publisher to the type specified in the {@link Class}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DOWNLOAD_DIRECTORY_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX;

import java.io.IOException;
Expand Down Expand Up @@ -110,7 +110,7 @@ private void doDownloadDirectory(CompletableFuture<CompletedDirectoryDownload> r
new AsyncBufferingSubscriber<>(downloadSingleFile(downloadDirectoryRequest, request,
failedFileDownloads),
allOfFutures,
DEFAULT_DOWNLOAD_DIRECTORY_MAX_CONCURRENCY);
DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY);
listObjectsHelper.listS3ObjectsRecursively(request)
.filter(downloadDirectoryRequest.filter())
.subscribe(asyncBufferingSubscriber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final class TransferConfigurationOption<T> extends AttributeMap.Key<T> {

public static final String DEFAULT_DELIMITER = "/";
public static final String DEFAULT_PREFIX = "";
public static final int DEFAULT_DOWNLOAD_DIRECTORY_MAX_CONCURRENCY = 100;
public static final int DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY = 100;

private static final int DEFAULT_UPLOAD_DIRECTORY_MAX_DEPTH = Integer.MAX_VALUE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DELIMITER;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY;
import static software.amazon.awssdk.transfer.s3.internal.TransferConfigurationOption.DEFAULT_PREFIX;

import java.io.IOException;
Expand All @@ -32,6 +33,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
Expand All @@ -47,6 +49,7 @@
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.StringUtils;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.IterablePublisher;

/**
* An internal helper class that traverses the file tree and send the upload request
Expand Down Expand Up @@ -90,23 +93,26 @@ private void doUploadDirectory(CompletableFuture<CompletedDirectoryUpload> retur
validateDirectory(uploadDirectoryRequest);

Collection<FailedFileUpload> failedFileUploads = new ConcurrentLinkedQueue<>();
List<CompletableFuture<CompletedFileUpload>> futures;

try (Stream<Path> entries = listFiles(directory, uploadDirectoryRequest)) {
futures = entries.map(path -> {
CompletableFuture<CompletedFileUpload> future = uploadSingleFile(uploadDirectoryRequest,
failedFileUploads, path);
Stream<Path> stream = listFiles(directory, uploadDirectoryRequest);

// Forward cancellation of the return future to all individual futures.
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
return future;
}).collect(Collectors.toList());
}
SdkPublisher<Path> iterablePublisher = SdkPublisher.fromIterable(() -> stream.iterator())
.doAfterOnCancel(() -> stream.close())
.doAfterOnError(t -> stream.close())
.doAfterOnComplete(() -> stream.close());

CompletableFuture<Void> allOfFutures = new CompletableFuture<>();

AsyncBufferingSubscriber<Path> bufferingSubscriber =
new AsyncBufferingSubscriber<>(path -> uploadSingleFile(uploadDirectoryRequest, failedFileUploads, path),
allOfFutures, DEFAULT_DIRECTORY_TRANSFER_MAX_CONCURRENCY);

iterablePublisher.subscribe(bufferingSubscriber);
CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures);

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((r, t) -> returnFuture.complete(CompletedDirectoryUpload.builder()
.failedTransfers(failedFileUploads)
.build()));
allOfFutures.whenComplete((r, t) -> returnFuture.complete(CompletedDirectoryUpload.builder()
.failedTransfers(failedFileUploads)
.build()));
}

private void validateDirectory(UploadDirectoryRequest uploadDirectoryRequest) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.utils.async;


import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.internal.async.EmptySubscription;

/**
* Converts an {@link Iterable} to a {@link Publisher}.
*/
@SdkProtectedApi
public class IterablePublisher<T> implements Publisher<T> {

private final Iterator<T> iterator;
private Subscriber<? super T> subscriber;
private AtomicBoolean isSendingData = new AtomicBoolean(false);
private final AtomicLong outstandingDemand = new AtomicLong();

// public IterablePublisher(Iterator<T> iterable) {
// Validate.notNull(iterable, "iterable");
// this.iterator = iterable.iterator();
// }

public IterablePublisher(Iterable<T> iterable) {
Validate.notNull(iterable, "iterable");
this.iterator = iterable.iterator();
}

@Override
public void subscribe(Subscriber<? super T> s) {
if (subscriber != null) {
s.onSubscribe(new NoOpSubscription(s));
s.onError(new IllegalArgumentException("Only one subscription may be active at a time."));
}

this.subscriber = s;

if (!iterator.hasNext()) {
subscriber.onSubscribe(new EmptySubscription(s));
return;
}

subscriber.onSubscribe(new IteratorSubscription());
}

private class IteratorSubscription implements Subscription {
private volatile boolean done;

@Override
public void request(long newDemand) {
if (newDemand <= 0) {
subscriber.onError(new IllegalArgumentException("demand is not positive"));
}

outstandingDemand.updateAndGet(current -> {
if (Long.MAX_VALUE - current < newDemand) {
return Long.MAX_VALUE;
}

return current + newDemand;
});
sendData();
}

private void sendData() {
do {
if (!isSendingData.compareAndSet(false, true)) {
return;
}
try {
doSendData();
} finally {
isSendingData.set(false);
}
} while (shouldSendMoreData());
}

private boolean shouldSendMoreData() {
if (done) {
return false;
}

if (!iterator.hasNext()) {
done = true;
subscriber.onComplete();
return false;
}

return outstandingDemand.get() > 0;
}

private void doSendData() {
while (shouldSendMoreData()) {
outstandingDemand.decrementAndGet();
T next = iterator.next();
if (next == null) {
done = true;
subscriber.onError(new IllegalArgumentException("Iterable returned null"));
} else {
subscriber.onNext(next);
}
}
}

@Override
public void cancel() {
done = true;
subscriber = null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.utils.async;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkProtectedApi;

/** An implementation of {@link Subscription} that does nothing.
* <p>
* Useful in situations where a {@link Publisher} needs to
* signal {@code exceptionOccurred} or {@code onComplete} immediately after
* {@code subscribe()} but but it needs to signal{@code onSubscription} first.
*/
@SdkProtectedApi
public final class NoOpSubscription implements Subscription {
private final Subscriber<?> subscriber;

public NoOpSubscription(Subscriber<?> subscriber) {
this.subscriber = subscriber;
}

@Override
public void request(long n) {
if (n < 1) {
subscriber.onError(new IllegalArgumentException("Demand must be positive!"));
}
}

@Override
public void cancel() {

}
}

0 comments on commit 77b88f5

Please sign in to comment.