Skip to content

Commit

Permalink
Update after internal poll
Browse files Browse the repository at this point in the history
  • Loading branch information
joviegas committed Sep 5, 2024
1 parent 4d3f21c commit d2e59b9
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;

/**
* Configuration values for the BatchManager Implementation. All values are optional, and default values will be used if they
* are not specified.
* Configuration values for the BatchManager implementation used for controlling batch operations.
* All values are optional, and default values will be used if they are not specified.
*/
@SdkPublicApi
public final class BatchOverrideConfiguration implements ToCopyableBuilder<BatchOverrideConfiguration.Builder,
BatchOverrideConfiguration> {

private final Integer maxBatchSize;
private final Duration sendRequestFrequency;
private final Duration sendMessageFrequency;
private final Duration receiveMessageVisibilityTimeout;
private final Duration receiveMessageMinWaitTime;
private final Duration receiveMessageMinWaitDuration;
private final List<MessageSystemAttributeName> receiveMessageSystemAttributeNames;
private final List<String> receiveMessageAttributeNames;

Expand All @@ -50,72 +50,73 @@ private BatchOverrideConfiguration(Builder builder) {
this.maxBatchSize = Validate.isPositiveOrNull(builder.maxBatchSize,
"maxBatchSize");
Validate.isTrue(this.maxBatchSize == null || this.maxBatchSize <= 10,
"A batch can contain up to 10 messages.");
"The maxBatchSize must be less than or equal to 10. A batch can contain up to 10 messages.");

this.sendRequestFrequency = Validate.isPositiveOrNull(builder.sendRequestFrequency,
"sendRequestFrequency");
this.sendMessageFrequency = Validate.isPositiveOrNull(builder.sendMessageFrequency,
"sendMessageFrequency");
this.receiveMessageVisibilityTimeout = Validate.isPositiveOrNull(builder.receiveMessageVisibilityTimeout,
"receiveMessageVisibilityTimeout");

this.receiveMessageMinWaitTime = Validate.isPositiveOrNull(builder.receiveMessageMinWaitTime,
"receiveMessageMinWaitTime");

this.receiveMessageSystemAttributeNames = builder.receiveMessageSystemAttributeNames != null ?
Collections.unmodifiableList(
new ArrayList<>(builder.receiveMessageSystemAttributeNames)) :
Collections.emptyList();

this.receiveMessageAttributeNames = builder.receiveMessageAttributeNames != null ?
Collections.unmodifiableList(new ArrayList<>(builder.receiveMessageAttributeNames)) :
Collections.emptyList();
this.receiveMessageMinWaitDuration = Validate.isPositiveOrNull(builder.receiveMessageMinWaitDuration,
"receiveMessageMinWaitDuration");
this.receiveMessageSystemAttributeNames =
builder.receiveMessageSystemAttributeNames == null ? Collections.emptyList() :
Collections.unmodifiableList(builder.receiveMessageSystemAttributeNames);

this.receiveMessageAttributeNames =
builder.receiveMessageAttributeNames == null ? Collections.emptyList() :
Collections.unmodifiableList(builder.receiveMessageAttributeNames);
}

public static Builder builder() {
return new Builder();
}

/**
* @return the maximum number of items that are batched together in a single outbound request.
* A batch can contain up to a maximum of 10 messages.
* The default value is 10.
* @return the maximum number of items that can be batched together in a single outbound SQS request
* (e.g., for {@link SendMessageBatchRequest}, {@link ChangeMessageVisibilityBatchRequest}, or
* {@link DeleteMessageBatchRequest}). A batch can contain up to a maximum of 10 messages.
* The default value is 10.
*/
public Integer maxBatchSize() {
return maxBatchSize;
}

/**
* @return the maximum amount of time that an outgoing call waits to be batched with messages of the same type.
* The default value is 200 milliseconds.
* @return the maximum duration an outgoing call waits for additional messages of the same type before being sent.
* If the {@link #maxBatchSize()} is reached before this duration, the batch will be sent immediately.
* The default value is 200 milliseconds.
*/
public Duration sendRequestFrequency() {
return sendRequestFrequency;
public Duration sendMessageFrequency() {
return sendMessageFrequency;
}

/**
* @return the custom visibility timeout to use when retrieving messages from SQS.
* @return the custom visibility timeout to use when retrieving messages from SQS. If not set,
* the default visibility timeout configured on the SQS queue will be used.
*/
public Duration receiveMessageVisibilityTimeout() {
return receiveMessageVisibilityTimeout;
}

/**
* @return the minimum wait time for incoming receive message requests.
* @return the minimum wait time for incoming receive message requests. Without a non-zero minimum wait time,
* threads can waste CPU resources busy-waiting for messages. The default value is 50 milliseconds.
*/
public Duration receiveMessageMinWaitTime() {
return receiveMessageMinWaitTime;
public Duration receiveMessageMinWaitDuration() {
return receiveMessageMinWaitDuration;
}

/**
* @return the system attribute names specific to the {@link ReceiveMessageRequest}
* that will be requested via {@link ReceiveMessageRequest#messageSystemAttributeNames()}.
* @return the system attribute names to request for {@link ReceiveMessageRequest}. Requests with differing
* system attribute names will bypass the batch manager and make a direct call to SQS.
*/
public List<MessageSystemAttributeName> receiveMessageSystemAttributeNames() {
return receiveMessageSystemAttributeNames;
}

/**
* @return the message attribute names that are specific to receive calls
* and will be requested via {@link ReceiveMessageRequest#messageAttributeNames()}.
* @return the message attribute names to request for {@link ReceiveMessageRequest}. Requests with different
* message attribute names will bypass the batch manager and make a direct call to SQS.
*/
public List<String> receiveMessageAttributeNames() {
return receiveMessageAttributeNames;
Expand All @@ -126,21 +127,20 @@ public List<String> receiveMessageAttributeNames() {
public Builder toBuilder() {
return new Builder()
.maxBatchSize(maxBatchSize)
.sendRequestFrequency(sendRequestFrequency)
.sendMessageFrequency(sendMessageFrequency)
.receiveMessageVisibilityTimeout(receiveMessageVisibilityTimeout)
.receiveMessageMinWaitTime(receiveMessageMinWaitTime)
.receiveMessageMinWaitDuration(receiveMessageMinWaitDuration)
.receiveMessageSystemAttributeNames(receiveMessageSystemAttributeNames)
.receiveMessageAttributeNames(receiveMessageAttributeNames);
}


@Override
public String toString() {
return ToString.builder("BatchOverrideConfiguration")
.add("maxBatchSize", maxBatchSize)
.add("sendRequestFrequency", sendRequestFrequency)
.add("sendMessageFrequency", sendMessageFrequency)
.add("receiveMessageVisibilityTimeout", receiveMessageVisibilityTimeout)
.add("receiveMessageMinWaitTime", receiveMessageMinWaitTime)
.add("receiveMessageMinWaitDuration", receiveMessageMinWaitDuration)
.add("receiveMessageSystemAttributeNames", receiveMessageSystemAttributeNames)
.add("receiveMessageAttributeNames", receiveMessageAttributeNames)
.build();
Expand All @@ -160,20 +160,22 @@ public boolean equals(Object o) {
if (maxBatchSize != null ? !maxBatchSize.equals(that.maxBatchSize) : that.maxBatchSize != null) {
return false;
}
if (sendRequestFrequency != null ? !sendRequestFrequency.equals(that.sendRequestFrequency) :
that.sendRequestFrequency != null) {
if (sendMessageFrequency != null ? !sendMessageFrequency.equals(that.sendMessageFrequency) :
that.sendMessageFrequency != null) {
return false;
}
if (receiveMessageVisibilityTimeout != null ? !receiveMessageVisibilityTimeout.equals(that.receiveMessageVisibilityTimeout) :
if (receiveMessageVisibilityTimeout != null
? !receiveMessageVisibilityTimeout.equals(that.receiveMessageVisibilityTimeout) :
that.receiveMessageVisibilityTimeout != null) {
return false;
}
if (receiveMessageMinWaitTime != null ? !receiveMessageMinWaitTime.equals(that.receiveMessageMinWaitTime) :
that.receiveMessageMinWaitTime != null) {
if (receiveMessageMinWaitDuration != null ? !receiveMessageMinWaitDuration.equals(that.receiveMessageMinWaitDuration) :
that.receiveMessageMinWaitDuration != null) {
return false;
}
if (receiveMessageSystemAttributeNames != null ? !receiveMessageSystemAttributeNames.equals(that.receiveMessageSystemAttributeNames) :
that.receiveMessageSystemAttributeNames != null) {
if (receiveMessageSystemAttributeNames != null ?
!receiveMessageSystemAttributeNames.equals(that.receiveMessageSystemAttributeNames)
: that.receiveMessageSystemAttributeNames != null) {
return false;
}
return receiveMessageAttributeNames != null ? receiveMessageAttributeNames.equals(that.receiveMessageAttributeNames) :
Expand All @@ -183,9 +185,9 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
int result = maxBatchSize != null ? maxBatchSize.hashCode() : 0;
result = 31 * result + (sendRequestFrequency != null ? sendRequestFrequency.hashCode() : 0);
result = 31 * result + (sendMessageFrequency != null ? sendMessageFrequency.hashCode() : 0);
result = 31 * result + (receiveMessageVisibilityTimeout != null ? receiveMessageVisibilityTimeout.hashCode() : 0);
result = 31 * result + (receiveMessageMinWaitTime != null ? receiveMessageMinWaitTime.hashCode() : 0);
result = 31 * result + (receiveMessageMinWaitDuration != null ? receiveMessageMinWaitDuration.hashCode() : 0);
result = 31 * result + (receiveMessageSystemAttributeNames != null ? receiveMessageSystemAttributeNames.hashCode() : 0);
result = 31 * result + (receiveMessageAttributeNames != null ? receiveMessageAttributeNames.hashCode() : 0);
return result;
Expand All @@ -194,9 +196,9 @@ public int hashCode() {
public static final class Builder implements CopyableBuilder<Builder, BatchOverrideConfiguration> {

private Integer maxBatchSize = 10;
private Duration sendRequestFrequency = Duration.ofMillis(200);
private Duration sendMessageFrequency ;
private Duration receiveMessageVisibilityTimeout;
private Duration receiveMessageMinWaitTime = Duration.ofMillis(50);
private Duration receiveMessageMinWaitDuration ;
private List<MessageSystemAttributeName> receiveMessageSystemAttributeNames = Collections.emptyList();
private List<String> receiveMessageAttributeNames = Collections.emptyList();

Expand Down Expand Up @@ -224,15 +226,15 @@ public Builder maxBatchSize(Integer maxBatchSize) {
* requests before being sent. Outbound requests include SendMessageBatchRequest,
* ChangeMessageVisibilityBatchRequest, and DeleteMessageBatchRequest. If the maxBatchSize is reached
* before this duration, the batch will be sent immediately.
* Increasing the {@code sendRequestFrequency} gives more time for additional messages to be added to
* Increasing the {@code sendMessageFrequency} gives more time for additional messages to be added to
* the batch, which can reduce the number of requests and increase throughput. However, a higher
* frequency may also result in increased average message latency. The default value is 200 milliseconds.
*
* @param sendRequestFrequency The new value for the frequency at which outbound requests are sent.
* @param sendMessageFrequency The new value for the frequency at which outbound requests are sent.
* @return This Builder object for method chaining.
*/
public Builder sendRequestFrequency(Duration sendRequestFrequency) {
this.sendRequestFrequency = sendRequestFrequency;
public Builder sendMessageFrequency(Duration sendMessageFrequency) {
this.sendMessageFrequency = sendMessageFrequency;
return this;
}

Expand All @@ -257,11 +259,11 @@ public Builder receiveMessageVisibilityTimeout(Duration receiveMessageVisibility
* The call may return sooner than the configured `WaitTimeSeconds` if there are messages in the buffer.
* If no messages are available and the wait time expires, the call will return an empty message list.
*
* @param receiveMessageMinWaitTime The new minimum wait time value.
* @param receiveMessageMinWaitDuration The new minimum wait time value.
* @return This Builder object for method chaining.
*/
public Builder receiveMessageMinWaitTime(Duration receiveMessageMinWaitTime) {
this.receiveMessageMinWaitTime = receiveMessageMinWaitTime;
public Builder receiveMessageMinWaitDuration(Duration receiveMessageMinWaitDuration) {
this.receiveMessageMinWaitDuration = receiveMessageMinWaitDuration;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

import static software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration.MAX_SUPPORTED_SQS_RECEIVE_MSG;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
Expand Down Expand Up @@ -60,7 +58,7 @@ public CompletableFuture<ReceiveMessageResponse> processRequest(ReceiveMessageRe
}
int numMessages = rq.maxNumberOfMessages() != null ? rq.maxNumberOfMessages() : MAX_SUPPORTED_SQS_RECEIVE_MSG;

return queueAttributesManager.getReceiveMessageTimeout(rq, config.minReceiveWaitTime()).thenCompose(waitTimeMs -> {
return queueAttributesManager.getReceiveMessageTimeout(rq, config.messageMinWaitDuration()).thenCompose(waitTimeMs -> {
CompletableFuture<ReceiveMessageResponse> receiveMessageFuture = new CompletableFuture<>();
receiveQueueBuffer.receiveMessage(receiveMessageFuture, numMessages);
CompletableFuture<ReceiveMessageResponse> timeoutFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,7 @@ public ReceiveMessageBatchManager(SqsAsyncClient sqsClient,
BatchOverrideConfiguration config) {
this.sqsClient = sqsClient;
this.executor = executor;
this.config = config != null
? ResponseBatchConfiguration.builder()
.minReceiveWaitTime(config.receiveMessageMinWaitTime())
.receiveMessageAttributeNames(config.receiveMessageAttributeNames())
.messageSystemAttributeNames(config.receiveMessageSystemAttributeNames())
.visibilityTimeout(config.receiveMessageVisibilityTimeout())
.build()
: ResponseBatchConfiguration.builder().build();
this.config = ResponseBatchConfiguration.builder(config).build();


}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import static software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration.MAX_SUPPORTED_SQS_RECEIVE_MSG;

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
Expand Down Expand Up @@ -124,7 +126,7 @@ private int determineDesiredBatches() {
int totalRequested = futures.stream()
.mapToInt(FutureRequestWrapper::getRequestedSize)
.sum();
int batchesNeededToFulfillFutures = (int) Math.ceil((float) totalRequested / 10);
int batchesNeededToFulfillFutures = (int) Math.ceil((float) totalRequested / MAX_SUPPORTED_SQS_RECEIVE_MSG);
desiredBatches = Math.min(batchesNeededToFulfillFutures, desiredBatches);
}

Expand Down
Loading

0 comments on commit d2e59b9

Please sign in to comment.