From befb86c4338bfed8a37af5c3b55a28080687f08d Mon Sep 17 00:00:00 2001 From: John Viegas Date: Thu, 29 Aug 2024 18:25:04 -0700 Subject: [PATCH 1/6] Initial version --- .../BatchOverrideConfiguration.java | 405 ++++++------------ 1 file changed, 141 insertions(+), 264 deletions(-) diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java index 7157fbb8601d..9eae6143216e 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java @@ -19,8 +19,11 @@ import java.util.Collections; import java.util.List; import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.utils.ToString; import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.builder.CopyableBuilder; @@ -34,32 +37,29 @@ public final class BatchOverrideConfiguration implements ToCopyableBuilder { - private final Integer maxBatchItems; - private final Integer maxBatchKeys; - private final Integer maxBufferSize; - private final Duration maxBatchOpenDuration; - private final Duration visibilityTimeout; - private final Duration longPollWaitTimeout; - private final Duration minReceiveWaitTime; - private final Integer maxDoneReceiveBatches; - private final List messageSystemAttributeNames; + private final Integer outboundBatchSizeLimit; + private final Duration outboundBatchWindowDuration; + private final Duration receiveMessageVisibilityTimeout; + private final Duration receiveMessageLongPollWaitDuration; + private final Duration receiveMessageMinWaitTime; + private final List receiveMessageSystemAttributeNames; private final List receiveMessageAttributeNames; - private final Boolean adaptivePrefetching; - private final Integer maxInflightReceiveBatches; + private BatchOverrideConfiguration(Builder builder) { - this.maxBatchItems = Validate.isPositiveOrNull(builder.maxBatchItems, "maxBatchItems"); - this.maxBatchKeys = Validate.isPositiveOrNull(builder.maxBatchKeys, "maxBatchKeys"); - this.maxBufferSize = Validate.isPositiveOrNull(builder.maxBufferSize, "maxBufferSize"); - this.maxBatchOpenDuration = Validate.isPositiveOrNull(builder.maxBatchOpenDuration, "maxBatchOpenDuration"); - this.visibilityTimeout = Validate.isPositiveOrNull(builder.visibilityTimeout, "visibilityTimeout"); - this.longPollWaitTimeout = Validate.isPositiveOrNull(builder.longPollWaitTimeout, "longPollWaitTimeout"); - this.minReceiveWaitTime = Validate.isPositiveOrNull(builder.minReceiveWaitTime, "minReceiveWaitTime"); - this.messageSystemAttributeNames = builder.messageSystemAttributeNames; + this.outboundBatchSizeLimit = Validate.isPositiveOrNull(builder.outboundBatchSizeLimit, + "outboundBatchSizeLimit"); + Validate.isTrue(this.outboundBatchSizeLimit <= 10, "A batch can contain up to 10 messages."); + this.outboundBatchWindowDuration = Validate.isPositiveOrNull(builder.outboundBatchWindowDuration, + "outboundBatchCollectionTimeDuration"); + this.receiveMessageVisibilityTimeout = Validate.isPositiveOrNull(builder.receiveMsgVisibilityTimeout, + "receiveMessageVisibilityTimeout"); + this.receiveMessageLongPollWaitDuration = Validate.isPositiveOrNull(builder.receiveMsgLongPollWaitTimeout, + "receiveMessageLongPollWaitTimeout"); + this.receiveMessageMinWaitTime = Validate.isPositiveOrNull(builder.receiveMessageMinWaitTime, + "receiveMessageMinWaitTime"); + this.receiveMessageSystemAttributeNames = builder.receiveMessageSystemAttributeNames; this.receiveMessageAttributeNames = builder.receiveMessageAttributeNames; - this.adaptivePrefetching = builder.adaptivePrefetching; - this.maxInflightReceiveBatches = builder.maxInflightReceiveBatches; - this.maxDoneReceiveBatches = builder.maxDoneReceiveBatches; } public static Builder builder() { @@ -67,118 +67,83 @@ public static Builder builder() { } /** - * @return the optional maximum number of messages that are batched together in a single request. + * @return the maximum number of items that are batched together in a single outbound request + * A batch can contain up to maximum of 10 messages. + * The default value is 10. */ - public Integer maxBatchItems() { - return maxBatchItems; + public Integer outboundBatchSizeLimit() { + return outboundBatchSizeLimit; } - /** - * @return the optional maximum number of batchKeys to keep track of. - */ - public Integer maxBatchKeys() { - return maxBatchKeys; - } /** - * @return the maximum number of items to allow to be buffered for each batchKey. + * @return the maximum amount of time that an outgoing call waits to be batched with messages of the same type. + * The default value is 200 milliseconds. */ - public Integer maxBufferSize() { - return maxBufferSize; - } - - public Integer maxDoneReceiveBatches() { - return maxDoneReceiveBatches; - } - - /** - * @return the optional maximum amount of time that an outgoing call waits to be batched with messages of the same type. - */ - public Duration maxBatchOpenDuration() { - return maxBatchOpenDuration; + public Duration outboundBatchCollectionDuration() { + return outboundBatchWindowDuration; } /** * @return the custom visibility timeout to use when retrieving messages from SQS. */ - public Duration visibilityTimeout() { - return visibilityTimeout; + public Duration receiveMessageVisibilityTimeout() { + return receiveMessageVisibilityTimeout; } /** - * @return the amount of time, the receive call will block on the server waiting for messages to arrive if the - * queue is empty when the receive call is first made. + * @return the amount of time the receive call will block on the server, waiting for messages to arrive if the + * queue is empty when the call is initially made. */ - public Duration longPollWaitTimeout() { - return longPollWaitTimeout; + public Duration receiveMessageLongPollWaitDuration() { + return receiveMessageLongPollWaitDuration; } /** * @return the minimum wait time for incoming receive message requests. */ - public Duration minReceiveWaitTime() { - return minReceiveWaitTime; + public Duration receiveMessageMinWaitTime() { + return receiveMessageMinWaitTime; } /** - * @return the message systemAttribute Name will request {@link ReceiveMessageRequest#messageSystemAttributeNames()}. + * @return the system attribute names specific to the {@link ReceiveMessageRequest} + * that will be requested via {@link ReceiveMessageRequest#messageSystemAttributeNames()}. */ - public List messageSystemAttributeName() { - return messageSystemAttributeNames; + public List receiveMessageSystemAttributeNames() { + return receiveMessageSystemAttributeNames; } /** - * @return the message attributes receive calls will request. + * @return the message attribute names that are specific to receive calls + * and will be requested via {@link ReceiveMessageRequest#messageAttributeNames()}. */ public List receiveMessageAttributeNames() { return receiveMessageAttributeNames; } - /** - * @return the behavior for prefetching with respect to the number of in-flight incoming receive requests. - */ - public Boolean adaptivePrefetching() { - return adaptivePrefetching; - } - - /** - * @return the maximum number of concurrent receive message batches. - */ - public Integer maxInflightReceiveBatches() { - return maxInflightReceiveBatches; - } @Override public Builder toBuilder() { - return new Builder().maxBatchItems(maxBatchItems) - .maxBatchKeys(maxBatchKeys) - .maxBufferSize(maxBufferSize) - .maxBatchOpenDuration(maxBatchOpenDuration) - .visibilityTimeout(visibilityTimeout) - .longPollWaitTimeout(longPollWaitTimeout) - .minReceiveWaitTime(minReceiveWaitTime) - .maxInflightReceiveBatches(maxInflightReceiveBatches) - .messageSystemAttributeName(messageSystemAttributeNames) - .receiveMessageAttributeNames(receiveMessageAttributeNames) - .adaptivePrefetching(adaptivePrefetching) - .maxDoneReceiveBatches(maxDoneReceiveBatches); + return new Builder().outboundBatchSizeLimit(outboundBatchSizeLimit) + .maxOutboundBatchCollectionDuration(outboundBatchWindowDuration) + .receiveMsgVisibilityTimeout(receiveMessageVisibilityTimeout) + .receiveMsgLongPollWaitTimeout(receiveMessageLongPollWaitDuration) + .receiveMessageMinWaitTime(receiveMessageMinWaitTime) + .receiveMessageSystemAttributeNames(receiveMessageSystemAttributeNames) + .receiveMessageAttributeNames(receiveMessageAttributeNames); } @Override public String toString() { return ToString.builder("BatchOverrideConfiguration") - .add("maxBatchItems", maxBatchItems) - .add("maxBatchKeys", maxBatchKeys) - .add("maxBufferSize", maxBufferSize) - .add("maxBatchOpenDuration", maxBatchOpenDuration) - .add("visibilityTimeout", visibilityTimeout) - .add("longPollWaitTimeout", longPollWaitTimeout) - .add("minReceiveWaitTime", minReceiveWaitTime) - .add("receiveAttributeNames", messageSystemAttributeNames) + .add("maxBatchItems", outboundBatchSizeLimit) + .add("maxOutboundBatchCollectionDuration", outboundBatchWindowDuration) + .add("visibilityTimeout", receiveMessageVisibilityTimeout) + .add("longPollWaitTimeout", receiveMessageLongPollWaitDuration) + .add("receiveMessageMinWaitTime", receiveMessageMinWaitTime) + .add("receiveAttributeNames", receiveMessageSystemAttributeNames) .add("receiveMessageAttributeNames", receiveMessageAttributeNames) - .add("adaptivePrefetching", adaptivePrefetching) - .add("maxInflightReceiveBatches", maxInflightReceiveBatches) - .add("maxDoneReceiveBatches", maxDoneReceiveBatches) .build(); } @@ -193,232 +158,158 @@ public boolean equals(Object o) { BatchOverrideConfiguration that = (BatchOverrideConfiguration) o; - if (maxBatchItems != null ? !maxBatchItems.equals(that.maxBatchItems) : that.maxBatchItems != null) { - return false; - } - if (maxBatchKeys != null ? !maxBatchKeys.equals(that.maxBatchKeys) : that.maxBatchKeys != null) { + if (outboundBatchSizeLimit != null ? !outboundBatchSizeLimit.equals(that.outboundBatchSizeLimit) : that.outboundBatchSizeLimit != null) { return false; } - if (maxBufferSize != null ? !maxBufferSize.equals(that.maxBufferSize) : that.maxBufferSize != null) { + if (outboundBatchWindowDuration != null ? !outboundBatchWindowDuration.equals(that.outboundBatchWindowDuration) : + that.outboundBatchWindowDuration != null) { return false; } - - if (maxBatchOpenDuration != null ? !maxBatchOpenDuration.equals(that.maxBatchOpenDuration) : - that.maxBatchOpenDuration != null) { - return false; - } - if (visibilityTimeout != null ? !visibilityTimeout.equals(that.visibilityTimeout) : - that.visibilityTimeout != null) { + if (receiveMessageVisibilityTimeout != null ? !receiveMessageVisibilityTimeout.equals(that.receiveMessageVisibilityTimeout) : + that.receiveMessageVisibilityTimeout != null) { return false; } - if (longPollWaitTimeout != null ? !longPollWaitTimeout.equals(that.longPollWaitTimeout) : - that.longPollWaitTimeout != null) { + if (receiveMessageLongPollWaitDuration != null ? !receiveMessageLongPollWaitDuration.equals(that.receiveMessageLongPollWaitDuration) : + that.receiveMessageLongPollWaitDuration != null) { return false; } - if (minReceiveWaitTime != null ? !minReceiveWaitTime.equals(that.minReceiveWaitTime) : - that.minReceiveWaitTime != null) { + if (receiveMessageMinWaitTime != null ? !receiveMessageMinWaitTime.equals(that.receiveMessageMinWaitTime) : + that.receiveMessageMinWaitTime != null) { return false; } - if (messageSystemAttributeNames != null ? !messageSystemAttributeNames.equals(that.messageSystemAttributeNames) : - that.messageSystemAttributeNames != null) { + if (receiveMessageSystemAttributeNames != null ? !receiveMessageSystemAttributeNames.equals(that.receiveMessageSystemAttributeNames) : + that.receiveMessageSystemAttributeNames != null) { return false; } - if (receiveMessageAttributeNames != null ? !receiveMessageAttributeNames.equals(that.receiveMessageAttributeNames) : - that.receiveMessageAttributeNames != null) { - return false; - } - if (adaptivePrefetching != null ? !adaptivePrefetching.equals(that.adaptivePrefetching) : - that.adaptivePrefetching != null) { - return false; - } - if (maxInflightReceiveBatches != null ? !maxInflightReceiveBatches.equals(that.maxInflightReceiveBatches) : - that.maxInflightReceiveBatches != null) { - return false; - } - return maxDoneReceiveBatches != null ? maxDoneReceiveBatches.equals(that.maxDoneReceiveBatches) : - that.maxDoneReceiveBatches == null; + return receiveMessageAttributeNames != null ? receiveMessageAttributeNames.equals(that.receiveMessageAttributeNames) : + that.receiveMessageAttributeNames == null; } @Override public int hashCode() { - int result = maxBatchItems != null ? maxBatchItems.hashCode() : 0; - result = 31 * result + (maxBatchKeys != null ? maxBatchKeys.hashCode() : 0); - result = 31 * result + (maxBufferSize != null ? maxBufferSize.hashCode() : 0); - result = 31 * result + (maxBatchOpenDuration != null ? maxBatchOpenDuration.hashCode() : 0); - result = 31 * result + (visibilityTimeout != null ? visibilityTimeout.hashCode() : 0); - result = 31 * result + (longPollWaitTimeout != null ? longPollWaitTimeout.hashCode() : 0); - result = 31 * result + (minReceiveWaitTime != null ? minReceiveWaitTime.hashCode() : 0); - result = 31 * result + (messageSystemAttributeNames != null ? messageSystemAttributeNames.hashCode() : 0); + int result = outboundBatchSizeLimit != null ? outboundBatchSizeLimit.hashCode() : 0; + result = 31 * result + (outboundBatchWindowDuration != null ? outboundBatchWindowDuration.hashCode() : 0); + result = 31 * result + (receiveMessageVisibilityTimeout != null ? receiveMessageVisibilityTimeout.hashCode() : 0); + result = 31 * result + (receiveMessageLongPollWaitDuration != null ? receiveMessageLongPollWaitDuration.hashCode() : 0); + result = 31 * result + (receiveMessageMinWaitTime != null ? receiveMessageMinWaitTime.hashCode() : 0); + result = 31 * result + (receiveMessageSystemAttributeNames != null ? receiveMessageSystemAttributeNames.hashCode() : 0); result = 31 * result + (receiveMessageAttributeNames != null ? receiveMessageAttributeNames.hashCode() : 0); - result = 31 * result + (adaptivePrefetching != null ? adaptivePrefetching.hashCode() : 0); - result = 31 * result + (maxInflightReceiveBatches != null ? maxInflightReceiveBatches.hashCode() : 0); - result = 31 * result + (maxDoneReceiveBatches != null ? maxDoneReceiveBatches.hashCode() : 0); return result; } public static final class Builder implements CopyableBuilder { - private Integer maxBatchItems; - private Integer maxBatchKeys; - private Integer maxBufferSize; - private Duration maxBatchOpenDuration; - private Duration visibilityTimeout; - private Duration longPollWaitTimeout; - private Duration minReceiveWaitTime; - private Integer maxDoneReceiveBatches; - private Integer maxInflightReceiveBatches; - private List messageSystemAttributeNames = Collections.emptyList(); + private Integer outboundBatchSizeLimit; + private Duration outboundBatchWindowDuration; + private Duration receiveMsgVisibilityTimeout; + private Duration receiveMsgLongPollWaitTimeout; + private Duration receiveMessageMinWaitTime; + private List receiveMessageSystemAttributeNames = Collections.emptyList(); private List receiveMessageAttributeNames = Collections.emptyList(); - private Boolean adaptivePrefetching; private Builder() { } /** - * Define the maximum number of messages that are batched together in a single request. + * Specifies the maximum number of items that the buffered client will include in a single outbound batch request. + * Outbound requests include {@link SendMessageBatchRequest}, {@link ChangeMessageVisibilityBatchRequest}, + * and {@link DeleteMessageBatchRequest}. + * A batch can contain up to a maximum of 10 messages. The default value is 10. * - * @param maxBatchItems The new maxBatchItems value. - * @return This object for method chaining. + * @param outboundBatchSizeLimit The maximum number of items to be batched together in a single request. + * @return This Builder object for method chaining. */ - public Builder maxBatchItems(Integer maxBatchItems) { - this.maxBatchItems = maxBatchItems; + public Builder outboundBatchSizeLimit(Integer outboundBatchSizeLimit) { + this.outboundBatchSizeLimit = outboundBatchSizeLimit; return this; } /** - * Define the maximum number of batchKeys to keep track of. A batchKey determines which requests are batched together and - * is calculated by the client based on the information in a request. - *

- * Ex. SQS determines a batchKey based on a request's queueUrl in combination with its overrideConfiguration, so requests - * with the same queueUrl and overrideConfiguration will have the same batchKey and be batched together. + * Specifies the maximum duration that an outbound batch is held open for additional outbound requests + * before being sent. Outbound requests include {@link SendMessageBatchRequest}, + * {@link ChangeMessageVisibilityBatchRequest}, and {@link DeleteMessageBatchRequest}. If the + * {@link #outboundBatchSizeLimit} is reached before this duration, the batch will be sent immediately. + * The longer this duration, the more time messages have to be added to the batch, which can reduce the + * number of calls made and increase throughput, but it may also increase average message latency. + * The default value is 200 milliseconds. * - * @param maxBatchKeys the new maxBatchKeys value. - * @return This object for method chaining. + * @param maxOutboundBatchCollectionDuration The new maxOutboundBatchCollectionDuration value. + * @return This Builder object for method chaining. */ - public Builder maxBatchKeys(Integer maxBatchKeys) { - this.maxBatchKeys = maxBatchKeys; + public Builder maxOutboundBatchCollectionDuration(Duration maxOutboundBatchCollectionDuration) { + this.outboundBatchWindowDuration = maxOutboundBatchCollectionDuration; return this; } /** - * Define the maximum number of items to allow to be buffered for each batchKey. + * Defines the custom visibility timeout to use when retrieving messages from SQS. If set to a positive value, + * this timeout will override the default visibility timeout set on the SQS queue. If no value is set, + * then by default, the visibility timeout of the queue will be used. Only positive values are supported. * - * @param maxBufferSize the new maxBufferSize value. - * @return This object for method chaining. + * @param receiveMsgVisibilityTimeout The new visibilityTimeout value. + * @return This Builder object for method chaining. */ - public Builder maxBufferSize(Integer maxBufferSize) { - this.maxBufferSize = maxBufferSize; + public Builder receiveMsgVisibilityTimeout(Duration receiveMsgVisibilityTimeout) { + this.receiveMsgVisibilityTimeout = receiveMsgVisibilityTimeout; return this; } /** - * Define the maximum amount of time that an outgoing call waits for other requests before sending out a - * batch request. - * TODO : Decide if Ms needs to be added to the name in surface API review meeting - * @param maxBatchOpenDuration The new maxBatchOpenDuration value. - * @return This object for method chaining. - */ - public Builder maxBatchOpenDuration(Duration maxBatchOpenDuration) { - this.maxBatchOpenDuration = maxBatchOpenDuration; - return this; - } - - /** - * Define the custom visibility timeout to use when retrieving messages from SQS. If set to a value greater than zero, - * this timeout will override the default visibility timeout set on the SQS queue. Set it to -1 to use the default - * visibility timeout of the queue. Visibility timeout of 0 seconds is not supported. + * Specifies the amount of time the receive call will block on the server waiting for messages to arrive if the + * queue is empty when the receive call is first made. By default, this value is not set, meaning no long + * polling wait time on the server side. * - * @param visibilityTimeout The new visibilityTimeout value. - * @return This object for method chaining. + * @param receiveMsgLongPollWaitTimeout The new longPollWaitTimeout value. + * @return This Builder object for method chaining. */ - public Builder visibilityTimeout(Duration visibilityTimeout) { - this.visibilityTimeout = visibilityTimeout; + public Builder receiveMsgLongPollWaitTimeout(Duration receiveMsgLongPollWaitTimeout) { + this.receiveMsgLongPollWaitTimeout = receiveMsgLongPollWaitTimeout; return this; } /** - * Define the amount of time, the receive call will block on the server waiting for messages to arrive if the - * queue is empty when the receive call is first made. This setting has no effect if long polling is disabled. + * Configures the minimum wait time for incoming receive message requests. The default value is 50 milliseconds. + * Without a non-zero minimum wait time, threads can easily waste CPU time by busy-waiting against empty local buffers. + * Avoid setting this to 0 unless you are confident that threads will perform useful work between each call + * to receive messages. + * The call may return sooner than the configured `WaitTimeSeconds` if there are messages in the buffer. + * If no messages are available and the wait time expires, the call will return an empty message list. * - * @param longPollWaitTimeout The new longPollWaitTimeout value. - * @return This object for method chaining. - */ - public Builder longPollWaitTimeout(Duration longPollWaitTimeout) { - this.longPollWaitTimeout = longPollWaitTimeout; - return this; - } - - /** - * Define the minimum wait time for incoming receive message requests. Without a non-zero minimum wait time, threads can - * easily waste CPU time busy-waiting against empty local buffers. Avoid setting this to 0 unless you are confident - * threads will do useful work in-between each call to receive messages! - * - * @param minReceiveWaitTime The new minReceiveWaitTime value. - * @return This object for method chaining. - */ - public Builder minReceiveWaitTime(Duration minReceiveWaitTime) { - this.minReceiveWaitTime = minReceiveWaitTime; - return this; - } - - /** - * Define the maximum number of concurrent receive message batches. The greater this number, the faster the queue will be - * pulling messages from the SQS servers (at the expense of consuming more threads). * - * @param maxInflightReceiveBatches The new maxInflightReceiveBatches value. - * @return This object for method chaining. + * @param receiveMessageMinWaitTime The new minimum wait time value. + * @return This Builder object for method chaining. */ - public Builder maxInflightReceiveBatches(Integer maxInflightReceiveBatches) { - this.maxInflightReceiveBatches = maxInflightReceiveBatches; + public Builder receiveMessageMinWaitTime(Duration receiveMessageMinWaitTime) { + this.receiveMessageMinWaitTime = receiveMessageMinWaitTime; return this; } /** - * Define the maximum number of done receive batches. If more than that number of completed receive batches are waiting in - * the buffer, the querying for new messages will stop. The larger this number, the more messages the buffer queue will - * pre-fetch and keep in the buffer on the client side, and the faster receive requests will be satisfied. The visibility - * timeout of a pre-fetched message starts at the point of pre-fetch, which means that while the message is in the local - * buffer it is unavailable for other clients to process, and when this client retrieves it, part of the visibility - * timeout may have already expired. The number of messages prefetched will not exceed maxBatchSize * - * maxDoneReceiveBatches. - * - * @param maxDoneReceiveBatches The new maxDoneReceiveBatches value. - * @return This object for method chaining. - */ - public Builder maxDoneReceiveBatches(Integer maxDoneReceiveBatches) { - this.maxDoneReceiveBatches = maxDoneReceiveBatches; - return this; - } - - /** - * Defines the list of message system attribute names that should be requested in receive message calls. - * Only receive message requests that request the same set of message system attributes will be satisfied - * from the receive buffers. Refer to {@link ReceiveMessageRequest#messageSystemAttributeNames()}. - * - * Note that if requests to the BatchManager are sent with different messageSystemAttributeNames than what is - * configured, the request will bypass the BatchManager and will instead make a direct call to SQS. + * Defines the list of message system attribute names to request in receive message calls. + * If no `messageSystemAttributeNames` are set in the individual request, the ones configured here will be used. + *

+ * Requests with different `messageSystemAttributeNames` than those configured here will bypass the + * BatchManager and make a direct call to SQS. Only requests with matching attribute names will be + * batched and fulfilled from the receive buffers. * - * @param messageSystemAttributeNames The list of message system attribute names to be requested. + * @param messageSystemAttributeNames The list of message system attribute names to request. * If null, an empty list will be used. * @return This builder object for method chaining. */ - public Builder messageSystemAttributeName(List messageSystemAttributeNames) { - this.messageSystemAttributeNames = messageSystemAttributeNames != null ? - Collections.unmodifiableList(messageSystemAttributeNames) : - Collections.emptyList() ; + public Builder receiveMessageSystemAttributeNames(List messageSystemAttributeNames) { + this.receiveMessageSystemAttributeNames = messageSystemAttributeNames; return this; } /** - * Defines the message attributes that receive calls will request. Only receive message requests that request the same set - * of message attributes will be satisfied from the Receive buffer .Refer to - * {@link ReceiveMessageRequest#messageAttributeNames()}. + * Defines the list of message attribute names to request in receive message calls. + * If no `receiveMessageAttributeNames` are set in the individual requests, the ones configured here will be used. *

- * Note that if requests to the BatchManager are sent with different receiveMessageAttributeNames than what is configured, - * the request will bypass the BatchManager and will instead make a direct call to SQS. + * Requests with different `receiveMessageAttributeNames` than those configured here will bypass the batched and + * fulfilled from the receive buffers. * - * @param receiveMessageAttributeNames The list of message attributes to be requested. If null, an empty list will be - * used. + * @param receiveMessageAttributeNames The list of message attribute names to request. + * If null, an empty list will be used. * @return This builder object for method chaining. */ public Builder receiveMessageAttributeNames(List receiveMessageAttributeNames) { @@ -428,20 +319,6 @@ public Builder receiveMessageAttributeNames(List receiveMessageAttribute return this; } - /** - * Define the behavior for prefetching with respect to the number of in-flight incoming receive requests made to the - * client. The advantage of this is reducing the number of outgoing requests made to SQS when incoming requests are - * reduced: in particular, if all incoming requests stop no future requests to SQS will be made. The disadvantage is - * increased latency when incoming requests first start occurring. - * - * @param adaptivePrefetching The new adaptivePrefetching value. - * @return This object for method chaining. - */ - public Builder adaptivePrefetching(Boolean adaptivePrefetching) { - this.adaptivePrefetching = adaptivePrefetching; - return this; - } - public BatchOverrideConfiguration build() { return new BatchOverrideConfiguration(this); } From 36bd1a154388cd7e565c35f6ec077ba83ad8852b Mon Sep 17 00:00:00 2001 From: John Viegas Date: Tue, 3 Sep 2024 10:34:32 -0700 Subject: [PATCH 2/6] Intermediate changes --- .../BatchOverrideConfiguration.java | 114 ++++++++++-------- .../RequestBatchConfiguration.java | 18 +-- .../batchmanager/RequestBatchManager.java | 8 +- .../ResponseBatchConfiguration.java | 10 +- .../BatchOverrideConfigurationTest.java | 26 ++-- .../batchmanager/ReceiveBatchManagerTest.java | 6 +- .../ReceiveMessageBatchManagerTest.java | 7 +- .../ReceiveSqsMessageHelperTest.java | 12 +- .../batchmanager/RequestBatchManagerTest.java | 12 +- .../sqs/batchmanager/SampleBatchManager.java | 4 +- 10 files changed, 115 insertions(+), 102 deletions(-) diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java index 9eae6143216e..29519f0c91b6 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.services.sqs.batchmanager; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import software.amazon.awssdk.annotations.SdkPublicApi; @@ -30,7 +31,7 @@ import software.amazon.awssdk.utils.builder.ToCopyableBuilder; /** - * Configuration values for the BatchManager Implementation. All values are optional, and the default values will be used if they + * Configuration values for the BatchManager Implementation. All values are optional, and default values will be used if they * are not specified. */ @SdkPublicApi @@ -49,17 +50,26 @@ public final class BatchOverrideConfiguration implements ToCopyableBuilder(builder.receiveMessageSystemAttributeNames)) : + Collections.emptyList(); + + this.receiveMessageAttributeNames = builder.receiveMessageAttributeNames != null ? + Collections.unmodifiableList(new ArrayList<>(builder.receiveMessageAttributeNames)) : + Collections.emptyList(); } public static Builder builder() { @@ -67,20 +77,19 @@ public static Builder builder() { } /** - * @return the maximum number of items that are batched together in a single outbound request - * A batch can contain up to maximum of 10 messages. - * The default value is 10. + * @return the maximum number of items that are batched together in a single outbound request. + * A batch can contain up to a maximum of 10 messages. + * The default value is 10. */ public Integer outboundBatchSizeLimit() { return outboundBatchSizeLimit; } - /** * @return the maximum amount of time that an outgoing call waits to be batched with messages of the same type. - * The default value is 200 milliseconds. + * The default value is 200 milliseconds. */ - public Duration outboundBatchCollectionDuration() { + public Duration outboundBatchWindowDuration() { return outboundBatchWindowDuration; } @@ -93,7 +102,7 @@ public Duration receiveMessageVisibilityTimeout() { /** * @return the amount of time the receive call will block on the server, waiting for messages to arrive if the - * queue is empty when the call is initially made. + * queue is empty when the call is initially made. */ public Duration receiveMessageLongPollWaitDuration() { return receiveMessageLongPollWaitDuration; @@ -108,7 +117,7 @@ public Duration receiveMessageMinWaitTime() { /** * @return the system attribute names specific to the {@link ReceiveMessageRequest} - * that will be requested via {@link ReceiveMessageRequest#messageSystemAttributeNames()}. + * that will be requested via {@link ReceiveMessageRequest#messageSystemAttributeNames()}. */ public List receiveMessageSystemAttributeNames() { return receiveMessageSystemAttributeNames; @@ -116,7 +125,7 @@ public List receiveMessageSystemAttributeNames() { /** * @return the message attribute names that are specific to receive calls - * and will be requested via {@link ReceiveMessageRequest#messageAttributeNames()}. + * and will be requested via {@link ReceiveMessageRequest#messageAttributeNames()}. */ public List receiveMessageAttributeNames() { return receiveMessageAttributeNames; @@ -125,24 +134,26 @@ public List receiveMessageAttributeNames() { @Override public Builder toBuilder() { - return new Builder().outboundBatchSizeLimit(outboundBatchSizeLimit) - .maxOutboundBatchCollectionDuration(outboundBatchWindowDuration) - .receiveMsgVisibilityTimeout(receiveMessageVisibilityTimeout) - .receiveMsgLongPollWaitTimeout(receiveMessageLongPollWaitDuration) - .receiveMessageMinWaitTime(receiveMessageMinWaitTime) - .receiveMessageSystemAttributeNames(receiveMessageSystemAttributeNames) - .receiveMessageAttributeNames(receiveMessageAttributeNames); + return new Builder() + .outboundBatchSizeLimit(outboundBatchSizeLimit) + .outboundBatchWindowDuration(outboundBatchWindowDuration) + .receiveMessageVisibilityTimeout(receiveMessageVisibilityTimeout) + .receiveMessageLongPollWaitDuration(receiveMessageLongPollWaitDuration) + .receiveMessageMinWaitTime(receiveMessageMinWaitTime) + .receiveMessageSystemAttributeNames(receiveMessageSystemAttributeNames) + .receiveMessageAttributeNames(receiveMessageAttributeNames); } + @Override public String toString() { return ToString.builder("BatchOverrideConfiguration") - .add("maxBatchItems", outboundBatchSizeLimit) - .add("maxOutboundBatchCollectionDuration", outboundBatchWindowDuration) - .add("visibilityTimeout", receiveMessageVisibilityTimeout) - .add("longPollWaitTimeout", receiveMessageLongPollWaitDuration) + .add("outboundBatchSizeLimit", outboundBatchSizeLimit) + .add("outboundBatchWindowDuration", outboundBatchWindowDuration) + .add("receiveMessageVisibilityTimeout", receiveMessageVisibilityTimeout) + .add("receiveMessageLongPollWaitDuration", receiveMessageLongPollWaitDuration) .add("receiveMessageMinWaitTime", receiveMessageMinWaitTime) - .add("receiveAttributeNames", receiveMessageSystemAttributeNames) + .add("receiveMessageSystemAttributeNames", receiveMessageSystemAttributeNames) .add("receiveMessageAttributeNames", receiveMessageAttributeNames) .build(); } @@ -199,14 +210,15 @@ public int hashCode() { public static final class Builder implements CopyableBuilder { - private Integer outboundBatchSizeLimit; - private Duration outboundBatchWindowDuration; - private Duration receiveMsgVisibilityTimeout; - private Duration receiveMsgLongPollWaitTimeout; - private Duration receiveMessageMinWaitTime; + private Integer outboundBatchSizeLimit = 10; + private Duration outboundBatchWindowDuration = Duration.ofMillis(200); + private Duration receiveMessageVisibilityTimeout; + private Duration receiveMessageLongPollWaitDuration; + private Duration receiveMessageMinWaitTime = Duration.ofMillis(50); private List receiveMessageSystemAttributeNames = Collections.emptyList(); private List receiveMessageAttributeNames = Collections.emptyList(); + private Builder() { } @@ -233,11 +245,11 @@ public Builder outboundBatchSizeLimit(Integer outboundBatchSizeLimit) { * number of calls made and increase throughput, but it may also increase average message latency. * The default value is 200 milliseconds. * - * @param maxOutboundBatchCollectionDuration The new maxOutboundBatchCollectionDuration value. + * @param outboundBatchWindowDuration The new outboundBatchWindowDuration value. * @return This Builder object for method chaining. */ - public Builder maxOutboundBatchCollectionDuration(Duration maxOutboundBatchCollectionDuration) { - this.outboundBatchWindowDuration = maxOutboundBatchCollectionDuration; + public Builder outboundBatchWindowDuration(Duration outboundBatchWindowDuration) { + this.outboundBatchWindowDuration = outboundBatchWindowDuration; return this; } @@ -246,11 +258,11 @@ public Builder maxOutboundBatchCollectionDuration(Duration maxOutboundBatchColle * this timeout will override the default visibility timeout set on the SQS queue. If no value is set, * then by default, the visibility timeout of the queue will be used. Only positive values are supported. * - * @param receiveMsgVisibilityTimeout The new visibilityTimeout value. + * @param receiveMessageVisibilityTimeout The new visibilityTimeout value. * @return This Builder object for method chaining. */ - public Builder receiveMsgVisibilityTimeout(Duration receiveMsgVisibilityTimeout) { - this.receiveMsgVisibilityTimeout = receiveMsgVisibilityTimeout; + public Builder receiveMessageVisibilityTimeout(Duration receiveMessageVisibilityTimeout) { + this.receiveMessageVisibilityTimeout = receiveMessageVisibilityTimeout; return this; } @@ -259,11 +271,11 @@ public Builder receiveMsgVisibilityTimeout(Duration receiveMsgVisibilityTimeout) * queue is empty when the receive call is first made. By default, this value is not set, meaning no long * polling wait time on the server side. * - * @param receiveMsgLongPollWaitTimeout The new longPollWaitTimeout value. + * @param receiveMessageLongPollWaitDuration The new longPollWaitTimeout value. * @return This Builder object for method chaining. */ - public Builder receiveMsgLongPollWaitTimeout(Duration receiveMsgLongPollWaitTimeout) { - this.receiveMsgLongPollWaitTimeout = receiveMsgLongPollWaitTimeout; + public Builder receiveMessageLongPollWaitDuration(Duration receiveMessageLongPollWaitDuration) { + this.receiveMessageLongPollWaitDuration = receiveMessageLongPollWaitDuration; return this; } @@ -275,7 +287,6 @@ public Builder receiveMsgLongPollWaitTimeout(Duration receiveMsgLongPollWaitTime * The call may return sooner than the configured `WaitTimeSeconds` if there are messages in the buffer. * If no messages are available and the wait time expires, the call will return an empty message list. * - * * @param receiveMessageMinWaitTime The new minimum wait time value. * @return This Builder object for method chaining. */ @@ -292,12 +303,14 @@ public Builder receiveMessageMinWaitTime(Duration receiveMessageMinWaitTime) { * BatchManager and make a direct call to SQS. Only requests with matching attribute names will be * batched and fulfilled from the receive buffers. * - * @param messageSystemAttributeNames The list of message system attribute names to request. + * @param receiveMessageSystemAttributeNames The list of message system attribute names to request. * If null, an empty list will be used. * @return This builder object for method chaining. */ - public Builder receiveMessageSystemAttributeNames(List messageSystemAttributeNames) { - this.receiveMessageSystemAttributeNames = messageSystemAttributeNames; + public Builder receiveMessageSystemAttributeNames(List receiveMessageSystemAttributeNames) { + this.receiveMessageSystemAttributeNames = receiveMessageSystemAttributeNames != null ? + new ArrayList<>(receiveMessageSystemAttributeNames) : + Collections.emptyList(); return this; } @@ -314,11 +327,16 @@ public Builder receiveMessageSystemAttributeNames(List receiveMessageAttributeNames) { this.receiveMessageAttributeNames = receiveMessageAttributeNames != null ? - Collections.unmodifiableList(receiveMessageAttributeNames) : + new ArrayList<>(receiveMessageAttributeNames) : Collections.emptyList(); return this; } + /** + * Builds a new {@link BatchOverrideConfiguration} object based on the values set in this builder. + * + * @return A new {@link BatchOverrideConfiguration} object. + */ public BatchOverrideConfiguration build() { return new BatchOverrideConfiguration(this); } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java index dc407c4c84f3..6859febfcc54 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java @@ -31,7 +31,7 @@ public final class RequestBatchConfiguration { private final Integer maxBatchItems; private final Integer maxBatchKeys; private final Integer maxBufferSize; - private final Duration maxBatchOpenDuration; + private final Duration maxOutboundBatchCollectionDuration; private final Integer maxBatchBytesSize; private RequestBatchConfiguration(Builder builder) { @@ -39,7 +39,7 @@ private RequestBatchConfiguration(Builder builder) { this.maxBatchItems = builder.maxBatchItems != null ? builder.maxBatchItems : DEFAULT_MAX_BATCH_ITEMS; this.maxBatchKeys = builder.maxBatchKeys != null ? builder.maxBatchKeys : DEFAULT_MAX_BATCH_KEYS; this.maxBufferSize = builder.maxBufferSize != null ? builder.maxBufferSize : DEFAULT_MAX_BUFFER_SIZE; - this.maxBatchOpenDuration = builder.maxBatchOpenDuration != null ? builder.maxBatchOpenDuration : + this.maxOutboundBatchCollectionDuration = builder.maxOutboundBatchCollectionDuration != null ? builder.maxOutboundBatchCollectionDuration : DEFAULT_MAX_BATCH_OPEN_IN_MS; this.maxBatchBytesSize = builder.maxBatchBytesSize != null ? builder.maxBatchBytesSize : DEFAULT_MAX_BATCH_BYTES_SIZE; @@ -53,15 +53,15 @@ public static Builder builder(BatchOverrideConfiguration configuration) { if (configuration != null) { return new Builder() .maxBatchKeys(configuration.maxBatchKeys()) - .maxBatchItems(configuration.maxBatchItems()) - .maxBatchOpenDuration(configuration.maxBatchOpenDuration()) + .maxBatchItems(configuration.outboundBatchSizeLimit()) + .maxOutboundBatchCollectionDuration(configuration.outboundBatchCollectionDuration()) .maxBufferSize(configuration.maxBufferSize()); } return new Builder(); } - public Duration maxBatchOpenDuration() { - return maxBatchOpenDuration; + public Duration maxOutboundBatchCollectionDuration() { + return maxOutboundBatchCollectionDuration; } public int maxBatchItems() { @@ -85,7 +85,7 @@ public static final class Builder { private Integer maxBatchItems; private Integer maxBatchKeys; private Integer maxBufferSize; - private Duration maxBatchOpenDuration; + private Duration maxOutboundBatchCollectionDuration; private Integer maxBatchBytesSize; private Builder() { @@ -106,8 +106,8 @@ public Builder maxBufferSize(Integer maxBufferSize) { return this; } - public Builder maxBatchOpenDuration(Duration maxBatchOpenDuration) { - this.maxBatchOpenDuration = maxBatchOpenDuration; + public Builder maxOutboundBatchCollectionDuration(Duration maxOutboundBatchCollectionDuration) { + this.maxOutboundBatchCollectionDuration = maxOutboundBatchCollectionDuration; return this; } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java index dc25430613c1..eb0d7aede254 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java @@ -36,7 +36,7 @@ public abstract class RequestBatchManager { protected final RequestBatchConfiguration batchConfiguration ; private final int maxBatchItems; - private final Duration maxBatchOpenDuration; + private final Duration maxOutboundBatchCollectionDuration; private final BatchingMap requestsAndResponsesMaps; private final ScheduledExecutorService scheduledExecutor; @@ -48,7 +48,7 @@ protected RequestBatchManager(RequestBatchConfiguration overrideConfiguration, ScheduledExecutorService scheduledExecutor) { batchConfiguration = overrideConfiguration; this.maxBatchItems = batchConfiguration.maxBatchItems(); - this.maxBatchOpenDuration = batchConfiguration.maxBatchOpenDuration(); + this.maxOutboundBatchCollectionDuration = batchConfiguration.maxOutboundBatchCollectionDuration(); this.scheduledExecutor = Validate.notNull(scheduledExecutor, "Null scheduledExecutor"); pendingBatchResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); pendingResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -72,7 +72,7 @@ public CompletableFuture batchRequest(RequestT request) { // Add request and response to the map, scheduling a flush if necessary requestsAndResponsesMaps.put(batchKey, - () -> scheduleBufferFlush(batchKey, maxBatchOpenDuration.toMillis(), scheduledExecutor), + () -> scheduleBufferFlush(batchKey, maxOutboundBatchCollectionDuration.toMillis(), scheduledExecutor), request, response); @@ -100,7 +100,7 @@ private void manualFlushBuffer(String batchKey, Map> flushableRequests) { requestsAndResponsesMaps.cancelScheduledFlush(batchKey); flushBuffer(batchKey, flushableRequests); - requestsAndResponsesMaps.putScheduledFlush(batchKey, scheduleBufferFlush(batchKey, maxBatchOpenDuration.toMillis(), + requestsAndResponsesMaps.putScheduledFlush(batchKey, scheduleBufferFlush(batchKey, maxOutboundBatchCollectionDuration.toMillis(), scheduledExecutor)); } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java index 16acb845f8ca..61e878589bf1 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java @@ -49,19 +49,19 @@ public final class ResponseBatchConfiguration { public ResponseBatchConfiguration(BatchOverrideConfiguration overrideConfiguration) { this.visibilityTimeout = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::visibilityTimeout) + .map(BatchOverrideConfiguration::receiveMessageVisibilityTimeout) .orElse(VISIBILITY_TIMEOUT_SECONDS_DEFAULT); this.longPollWaitTimeout = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::longPollWaitTimeout) + .map(BatchOverrideConfiguration::receiveMessageLongPollWaitDuration) .orElse(LONG_POLL_WAIT_TIMEOUT_DEFAULT); this.minReceiveWaitTime = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::minReceiveWaitTime) + .map(BatchOverrideConfiguration::receiveMessageMinWaitTime) .orElse(MIN_RECEIVE_WAIT_TIME_MS_DEFAULT); this.messageSystemAttributeValues = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::messageSystemAttributeName) + .map(BatchOverrideConfiguration::receiveMessageSystemAttributeNames) .filter(list -> !list.isEmpty()) .orElse(MESSAGE_SYSTEM_ATTRIBUTE_NAMES_DEFAULT); @@ -75,7 +75,7 @@ public ResponseBatchConfiguration(BatchOverrideConfiguration overrideConfigurati .orElse(ADAPTIVE_PREFETCHING_DEFAULT); this.maxBatchItems = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::maxBatchItems) + .map(BatchOverrideConfiguration::outboundBatchSizeLimit) .orElse(MAX_BATCH_ITEMS_DEFAULT); diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java index 0b6492d520c9..cbcca6f7ad7f 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java @@ -66,7 +66,7 @@ private static Stream provideConfigurations() { @MethodSource("provideConfigurations") void testBatchOverrideConfiguration(Integer maxBatchItems, Integer maxBatchKeys, - Duration maxBatchOpenDuration, + Duration maxOutboundBatchCollectionDuration, Duration visibilityTimeout, Duration longPollWaitTimeout, Duration minReceiveWaitTime, @@ -77,9 +77,9 @@ void testBatchOverrideConfiguration(Integer maxBatchItems, Integer maxDoneReceiveBatches) { BatchOverrideConfiguration config = BatchOverrideConfiguration.builder() - .maxBatchItems(maxBatchItems) + .outboundBatchSizeLimit(maxBatchItems) .maxBatchKeys(maxBatchKeys) - .maxBatchOpenDuration(maxBatchOpenDuration) + .maxOutboundBatchCollectionDuration(maxOutboundBatchCollectionDuration) .visibilityTimeout(visibilityTimeout) .longPollWaitTimeout(longPollWaitTimeout) .minReceiveWaitTime(minReceiveWaitTime) @@ -90,16 +90,16 @@ void testBatchOverrideConfiguration(Integer maxBatchItems, .maxDoneReceiveBatches(maxDoneReceiveBatches) .build(); - assertEquals(maxBatchItems, config.maxBatchItems()); + assertEquals(maxBatchItems, config.outboundBatchSizeLimit()); assertEquals(maxBatchKeys, config.maxBatchKeys()); - assertEquals(maxBatchOpenDuration, config.maxBatchOpenDuration()); - assertEquals(visibilityTimeout, config.visibilityTimeout()); - assertEquals(longPollWaitTimeout, config.longPollWaitTimeout()); - assertEquals(minReceiveWaitTime, config.minReceiveWaitTime()); + assertEquals(maxOutboundBatchCollectionDuration, config.outboundBatchCollectionDuration()); + assertEquals(visibilityTimeout, config.receiveMessageVisibilityTimeout()); + assertEquals(longPollWaitTimeout, config.receiveMessageLongPollWaitDuration()); + assertEquals(minReceiveWaitTime, config.receiveMessageMinWaitTime()); assertEquals(Optional.ofNullable(receiveMessageAttributeNames).orElse(Collections.emptyList()), config.receiveMessageAttributeNames()); assertEquals(Optional.ofNullable(messageSystemAttributeNames).orElse(Collections.emptyList()), - config.messageSystemAttributeName()); + config.receiveMessageSystemAttributeNames()); assertEquals(adaptivePrefetching, config.adaptivePrefetching()); assertEquals(maxInflightReceiveBatches, config.maxInflightReceiveBatches()); assertEquals(maxDoneReceiveBatches, config.maxDoneReceiveBatches()); @@ -115,9 +115,9 @@ void testEqualsAndHashCode() { @Test void testToBuilder() { BatchOverrideConfiguration originalConfig = BatchOverrideConfiguration.builder() - .maxBatchItems(10) + .outboundBatchSizeLimit(10) .maxBatchKeys(5) - .maxBatchOpenDuration(Duration.ofMillis(200)) + .maxOutboundBatchCollectionDuration(Duration.ofMillis(200)) .visibilityTimeout(Duration.ofSeconds(30)) .longPollWaitTimeout(Duration.ofSeconds(20)) .minReceiveWaitTime(Duration.ofMillis(50)) @@ -134,7 +134,7 @@ void testToBuilder() { BatchOverrideConfiguration newConfig = builder.build(); assertEquals(originalConfig, newConfig); // Ensure that modifying the builder does not affect the original config - builder.maxBatchItems(20); - assertNotEquals(originalConfig.maxBatchItems(), builder.build().maxBatchItems()); + builder.outboundBatchSizeLimit(20); + assertNotEquals(originalConfig.outboundBatchSizeLimit(), builder.build().outboundBatchSizeLimit()); } } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java index acde3df91247..0b3d61930362 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java @@ -87,13 +87,13 @@ private ResponseBatchConfiguration createConfig(int maxBatchItems, boolean adapt int maxInflightReceiveBatches, int maxDoneReceiveBatches, Duration minReceiveWaitTime) { return new ResponseBatchConfiguration(BatchOverrideConfiguration.builder() - .maxBatchItems(maxBatchItems) + .outboundBatchSizeLimit(maxBatchItems) .adaptivePrefetching(adaptivePrefetching) .maxInflightReceiveBatches(maxInflightReceiveBatches) .maxDoneReceiveBatches(maxDoneReceiveBatches) .receiveMessageAttributeNames(Collections.emptyList()) - .visibilityTimeout(Duration.ofSeconds(2)) - .minReceiveWaitTime(minReceiveWaitTime) + .receiveMsgVisibilityTimeout(Duration.ofSeconds(2)) + .receiveMessageMinWaitTime(minReceiveWaitTime) .build()); } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java index b51399f7e89b..01c803a272de 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java @@ -20,25 +20,20 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.mockito.ArgumentCaptor.forClass; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.MockitoAnnotations; import org.mockito.junit.jupiter.MockitoExtension; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.internal.batchmanager.ReceiveMessageBatchManager; @@ -214,7 +209,7 @@ private static Stream provideBatchOverrideConfigurations() { Arguments.of( "Buffering enabled, with messageSystemAttributeName in Config and simple ReceiveMessageRequest", BatchOverrideConfiguration.builder() - .messageSystemAttributeName(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) + .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) .build(), ReceiveMessageRequest.builder() .queueUrl("testQueueUrl") diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java index e2dc19458456..01af1efab831 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java @@ -69,11 +69,11 @@ class ReceiveSqsMessageHelperTest { @BeforeEach void setUp() { BatchOverrideConfiguration batchOverrideConfig = BatchOverrideConfiguration.builder() - .maxBatchItems(10) + .outboundBatchSizeLimit(10) .receiveMessageAttributeNames(Arrays.asList( "attribute1", "attribute2")) - .visibilityTimeout(Duration.ofSeconds(20)) - .longPollWaitTimeout(Duration.ofSeconds(15)) + .receiveMsgVisibilityTimeout(Duration.ofSeconds(20)) + .receiveMsgLongPollWaitTimeout(Duration.ofSeconds(15)) .build(); config = new ResponseBatchConfiguration(batchOverrideConfig); @@ -265,11 +265,11 @@ public void asyncReceiveMessageArgs() throws Exception { Duration visibilityTimeout = Duration.ofSeconds(9); BatchOverrideConfiguration batchOverrideConfig = BatchOverrideConfiguration.builder() - .maxBatchItems(10) + .outboundBatchSizeLimit(10) .receiveMessageAttributeNames(Arrays.asList( "custom1", "custom2")) - .visibilityTimeout(visibilityTimeout) - .longPollWaitTimeout(Duration.ofSeconds(15)) + .receiveMsgVisibilityTimeout(visibilityTimeout) + .receiveMsgLongPollWaitTimeout(Duration.ofSeconds(15)) .build(); ReceiveSqsMessageHelper batch = new ReceiveSqsMessageHelper( diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java index 030273fcf7cd..c1da1b2d607a 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java @@ -69,7 +69,7 @@ void batchRequest_OnlyOneInBatch_successful() throws Exception { when(mockClient.sendBatchAsync(any(), eq(batchKey))).thenReturn(batchResponseFuture); SampleBatchManager batchManager = - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(1).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().outboundBatchSizeLimit(1).build(), scheduledExecutor, mockClient); CompletableFuture response = batchManager.batchRequest(request); assertEquals("testResponse0", response.get(1, TimeUnit.SECONDS)); } @@ -83,7 +83,7 @@ void batchRequest_TwoBatchesMessagesSplitInTwoCalls_successful() throws Exceptio "testResponse")); when(mockClient.sendBatchAsync(any(), eq(batchKey1))).thenReturn(batchResponseFuture); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().outboundBatchSizeLimit(2).maxOutboundBatchCollectionDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(request1); CompletableFuture response2 = batchManager.batchRequest(request2); assertEquals("testResponse0", response1.get(1, TimeUnit.SECONDS)); @@ -103,7 +103,7 @@ void batchRequest_TwoBatchesWithDifferentKey_successful() throws Exception { when(mockClient.sendBatchAsync(any(), eq(KEY_TWO))).thenReturn(batchResponseFutureTwo); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().outboundBatchSizeLimit(2).maxOutboundBatchCollectionDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(KEY_ONE + ":0"); CompletableFuture response2 = batchManager.batchRequest(KEY_TWO + ":0"); CompletableFuture response3 = batchManager.batchRequest(KEY_ONE + ":1"); @@ -157,7 +157,7 @@ void close_FlushesAllBatches() throws Exception { when(mockClient.sendBatchAsync(any(), eq(batchKey))).thenReturn(batchResponseFuture); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().outboundBatchSizeLimit(2).maxOutboundBatchCollectionDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(request1); CompletableFuture response2 = batchManager.batchRequest(request2); @@ -185,7 +185,7 @@ void batchRequest_ClosedWhenWaitingForResponse() throws Exception { when(mockClient.sendBatchAsync(any(), eq(batchKey))).thenReturn(batchResponseFuture); SampleBatchManager batchManager = - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(1).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().outboundBatchSizeLimit(1).build(), scheduledExecutor, mockClient); CompletableFuture response = batchManager.batchRequest(request); batchManager.close(); @@ -206,7 +206,7 @@ void batchRequest_MoreThanBufferSize_Fails() throws Exception { when(mockClient.sendBatchAsync(any(), eq(KEY_TWO))).thenReturn(batchResponseFutureTwo); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchKeys(1).maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchKeys(1).maxBatchItems(2).maxOutboundBatchCollectionDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(KEY_ONE + ":0"); CompletableFuture response2 = batchManager.batchRequest(KEY_TWO + ":0"); CompletableFuture response3 = batchManager.batchRequest(KEY_ONE + ":1"); diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java index d083f2502b38..8035e9d45187 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java @@ -33,9 +33,9 @@ protected SampleBatchManager(BatchOverrideConfiguration batchOverrideConfigurati ScheduledExecutorService executorService, CustomClient client) { super(RequestBatchConfiguration.builder() - .maxBatchOpenDuration(batchOverrideConfiguration.maxBatchOpenDuration()) + .maxOutboundBatchCollectionDuration(batchOverrideConfiguration.outboundBatchCollectionDuration()) .maxBatchBytesSize(SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES) - .maxBatchItems(batchOverrideConfiguration.maxBatchItems()) + .maxBatchItems(batchOverrideConfiguration.outboundBatchSizeLimit()) .maxBufferSize(batchOverrideConfiguration.maxBufferSize()) .maxBatchKeys(batchOverrideConfiguration.maxBatchKeys()) .build(), From d2e59b9d54d74e678962dde46b20b485261802fd Mon Sep 17 00:00:00 2001 From: John Viegas Date: Thu, 5 Sep 2024 10:37:35 -0700 Subject: [PATCH 3/6] Update after internal poll --- .../BatchOverrideConfiguration.java | 116 +++++++++--------- .../batchmanager/ReceiveBatchManager.java | 4 +- .../ReceiveMessageBatchManager.java | 9 +- .../batchmanager/ReceiveQueueBuffer.java | 4 +- .../RequestBatchConfiguration.java | 17 +-- .../batchmanager/RequestBatchManager.java | 16 +-- .../ResponseBatchConfiguration.java | 29 +++-- .../BatchOverrideConfigurationTest.java | 23 ++-- .../batchmanager/ReceiveBatchManagerTest.java | 2 +- .../ReceiveMessageBatchManagerTest.java | 2 +- .../ReceiveSqsMessageHelperTest.java | 33 ++--- .../batchmanager/RequestBatchManagerTest.java | 8 +- .../sqs/batchmanager/SampleBatchManager.java | 5 +- 13 files changed, 130 insertions(+), 138 deletions(-) diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java index 16d9f0da80eb..9c5a5fb4053a 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java @@ -31,17 +31,17 @@ import software.amazon.awssdk.utils.builder.ToCopyableBuilder; /** - * Configuration values for the BatchManager Implementation. All values are optional, and default values will be used if they - * are not specified. + * Configuration values for the BatchManager implementation used for controlling batch operations. + * All values are optional, and default values will be used if they are not specified. */ @SdkPublicApi public final class BatchOverrideConfiguration implements ToCopyableBuilder { private final Integer maxBatchSize; - private final Duration sendRequestFrequency; + private final Duration sendMessageFrequency; private final Duration receiveMessageVisibilityTimeout; - private final Duration receiveMessageMinWaitTime; + private final Duration receiveMessageMinWaitDuration; private final List receiveMessageSystemAttributeNames; private final List receiveMessageAttributeNames; @@ -50,24 +50,21 @@ private BatchOverrideConfiguration(Builder builder) { this.maxBatchSize = Validate.isPositiveOrNull(builder.maxBatchSize, "maxBatchSize"); Validate.isTrue(this.maxBatchSize == null || this.maxBatchSize <= 10, - "A batch can contain up to 10 messages."); + "The maxBatchSize must be less than or equal to 10. A batch can contain up to 10 messages."); - this.sendRequestFrequency = Validate.isPositiveOrNull(builder.sendRequestFrequency, - "sendRequestFrequency"); + this.sendMessageFrequency = Validate.isPositiveOrNull(builder.sendMessageFrequency, + "sendMessageFrequency"); this.receiveMessageVisibilityTimeout = Validate.isPositiveOrNull(builder.receiveMessageVisibilityTimeout, "receiveMessageVisibilityTimeout"); - - this.receiveMessageMinWaitTime = Validate.isPositiveOrNull(builder.receiveMessageMinWaitTime, - "receiveMessageMinWaitTime"); - - this.receiveMessageSystemAttributeNames = builder.receiveMessageSystemAttributeNames != null ? - Collections.unmodifiableList( - new ArrayList<>(builder.receiveMessageSystemAttributeNames)) : - Collections.emptyList(); - - this.receiveMessageAttributeNames = builder.receiveMessageAttributeNames != null ? - Collections.unmodifiableList(new ArrayList<>(builder.receiveMessageAttributeNames)) : - Collections.emptyList(); + this.receiveMessageMinWaitDuration = Validate.isPositiveOrNull(builder.receiveMessageMinWaitDuration, + "receiveMessageMinWaitDuration"); + this.receiveMessageSystemAttributeNames = + builder.receiveMessageSystemAttributeNames == null ? Collections.emptyList() : + Collections.unmodifiableList(builder.receiveMessageSystemAttributeNames); + + this.receiveMessageAttributeNames = + builder.receiveMessageAttributeNames == null ? Collections.emptyList() : + Collections.unmodifiableList(builder.receiveMessageAttributeNames); } public static Builder builder() { @@ -75,47 +72,51 @@ public static Builder builder() { } /** - * @return the maximum number of items that are batched together in a single outbound request. - * A batch can contain up to a maximum of 10 messages. - * The default value is 10. + * @return the maximum number of items that can be batched together in a single outbound SQS request + * (e.g., for {@link SendMessageBatchRequest}, {@link ChangeMessageVisibilityBatchRequest}, or + * {@link DeleteMessageBatchRequest}). A batch can contain up to a maximum of 10 messages. + * The default value is 10. */ public Integer maxBatchSize() { return maxBatchSize; } /** - * @return the maximum amount of time that an outgoing call waits to be batched with messages of the same type. - * The default value is 200 milliseconds. + * @return the maximum duration an outgoing call waits for additional messages of the same type before being sent. + * If the {@link #maxBatchSize()} is reached before this duration, the batch will be sent immediately. + * The default value is 200 milliseconds. */ - public Duration sendRequestFrequency() { - return sendRequestFrequency; + public Duration sendMessageFrequency() { + return sendMessageFrequency; } /** - * @return the custom visibility timeout to use when retrieving messages from SQS. + * @return the custom visibility timeout to use when retrieving messages from SQS. If not set, + * the default visibility timeout configured on the SQS queue will be used. */ public Duration receiveMessageVisibilityTimeout() { return receiveMessageVisibilityTimeout; } /** - * @return the minimum wait time for incoming receive message requests. + * @return the minimum wait time for incoming receive message requests. Without a non-zero minimum wait time, + * threads can waste CPU resources busy-waiting for messages. The default value is 50 milliseconds. */ - public Duration receiveMessageMinWaitTime() { - return receiveMessageMinWaitTime; + public Duration receiveMessageMinWaitDuration() { + return receiveMessageMinWaitDuration; } /** - * @return the system attribute names specific to the {@link ReceiveMessageRequest} - * that will be requested via {@link ReceiveMessageRequest#messageSystemAttributeNames()}. + * @return the system attribute names to request for {@link ReceiveMessageRequest}. Requests with differing + * system attribute names will bypass the batch manager and make a direct call to SQS. */ public List receiveMessageSystemAttributeNames() { return receiveMessageSystemAttributeNames; } /** - * @return the message attribute names that are specific to receive calls - * and will be requested via {@link ReceiveMessageRequest#messageAttributeNames()}. + * @return the message attribute names to request for {@link ReceiveMessageRequest}. Requests with different + * message attribute names will bypass the batch manager and make a direct call to SQS. */ public List receiveMessageAttributeNames() { return receiveMessageAttributeNames; @@ -126,21 +127,20 @@ public List receiveMessageAttributeNames() { public Builder toBuilder() { return new Builder() .maxBatchSize(maxBatchSize) - .sendRequestFrequency(sendRequestFrequency) + .sendMessageFrequency(sendMessageFrequency) .receiveMessageVisibilityTimeout(receiveMessageVisibilityTimeout) - .receiveMessageMinWaitTime(receiveMessageMinWaitTime) + .receiveMessageMinWaitDuration(receiveMessageMinWaitDuration) .receiveMessageSystemAttributeNames(receiveMessageSystemAttributeNames) .receiveMessageAttributeNames(receiveMessageAttributeNames); } - @Override public String toString() { return ToString.builder("BatchOverrideConfiguration") .add("maxBatchSize", maxBatchSize) - .add("sendRequestFrequency", sendRequestFrequency) + .add("sendMessageFrequency", sendMessageFrequency) .add("receiveMessageVisibilityTimeout", receiveMessageVisibilityTimeout) - .add("receiveMessageMinWaitTime", receiveMessageMinWaitTime) + .add("receiveMessageMinWaitDuration", receiveMessageMinWaitDuration) .add("receiveMessageSystemAttributeNames", receiveMessageSystemAttributeNames) .add("receiveMessageAttributeNames", receiveMessageAttributeNames) .build(); @@ -160,20 +160,22 @@ public boolean equals(Object o) { if (maxBatchSize != null ? !maxBatchSize.equals(that.maxBatchSize) : that.maxBatchSize != null) { return false; } - if (sendRequestFrequency != null ? !sendRequestFrequency.equals(that.sendRequestFrequency) : - that.sendRequestFrequency != null) { + if (sendMessageFrequency != null ? !sendMessageFrequency.equals(that.sendMessageFrequency) : + that.sendMessageFrequency != null) { return false; } - if (receiveMessageVisibilityTimeout != null ? !receiveMessageVisibilityTimeout.equals(that.receiveMessageVisibilityTimeout) : + if (receiveMessageVisibilityTimeout != null + ? !receiveMessageVisibilityTimeout.equals(that.receiveMessageVisibilityTimeout) : that.receiveMessageVisibilityTimeout != null) { return false; } - if (receiveMessageMinWaitTime != null ? !receiveMessageMinWaitTime.equals(that.receiveMessageMinWaitTime) : - that.receiveMessageMinWaitTime != null) { + if (receiveMessageMinWaitDuration != null ? !receiveMessageMinWaitDuration.equals(that.receiveMessageMinWaitDuration) : + that.receiveMessageMinWaitDuration != null) { return false; } - if (receiveMessageSystemAttributeNames != null ? !receiveMessageSystemAttributeNames.equals(that.receiveMessageSystemAttributeNames) : - that.receiveMessageSystemAttributeNames != null) { + if (receiveMessageSystemAttributeNames != null ? + !receiveMessageSystemAttributeNames.equals(that.receiveMessageSystemAttributeNames) + : that.receiveMessageSystemAttributeNames != null) { return false; } return receiveMessageAttributeNames != null ? receiveMessageAttributeNames.equals(that.receiveMessageAttributeNames) : @@ -183,9 +185,9 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = maxBatchSize != null ? maxBatchSize.hashCode() : 0; - result = 31 * result + (sendRequestFrequency != null ? sendRequestFrequency.hashCode() : 0); + result = 31 * result + (sendMessageFrequency != null ? sendMessageFrequency.hashCode() : 0); result = 31 * result + (receiveMessageVisibilityTimeout != null ? receiveMessageVisibilityTimeout.hashCode() : 0); - result = 31 * result + (receiveMessageMinWaitTime != null ? receiveMessageMinWaitTime.hashCode() : 0); + result = 31 * result + (receiveMessageMinWaitDuration != null ? receiveMessageMinWaitDuration.hashCode() : 0); result = 31 * result + (receiveMessageSystemAttributeNames != null ? receiveMessageSystemAttributeNames.hashCode() : 0); result = 31 * result + (receiveMessageAttributeNames != null ? receiveMessageAttributeNames.hashCode() : 0); return result; @@ -194,9 +196,9 @@ public int hashCode() { public static final class Builder implements CopyableBuilder { private Integer maxBatchSize = 10; - private Duration sendRequestFrequency = Duration.ofMillis(200); + private Duration sendMessageFrequency ; private Duration receiveMessageVisibilityTimeout; - private Duration receiveMessageMinWaitTime = Duration.ofMillis(50); + private Duration receiveMessageMinWaitDuration ; private List receiveMessageSystemAttributeNames = Collections.emptyList(); private List receiveMessageAttributeNames = Collections.emptyList(); @@ -224,15 +226,15 @@ public Builder maxBatchSize(Integer maxBatchSize) { * requests before being sent. Outbound requests include SendMessageBatchRequest, * ChangeMessageVisibilityBatchRequest, and DeleteMessageBatchRequest. If the maxBatchSize is reached * before this duration, the batch will be sent immediately. - * Increasing the {@code sendRequestFrequency} gives more time for additional messages to be added to + * Increasing the {@code sendMessageFrequency} gives more time for additional messages to be added to * the batch, which can reduce the number of requests and increase throughput. However, a higher * frequency may also result in increased average message latency. The default value is 200 milliseconds. * - * @param sendRequestFrequency The new value for the frequency at which outbound requests are sent. + * @param sendMessageFrequency The new value for the frequency at which outbound requests are sent. * @return This Builder object for method chaining. */ - public Builder sendRequestFrequency(Duration sendRequestFrequency) { - this.sendRequestFrequency = sendRequestFrequency; + public Builder sendMessageFrequency(Duration sendMessageFrequency) { + this.sendMessageFrequency = sendMessageFrequency; return this; } @@ -257,11 +259,11 @@ public Builder receiveMessageVisibilityTimeout(Duration receiveMessageVisibility * The call may return sooner than the configured `WaitTimeSeconds` if there are messages in the buffer. * If no messages are available and the wait time expires, the call will return an empty message list. * - * @param receiveMessageMinWaitTime The new minimum wait time value. + * @param receiveMessageMinWaitDuration The new minimum wait time value. * @return This Builder object for method chaining. */ - public Builder receiveMessageMinWaitTime(Duration receiveMessageMinWaitTime) { - this.receiveMessageMinWaitTime = receiveMessageMinWaitTime; + public Builder receiveMessageMinWaitDuration(Duration receiveMessageMinWaitDuration) { + this.receiveMessageMinWaitDuration = receiveMessageMinWaitDuration; return this; } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java index 8d48d3bdb67f..e8fa5ca518c5 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java @@ -17,13 +17,11 @@ import static software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration.MAX_SUPPORTED_SQS_RECEIVE_MSG; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Function; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; @@ -60,7 +58,7 @@ public CompletableFuture processRequest(ReceiveMessageRe } int numMessages = rq.maxNumberOfMessages() != null ? rq.maxNumberOfMessages() : MAX_SUPPORTED_SQS_RECEIVE_MSG; - return queueAttributesManager.getReceiveMessageTimeout(rq, config.minReceiveWaitTime()).thenCompose(waitTimeMs -> { + return queueAttributesManager.getReceiveMessageTimeout(rq, config.messageMinWaitDuration()).thenCompose(waitTimeMs -> { CompletableFuture receiveMessageFuture = new CompletableFuture<>(); receiveQueueBuffer.receiveMessage(receiveMessageFuture, numMessages); CompletableFuture timeoutFuture = new CompletableFuture<>(); diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java index 0c4ad8a93b3d..3354c10c2d0e 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java @@ -42,14 +42,7 @@ public ReceiveMessageBatchManager(SqsAsyncClient sqsClient, BatchOverrideConfiguration config) { this.sqsClient = sqsClient; this.executor = executor; - this.config = config != null - ? ResponseBatchConfiguration.builder() - .minReceiveWaitTime(config.receiveMessageMinWaitTime()) - .receiveMessageAttributeNames(config.receiveMessageAttributeNames()) - .messageSystemAttributeNames(config.receiveMessageSystemAttributeNames()) - .visibilityTimeout(config.receiveMessageVisibilityTimeout()) - .build() - : ResponseBatchConfiguration.builder().build(); + this.config = ResponseBatchConfiguration.builder(config).build(); } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java index d2fb367e1d84..de28f6c79e09 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java @@ -15,6 +15,8 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration.MAX_SUPPORTED_SQS_RECEIVE_MSG; + import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -124,7 +126,7 @@ private int determineDesiredBatches() { int totalRequested = futures.stream() .mapToInt(FutureRequestWrapper::getRequestedSize) .sum(); - int batchesNeededToFulfillFutures = (int) Math.ceil((float) totalRequested / 10); + int batchesNeededToFulfillFutures = (int) Math.ceil((float) totalRequested / MAX_SUPPORTED_SQS_RECEIVE_MSG); desiredBatches = Math.min(batchesNeededToFulfillFutures, desiredBatches); } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java index a001c9be5ecc..98f0e1c145b8 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java @@ -31,7 +31,7 @@ public final class RequestBatchConfiguration { private final Integer maxBatchItems; private final Integer maxBatchKeys; private final Integer maxBufferSize; - private final Duration maxOutboundBatchCollectionDuration; + private final Duration sendMessageFrequency; private final Integer maxBatchBytesSize; private RequestBatchConfiguration(Builder builder) { @@ -39,7 +39,8 @@ private RequestBatchConfiguration(Builder builder) { this.maxBatchItems = builder.maxBatchItems != null ? builder.maxBatchItems : DEFAULT_MAX_BATCH_ITEMS; this.maxBatchKeys = builder.maxBatchKeys != null ? builder.maxBatchKeys : DEFAULT_MAX_BATCH_KEYS; this.maxBufferSize = builder.maxBufferSize != null ? builder.maxBufferSize : DEFAULT_MAX_BUFFER_SIZE; - this.maxOutboundBatchCollectionDuration = builder.maxOutboundBatchCollectionDuration != null ? builder.maxOutboundBatchCollectionDuration : + this.sendMessageFrequency = builder.sendMessageFrequency != null ? + builder.sendMessageFrequency : DEFAULT_MAX_BATCH_OPEN_IN_MS; this.maxBatchBytesSize = builder.maxBatchBytesSize != null ? builder.maxBatchBytesSize : DEFAULT_MAX_BATCH_BYTES_SIZE; @@ -53,14 +54,14 @@ public static Builder builder(BatchOverrideConfiguration configuration) { if (configuration != null) { return new Builder() .maxBatchItems(configuration.maxBatchSize()) - .maxOutboundBatchCollectionDuration(configuration.sendRequestFrequency()) + .sendMessageFrequency(configuration.sendMessageFrequency()) .maxBatchBytesSize(configuration.maxBatchSize()); } return new Builder(); } - public Duration maxOutboundBatchCollectionDuration() { - return maxOutboundBatchCollectionDuration; + public Duration sendMessageFrequency() { + return sendMessageFrequency; } public int maxBatchItems() { @@ -84,7 +85,7 @@ public static final class Builder { private Integer maxBatchItems; private Integer maxBatchKeys; private Integer maxBufferSize; - private Duration maxOutboundBatchCollectionDuration; + private Duration sendMessageFrequency; private Integer maxBatchBytesSize; private Builder() { @@ -105,8 +106,8 @@ public Builder maxBufferSize(Integer maxBufferSize) { return this; } - public Builder maxOutboundBatchCollectionDuration(Duration maxOutboundBatchCollectionDuration) { - this.maxOutboundBatchCollectionDuration = maxOutboundBatchCollectionDuration; + public Builder sendMessageFrequency(Duration sendMessageFrequency) { + this.sendMessageFrequency = sendMessageFrequency; return this; } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java index 6c50d78e4aac..1c8c01b26532 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java @@ -45,10 +45,9 @@ public abstract class RequestBatchManager { protected final RequestBatchConfiguration batchConfiguration ; private final int maxBatchItems; - private final Duration maxOutboundBatchCollectionDuration; + private final Duration sendMessageFrequency; private final BatchingMap requestsAndResponsesMaps; private final ScheduledExecutorService scheduledExecutor; - private final Set> pendingBatchResponses ; private final Set> pendingResponses ; @@ -57,7 +56,7 @@ protected RequestBatchManager(RequestBatchConfiguration overrideConfiguration, ScheduledExecutorService scheduledExecutor) { batchConfiguration = overrideConfiguration; this.maxBatchItems = batchConfiguration.maxBatchItems(); - this.maxOutboundBatchCollectionDuration = batchConfiguration.maxOutboundBatchCollectionDuration(); + this.sendMessageFrequency = batchConfiguration.sendMessageFrequency(); this.scheduledExecutor = Validate.notNull(scheduledExecutor, "Null scheduledExecutor"); pendingBatchResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); pendingResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -80,7 +79,9 @@ public CompletableFuture batchRequest(RequestT request) { // Add request and response to the map, scheduling a flush if necessary requestsAndResponsesMaps.put(batchKey, - () -> scheduleBufferFlush(batchKey, maxOutboundBatchCollectionDuration.toMillis(), scheduledExecutor), + () -> scheduleBufferFlush(batchKey, + sendMessageFrequency.toMillis(), + scheduledExecutor), request, response); @@ -108,8 +109,10 @@ private void manualFlushBuffer(String batchKey, Map> flushableRequests) { requestsAndResponsesMaps.cancelScheduledFlush(batchKey); flushBuffer(batchKey, flushableRequests); - requestsAndResponsesMaps.putScheduledFlush(batchKey, scheduleBufferFlush(batchKey, maxOutboundBatchCollectionDuration.toMillis(), - scheduledExecutor)); + requestsAndResponsesMaps.putScheduledFlush(batchKey, + scheduleBufferFlush(batchKey, + sendMessageFrequency.toMillis(), + scheduledExecutor)); } private void flushBuffer(String batchKey, Map> flushableRequests) { @@ -173,5 +176,4 @@ public void close() { requestsAndResponsesMaps.clear(); } - } \ No newline at end of file diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java index 48b298ed987e..0de53beacb80 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.List; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; @SdkInternalApi @@ -46,7 +47,7 @@ public final class ResponseBatchConfiguration { public static final int ATTRIBUTE_MAPS_PAYLOAD_BYTES = 16 * 1024; // 16 KiB private final Duration visibilityTimeout; - private final Duration minReceiveWaitTime; + private final Duration messageMinWaitDuration; private final List messageSystemAttributeNames; private final List receiveMessageAttributeNames; private final Boolean adaptivePrefetching; @@ -59,8 +60,8 @@ private ResponseBatchConfiguration(Builder builder) { ? builder.visibilityTimeout : VISIBILITY_TIMEOUT_SECONDS_DEFAULT; - this.minReceiveWaitTime = builder.minReceiveWaitTime != null - ? builder.minReceiveWaitTime + this.messageMinWaitDuration = builder.messageMinWaitDuration != null + ? builder.messageMinWaitDuration : MIN_RECEIVE_WAIT_TIME_MS_DEFAULT; this.messageSystemAttributeNames = builder.messageSystemAttributeNames != null @@ -93,8 +94,8 @@ public Duration visibilityTimeout() { return visibilityTimeout; } - public Duration minReceiveWaitTime() { - return minReceiveWaitTime; + public Duration messageMinWaitDuration() { + return messageMinWaitDuration; } public List messageSystemAttributeNames() { @@ -121,14 +122,24 @@ public int maxDoneReceiveBatches() { return maxDoneReceiveBatches; } + public static Builder builder(BatchOverrideConfiguration overrideConfiguration) { + Builder builder = new Builder(); + if (overrideConfiguration != null) { + builder.messageMinWaitDuration(overrideConfiguration.receiveMessageMinWaitDuration()) + .receiveMessageAttributeNames(overrideConfiguration.receiveMessageAttributeNames()) + .messageSystemAttributeNames(overrideConfiguration.receiveMessageSystemAttributeNames()) + .visibilityTimeout(overrideConfiguration.receiveMessageVisibilityTimeout()); + } + return builder; + } + public static Builder builder() { return new Builder(); } - public static class Builder { private Duration visibilityTimeout; - private Duration minReceiveWaitTime; + private Duration messageMinWaitDuration; private List messageSystemAttributeNames; private List receiveMessageAttributeNames; private Boolean adaptivePrefetching; @@ -141,8 +152,8 @@ public Builder visibilityTimeout(Duration visibilityTimeout) { return this; } - public Builder minReceiveWaitTime(Duration minReceiveWaitTime) { - this.minReceiveWaitTime = minReceiveWaitTime; + public Builder messageMinWaitDuration(Duration messageMinWaitDuration) { + this.messageMinWaitDuration = messageMinWaitDuration; return this; } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java index 522ea58bbb33..606c729a8180 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java @@ -56,25 +56,25 @@ private static Stream provideConfigurations() { @ParameterizedTest @MethodSource("provideConfigurations") void testBatchOverrideConfiguration(Integer maxBatchSize, - Duration sendRequestFrequency, + Duration sendMessageFrequency, Duration receiveMessageVisibilityTimeout, - Duration receiveMessageMinWaitTime, + Duration receiveMessageMinWaitDuration, List receiveMessageAttributeNames, List receiveMessageSystemAttributeNames) { BatchOverrideConfiguration config = BatchOverrideConfiguration.builder() .maxBatchSize(maxBatchSize) - .sendRequestFrequency(sendRequestFrequency) + .sendMessageFrequency(sendMessageFrequency) .receiveMessageVisibilityTimeout(receiveMessageVisibilityTimeout) - .receiveMessageMinWaitTime(receiveMessageMinWaitTime) + .receiveMessageMinWaitDuration(receiveMessageMinWaitDuration) .receiveMessageAttributeNames(receiveMessageAttributeNames) .receiveMessageSystemAttributeNames(receiveMessageSystemAttributeNames) .build(); assertEquals(maxBatchSize, config.maxBatchSize()); - assertEquals(sendRequestFrequency, config.sendRequestFrequency()); + assertEquals(sendMessageFrequency, config.sendMessageFrequency()); assertEquals(receiveMessageVisibilityTimeout, config.receiveMessageVisibilityTimeout()); - assertEquals(receiveMessageMinWaitTime, config.receiveMessageMinWaitTime()); + assertEquals(receiveMessageMinWaitDuration, config.receiveMessageMinWaitDuration()); assertEquals(Optional.ofNullable(receiveMessageAttributeNames).orElse(Collections.emptyList()), config.receiveMessageAttributeNames()); assertEquals(Optional.ofNullable(receiveMessageSystemAttributeNames).orElse(Collections.emptyList()), @@ -92,9 +92,9 @@ void testEqualsAndHashCode() { void testToBuilder() { BatchOverrideConfiguration originalConfig = BatchOverrideConfiguration.builder() .maxBatchSize(10) - .sendRequestFrequency(Duration.ofMillis(200)) + .sendMessageFrequency(Duration.ofMillis(200)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(30)) - .receiveMessageMinWaitTime(Duration.ofMillis(50)) + .receiveMessageMinWaitDuration(Duration.ofMillis(50)) .receiveMessageAttributeNames(Arrays.asList("msgAttr1")) .receiveMessageSystemAttributeNames(Collections.singletonList( MessageSystemAttributeName.SENDER_ID)) @@ -107,9 +107,9 @@ void testToBuilder() { builder.maxBatchSize(9); assertNotEquals(originalConfig.maxBatchSize(), builder.build().maxBatchSize()); // Ensure that all other fields are still equal after modifying the maxBatchSize - assertEquals(originalConfig.sendRequestFrequency(), builder.build().sendRequestFrequency()); + assertEquals(originalConfig.sendMessageFrequency(), builder.build().sendMessageFrequency()); assertEquals(originalConfig.receiveMessageVisibilityTimeout(), builder.build().receiveMessageVisibilityTimeout()); - assertEquals(originalConfig.receiveMessageMinWaitTime(), builder.build().receiveMessageMinWaitTime()); + assertEquals(originalConfig.receiveMessageMinWaitDuration(), builder.build().receiveMessageMinWaitDuration()); assertEquals(originalConfig.receiveMessageAttributeNames(), builder.build().receiveMessageAttributeNames()); assertEquals(originalConfig.receiveMessageSystemAttributeNames(), builder.build().receiveMessageSystemAttributeNames()); } @@ -124,7 +124,8 @@ void testMaxBatchSizeExceedsLimitThrowsException() { }); // Assert that the exception message matches the expected output - assertEquals("A batch can contain up to 10 messages.", exception.getMessage()); + assertEquals("The maxBatchSize must be less than or equal to 10. A batch can contain up to 10 messages.", + exception.getMessage()); } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java index eeb041e36858..ea94b744f166 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java @@ -87,7 +87,7 @@ private ResponseBatchConfiguration createConfig(Duration minReceiveWaitTime) { return ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.emptyList()) .visibilityTimeout(Duration.ofSeconds(2)) - .minReceiveWaitTime(minReceiveWaitTime) + .messageMinWaitDuration(minReceiveWaitTime) .build(); } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java index 2cede50235f1..7416548731b2 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java @@ -77,7 +77,7 @@ void testBatchRequest_WhenBufferingDisabledAndInCompatible_ShouldNotUseBatchMana .messageSystemAttributeNames(overrideConfig.receiveMessageSystemAttributeNames()) .receiveMessageAttributeNames(overrideConfig.receiveMessageAttributeNames()) .visibilityTimeout(overrideConfig.receiveMessageVisibilityTimeout()) - .minReceiveWaitTime(overrideConfig.receiveMessageMinWaitTime()).build(); + .messageMinWaitDuration(overrideConfig.receiveMessageMinWaitDuration()).build(); receiveMessageBatchManager = new ReceiveMessageBatchManager(sqsClient, executor, overrideConfig); diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java index 8b7acf773055..7332c09c4cc2 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java @@ -15,12 +15,10 @@ package software.amazon.awssdk.services.sqs.batchmanager; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -59,7 +57,7 @@ @ExtendWith(MockitoExtension.class) class ReceiveSqsMessageHelperTest { - private final String queueUrl = "test-queue-url"; + private static final String QUEUE_URL = "test-queue-url"; private final Duration visibilityTimeout = Duration.ofSeconds(30); @Mock private ScheduledExecutorService scheduledExecutorService; @@ -76,8 +74,7 @@ void setUp() { "attribute1", "attribute2")) .visibilityTimeout(Duration.ofSeconds(20)) .build(); - - receiveSqsMessageHelper = new ReceiveSqsMessageHelper(queueUrl, sqsClient, visibilityTimeout, config); + receiveSqsMessageHelper = new ReceiveSqsMessageHelper(QUEUE_URL, sqsClient, visibilityTimeout, config); } @AfterEach @@ -102,7 +99,6 @@ void asyncReceiveMessageSuccess() throws Exception { assertEquals(1, result.get().messagesSize()); } - @Test void multipleMessageGetsAdded() throws Exception { ReceiveMessageResponse response = generateMessageResponse(10); @@ -116,10 +112,8 @@ void multipleMessageGetsAdded() throws Exception { assertTrue(result.isDone()); assertNull(result.get().getException()); assertFalse(result.get().isEmpty()); - } - @Test void asyncReceiveMessageFailure() throws Exception { // Mocking receiveMessage to throw an exception @@ -153,7 +147,7 @@ void emptyResponseReceivedFromSQS() throws Exception { } @Test - public void concurrencyTestForRemoveMessage() throws Exception { + void concurrencyTestForRemoveMessage() throws Exception { // Mocking receiveMessage to return 10 messages ReceiveMessageResponse response = generateMessageResponse(10); when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(CompletableFuture.completedFuture(response)); @@ -177,15 +171,12 @@ public void concurrencyTestForRemoveMessage() throws Exception { } })); } - // Start all threads threads.forEach(Thread::start); - // Wait for all threads to finish for (Thread thread : threads) { thread.join(); } - // Verify final state assertEquals(10, successfulRemovals.get()); assertEquals(0, receiveSqsMessageHelper.messagesSize()); @@ -236,12 +227,10 @@ void immediateMessageProcessingWithoutExpiry() throws Exception { ReceiveSqsMessageHelper receiveSqsMessageHelper1 = completableFuture.get(2, TimeUnit.SECONDS); Message message = receiveSqsMessageHelper1.removeMessage(); assertEquals(message, Message.builder().body("Message 1").build()); - - } @Test - public void expiredBatchesClearsItself() throws Exception { + void expiredBatchesClearsItself() throws Exception { // Test setup: creating a new instance of ReceiveSqsMessageHelper ReceiveSqsMessageHelper batch = new ReceiveSqsMessageHelper("queueUrl", sqsClient , Duration.ofNanos(1), config); @@ -261,21 +250,20 @@ public void expiredBatchesClearsItself() throws Exception { } @Test - public void asyncReceiveMessageArgs() throws Exception { + void asyncReceiveMessageArgs() throws Exception { - Duration visibilityTimeout = Duration.ofSeconds(9); ResponseBatchConfiguration batchOverrideConfig = ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Arrays.asList( "custom1", "custom2")) .messageSystemAttributeNames(Arrays.asList( MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT)) - .visibilityTimeout(visibilityTimeout) - .minReceiveWaitTime(Duration.ofMillis(200)) + .visibilityTimeout(Duration.ofSeconds(9)) + .messageMinWaitDuration(Duration.ofMillis(200)) .build(); ReceiveSqsMessageHelper batch = new ReceiveSqsMessageHelper( - queueUrl, sqsClient, visibilityTimeout, batchOverrideConfig); + QUEUE_URL, sqsClient, Duration.ofSeconds(9), batchOverrideConfig); // Mocking receiveMessage to return a single message @@ -289,7 +277,7 @@ public void asyncReceiveMessageArgs() throws Exception { // Verify that receiveMessage was called with the correct arguments ReceiveMessageRequest expectedRequest = ReceiveMessageRequest.builder() - .queueUrl(queueUrl) + .queueUrl(QUEUE_URL) .maxNumberOfMessages(10) .messageAttributeNames("custom1", "custom2") .messageSystemAttributeNames(Arrays.asList( @@ -301,7 +289,4 @@ public void asyncReceiveMessageArgs() throws Exception { verify(sqsClient, times(1)).receiveMessage(eq(expectedRequest)); } - - - } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java index c82984bbecd1..183a95a8639f 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java @@ -88,7 +88,7 @@ void batchRequest_TwoBatchesMessagesSplitInTwoCalls_successful() throws Exceptio when(mockClient.sendBatchAsync(any(), eq(batchKey1))).thenReturn(batchResponseFuture); SampleBatchManager batchManager= new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2) - .sendRequestFrequency(Duration.ofHours(1)).build(), + .sendMessageFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(request1); CompletableFuture response2 = batchManager.batchRequest(request2); @@ -109,7 +109,7 @@ void batchRequest_TwoBatchesWithDifferentKey_successful() throws Exception { when(mockClient.sendBatchAsync(any(), eq(KEY_TWO))).thenReturn(batchResponseFutureTwo); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2).sendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2).sendMessageFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(KEY_ONE + ":0"); CompletableFuture response2 = batchManager.batchRequest(KEY_TWO + ":0"); CompletableFuture response3 = batchManager.batchRequest(KEY_ONE + ":1"); @@ -163,7 +163,7 @@ void close_FlushesAllBatches() throws Exception { when(mockClient.sendBatchAsync(any(), eq(batchKey))).thenReturn(batchResponseFuture); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2).sendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2).sendMessageFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(request1); CompletableFuture response2 = batchManager.batchRequest(request2); @@ -218,7 +218,7 @@ void batchRequest_MoreThanBufferSize_Fails() throws Exception { SampleBatchManager batchManager = new SampleBatchManager( BatchOverrideConfiguration.builder() .maxBatchSize(2) - .sendRequestFrequency(Duration.ofHours(1)) + .sendMessageFrequency(Duration.ofHours(1)) .build(), scheduledExecutor, mockClient diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java index 35c059f81dde..301e176aaa14 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java @@ -33,10 +33,7 @@ public class SampleBatchManager extends RequestBatchManager Date: Thu, 5 Sep 2024 12:41:02 -0700 Subject: [PATCH 4/6] ResponseCOnfiguration construction updated --- .../DefaultSqsAsyncBatchManager.java | 4 ++- .../ReceiveMessageBatchManager.java | 4 +-- .../ReceiveMessageBatchManagerTest.java | 34 +++++++++---------- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java index 5d1c673a1aeb..580c23c67770 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java @@ -75,7 +75,9 @@ private DefaultSqsAsyncBatchManager(DefaultBuilder builder) { ); this.receiveMessageBatchManager = - new ReceiveMessageBatchManager(client, scheduledExecutor, builder.overrideConfiguration); + new ReceiveMessageBatchManager(client, + scheduledExecutor, + ResponseBatchConfiguration.builder(builder.overrideConfiguration).build()); } @Override diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java index 3354c10c2d0e..28f1e47a3cda 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java @@ -39,10 +39,10 @@ public class ReceiveMessageBatchManager implements SdkAutoCloseable { public ReceiveMessageBatchManager(SqsAsyncClient sqsClient, ScheduledExecutorService executor, - BatchOverrideConfiguration config) { + ResponseBatchConfiguration config) { this.sqsClient = sqsClient; this.executor = executor; - this.config = ResponseBatchConfiguration.builder(config).build(); + this.config = config; } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java index 7416548731b2..3d6dcb116735 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java @@ -47,7 +47,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -60,7 +59,6 @@ class ReceiveMessageBatchManagerTest { private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4); - private ReceiveMessageBatchManager receiveMessageBatchManager; @@ -68,16 +66,16 @@ class ReceiveMessageBatchManagerTest { @MethodSource("provideBatchOverrideConfigurations") @DisplayName("Test BatchRequest with various configurations") void testBatchRequest_WhenBufferingDisabledAndInCompatible_ShouldNotUseBatchManager(String testCaseName, - BatchOverrideConfiguration overrideConfig, + ResponseBatchConfiguration overrideConfig, ReceiveMessageRequest request, boolean useBatchManager) throws Exception { // Initialize the ResponseBatchConfiguration and ReceiveMessageBatchManager ResponseBatchConfiguration config = ResponseBatchConfiguration.builder() - .messageSystemAttributeNames(overrideConfig.receiveMessageSystemAttributeNames()) + .messageSystemAttributeNames(overrideConfig.messageSystemAttributeNames()) .receiveMessageAttributeNames(overrideConfig.receiveMessageAttributeNames()) - .visibilityTimeout(overrideConfig.receiveMessageVisibilityTimeout()) - .messageMinWaitDuration(overrideConfig.receiveMessageMinWaitDuration()).build(); + .visibilityTimeout(overrideConfig.visibilityTimeout()) + .messageMinWaitDuration(overrideConfig.messageMinWaitDuration()).build(); receiveMessageBatchManager = new ReceiveMessageBatchManager(sqsClient, executor, overrideConfig); @@ -122,9 +120,9 @@ private static Stream provideBatchOverrideConfigurations() { return Stream.of( Arguments.of( "Buffering enabled, compatible system and message attributes, and no visibility timeout", - BatchOverrideConfiguration.builder() + ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.singletonList("attr1")) - .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) .build(), ReceiveMessageRequest.builder() .queueUrl("testQueueUrl") @@ -135,9 +133,9 @@ private static Stream provideBatchOverrideConfigurations() { ), Arguments.of( "Buffering , compatible attributes, and no visibility timeout", - BatchOverrideConfiguration.builder() + ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.singletonList("attr1")) - .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) .build(), ReceiveMessageRequest.builder() .queueUrl("testQueueUrl") @@ -150,9 +148,9 @@ private static Stream provideBatchOverrideConfigurations() { ), Arguments.of( "Buffering disabled, incompatible system attributes, and no visibility timeout", - BatchOverrideConfiguration.builder() + ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.singletonList("attr1")) - .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENT_TIMESTAMP)) + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENT_TIMESTAMP)) .build(), ReceiveMessageRequest.builder() .queueUrl("testQueueUrl") @@ -164,9 +162,9 @@ private static Stream provideBatchOverrideConfigurations() { ), Arguments.of( "Buffering disabled, compatible attributes, but visibility timeout is set", - BatchOverrideConfiguration.builder() + ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.singletonList("attr1")) - .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) .build(), ReceiveMessageRequest.builder() .queueUrl("testQueueUrl") @@ -178,9 +176,9 @@ private static Stream provideBatchOverrideConfigurations() { ), Arguments.of( "Buffering disabled, compatible attributes, no visibility timeout, but request has attribute names", - BatchOverrideConfiguration.builder() + ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.singletonList("attr1")) - .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) .build(), ReceiveMessageRequest.builder() .queueUrl("testQueueUrl") @@ -192,8 +190,8 @@ private static Stream provideBatchOverrideConfigurations() { ), Arguments.of( "Buffering enabled, with messageSystemAttributeName in Config and simple ReceiveMessageRequest", - BatchOverrideConfiguration.builder() - .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) + ResponseBatchConfiguration.builder() + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) .build(), ReceiveMessageRequest.builder() .queueUrl("testQueueUrl") From b19b34a957f60d8963ccb2336c587236bcff203b Mon Sep 17 00:00:00 2001 From: John Viegas Date: Thu, 5 Sep 2024 13:27:39 -0700 Subject: [PATCH 5/6] RequestOverride configuration check added to Bypass batch manager --- .../ReceiveMessageBatchManager.java | 27 ++- .../ReceiveMessageBatchManagerTest.java | 200 ++++++++++-------- 2 files changed, 137 insertions(+), 90 deletions(-) diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java index 28f1e47a3cda..0d497081d37e 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java @@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; import software.amazon.awssdk.utils.Logger; @@ -48,12 +47,12 @@ public ReceiveMessageBatchManager(SqsAsyncClient sqsClient, } public CompletableFuture batchRequest(ReceiveMessageRequest request) { - if (canBeRetrievedFromQueueBuffer(request)) { + String ineligibleReason = checkBatchingEligibility(request); + if (ineligibleReason == null) { return receiveBatchManagerMap.computeIfAbsent(generateBatchKey(request), key -> createReceiveBatchManager(request)) .processRequest(request); } else { - log.debug(() -> "canBeRetrievedFromQueueBuffer failed, so skipping batching for request for Queue with URL: " - + request.queueUrl()); + log.debug(() -> String.format("Batching skipped. Reason: %s", ineligibleReason)); return sqsClient.receiveMessage(request); } } @@ -79,11 +78,25 @@ public void close() { receiveBatchManagerMap.values().forEach(ReceiveBatchManager::close); } - private boolean canBeRetrievedFromQueueBuffer(ReceiveMessageRequest rq) { - return hasCompatibleAttributes(rq) && isBufferingEnabled() && rq.visibilityTimeout() == null; + private String checkBatchingEligibility(ReceiveMessageRequest rq) { + if (!hasCompatibleAttributes(rq)) { + return "Incompatible attributes."; + } + if (rq.visibilityTimeout() != null) { + return "Visibility timeout is set."; + } + if (!isBufferingEnabled()) { + return "Buffering is disabled."; + } + if (rq.overrideConfiguration().isPresent()) { + return "Request has override configurations."; + } + if (rq.waitTimeSeconds() != null && rq.waitTimeSeconds() != 0) { + return "Request has long polling enabled."; + } + return null; } - private boolean hasCompatibleAttributes(ReceiveMessageRequest rq) { return !rq.hasAttributeNames() && hasCompatibleSystemAttributes(rq) diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java index 3d6dcb116735..5b9a02ecd140 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java @@ -16,16 +16,26 @@ package software.amazon.awssdk.services.sqs.batchmanager; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.mockito.ArgumentCaptor.forClass; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; + +import org.apache.logging.log4j.Level; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -43,83 +53,92 @@ import software.amazon.awssdk.services.sqs.model.QueueAttributeName; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; - +import software.amazon.awssdk.testutils.LogCaptor; @ExtendWith(MockitoExtension.class) class ReceiveMessageBatchManagerTest { + private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(4); + @Mock private SqsAsyncClient sqsClient; - private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4); - private ReceiveMessageBatchManager receiveMessageBatchManager; - @ParameterizedTest(name = "{index} => {0}") @MethodSource("provideBatchOverrideConfigurations") @DisplayName("Test BatchRequest with various configurations") - void testBatchRequest_WhenBufferingDisabledAndInCompatible_ShouldNotUseBatchManager(String testCaseName, - ResponseBatchConfiguration overrideConfig, - ReceiveMessageRequest request, - boolean useBatchManager) throws Exception { - - // Initialize the ResponseBatchConfiguration and ReceiveMessageBatchManager - ResponseBatchConfiguration config = ResponseBatchConfiguration.builder() - .messageSystemAttributeNames(overrideConfig.messageSystemAttributeNames()) - .receiveMessageAttributeNames(overrideConfig.receiveMessageAttributeNames()) - .visibilityTimeout(overrideConfig.visibilityTimeout()) - .messageMinWaitDuration(overrideConfig.messageMinWaitDuration()).build(); - - receiveMessageBatchManager = new ReceiveMessageBatchManager(sqsClient, executor, overrideConfig); - - CompletableFuture mockResponse = - CompletableFuture.completedFuture(ReceiveMessageResponse.builder().build()); - String visibilityTimeout = "1"; - when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(mockResponse); - if(useBatchManager) { - mockGetQueueAttributesResponse("0", visibilityTimeout); - } + void testBatchRequest(String testCaseName, + ResponseBatchConfiguration overrideConfig, + ReceiveMessageRequest request, + boolean useBatchManager, + String inEligibleReason) throws Exception { + setupBatchManager(overrideConfig); - CompletableFuture result = receiveMessageBatchManager.batchRequest(request); - result.get(2, TimeUnit.SECONDS); + CompletableFuture mockResponse = CompletableFuture.completedFuture( + ReceiveMessageResponse.builder().build()); + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(mockResponse); - // Enough time to make sure any spawned task after receiving response is completed - Thread.sleep(500); + try (LogCaptor logCaptor = LogCaptor.create(Level.DEBUG)) { - // Capture the argument passed to receiveMessage - ArgumentCaptor requestCaptor = forClass(ReceiveMessageRequest.class); + if (useBatchManager) { + mockQueueAttributes("0", "1"); + } - if (useBatchManager) { - verify(sqsClient, atLeast(1)).receiveMessage(requestCaptor.capture()); + CompletableFuture result = receiveMessageBatchManager.batchRequest(request); + result.get(2, TimeUnit.SECONDS); + Thread.sleep(500); - // Assertions to verify the behavior when batch manager is used - assertEquals(config.maxBatchItems(), requestCaptor.getValue().maxNumberOfMessages()); - assertEquals(Integer.parseInt(visibilityTimeout), requestCaptor.getValue().visibilityTimeout()); - } else { - verify(sqsClient, times(1)).receiveMessage(requestCaptor.capture()); + ArgumentCaptor requestCaptor = forClass(ReceiveMessageRequest.class); - // Assertions to verify the behavior when batch manager is not used - assertEquals(request.maxNumberOfMessages(), requestCaptor.getValue().maxNumberOfMessages()); - assertEquals(request.visibilityTimeout(), requestCaptor.getValue().visibilityTimeout()); - assertNotEquals(config.maxBatchItems(), - requestCaptor.getValue().maxNumberOfMessages()); + if (useBatchManager) { + verifyBatchManagerUsed(requestCaptor); + } else { + verifyBatchManagerNotUsed(request, requestCaptor, logCaptor, inEligibleReason); + } } } + private void setupBatchManager(ResponseBatchConfiguration overrideConfig) { + receiveMessageBatchManager = new ReceiveMessageBatchManager(sqsClient, EXECUTOR, overrideConfig); + } + private void verifyBatchManagerUsed(ArgumentCaptor requestCaptor) { + verify(sqsClient, atLeast(1)).receiveMessage(requestCaptor.capture()); + assertEquals(ResponseBatchConfiguration.MAX_DONE_RECEIVE_BATCHES_DEFAULT, + requestCaptor.getValue().maxNumberOfMessages()); + } + + private void verifyBatchManagerNotUsed(ReceiveMessageRequest request, + ArgumentCaptor requestCaptor, + LogCaptor logCaptor, + String inEligibleReason) { + verify(sqsClient, times(1)).receiveMessage(requestCaptor.capture()); + assertEquals(request.maxNumberOfMessages(), requestCaptor.getValue().maxNumberOfMessages()); + assertEquals(request.visibilityTimeout(), requestCaptor.getValue().visibilityTimeout()); + assertThat(logCaptor.loggedEvents()) + .anySatisfy(logEvent -> assertThat(logEvent.getMessage().getFormattedMessage()) + .contains(inEligibleReason)); + } + + private void mockQueueAttributes(String receiveMessageWaitTimeSeconds, String visibilityTimeout) { + Map attributes = new HashMap<>(); + attributes.put(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, receiveMessageWaitTimeSeconds); + attributes.put(QueueAttributeName.VISIBILITY_TIMEOUT, visibilityTimeout); + + GetQueueAttributesResponse response = GetQueueAttributesResponse.builder() + .attributes(attributes) + .build(); + + when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + } private static Stream provideBatchOverrideConfigurations() { return Stream.of( Arguments.of( - "Buffering enabled, compatible system and message attributes, and no visibility timeout", + "Buffering enabled, compatible system and message attributes, no visibility timeout", ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.singletonList("attr1")) .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) @@ -129,10 +148,11 @@ private static Stream provideBatchOverrideConfigurations() { .messageAttributeNames(Collections.singletonList("attr1")) .messageSystemAttributeNames(MessageSystemAttributeName.SENDER_ID) .build(), - true + true, + "" ), Arguments.of( - "Buffering , compatible attributes, and no visibility timeout", + "Buffering enabled, compatible attributes, no visibility timeout but deprecated attributeNames", ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.singletonList("attr1")) .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) @@ -141,13 +161,13 @@ private static Stream provideBatchOverrideConfigurations() { .queueUrl("testQueueUrl") .messageAttributeNames(Collections.singletonList("attr1")) .messageSystemAttributeNamesWithStrings(Collections.singletonList("SenderId")) - // attributeNames which is Deprecated api not supported for Batching - .attributeNames(QueueAttributeName.ALL) + .attributeNames(QueueAttributeName.ALL) // Deprecated api not supported for Batching .build(), - false + false, + "Incompatible attributes." ), Arguments.of( - "Buffering disabled, incompatible system attributes, and no visibility timeout", + "Buffering disabled, incompatible system attributes, no visibility timeout", ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.singletonList("attr1")) .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENT_TIMESTAMP)) @@ -155,13 +175,13 @@ private static Stream provideBatchOverrideConfigurations() { ReceiveMessageRequest.builder() .queueUrl("testQueueUrl") .messageAttributeNames(Collections.singletonList("attr1")) - .messageSystemAttributeNamesWithStrings(Collections.singletonList("SenderId")) // - // Incompatible system attribute + .messageSystemAttributeNamesWithStrings(Collections.singletonList("SenderId")) .build(), - false + false, + "Incompatible attributes." ), Arguments.of( - "Buffering disabled, compatible attributes, but visibility timeout is set", + "Buffering disabled, compatible attributes, visibility timeout is set", ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.singletonList("attr1")) .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) @@ -170,12 +190,13 @@ private static Stream provideBatchOverrideConfigurations() { .queueUrl("testQueueUrl") .messageAttributeNames(Collections.singletonList("attr1")) .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) - .visibilityTimeout(30) // Visibility timeout is set + .visibilityTimeout(30) .build(), - false + false, + "Visibility timeout is set." ), Arguments.of( - "Buffering disabled, compatible attributes, no visibility timeout, but request has attribute names", + "Buffering disabled, compatible attributes, no visibility timeout but has attribute names", ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.singletonList("attr1")) .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) @@ -184,12 +205,13 @@ private static Stream provideBatchOverrideConfigurations() { .queueUrl("testQueueUrl") .messageAttributeNames(Collections.singletonList("attr1")) .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) - .attributeNamesWithStrings("All") // Request has attribute names + .attributeNamesWithStrings("All") .build(), - false + false, + "Incompatible attributes." ), Arguments.of( - "Buffering enabled, with messageSystemAttributeName in Config and simple ReceiveMessageRequest", + "Buffering enabled, simple ReceiveMessageRequest, no visibility timeout", ResponseBatchConfiguration.builder() .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) .build(), @@ -197,23 +219,35 @@ private static Stream provideBatchOverrideConfigurations() { .queueUrl("testQueueUrl") .maxNumberOfMessages(3) .build(), - true + true, + "" + ), + Arguments.of( + "Buffering disabled, request has override config", + ResponseBatchConfiguration.builder() + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) + .build(), + ReceiveMessageRequest.builder() + .queueUrl("testQueueUrl") + .maxNumberOfMessages(3) + .overrideConfiguration(o -> o.apiCallTimeout(Duration.ofSeconds(2))) + .build(), + false, + "Request has override configurations." + ), + Arguments.of( + "Buffering disabled, with waitTimeSeconds in ReceiveMessageRequest", + ResponseBatchConfiguration.builder() + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) + .build(), + ReceiveMessageRequest.builder() + .queueUrl("testQueueUrl") + .maxNumberOfMessages(3) + .waitTimeSeconds(3) + .build(), + false, + "Request has long polling enabled." ) ); } - - private void mockGetQueueAttributesResponse(String receiveMessageWaitTimeSeconds, String visibilityTimeout) { - Map attributes = new HashMap<>(); - attributes.put(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, receiveMessageWaitTimeSeconds); - attributes.put(QueueAttributeName.VISIBILITY_TIMEOUT, visibilityTimeout); - - GetQueueAttributesResponse response = GetQueueAttributesResponse.builder() - .attributes(attributes) - .build(); - - when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))) - .thenReturn(CompletableFuture.completedFuture(response)); - } - - } From 8a62d2d9cc0ce155708d4f9c9c65f1831b66ffa7 Mon Sep 17 00:00:00 2001 From: John Viegas Date: Thu, 5 Sep 2024 15:04:08 -0700 Subject: [PATCH 6/6] Handled review comments --- .../BatchOverrideConfiguration.java | 30 +++--- .../batchmanager/ReceiveQueueBuffer.java | 13 +-- .../RequestBatchConfiguration.java | 18 ++-- .../batchmanager/RequestBatchManager.java | 8 +- .../ResponseBatchConfiguration.java | 16 --- .../BatchOverrideConfigurationTest.java | 10 +- .../batchmanager/ReceiveQueueBufferTest.java | 100 +----------------- .../batchmanager/RequestBatchManagerTest.java | 8 +- 8 files changed, 45 insertions(+), 158 deletions(-) diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java index 9c5a5fb4053a..a701f099962a 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java @@ -39,7 +39,7 @@ public final class BatchOverrideConfiguration implements ToCopyableBuilder { private final Integer maxBatchSize; - private final Duration sendMessageFrequency; + private final Duration sendRequestFrequency; private final Duration receiveMessageVisibilityTimeout; private final Duration receiveMessageMinWaitDuration; private final List receiveMessageSystemAttributeNames; @@ -52,8 +52,8 @@ private BatchOverrideConfiguration(Builder builder) { Validate.isTrue(this.maxBatchSize == null || this.maxBatchSize <= 10, "The maxBatchSize must be less than or equal to 10. A batch can contain up to 10 messages."); - this.sendMessageFrequency = Validate.isPositiveOrNull(builder.sendMessageFrequency, - "sendMessageFrequency"); + this.sendRequestFrequency = Validate.isPositiveOrNull(builder.sendRequestFrequency, + "sendRequestFrequency"); this.receiveMessageVisibilityTimeout = Validate.isPositiveOrNull(builder.receiveMessageVisibilityTimeout, "receiveMessageVisibilityTimeout"); this.receiveMessageMinWaitDuration = Validate.isPositiveOrNull(builder.receiveMessageMinWaitDuration, @@ -86,8 +86,8 @@ public Integer maxBatchSize() { * If the {@link #maxBatchSize()} is reached before this duration, the batch will be sent immediately. * The default value is 200 milliseconds. */ - public Duration sendMessageFrequency() { - return sendMessageFrequency; + public Duration sendRequestFrequency() { + return sendRequestFrequency; } /** @@ -127,7 +127,7 @@ public List receiveMessageAttributeNames() { public Builder toBuilder() { return new Builder() .maxBatchSize(maxBatchSize) - .sendMessageFrequency(sendMessageFrequency) + .sendRequestFrequency(sendRequestFrequency) .receiveMessageVisibilityTimeout(receiveMessageVisibilityTimeout) .receiveMessageMinWaitDuration(receiveMessageMinWaitDuration) .receiveMessageSystemAttributeNames(receiveMessageSystemAttributeNames) @@ -138,7 +138,7 @@ public Builder toBuilder() { public String toString() { return ToString.builder("BatchOverrideConfiguration") .add("maxBatchSize", maxBatchSize) - .add("sendMessageFrequency", sendMessageFrequency) + .add("sendRequestFrequency", sendRequestFrequency) .add("receiveMessageVisibilityTimeout", receiveMessageVisibilityTimeout) .add("receiveMessageMinWaitDuration", receiveMessageMinWaitDuration) .add("receiveMessageSystemAttributeNames", receiveMessageSystemAttributeNames) @@ -160,8 +160,8 @@ public boolean equals(Object o) { if (maxBatchSize != null ? !maxBatchSize.equals(that.maxBatchSize) : that.maxBatchSize != null) { return false; } - if (sendMessageFrequency != null ? !sendMessageFrequency.equals(that.sendMessageFrequency) : - that.sendMessageFrequency != null) { + if (sendRequestFrequency != null ? !sendRequestFrequency.equals(that.sendRequestFrequency) : + that.sendRequestFrequency != null) { return false; } if (receiveMessageVisibilityTimeout != null @@ -185,7 +185,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = maxBatchSize != null ? maxBatchSize.hashCode() : 0; - result = 31 * result + (sendMessageFrequency != null ? sendMessageFrequency.hashCode() : 0); + result = 31 * result + (sendRequestFrequency != null ? sendRequestFrequency.hashCode() : 0); result = 31 * result + (receiveMessageVisibilityTimeout != null ? receiveMessageVisibilityTimeout.hashCode() : 0); result = 31 * result + (receiveMessageMinWaitDuration != null ? receiveMessageMinWaitDuration.hashCode() : 0); result = 31 * result + (receiveMessageSystemAttributeNames != null ? receiveMessageSystemAttributeNames.hashCode() : 0); @@ -196,7 +196,7 @@ public int hashCode() { public static final class Builder implements CopyableBuilder { private Integer maxBatchSize = 10; - private Duration sendMessageFrequency ; + private Duration sendRequestFrequency ; private Duration receiveMessageVisibilityTimeout; private Duration receiveMessageMinWaitDuration ; private List receiveMessageSystemAttributeNames = Collections.emptyList(); @@ -226,15 +226,15 @@ public Builder maxBatchSize(Integer maxBatchSize) { * requests before being sent. Outbound requests include SendMessageBatchRequest, * ChangeMessageVisibilityBatchRequest, and DeleteMessageBatchRequest. If the maxBatchSize is reached * before this duration, the batch will be sent immediately. - * Increasing the {@code sendMessageFrequency} gives more time for additional messages to be added to + * Increasing the {@code sendRequestFrequency} gives more time for additional messages to be added to * the batch, which can reduce the number of requests and increase throughput. However, a higher * frequency may also result in increased average message latency. The default value is 200 milliseconds. * - * @param sendMessageFrequency The new value for the frequency at which outbound requests are sent. + * @param sendRequestFrequency The new value for the frequency at which outbound requests are sent. * @return This Builder object for method chaining. */ - public Builder sendMessageFrequency(Duration sendMessageFrequency) { - this.sendMessageFrequency = sendMessageFrequency; + public Builder sendRequestFrequency(Duration sendRequestFrequency) { + this.sendRequestFrequency = sendRequestFrequency; return this; } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java index de28f6c79e09..895fa3d8a57a 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java @@ -121,14 +121,11 @@ private void spawnMoreReceiveTasks() { private int determineDesiredBatches() { int desiredBatches = Math.max(config.maxDoneReceiveBatches(), 1); - - if (config.adaptivePrefetching()) { - int totalRequested = futures.stream() - .mapToInt(FutureRequestWrapper::getRequestedSize) - .sum(); - int batchesNeededToFulfillFutures = (int) Math.ceil((float) totalRequested / MAX_SUPPORTED_SQS_RECEIVE_MSG); - desiredBatches = Math.min(batchesNeededToFulfillFutures, desiredBatches); - } + int totalRequested = futures.stream() + .mapToInt(FutureRequestWrapper::getRequestedSize) + .sum(); + int batchesNeededToFulfillFutures = (int) Math.ceil((float) totalRequested / MAX_SUPPORTED_SQS_RECEIVE_MSG); + desiredBatches = Math.min(batchesNeededToFulfillFutures, desiredBatches); return desiredBatches; } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java index 98f0e1c145b8..08cd1c0818ce 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java @@ -31,7 +31,7 @@ public final class RequestBatchConfiguration { private final Integer maxBatchItems; private final Integer maxBatchKeys; private final Integer maxBufferSize; - private final Duration sendMessageFrequency; + private final Duration sendRequestFrequency; private final Integer maxBatchBytesSize; private RequestBatchConfiguration(Builder builder) { @@ -39,8 +39,8 @@ private RequestBatchConfiguration(Builder builder) { this.maxBatchItems = builder.maxBatchItems != null ? builder.maxBatchItems : DEFAULT_MAX_BATCH_ITEMS; this.maxBatchKeys = builder.maxBatchKeys != null ? builder.maxBatchKeys : DEFAULT_MAX_BATCH_KEYS; this.maxBufferSize = builder.maxBufferSize != null ? builder.maxBufferSize : DEFAULT_MAX_BUFFER_SIZE; - this.sendMessageFrequency = builder.sendMessageFrequency != null ? - builder.sendMessageFrequency : + this.sendRequestFrequency = builder.sendRequestFrequency != null ? + builder.sendRequestFrequency : DEFAULT_MAX_BATCH_OPEN_IN_MS; this.maxBatchBytesSize = builder.maxBatchBytesSize != null ? builder.maxBatchBytesSize : DEFAULT_MAX_BATCH_BYTES_SIZE; @@ -54,14 +54,14 @@ public static Builder builder(BatchOverrideConfiguration configuration) { if (configuration != null) { return new Builder() .maxBatchItems(configuration.maxBatchSize()) - .sendMessageFrequency(configuration.sendMessageFrequency()) + .sendRequestFrequency(configuration.sendRequestFrequency()) .maxBatchBytesSize(configuration.maxBatchSize()); } return new Builder(); } - public Duration sendMessageFrequency() { - return sendMessageFrequency; + public Duration sendRequestFrequency() { + return sendRequestFrequency; } public int maxBatchItems() { @@ -85,7 +85,7 @@ public static final class Builder { private Integer maxBatchItems; private Integer maxBatchKeys; private Integer maxBufferSize; - private Duration sendMessageFrequency; + private Duration sendRequestFrequency; private Integer maxBatchBytesSize; private Builder() { @@ -106,8 +106,8 @@ public Builder maxBufferSize(Integer maxBufferSize) { return this; } - public Builder sendMessageFrequency(Duration sendMessageFrequency) { - this.sendMessageFrequency = sendMessageFrequency; + public Builder sendRequestFrequency(Duration sendRequestFrequency) { + this.sendRequestFrequency = sendRequestFrequency; return this; } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java index 1c8c01b26532..fdee592a7096 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java @@ -45,7 +45,7 @@ public abstract class RequestBatchManager { protected final RequestBatchConfiguration batchConfiguration ; private final int maxBatchItems; - private final Duration sendMessageFrequency; + private final Duration sendRequestFrequency; private final BatchingMap requestsAndResponsesMaps; private final ScheduledExecutorService scheduledExecutor; private final Set> pendingBatchResponses ; @@ -56,7 +56,7 @@ protected RequestBatchManager(RequestBatchConfiguration overrideConfiguration, ScheduledExecutorService scheduledExecutor) { batchConfiguration = overrideConfiguration; this.maxBatchItems = batchConfiguration.maxBatchItems(); - this.sendMessageFrequency = batchConfiguration.sendMessageFrequency(); + this.sendRequestFrequency = batchConfiguration.sendRequestFrequency(); this.scheduledExecutor = Validate.notNull(scheduledExecutor, "Null scheduledExecutor"); pendingBatchResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); pendingResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -80,7 +80,7 @@ public CompletableFuture batchRequest(RequestT request) { // Add request and response to the map, scheduling a flush if necessary requestsAndResponsesMaps.put(batchKey, () -> scheduleBufferFlush(batchKey, - sendMessageFrequency.toMillis(), + sendRequestFrequency.toMillis(), scheduledExecutor), request, response); @@ -111,7 +111,7 @@ private void manualFlushBuffer(String batchKey, flushBuffer(batchKey, flushableRequests); requestsAndResponsesMaps.putScheduledFlush(batchKey, scheduleBufferFlush(batchKey, - sendMessageFrequency.toMillis(), + sendRequestFrequency.toMillis(), scheduledExecutor)); } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java index 0de53beacb80..87e471e85301 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java @@ -29,7 +29,6 @@ public final class ResponseBatchConfiguration { public static final Duration MIN_RECEIVE_WAIT_TIME_MS_DEFAULT = Duration.ofMillis(50); public static final List RECEIVE_MESSAGE_ATTRIBUTE_NAMES_DEFAULT = Collections.emptyList(); public static final List MESSAGE_SYSTEM_ATTRIBUTE_NAMES_DEFAULT = Collections.emptyList(); - public static final boolean ADAPTIVE_PREFETCHING_DEFAULT = true; public static final int MAX_INFLIGHT_RECEIVE_BATCHES_DEFAULT = 10; public static final int MAX_DONE_RECEIVE_BATCHES_DEFAULT = 10; @@ -50,7 +49,6 @@ public final class ResponseBatchConfiguration { private final Duration messageMinWaitDuration; private final List messageSystemAttributeNames; private final List receiveMessageAttributeNames; - private final Boolean adaptivePrefetching; private final Integer maxBatchItems; private final Integer maxInflightReceiveBatches; private final Integer maxDoneReceiveBatches; @@ -72,10 +70,6 @@ private ResponseBatchConfiguration(Builder builder) { ? builder.receiveMessageAttributeNames : RECEIVE_MESSAGE_ATTRIBUTE_NAMES_DEFAULT; - this.adaptivePrefetching = builder.adaptivePrefetching != null - ? builder.adaptivePrefetching - : ADAPTIVE_PREFETCHING_DEFAULT; - this.maxBatchItems = builder.maxBatchItems != null ? builder.maxBatchItems : MAX_SUPPORTED_SQS_RECEIVE_MSG; @@ -106,10 +100,6 @@ public List receiveMessageAttributeNames() { return Collections.unmodifiableList(receiveMessageAttributeNames); } - public boolean adaptivePrefetching() { - return adaptivePrefetching; - } - public int maxBatchItems() { return maxBatchItems; } @@ -142,7 +132,6 @@ public static class Builder { private Duration messageMinWaitDuration; private List messageSystemAttributeNames; private List receiveMessageAttributeNames; - private Boolean adaptivePrefetching; private Integer maxBatchItems; private Integer maxInflightReceiveBatches; private Integer maxDoneReceiveBatches; @@ -167,11 +156,6 @@ public Builder receiveMessageAttributeNames(List receiveMessageAttribute return this; } - public Builder adaptivePrefetching(Boolean adaptivePrefetching) { - this.adaptivePrefetching = adaptivePrefetching; - return this; - } - public Builder maxBatchItems(Integer maxBatchItems) { this.maxBatchItems = maxBatchItems; return this; diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java index 606c729a8180..d563f05f2502 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java @@ -56,7 +56,7 @@ private static Stream provideConfigurations() { @ParameterizedTest @MethodSource("provideConfigurations") void testBatchOverrideConfiguration(Integer maxBatchSize, - Duration sendMessageFrequency, + Duration sendRequestFrequency, Duration receiveMessageVisibilityTimeout, Duration receiveMessageMinWaitDuration, List receiveMessageAttributeNames, @@ -64,7 +64,7 @@ void testBatchOverrideConfiguration(Integer maxBatchSize, BatchOverrideConfiguration config = BatchOverrideConfiguration.builder() .maxBatchSize(maxBatchSize) - .sendMessageFrequency(sendMessageFrequency) + .sendRequestFrequency(sendRequestFrequency) .receiveMessageVisibilityTimeout(receiveMessageVisibilityTimeout) .receiveMessageMinWaitDuration(receiveMessageMinWaitDuration) .receiveMessageAttributeNames(receiveMessageAttributeNames) @@ -72,7 +72,7 @@ void testBatchOverrideConfiguration(Integer maxBatchSize, .build(); assertEquals(maxBatchSize, config.maxBatchSize()); - assertEquals(sendMessageFrequency, config.sendMessageFrequency()); + assertEquals(sendRequestFrequency, config.sendRequestFrequency()); assertEquals(receiveMessageVisibilityTimeout, config.receiveMessageVisibilityTimeout()); assertEquals(receiveMessageMinWaitDuration, config.receiveMessageMinWaitDuration()); assertEquals(Optional.ofNullable(receiveMessageAttributeNames).orElse(Collections.emptyList()), @@ -92,7 +92,7 @@ void testEqualsAndHashCode() { void testToBuilder() { BatchOverrideConfiguration originalConfig = BatchOverrideConfiguration.builder() .maxBatchSize(10) - .sendMessageFrequency(Duration.ofMillis(200)) + .sendRequestFrequency(Duration.ofMillis(200)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(30)) .receiveMessageMinWaitDuration(Duration.ofMillis(50)) .receiveMessageAttributeNames(Arrays.asList("msgAttr1")) @@ -107,7 +107,7 @@ void testToBuilder() { builder.maxBatchSize(9); assertNotEquals(originalConfig.maxBatchSize(), builder.build().maxBatchSize()); // Ensure that all other fields are still equal after modifying the maxBatchSize - assertEquals(originalConfig.sendMessageFrequency(), builder.build().sendMessageFrequency()); + assertEquals(originalConfig.sendRequestFrequency(), builder.build().sendRequestFrequency()); assertEquals(originalConfig.receiveMessageVisibilityTimeout(), builder.build().receiveMessageVisibilityTimeout()); assertEquals(originalConfig.receiveMessageMinWaitDuration(), builder.build().receiveMessageMinWaitDuration()); assertEquals(originalConfig.receiveMessageAttributeNames(), builder.build().receiveMessageAttributeNames()); diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveQueueBufferTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveQueueBufferTest.java index 78ff00e24a84..10aa9db30a99 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveQueueBufferTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveQueueBufferTest.java @@ -193,7 +193,7 @@ void numberOfBatchesSpawned() throws Exception { } @Test - void testReceiveMessageWithAdaptivePrefetchingTrue() throws Exception { + void testReceiveMessageWithAdaptivePrefetchingOfReceieveMessageApiCalls() throws Exception { // Mock response ReceiveMessageResponse response = generateMessageResponse(10); when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) @@ -225,7 +225,7 @@ void testReceiveMessageWithAdaptivePrefetchingTrue() throws Exception { @Test - void testReceiveMessageWithAdaptivePrefetchingTrueForSingleCall() throws Exception { + void testReceiveMessageWithAdaptivePrefetchingForASingleCall() throws Exception { ReceiveMessageResponse response = generateMessageResponse(10); when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) @@ -251,101 +251,7 @@ void testReceiveMessageWithAdaptivePrefetchingTrueForSingleCall() throws Excepti } @Test - void testReceiveMessageWithAdaptivePrefetchingFalseForSingleCall() throws Exception { - ReceiveMessageResponse response = generateMessageResponse(10); - when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) - .thenReturn(CompletableFuture.completedFuture(response)); - - // Create receiveQueueBuffer with adaptive prefetching - ReceiveQueueBuffer receiveQueueBuffer = ReceiveQueueBuffer.builder() - .executor(executor) - .sqsClient(sqsClient) - .config(ResponseBatchConfiguration.builder().adaptivePrefetching(false).build()) - .queueUrl("queueUrl") - .queueAttributesManager(queueAttributesManager) - .build(); - - - CompletableFuture future = new CompletableFuture<>(); - receiveQueueBuffer.receiveMessage(future, 10); - - ReceiveMessageResponse receiveMessageResponse = future.get(1, TimeUnit.SECONDS); - System.out.println(receiveMessageResponse); - assertThat(receiveMessageResponse.messages().size()).isEqualTo(10); - Thread.sleep(1000); - - verify(sqsClient, times(11)).receiveMessage(any(ReceiveMessageRequest.class)); - } - - @Test - void testReceiveMessageWithAdaptivePrefetchingFalse() throws Exception { - // Mock response - ReceiveMessageResponse response = generateMessageResponse(10); - when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(CompletableFuture.completedFuture(response)); - - ReceiveQueueBuffer receiveQueueBuffer = ReceiveQueueBuffer.builder() - .executor(executor) - .sqsClient(sqsClient) - .config(ResponseBatchConfiguration.builder() - .adaptivePrefetching(false) - .build()) - .queueUrl("queueUrl") - .queueAttributesManager(queueAttributesManager) - .build(); - - List> futures = new ArrayList<>(); - for (int i = 0; i < 30; i++) { - CompletableFuture future = new CompletableFuture<>(); - futures.add(future); - receiveQueueBuffer.receiveMessage(future, 1); - Thread.sleep(10); - } - - // Join all futures to ensure they complete - for (CompletableFuture future : futures) { - future.get(2, TimeUnit.SECONDS); - } - - verify(sqsClient, times(13)).receiveMessage(any(ReceiveMessageRequest.class)); - } - - @Test - void testReceiveMessageWithAdaptivePrefetchingFalse_followsMaxDoneRecieveBatches() throws Exception { - // Mock response - int MAX_BATCH_ITEMS = 10; - ReceiveMessageResponse response = generateMessageResponse(MAX_BATCH_ITEMS); - when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(CompletableFuture.completedFuture(response)); - - ReceiveQueueBuffer receiveQueueBuffer = ReceiveQueueBuffer.builder() - .executor(executor) - .sqsClient(sqsClient) - .config(ResponseBatchConfiguration - .builder() - .adaptivePrefetching(false) - .build()) - .queueUrl("queueUrl") - .queueAttributesManager(queueAttributesManager) - .build(); - - // Create and send multiple futures using a loop - List> futures = new ArrayList<>(); - for (int i = 0; i < 30; i++) { - CompletableFuture future = new CompletableFuture<>(); - futures.add(future); - Thread.sleep(10); - receiveQueueBuffer.receiveMessage(future, 1); - } - - // Join all futures to ensure they complete - for (CompletableFuture future : futures) { - future.get(2, TimeUnit.SECONDS); - } - - verify(sqsClient, times(13)).receiveMessage(any(ReceiveMessageRequest.class)); - } - - @Test - void receiveMessageShutDown() throws Exception { + void receiveMessageShutDown() { ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(ResponseBatchConfiguration.builder().build()); // Create future and call receiveMessage diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java index 183a95a8639f..c82984bbecd1 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java @@ -88,7 +88,7 @@ void batchRequest_TwoBatchesMessagesSplitInTwoCalls_successful() throws Exceptio when(mockClient.sendBatchAsync(any(), eq(batchKey1))).thenReturn(batchResponseFuture); SampleBatchManager batchManager= new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2) - .sendMessageFrequency(Duration.ofHours(1)).build(), + .sendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(request1); CompletableFuture response2 = batchManager.batchRequest(request2); @@ -109,7 +109,7 @@ void batchRequest_TwoBatchesWithDifferentKey_successful() throws Exception { when(mockClient.sendBatchAsync(any(), eq(KEY_TWO))).thenReturn(batchResponseFutureTwo); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2).sendMessageFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2).sendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(KEY_ONE + ":0"); CompletableFuture response2 = batchManager.batchRequest(KEY_TWO + ":0"); CompletableFuture response3 = batchManager.batchRequest(KEY_ONE + ":1"); @@ -163,7 +163,7 @@ void close_FlushesAllBatches() throws Exception { when(mockClient.sendBatchAsync(any(), eq(batchKey))).thenReturn(batchResponseFuture); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2).sendMessageFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2).sendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(request1); CompletableFuture response2 = batchManager.batchRequest(request2); @@ -218,7 +218,7 @@ void batchRequest_MoreThanBufferSize_Fails() throws Exception { SampleBatchManager batchManager = new SampleBatchManager( BatchOverrideConfiguration.builder() .maxBatchSize(2) - .sendMessageFrequency(Duration.ofHours(1)) + .sendRequestFrequency(Duration.ofHours(1)) .build(), scheduledExecutor, mockClient