Skip to content

Commit 8756570

Browse files
committed
Fix PR comments, added an interface and abstract class to denote Upload and Download operations in a wya in which BytesReadTrackingPublisher can be reused for upload and download
1 parent 55a9bd8 commit 8756570

21 files changed

+174
-233
lines changed

core/imds/src/main/java/software/amazon/awssdk/imds/internal/AsyncHttpRequestHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private static <T> CompletableFuture<T> sendAsync(SdkAsyncHttpClient client,
6161
SdkHttpFullRequest request,
6262
HttpResponseHandler<T> handler,
6363
CompletableFuture<?> parentFuture) {
64-
SdkHttpContentPublisher requestContentPublisher = new SimpleHttpContentPublisher(request);
64+
SdkHttpContentPublisher requestContentPublisher = new SimpleHttpContentPublisher(request, null);
6565
TransformingAsyncResponseHandler<T> responseHandler =
6666
new AsyncResponseHandler<>(handler, Function.identity(), new ExecutionAttributes());
6767
CompletableFuture<T> responseHandlerFuture = responseHandler.prepare();

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/AmazonAsyncHttpClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ public <OutputT> CompletableFuture<OutputT> execute(
210210
.wrappedWith(AsyncApiCallAttemptMetricCollectionStage::new)
211211
.wrappedWith((deps, wrapped) -> new AsyncRetryableStage<>(responseHandler, deps, wrapped))
212212
.then(async(() -> new UnwrapResponseContainer<>()))
213-
.then(() -> new AfterExecutionProgressReportingStage<>())
213+
.then(async(() -> new AfterExecutionProgressReportingStage<>()))
214214
.then(async(() -> new AfterExecutionInterceptorsStage<>()))
215215
.wrappedWith(AsyncExecutionFailureExceptionReportingStage::new)
216216
.wrappedWith(AsyncApiCallTimeoutTrackingStage::new)

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/async/SimpleHttpContentPublisher.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.reactivestreams.Subscriber;
2323
import org.reactivestreams.Subscription;
2424
import software.amazon.awssdk.annotations.SdkInternalApi;
25+
import software.amazon.awssdk.core.internal.progress.listener.ProgressUpdater;
2526
import software.amazon.awssdk.http.SdkHttpFullRequest;
2627
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
2728
import software.amazon.awssdk.utils.IoUtils;
@@ -35,11 +36,13 @@ public final class SimpleHttpContentPublisher implements SdkHttpContentPublisher
3536

3637
private final byte[] content;
3738
private final int length;
39+
private final ProgressUpdater progressUpdater;
3840

39-
public SimpleHttpContentPublisher(SdkHttpFullRequest request) {
41+
public SimpleHttpContentPublisher(SdkHttpFullRequest request, ProgressUpdater progressUpdater) {
4042
this.content = request.contentStreamProvider().map(p -> invokeSafely(() -> IoUtils.toByteArray(p.newStream())))
4143
.orElseGet(() -> new byte[0]);
4244
this.length = content.length;
45+
this.progressUpdater = progressUpdater;
4346
}
4447

4548
@Override
@@ -52,7 +55,7 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
5255
s.onSubscribe(new SubscriptionImpl(s));
5356
}
5457

55-
private class SubscriptionImpl implements Subscription {
58+
private final class SubscriptionImpl implements Subscription {
5659
private boolean running = true;
5760
private final Subscriber<? super ByteBuffer> s;
5861

@@ -68,6 +71,11 @@ public void request(long n) {
6871
s.onError(new IllegalArgumentException("Demand must be positive"));
6972
} else {
7073
s.onNext(ByteBuffer.wrap(content));
74+
75+
if (progressUpdater != null) {
76+
progressUpdater.incrementBytesSent(length);
77+
}
78+
7179
s.onComplete();
7280
}
7381
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/AfterExecutionProgressReportingStage.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
public class AfterExecutionProgressReportingStage<OutputT> implements RequestPipeline<OutputT, OutputT> {
2626
@Override
2727
public OutputT execute(OutputT input, RequestExecutionContext context) throws Exception {
28-
ProgressListenerUtils.updateProgressListenersWithSuccessResponse((SdkResponse) input, context);
28+
if (input instanceof SdkResponse) {
29+
ProgressListenerUtils.updateProgressListenersWithSuccessResponse((SdkResponse) input, context);
30+
}
31+
2932
return input;
3033
}
3134

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,11 @@
4747
import software.amazon.awssdk.core.internal.http.timers.TimeoutTracker;
4848
import software.amazon.awssdk.core.internal.http.timers.TimerUtils;
4949
import software.amazon.awssdk.core.internal.metrics.BytesReadTrackingPublisher;
50+
import software.amazon.awssdk.core.internal.progress.listener.ProgressUpdater;
51+
import software.amazon.awssdk.core.internal.util.DownloadProgressUpdaterInvocation;
5052
import software.amazon.awssdk.core.internal.util.MetricUtils;
5153
import software.amazon.awssdk.core.internal.util.ProgressListenerUtils;
54+
import software.amazon.awssdk.core.internal.util.UploadProgressUpdaterInvocation;
5255
import software.amazon.awssdk.core.metrics.CoreMetric;
5356
import software.amazon.awssdk.http.SdkHttpFullRequest;
5457
import software.amazon.awssdk.http.SdkHttpMethod;
@@ -130,22 +133,16 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque
130133

131134
CompletableFuture<Response<OutputT>> responseHandlerFuture = responseHandler.prepare();
132135

136+
ProgressUpdater progressUpdater = ProgressListenerUtils.getProgressUpdaterIfAttached(context);
137+
133138
SdkHttpContentPublisher requestProvider = context.requestProvider() == null
134-
? new SimpleHttpContentPublisher(request)
135-
: new SdkHttpContentPublisherAdapter(context.requestProvider());
139+
? new SimpleHttpContentPublisher(request, progressUpdater)
140+
: new SdkHttpContentPublisherAdapter(context.requestProvider(), progressUpdater);
136141
// Set content length if it hasn't been set already.
137142
SdkHttpFullRequest requestWithContentLength = getRequestWithContentLength(request, requestProvider);
138143

139144
MetricCollector httpMetricCollector = MetricUtils.createHttpMetricsCollector(context);
140145

141-
//If Progress Listening is enabled, wrap around BytesSentTrackingPublisher to track progress on bytes sent
142-
if (context.progressUpdater().isPresent()) {
143-
boolean shouldSetContentLength = shouldSetContentLength(request, requestProvider);
144-
requestProvider = ProgressListenerUtils.updateProgressListenersWithRequestStatus(context.progressUpdater().get(),
145-
requestProvider,
146-
shouldSetContentLength);
147-
}
148-
149146
AsyncExecuteRequest.Builder executeRequestBuilder = AsyncExecuteRequest.builder()
150147
.request(requestWithContentLength)
151148
.requestContentPublisher(requestProvider)
@@ -273,9 +270,11 @@ private void completeResponseFuture(CompletableFuture<Response<OutputT>> respons
273270
private static final class SdkHttpContentPublisherAdapter implements SdkHttpContentPublisher {
274271

275272
private final AsyncRequestBody asyncRequestBody;
273+
private final ProgressUpdater progressUpdater;
276274

277-
private SdkHttpContentPublisherAdapter(AsyncRequestBody asyncRequestBody) {
275+
private SdkHttpContentPublisherAdapter(AsyncRequestBody asyncRequestBody, ProgressUpdater progressUpdater) {
278276
this.asyncRequestBody = asyncRequestBody;
277+
this.progressUpdater = progressUpdater;
279278
}
280279

281280
@Override
@@ -285,7 +284,12 @@ public Optional<Long> contentLength() {
285284

286285
@Override
287286
public void subscribe(Subscriber<? super ByteBuffer> s) {
288-
asyncRequestBody.subscribe(s);
287+
if (progressUpdater != null) {
288+
Publisher<ByteBuffer> readTrackingPublisher = new BytesReadTrackingPublisher(asyncRequestBody, new AtomicLong(0L), new UploadProgressUpdaterInvocation(progressUpdater));
289+
readTrackingPublisher.subscribe(s);
290+
} else {
291+
asyncRequestBody.subscribe(s);
292+
}
289293
}
290294
}
291295

@@ -323,9 +327,11 @@ public void onStream(Publisher<ByteBuffer> stream) {
323327
AtomicLong bytesReadCounter = context.executionAttributes()
324328
.getAttribute(SdkInternalExecutionAttribute.RESPONSE_BYTES_READ);
325329

326-
BytesReadTrackingPublisher bytesReadTrackingPublisher = new BytesReadTrackingPublisher(stream,
327-
bytesReadCounter,
328-
context.progressUpdater());
330+
ProgressUpdater progressUpdater = context.progressUpdater().isPresent() ?
331+
context.progressUpdater().get() : null;
332+
Publisher<ByteBuffer> bytesReadTrackingPublisher = new BytesReadTrackingPublisher(stream,
333+
bytesReadCounter,
334+
new DownloadProgressUpdaterInvocation(progressUpdater));
329335

330336
super.onStream(bytesReadTrackingPublisher);
331337
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeHttpRequestStage.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public Pair<SdkHttpFullRequest, SdkHttpFullResponse> execute(SdkHttpFullRequest
5555
RequestExecutionContext context) throws Exception {
5656
InterruptMonitor.checkInterrupted();
5757
HttpExecuteResponse executeResponse = executeHttpRequest(request, context);
58+
5859
// TODO: Plumb through ExecuteResponse instead
5960
SdkHttpFullResponse httpResponse = (SdkHttpFullResponse) executeResponse.httpResponse();
6061
return Pair.of(request, httpResponse.toBuilder().content(executeResponse.responseBody().orElse(null)).build());

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/UnwrapResponseContainer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import software.amazon.awssdk.annotations.SdkInternalApi;
1919
import software.amazon.awssdk.core.Response;
20-
import software.amazon.awssdk.core.SdkResponse;
2120
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
2221
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
2322

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/metrics/BytesReadTrackingPublisher.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616
package software.amazon.awssdk.core.internal.metrics;
1717

1818
import java.nio.ByteBuffer;
19-
import java.util.Optional;
2019
import java.util.concurrent.atomic.AtomicLong;
2120
import org.reactivestreams.Publisher;
2221
import org.reactivestreams.Subscriber;
2322
import org.reactivestreams.Subscription;
2423
import software.amazon.awssdk.annotations.SdkInternalApi;
25-
import software.amazon.awssdk.core.internal.progress.listener.ProgressUpdater;
24+
import software.amazon.awssdk.core.internal.util.ProgressUpdaterInvoker;
2625

2726
/**
2827
* Publisher that tracks how many bytes are published from the wrapped publisher to the downstream subscriber.
@@ -31,20 +30,18 @@
3130
public final class BytesReadTrackingPublisher implements Publisher<ByteBuffer> {
3231
private final Publisher<ByteBuffer> upstream;
3332
private final AtomicLong bytesRead;
34-
private ProgressUpdater progressUpdater;
33+
private final ProgressUpdaterInvoker progressUpdaterInvoker;
3534

3635
public BytesReadTrackingPublisher(Publisher<ByteBuffer> upstream, AtomicLong bytesRead,
37-
Optional<ProgressUpdater> progressUpdater) {
36+
ProgressUpdaterInvoker progressUpdaterInvoker) {
3837
this.upstream = upstream;
3938
this.bytesRead = bytesRead;
40-
progressUpdater.ifPresent(value -> {
41-
this.progressUpdater = value;
42-
});
39+
this.progressUpdaterInvoker = progressUpdaterInvoker;
4340
}
4441

4542
@Override
4643
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
47-
upstream.subscribe(new BytesReadTracker(subscriber, bytesRead, progressUpdater));
44+
upstream.subscribe(new BytesReadTracker(subscriber, bytesRead, progressUpdaterInvoker));
4845
}
4946

5047
public long bytesRead() {
@@ -54,29 +51,30 @@ public long bytesRead() {
5451
private static final class BytesReadTracker implements Subscriber<ByteBuffer> {
5552
private final Subscriber<? super ByteBuffer> downstream;
5653
private final AtomicLong bytesRead;
57-
private final ProgressUpdater progressUpdater;
54+
private final ProgressUpdaterInvoker progressUpdaterInvoker;
5855

5956
private BytesReadTracker(Subscriber<? super ByteBuffer> downstream,
60-
AtomicLong bytesRead, ProgressUpdater progressUpdater) {
57+
AtomicLong bytesRead, ProgressUpdaterInvoker progressUpdaterInvoker) {
6158
this.downstream = downstream;
6259
this.bytesRead = bytesRead;
63-
this.progressUpdater = progressUpdater;
60+
this.progressUpdaterInvoker = progressUpdaterInvoker;
6461
}
6562

6663
@Override
6764
public void onSubscribe(Subscription subscription) {
6865
downstream.onSubscribe(subscription);
69-
if (progressUpdater != null) {
70-
progressUpdater.resetBytesReceived();
66+
if (progressUpdaterInvoker != null) {
67+
progressUpdaterInvoker.resetBytes();
7168
}
7269
}
7370

7471
@Override
7572
public void onNext(ByteBuffer byteBuffer) {
76-
bytesRead.addAndGet(byteBuffer.remaining());
73+
long byteBufferSize = byteBuffer.remaining();
74+
bytesRead.addAndGet(byteBufferSize);
7775
downstream.onNext(byteBuffer);
78-
if (progressUpdater != null) {
79-
progressUpdater.incrementBytesReceived(bytesRead.get());
76+
if (progressUpdaterInvoker != null) {
77+
progressUpdaterInvoker.updateBytesTransferred(byteBufferSize);
8078
}
8179
}
8280

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/metrics/BytesSentTrackingPublisher.java

Lines changed: 0 additions & 102 deletions
This file was deleted.

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/progress/ProgressListenerContext.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package software.amazon.awssdk.core.internal.progress;
1717

18+
import java.util.Optional;
1819
import software.amazon.awssdk.annotations.Immutable;
1920
import software.amazon.awssdk.annotations.SdkInternalApi;
2021
import software.amazon.awssdk.core.SdkRequest;
@@ -89,8 +90,8 @@ public SdkHttpResponse httpResponse() {
8990
}
9091

9192
@Override
92-
public SdkResponse response() {
93-
return response;
93+
public Optional<SdkResponse> response() {
94+
return Optional.of(response);
9495
}
9596

9697
@Override

0 commit comments

Comments
 (0)