Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of MultipartDownloaderSubscriber #4931

Merged
merged 19 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ default SplitAsyncResponseTransformer<ResponseT, ResultT> split(long bufferSize)
SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> transformer = SplittingTransformer
.<ResponseT, ResultT>builder()
.upstreamResponseTransformer(this)
.maximumBufferSize(bufferSize)
.maximumBufferSizeInBytes(bufferSize)
.returnFuture(future)
.build();
return SplitAsyncResponseTransformer.<ResponseT, ResultT>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As
/**
* The buffer size used to buffer the content received from the downstream subscriber
*/
private final long maximumBufferSize;
private final long maximumBufferInBytes;

/**
* This publisher is used to send the bytes received from the downstream subscriber's transformers to a
Expand All @@ -107,11 +107,11 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As
private final AtomicBoolean emitting = new AtomicBoolean(false);

private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer,
Long bufferSize,
Long maximumBufferInBytes,
CompletableFuture<ResultT> returnFuture) {
this.upstreamResponseTransformer = Validate.paramNotNull(upstreamResponseTransformer, "asyncRequestBody");
this.returnFuture = Validate.paramNotNull(returnFuture, "returnFuture");
this.maximumBufferSize = Validate.notNull(bufferSize, "bufferSize");
this.maximumBufferInBytes = Validate.notNull(maximumBufferInBytes, "bufferSize");
L-Applin marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down Expand Up @@ -226,10 +226,11 @@ public void onResponse(ResponseT response) {
public void onStream(SdkPublisher<ByteBuffer> publisher) {
if (onStreamCalled.compareAndSet(false, true)) {
log.trace(() -> "calling onStream on the upstream transformer");
upstreamResponseTransformer.onStream(
upstreamSubscriber ->
publisherToUpstream.subscribe(new DelegatingBufferingSubscriber(maximumBufferSize, upstreamSubscriber))
);
upstreamResponseTransformer.onStream(upstreamSubscriber -> publisherToUpstream.subscribe(
DelegatingBufferingSubscriber.builder()
.maximumBufferInBytes(maximumBufferInBytes)
.delegate(upstreamSubscriber)
.build()));
}
publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response, publisherToUpstream));
}
Expand All @@ -248,14 +249,14 @@ static class IndividualPartSubscriber<T> implements Subscriber<ByteBuffer> {

private final CompletableFuture<T> future;
private final T response;
private final SimplePublisher<ByteBuffer> bodyPartPublisher;
private final SimplePublisher<ByteBuffer> publisherToUpstream;
private Subscription subscription;

IndividualPartSubscriber(CompletableFuture<T> future, T response,
SimplePublisher<ByteBuffer> bodyPartPublisher) {
this.future = future;
this.response = response;
this.bodyPartPublisher = bodyPartPublisher;
this.publisherToUpstream = bodyPartPublisher;
}

@Override
Expand All @@ -274,7 +275,7 @@ public void onNext(ByteBuffer byteBuffer) {
if (byteBuffer == null) {
throw new NullPointerException("onNext must not be called with null byteBuffer");
}
bodyPartPublisher.send(byteBuffer).whenComplete((r, t) -> {
publisherToUpstream.send(byteBuffer).whenComplete((r, t) -> {
if (t != null) {
handleError(t);
return;
Expand All @@ -285,7 +286,6 @@ public void onNext(ByteBuffer byteBuffer) {

@Override
public void onError(Throwable t) {
bodyPartPublisher.error(t);
handleError(t);
}

Expand All @@ -295,18 +295,18 @@ public void onComplete() {
}

private void handleError(Throwable t) {
publisherToUpstream.error(t);
future.completeExceptionally(t);
}
}


public static <ResponseT, ResultT> Builder<ResponseT, ResultT> builder() {
return new Builder<>();
}

public static final class Builder<ResponseT, ResultT> {

private long maximumBufferSize;
private Long maximumBufferSize;
private CompletableFuture<ResultT> returnFuture;
private AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer;

Expand Down Expand Up @@ -336,7 +336,7 @@ public Builder<ResponseT, ResultT> upstreamResponseTransformer(
* @param maximumBufferSize the amount of data buffered and the size of the chunk of data
* @return an instance of this builder
*/
public Builder<ResponseT, ResultT> maximumBufferSize(long maximumBufferSize) {
public Builder<ResponseT, ResultT> maximumBufferSizeInBytes(Long maximumBufferSize) {
this.maximumBufferSize = maximumBufferSize;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public Publisher<AsyncResponseTransformer<Object, Object>> createPublisher(long
SplittingTransformer<Object, ResponseBytes<Object>> transformer =
SplittingTransformer.<Object, ResponseBytes<Object>>builder()
.upstreamResponseTransformer(upstreamTransformer)
.maximumBufferSize(64 * 1024)
.maximumBufferSizeInBytes(64 * 1024L)
.returnFuture(future)
.build();
return SdkPublisher.adapt(transformer).limit(Math.toIntExact(l));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void whenSubscriberCancelSubscription_AllDataSentToTransformer() {
SplittingTransformer<TestResultObject, Object> split =
SplittingTransformer.<TestResultObject, Object>builder()
.upstreamResponseTransformer(upstreamTestTransformer)
.maximumBufferSize(1024 * 1024 * 32)
.maximumBufferSizeInBytes(1024 * 1024 * 32L)
.returnFuture(future)
.build();
split.subscribe(new CancelAfterNTestSubscriber(
Expand All @@ -59,7 +59,7 @@ void whenSubscriberFailsAttempt_UpstreamTransformerCompletesExceptionally() {
SplittingTransformer<TestResultObject, Object> split =
SplittingTransformer.<TestResultObject, Object>builder()
.upstreamResponseTransformer(upstreamTestTransformer)
.maximumBufferSize(1024 * 1024 * 32)
.maximumBufferSizeInBytes(1024 * 1024 * 32L)
.returnFuture(future)
.build();
split.subscribe(new FailAfterNTestSubscriber(2));
Expand All @@ -68,7 +68,7 @@ void whenSubscriberFailsAttempt_UpstreamTransformerCompletesExceptionally() {

@Test
void whenDataExceedsBufferSize_UpstreamShouldReceiveAllData() {
int evenBufferSize = 16 * 1024;
Long evenBufferSize = 16 * 1024L;

// We send 9 split body of 7kb with a buffer size of 16kb. This is to test when uneven body size is used compared to
// the buffer size, this test use a body size which does not evenly divides with the buffer size.
Expand All @@ -79,7 +79,7 @@ void whenDataExceedsBufferSize_UpstreamShouldReceiveAllData() {
SplittingTransformer<TestResultObject, Object> split =
SplittingTransformer.<TestResultObject, Object>builder()
.upstreamResponseTransformer(upstreamTestTransformer)
.maximumBufferSize(evenBufferSize)
.maximumBufferSizeInBytes(evenBufferSize)
.returnFuture(future)
.build();
split.subscribe(new CancelAfterNTestSubscriber(
Expand All @@ -106,7 +106,7 @@ void whenRequestingMany_allDemandGetsFulfilled() {
SplittingTransformer<TestResultObject, Object> split =
SplittingTransformer.<TestResultObject, Object>builder()
.upstreamResponseTransformer(upstreamTestTransformer)
.maximumBufferSize(1024 * 1024 * 32)
.maximumBufferSizeInBytes(1024 * 1024 * 32L)
.returnFuture(future)
.build();
split.subscribe(new RequestingTestSubscriber(4));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,86 +27,115 @@
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.utils.Logger;

// [WIP]
// Still work in progress, currently only used to help manual testing, please ignore
/**
* A subscriber implementation that will download all individual parts for a multipart get-object request. It receives the
* individual {@link AsyncResponseTransformer} which will be used to perform the individual part requests.
* This is a 'one-shot' class, it should <em>NOT</em> be reused for more than one multipart download
L-Applin marked this conversation as resolved.
Show resolved Hide resolved
*/
@SdkInternalApi
public class MultipartDownloaderSubscriber implements Subscriber<AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>> {
private static final Logger log = Logger.loggerFor(MultipartDownloaderSubscriber.class);

/**
* The s3 client used to make the individual part requests
*/
private final S3AsyncClient s3;

/**
* The GetObjectRequest that was provided when calling s3.getObject(...). It is copied for each individual request, and the
* copy has the partNumber field updated as more parts are downloaded.
*/
private final GetObjectRequest getObjectRequest;

private AtomicBoolean totalPartKnown = new AtomicBoolean(false);
private AtomicInteger totalParts = new AtomicInteger();
private AtomicInteger completed = new AtomicInteger(0);
private AtomicInteger currentPart = new AtomicInteger(0);
/**
* Set to true when we know the total part number of the object to get.
*/
private final AtomicBoolean totalPartKnown = new AtomicBoolean(false);

/**
* Once total part is know, this value indicates the total number of parts of the object to get.
*/
private final AtomicInteger totalParts = new AtomicInteger();
L-Applin marked this conversation as resolved.
Show resolved Hide resolved

/**
* The total number of completed parts. A part is considered complete once the completable future associated with its request
* completes successfully.
*/
private final AtomicInteger completedParts = new AtomicInteger(0);

/**
* The subscription received from the publisher this subscriber subscribes to.
*/
private Subscription subscription;

private CompletableFuture<GetObjectResponse> responseFuture;
private GetObjectResponse returnResponse;
/**
* This future will be completed once this subscriber reaches a terminal state, failed or successfully, and will be
* completed accordingly.
*/
private CompletableFuture<Void> future = new CompletableFuture<>();
L-Applin marked this conversation as resolved.
Show resolved Hide resolved

public MultipartDownloaderSubscriber(S3AsyncClient s3, GetObjectRequest getObjectRequest) {
this.s3 = s3;
this.getObjectRequest = getObjectRequest;
this.responseFuture = new CompletableFuture<>();
}

@Override
public void onSubscribe(Subscription s) {
if (this.subscription == null) {
this.subscription = s;
if (this.subscription != null) {
s.cancel();
return;
}
s.request(1);
this.subscription = s;
this.subscription.request(1);
}

@Override
public void onNext(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> asyncResponseTransformer) {
int part = currentPart.incrementAndGet();
if (totalPartKnown.get()) {
log.info(() -> String.format("total part: %s, current part: %s", totalParts.get(), part));
} else {
log.info(() -> String.format("part %s", part));
if (asyncResponseTransformer == null) {
throw new NullPointerException("onNext must not be called with null asyncResponseTransformer");
}
if (totalPartKnown.get() && part > totalParts.get()) {
log.info(() -> "no more parts available, stopping");

int nextPartToGet = completedParts.get() + 1;
if (totalPartKnown.get() && nextPartToGet > totalParts.get()) {
L-Applin marked this conversation as resolved.
Show resolved Hide resolved
subscription.cancel();
return;
}
L-Applin marked this conversation as resolved.
Show resolved Hide resolved
GetObjectRequest actualRequest = this.getObjectRequest.copy(req -> req.partNumber(part));

GetObjectRequest actualRequest = getObjectRequest.copy(req -> req.partNumber(nextPartToGet));
CompletableFuture<GetObjectResponse> future = s3.getObject(actualRequest, asyncResponseTransformer);
future.whenComplete((response, e) -> {
if (e != null) {
responseFuture.completeExceptionally(e);
future.whenComplete((response, error) -> {
if (error != null) {
onError(error);
return;
}
completed.incrementAndGet();
returnResponse = response;
log.info(() -> String.format("received '%s'", response.contentRange()));
int totalComplete = completedParts.incrementAndGet();
log.info(() -> String.format("completed part: %s", totalComplete));

Integer partCount = response.partsCount();
if (totalPartKnown.compareAndSet(false, true)) {
if (partCount != null && totalPartKnown.compareAndSet(false, true)) {
log.info(() -> String.format("total parts: %s", partCount));
totalParts.set(partCount);
totalPartKnown.set(true);
L-Applin marked this conversation as resolved.
Show resolved Hide resolved
}
log.info(() -> String.format("total parts: %s", partCount));
if (totalParts.get() > 1) {
if (partCount != null && requiresMultipart()) {
subscription.request(1);
L-Applin marked this conversation as resolved.
Show resolved Hide resolved
} else {
subscription.cancel();
}
});
L-Applin marked this conversation as resolved.
Show resolved Hide resolved
}

private boolean requiresMultipart() {
return totalParts.get() > 1;
L-Applin marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void onError(Throwable t) {
responseFuture.completeExceptionally(t);
future.completeExceptionally(t);
}

@Override
public void onComplete() {
responseFuture.complete(returnResponse);
future.complete(null);
}

public CompletableFuture<GetObjectResponse> future() {
return responseFuture;
}

}
Loading
Loading