Skip to content

Commit 069ed9a

Browse files
zoewanggbenarnao
authored andcommitted
Allows users to configure subscribeTimeout for BlockingInputStreamAsyncRequestBody (aws#5020)
* allow user to configure subscribe timeout for input stream * Fix compilation error * Address feedback --------- Co-authored-by: Ben Arnao <benarnao@amazon.com>
1 parent 1a73d18 commit 069ed9a

File tree

4 files changed

+62
-8
lines changed

4 files changed

+62
-8
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS S3",
4+
"contributor": "benarnao",
5+
"description": "allow user to configure subscriber timeout for input stream"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,8 @@ static AsyncRequestBody fromInputStream(Consumer<AsyncRequestBodyFromInputStream
384384
/**
385385
* Creates a {@link BlockingInputStreamAsyncRequestBody} to use for writing an input stream to the downstream service.
386386
*
387+
* <p>By default, it will time out if streaming hasn't started within 10 seconds, and you can configure the timeout
388+
* via {@link BlockingInputStreamAsyncRequestBody#builder()}
387389
* <p><b>Example Usage</b>
388390
*
389391
* <p>
@@ -408,7 +410,9 @@ static AsyncRequestBody fromInputStream(Consumer<AsyncRequestBodyFromInputStream
408410
* }
409411
*/
410412
static BlockingInputStreamAsyncRequestBody forBlockingInputStream(Long contentLength) {
411-
return new BlockingInputStreamAsyncRequestBody(contentLength);
413+
return BlockingInputStreamAsyncRequestBody.builder()
414+
.contentLength(contentLength)
415+
.build();
412416
}
413417

414418
/**

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BlockingInputStreamAsyncRequestBody.java

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import software.amazon.awssdk.core.exception.NonRetryableException;
2828
import software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream;
2929
import software.amazon.awssdk.core.internal.util.NoopSubscription;
30+
import software.amazon.awssdk.utils.Validate;
3031
import software.amazon.awssdk.utils.async.InputStreamConsumingPublisher;
3132

3233
/**
@@ -37,19 +38,25 @@
3738
*/
3839
@SdkPublicApi
3940
public final class BlockingInputStreamAsyncRequestBody implements AsyncRequestBody {
41+
private static final Duration DEFAULT_SUBSCRIBE_TIMEOUT = Duration.ofSeconds(10);
4042
private final InputStreamConsumingPublisher delegate = new InputStreamConsumingPublisher();
4143
private final CountDownLatch subscribedLatch = new CountDownLatch(1);
4244
private final AtomicBoolean subscribeCalled = new AtomicBoolean(false);
4345
private final Long contentLength;
4446
private final Duration subscribeTimeout;
4547

46-
BlockingInputStreamAsyncRequestBody(Long contentLength) {
47-
this(contentLength, Duration.ofSeconds(10));
48+
BlockingInputStreamAsyncRequestBody(Builder builder) {
49+
this.contentLength = builder.contentLength;
50+
this.subscribeTimeout = Validate.isPositiveOrNull(builder.subscribeTimeout, "subscribeTimeout") != null ?
51+
builder.subscribeTimeout :
52+
DEFAULT_SUBSCRIBE_TIMEOUT;
4853
}
4954

50-
BlockingInputStreamAsyncRequestBody(Long contentLength, Duration subscribeTimeout) {
51-
this.contentLength = contentLength;
52-
this.subscribeTimeout = subscribeTimeout;
55+
/**
56+
* Creates a default builder for {@link BlockingInputStreamAsyncRequestBody}.
57+
*/
58+
public static Builder builder() {
59+
return new Builder();
5360
}
5461

5562
@Override
@@ -112,4 +119,41 @@ private void waitForSubscriptionIfNeeded() throws InterruptedException {
112119
+ "BEFORE invoking doBlockingWrite if your caller is single-threaded.");
113120
}
114121
}
122+
123+
public static final class Builder {
124+
private Duration subscribeTimeout;
125+
private Long contentLength;
126+
127+
private Builder() {
128+
}
129+
130+
/**
131+
* Defines how long it should wait for this AsyncRequestBody to be subscribed (to start streaming) before timing out.
132+
* By default, it's 10 seconds.
133+
*
134+
* <p>You may want to increase it if the request may not be executed right away.
135+
*
136+
* @param subscribeTimeout the timeout
137+
* @return Returns a reference to this object so that method calls can be chained together.
138+
*/
139+
public Builder subscribeTimeout(Duration subscribeTimeout) {
140+
this.subscribeTimeout = subscribeTimeout;
141+
return this;
142+
}
143+
144+
/**
145+
* The content length of the output stream.
146+
*
147+
* @param contentLength the content length
148+
* @return Returns a reference to this object so that method calls can be chained together.
149+
*/
150+
public Builder contentLength(Long contentLength) {
151+
this.contentLength = contentLength;
152+
return this;
153+
}
154+
155+
public BlockingInputStreamAsyncRequestBody build() {
156+
return new BlockingInputStreamAsyncRequestBody(this);
157+
}
158+
}
115159
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/async/BlockingInputStreamAsyncRequestBodyTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ public void doBlockingWrite_waitsForSubscription() {
4747

4848
@Test
4949
@Timeout(10)
50-
public void doBlockingWrite_failsIfSubscriptionNeverComes() {
50+
public void doBlockingWrite_overrideSubscribeTimeout_failsIfSubscriptionNeverComes() {
5151
BlockingInputStreamAsyncRequestBody requestBody =
52-
new BlockingInputStreamAsyncRequestBody(0L, Duration.ofSeconds(1));
52+
BlockingInputStreamAsyncRequestBody.builder().contentLength(0L).subscribeTimeout(Duration.ofSeconds(1)).build();
5353
assertThatThrownBy(() -> requestBody.writeInputStream(new StringInputStream("")))
5454
.hasMessageContaining("The service request was not made");
5555
}

0 commit comments

Comments
 (0)