Skip to content

Commit

Permalink
Expose subscribeTimeout config for BlockingOutputStreamAsyncRequestBo…
Browse files Browse the repository at this point in the history
…dy (#5000)
  • Loading branch information
zoewangg authored Mar 12, 2024
1 parent 6aac375 commit 05fe6d2
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 10 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-0e79eba.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS SDK for Java v2",
"contributor": "",
"description": "Allow users to configure `subscribeTimeout` for BlockingOutputStreamAsyncRequestBody. See [#4893](https://github.com/aws/aws-sdk-java-v2/issues/4893)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,8 @@ static BlockingInputStreamAsyncRequestBody forBlockingInputStream(Long contentLe
* <p>The caller is responsible for calling {@link OutputStream#close()} on the
* {@link BlockingOutputStreamAsyncRequestBody#outputStream()} when writing is complete.
*
* <p>By default, it will time out if streaming hasn't started within 10 seconds, and you can configure the timeout
* via {@link BlockingOutputStreamAsyncRequestBody#builder()}
* <p><b>Example Usage</b>
* <p>
* {@snippet :
Expand All @@ -440,9 +442,12 @@ static BlockingInputStreamAsyncRequestBody forBlockingInputStream(Long contentLe
* // Wait for the service to respond.
* PutObjectResponse response = responseFuture.join();
* }
* @see BlockingOutputStreamAsyncRequestBody
*/
static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long contentLength) {
return new BlockingOutputStreamAsyncRequestBody(contentLength);
return BlockingOutputStreamAsyncRequestBody.builder()
.contentLength(contentLength)
.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import software.amazon.awssdk.core.exception.NonRetryableException;
import software.amazon.awssdk.core.internal.util.NoopSubscription;
import software.amazon.awssdk.utils.CancellableOutputStream;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.OutputStreamPublisher;

/**
Expand All @@ -46,13 +47,11 @@ public final class BlockingOutputStreamAsyncRequestBody implements AsyncRequestB
private final Long contentLength;
private final Duration subscribeTimeout;

BlockingOutputStreamAsyncRequestBody(Long contentLength) {
this(contentLength, Duration.ofSeconds(10));
}

BlockingOutputStreamAsyncRequestBody(Long contentLength, Duration subscribeTimeout) {
this.contentLength = contentLength;
this.subscribeTimeout = subscribeTimeout;
private BlockingOutputStreamAsyncRequestBody(Builder builder) {
this.contentLength = builder.contentLength;
this.subscribeTimeout = Validate.isPositiveOrNull(builder.subscribeTimeout, "subscribeTimeout") != null ?
builder.subscribeTimeout :
Duration.ofSeconds(10);
}

/**
Expand All @@ -69,6 +68,13 @@ public CancellableOutputStream outputStream() {
return delegate;
}

/**
* Creates a default builder for {@link BlockingOutputStreamAsyncRequestBody}.
*/
public static Builder builder() {
return new Builder();
}

@Override
public Optional<Long> contentLength() {
return Optional.ofNullable(contentLength);
Expand Down Expand Up @@ -99,4 +105,41 @@ private void waitForSubscriptionIfNeeded() {
throw new RuntimeException("Interrupted while waiting for subscription.", e);
}
}

public static final class Builder {
private Duration subscribeTimeout;
private Long contentLength;

private Builder() {
}

/**
* Defines how long it should wait for this AsyncRequestBody to be subscribed (to start streaming) before timing out.
* By default, it's 10 seconds.
*
* <p>You may want to increase it if the request may not be executed right away.
*
* @param subscribeTimeout the timeout
* @return Returns a reference to this object so that method calls can be chained together.
*/
public Builder subscribeTimeout(Duration subscribeTimeout) {
this.subscribeTimeout = subscribeTimeout;
return this;
}

/**
* The content length of the output stream.
*
* @param contentLength the content length
* @return Returns a reference to this object so that method calls can be chained together.
*/
public Builder contentLength(Long contentLength) {
this.contentLength = contentLength;
return this;
}

public BlockingOutputStreamAsyncRequestBody build() {
return new BlockingOutputStreamAsyncRequestBody(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public void outputStream_waitsForSubscription() throws IOException {

@Test
@Timeout(10)
public void outputStream_failsIfSubscriptionNeverComes() {
public void outputStream_overrideSubscribeTimeout_failsIfSubscriptionNeverComes() {
BlockingOutputStreamAsyncRequestBody requestBody =
new BlockingOutputStreamAsyncRequestBody(0L, Duration.ofSeconds(1));
BlockingOutputStreamAsyncRequestBody.builder().contentLength(0L).subscribeTimeout(Duration.ofSeconds(1)).build();
assertThatThrownBy(requestBody::outputStream).hasMessageContaining("The service request was not made");
}

Expand Down

0 comments on commit 05fe6d2

Please sign in to comment.