diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java index 7157fbb8601d..a701f099962a 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java @@ -16,50 +16,55 @@ package software.amazon.awssdk.services.sqs.batchmanager; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.utils.ToString; import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.builder.CopyableBuilder; import software.amazon.awssdk.utils.builder.ToCopyableBuilder; /** - * Configuration values for the BatchManager Implementation. All values are optional, and the default values will be used if they - * are not specified. + * Configuration values for the BatchManager implementation used for controlling batch operations. + * All values are optional, and default values will be used if they are not specified. */ @SdkPublicApi public final class BatchOverrideConfiguration implements ToCopyableBuilder { - private final Integer maxBatchItems; - private final Integer maxBatchKeys; - private final Integer maxBufferSize; - private final Duration maxBatchOpenDuration; - private final Duration visibilityTimeout; - private final Duration longPollWaitTimeout; - private final Duration minReceiveWaitTime; - private final Integer maxDoneReceiveBatches; - private final List messageSystemAttributeNames; + private final Integer maxBatchSize; + private final Duration sendRequestFrequency; + private final Duration receiveMessageVisibilityTimeout; + private final Duration receiveMessageMinWaitDuration; + private final List receiveMessageSystemAttributeNames; private final List receiveMessageAttributeNames; - private final Boolean adaptivePrefetching; - private final Integer maxInflightReceiveBatches; + private BatchOverrideConfiguration(Builder builder) { - this.maxBatchItems = Validate.isPositiveOrNull(builder.maxBatchItems, "maxBatchItems"); - this.maxBatchKeys = Validate.isPositiveOrNull(builder.maxBatchKeys, "maxBatchKeys"); - this.maxBufferSize = Validate.isPositiveOrNull(builder.maxBufferSize, "maxBufferSize"); - this.maxBatchOpenDuration = Validate.isPositiveOrNull(builder.maxBatchOpenDuration, "maxBatchOpenDuration"); - this.visibilityTimeout = Validate.isPositiveOrNull(builder.visibilityTimeout, "visibilityTimeout"); - this.longPollWaitTimeout = Validate.isPositiveOrNull(builder.longPollWaitTimeout, "longPollWaitTimeout"); - this.minReceiveWaitTime = Validate.isPositiveOrNull(builder.minReceiveWaitTime, "minReceiveWaitTime"); - this.messageSystemAttributeNames = builder.messageSystemAttributeNames; - this.receiveMessageAttributeNames = builder.receiveMessageAttributeNames; - this.adaptivePrefetching = builder.adaptivePrefetching; - this.maxInflightReceiveBatches = builder.maxInflightReceiveBatches; - this.maxDoneReceiveBatches = builder.maxDoneReceiveBatches; + this.maxBatchSize = Validate.isPositiveOrNull(builder.maxBatchSize, + "maxBatchSize"); + Validate.isTrue(this.maxBatchSize == null || this.maxBatchSize <= 10, + "The maxBatchSize must be less than or equal to 10. A batch can contain up to 10 messages."); + + this.sendRequestFrequency = Validate.isPositiveOrNull(builder.sendRequestFrequency, + "sendRequestFrequency"); + this.receiveMessageVisibilityTimeout = Validate.isPositiveOrNull(builder.receiveMessageVisibilityTimeout, + "receiveMessageVisibilityTimeout"); + this.receiveMessageMinWaitDuration = Validate.isPositiveOrNull(builder.receiveMessageMinWaitDuration, + "receiveMessageMinWaitDuration"); + this.receiveMessageSystemAttributeNames = + builder.receiveMessageSystemAttributeNames == null ? Collections.emptyList() : + Collections.unmodifiableList(builder.receiveMessageSystemAttributeNames); + + this.receiveMessageAttributeNames = + builder.receiveMessageAttributeNames == null ? Collections.emptyList() : + Collections.unmodifiableList(builder.receiveMessageAttributeNames); } public static Builder builder() { @@ -67,118 +72,77 @@ public static Builder builder() { } /** - * @return the optional maximum number of messages that are batched together in a single request. - */ - public Integer maxBatchItems() { - return maxBatchItems; - } - - /** - * @return the optional maximum number of batchKeys to keep track of. - */ - public Integer maxBatchKeys() { - return maxBatchKeys; - } - - /** - * @return the maximum number of items to allow to be buffered for each batchKey. + * @return the maximum number of items that can be batched together in a single outbound SQS request + * (e.g., for {@link SendMessageBatchRequest}, {@link ChangeMessageVisibilityBatchRequest}, or + * {@link DeleteMessageBatchRequest}). A batch can contain up to a maximum of 10 messages. + * The default value is 10. */ - public Integer maxBufferSize() { - return maxBufferSize; - } - - public Integer maxDoneReceiveBatches() { - return maxDoneReceiveBatches; + public Integer maxBatchSize() { + return maxBatchSize; } /** - * @return the optional maximum amount of time that an outgoing call waits to be batched with messages of the same type. + * @return the maximum duration an outgoing call waits for additional messages of the same type before being sent. + * If the {@link #maxBatchSize()} is reached before this duration, the batch will be sent immediately. + * The default value is 200 milliseconds. */ - public Duration maxBatchOpenDuration() { - return maxBatchOpenDuration; + public Duration sendRequestFrequency() { + return sendRequestFrequency; } /** - * @return the custom visibility timeout to use when retrieving messages from SQS. + * @return the custom visibility timeout to use when retrieving messages from SQS. If not set, + * the default visibility timeout configured on the SQS queue will be used. */ - public Duration visibilityTimeout() { - return visibilityTimeout; + public Duration receiveMessageVisibilityTimeout() { + return receiveMessageVisibilityTimeout; } /** - * @return the amount of time, the receive call will block on the server waiting for messages to arrive if the - * queue is empty when the receive call is first made. + * @return the minimum wait time for incoming receive message requests. Without a non-zero minimum wait time, + * threads can waste CPU resources busy-waiting for messages. The default value is 50 milliseconds. */ - public Duration longPollWaitTimeout() { - return longPollWaitTimeout; + public Duration receiveMessageMinWaitDuration() { + return receiveMessageMinWaitDuration; } /** - * @return the minimum wait time for incoming receive message requests. + * @return the system attribute names to request for {@link ReceiveMessageRequest}. Requests with differing + * system attribute names will bypass the batch manager and make a direct call to SQS. */ - public Duration minReceiveWaitTime() { - return minReceiveWaitTime; + public List receiveMessageSystemAttributeNames() { + return receiveMessageSystemAttributeNames; } /** - * @return the message systemAttribute Name will request {@link ReceiveMessageRequest#messageSystemAttributeNames()}. - */ - public List messageSystemAttributeName() { - return messageSystemAttributeNames; - } - - /** - * @return the message attributes receive calls will request. + * @return the message attribute names to request for {@link ReceiveMessageRequest}. Requests with different + * message attribute names will bypass the batch manager and make a direct call to SQS. */ public List receiveMessageAttributeNames() { return receiveMessageAttributeNames; } - /** - * @return the behavior for prefetching with respect to the number of in-flight incoming receive requests. - */ - public Boolean adaptivePrefetching() { - return adaptivePrefetching; - } - - /** - * @return the maximum number of concurrent receive message batches. - */ - public Integer maxInflightReceiveBatches() { - return maxInflightReceiveBatches; - } @Override public Builder toBuilder() { - return new Builder().maxBatchItems(maxBatchItems) - .maxBatchKeys(maxBatchKeys) - .maxBufferSize(maxBufferSize) - .maxBatchOpenDuration(maxBatchOpenDuration) - .visibilityTimeout(visibilityTimeout) - .longPollWaitTimeout(longPollWaitTimeout) - .minReceiveWaitTime(minReceiveWaitTime) - .maxInflightReceiveBatches(maxInflightReceiveBatches) - .messageSystemAttributeName(messageSystemAttributeNames) - .receiveMessageAttributeNames(receiveMessageAttributeNames) - .adaptivePrefetching(adaptivePrefetching) - .maxDoneReceiveBatches(maxDoneReceiveBatches); + return new Builder() + .maxBatchSize(maxBatchSize) + .sendRequestFrequency(sendRequestFrequency) + .receiveMessageVisibilityTimeout(receiveMessageVisibilityTimeout) + .receiveMessageMinWaitDuration(receiveMessageMinWaitDuration) + .receiveMessageSystemAttributeNames(receiveMessageSystemAttributeNames) + .receiveMessageAttributeNames(receiveMessageAttributeNames); } @Override public String toString() { return ToString.builder("BatchOverrideConfiguration") - .add("maxBatchItems", maxBatchItems) - .add("maxBatchKeys", maxBatchKeys) - .add("maxBufferSize", maxBufferSize) - .add("maxBatchOpenDuration", maxBatchOpenDuration) - .add("visibilityTimeout", visibilityTimeout) - .add("longPollWaitTimeout", longPollWaitTimeout) - .add("minReceiveWaitTime", minReceiveWaitTime) - .add("receiveAttributeNames", messageSystemAttributeNames) + .add("maxBatchSize", maxBatchSize) + .add("sendRequestFrequency", sendRequestFrequency) + .add("receiveMessageVisibilityTimeout", receiveMessageVisibilityTimeout) + .add("receiveMessageMinWaitDuration", receiveMessageMinWaitDuration) + .add("receiveMessageSystemAttributeNames", receiveMessageSystemAttributeNames) .add("receiveMessageAttributeNames", receiveMessageAttributeNames) - .add("adaptivePrefetching", adaptivePrefetching) - .add("maxInflightReceiveBatches", maxInflightReceiveBatches) - .add("maxDoneReceiveBatches", maxDoneReceiveBatches) .build(); } @@ -193,255 +157,158 @@ public boolean equals(Object o) { BatchOverrideConfiguration that = (BatchOverrideConfiguration) o; - if (maxBatchItems != null ? !maxBatchItems.equals(that.maxBatchItems) : that.maxBatchItems != null) { - return false; - } - if (maxBatchKeys != null ? !maxBatchKeys.equals(that.maxBatchKeys) : that.maxBatchKeys != null) { - return false; - } - if (maxBufferSize != null ? !maxBufferSize.equals(that.maxBufferSize) : that.maxBufferSize != null) { - return false; - } - - if (maxBatchOpenDuration != null ? !maxBatchOpenDuration.equals(that.maxBatchOpenDuration) : - that.maxBatchOpenDuration != null) { + if (maxBatchSize != null ? !maxBatchSize.equals(that.maxBatchSize) : that.maxBatchSize != null) { return false; } - if (visibilityTimeout != null ? !visibilityTimeout.equals(that.visibilityTimeout) : - that.visibilityTimeout != null) { + if (sendRequestFrequency != null ? !sendRequestFrequency.equals(that.sendRequestFrequency) : + that.sendRequestFrequency != null) { return false; } - if (longPollWaitTimeout != null ? !longPollWaitTimeout.equals(that.longPollWaitTimeout) : - that.longPollWaitTimeout != null) { + if (receiveMessageVisibilityTimeout != null + ? !receiveMessageVisibilityTimeout.equals(that.receiveMessageVisibilityTimeout) : + that.receiveMessageVisibilityTimeout != null) { return false; } - if (minReceiveWaitTime != null ? !minReceiveWaitTime.equals(that.minReceiveWaitTime) : - that.minReceiveWaitTime != null) { + if (receiveMessageMinWaitDuration != null ? !receiveMessageMinWaitDuration.equals(that.receiveMessageMinWaitDuration) : + that.receiveMessageMinWaitDuration != null) { return false; } - if (messageSystemAttributeNames != null ? !messageSystemAttributeNames.equals(that.messageSystemAttributeNames) : - that.messageSystemAttributeNames != null) { + if (receiveMessageSystemAttributeNames != null ? + !receiveMessageSystemAttributeNames.equals(that.receiveMessageSystemAttributeNames) + : that.receiveMessageSystemAttributeNames != null) { return false; } - if (receiveMessageAttributeNames != null ? !receiveMessageAttributeNames.equals(that.receiveMessageAttributeNames) : - that.receiveMessageAttributeNames != null) { - return false; - } - if (adaptivePrefetching != null ? !adaptivePrefetching.equals(that.adaptivePrefetching) : - that.adaptivePrefetching != null) { - return false; - } - if (maxInflightReceiveBatches != null ? !maxInflightReceiveBatches.equals(that.maxInflightReceiveBatches) : - that.maxInflightReceiveBatches != null) { - return false; - } - return maxDoneReceiveBatches != null ? maxDoneReceiveBatches.equals(that.maxDoneReceiveBatches) : - that.maxDoneReceiveBatches == null; + return receiveMessageAttributeNames != null ? receiveMessageAttributeNames.equals(that.receiveMessageAttributeNames) : + that.receiveMessageAttributeNames == null; } @Override public int hashCode() { - int result = maxBatchItems != null ? maxBatchItems.hashCode() : 0; - result = 31 * result + (maxBatchKeys != null ? maxBatchKeys.hashCode() : 0); - result = 31 * result + (maxBufferSize != null ? maxBufferSize.hashCode() : 0); - result = 31 * result + (maxBatchOpenDuration != null ? maxBatchOpenDuration.hashCode() : 0); - result = 31 * result + (visibilityTimeout != null ? visibilityTimeout.hashCode() : 0); - result = 31 * result + (longPollWaitTimeout != null ? longPollWaitTimeout.hashCode() : 0); - result = 31 * result + (minReceiveWaitTime != null ? minReceiveWaitTime.hashCode() : 0); - result = 31 * result + (messageSystemAttributeNames != null ? messageSystemAttributeNames.hashCode() : 0); + int result = maxBatchSize != null ? maxBatchSize.hashCode() : 0; + result = 31 * result + (sendRequestFrequency != null ? sendRequestFrequency.hashCode() : 0); + result = 31 * result + (receiveMessageVisibilityTimeout != null ? receiveMessageVisibilityTimeout.hashCode() : 0); + result = 31 * result + (receiveMessageMinWaitDuration != null ? receiveMessageMinWaitDuration.hashCode() : 0); + result = 31 * result + (receiveMessageSystemAttributeNames != null ? receiveMessageSystemAttributeNames.hashCode() : 0); result = 31 * result + (receiveMessageAttributeNames != null ? receiveMessageAttributeNames.hashCode() : 0); - result = 31 * result + (adaptivePrefetching != null ? adaptivePrefetching.hashCode() : 0); - result = 31 * result + (maxInflightReceiveBatches != null ? maxInflightReceiveBatches.hashCode() : 0); - result = 31 * result + (maxDoneReceiveBatches != null ? maxDoneReceiveBatches.hashCode() : 0); return result; } public static final class Builder implements CopyableBuilder { - private Integer maxBatchItems; - private Integer maxBatchKeys; - private Integer maxBufferSize; - private Duration maxBatchOpenDuration; - private Duration visibilityTimeout; - private Duration longPollWaitTimeout; - private Duration minReceiveWaitTime; - private Integer maxDoneReceiveBatches; - private Integer maxInflightReceiveBatches; - private List messageSystemAttributeNames = Collections.emptyList(); + private Integer maxBatchSize = 10; + private Duration sendRequestFrequency ; + private Duration receiveMessageVisibilityTimeout; + private Duration receiveMessageMinWaitDuration ; + private List receiveMessageSystemAttributeNames = Collections.emptyList(); private List receiveMessageAttributeNames = Collections.emptyList(); - private Boolean adaptivePrefetching; - private Builder() { - } - - /** - * Define the maximum number of messages that are batched together in a single request. - * - * @param maxBatchItems The new maxBatchItems value. - * @return This object for method chaining. - */ - public Builder maxBatchItems(Integer maxBatchItems) { - this.maxBatchItems = maxBatchItems; - return this; - } - - /** - * Define the maximum number of batchKeys to keep track of. A batchKey determines which requests are batched together and - * is calculated by the client based on the information in a request. - *

- * Ex. SQS determines a batchKey based on a request's queueUrl in combination with its overrideConfiguration, so requests - * with the same queueUrl and overrideConfiguration will have the same batchKey and be batched together. - * - * @param maxBatchKeys the new maxBatchKeys value. - * @return This object for method chaining. - */ - public Builder maxBatchKeys(Integer maxBatchKeys) { - this.maxBatchKeys = maxBatchKeys; - return this; - } - /** - * Define the maximum number of items to allow to be buffered for each batchKey. - * - * @param maxBufferSize the new maxBufferSize value. - * @return This object for method chaining. - */ - public Builder maxBufferSize(Integer maxBufferSize) { - this.maxBufferSize = maxBufferSize; - return this; - } - - /** - * Define the maximum amount of time that an outgoing call waits for other requests before sending out a - * batch request. - * TODO : Decide if Ms needs to be added to the name in surface API review meeting - * @param maxBatchOpenDuration The new maxBatchOpenDuration value. - * @return This object for method chaining. - */ - public Builder maxBatchOpenDuration(Duration maxBatchOpenDuration) { - this.maxBatchOpenDuration = maxBatchOpenDuration; - return this; - } - - /** - * Define the custom visibility timeout to use when retrieving messages from SQS. If set to a value greater than zero, - * this timeout will override the default visibility timeout set on the SQS queue. Set it to -1 to use the default - * visibility timeout of the queue. Visibility timeout of 0 seconds is not supported. - * - * @param visibilityTimeout The new visibilityTimeout value. - * @return This object for method chaining. - */ - public Builder visibilityTimeout(Duration visibilityTimeout) { - this.visibilityTimeout = visibilityTimeout; - return this; + private Builder() { } /** - * Define the amount of time, the receive call will block on the server waiting for messages to arrive if the - * queue is empty when the receive call is first made. This setting has no effect if long polling is disabled. + * Specifies the maximum number of items that the buffered client will include in a single outbound batch request. + * Outbound requests include {@link SendMessageBatchRequest}, {@link ChangeMessageVisibilityBatchRequest}, + * and {@link DeleteMessageBatchRequest}. + * A batch can contain up to a maximum of 10 messages. The default value is 10. * - * @param longPollWaitTimeout The new longPollWaitTimeout value. - * @return This object for method chaining. + * @param maxBatchSize The maximum number of items to be batched together in a single request. + * @return This Builder object for method chaining. */ - public Builder longPollWaitTimeout(Duration longPollWaitTimeout) { - this.longPollWaitTimeout = longPollWaitTimeout; + public Builder maxBatchSize(Integer maxBatchSize) { + this.maxBatchSize = maxBatchSize; return this; } /** - * Define the minimum wait time for incoming receive message requests. Without a non-zero minimum wait time, threads can - * easily waste CPU time busy-waiting against empty local buffers. Avoid setting this to 0 unless you are confident - * threads will do useful work in-between each call to receive messages! + * Specifies the frequency at which outbound batches are sent. + * This defines the maximum duration that an outbound batch is held open for additional outbound + * requests before being sent. Outbound requests include SendMessageBatchRequest, + * ChangeMessageVisibilityBatchRequest, and DeleteMessageBatchRequest. If the maxBatchSize is reached + * before this duration, the batch will be sent immediately. + * Increasing the {@code sendRequestFrequency} gives more time for additional messages to be added to + * the batch, which can reduce the number of requests and increase throughput. However, a higher + * frequency may also result in increased average message latency. The default value is 200 milliseconds. * - * @param minReceiveWaitTime The new minReceiveWaitTime value. - * @return This object for method chaining. + * @param sendRequestFrequency The new value for the frequency at which outbound requests are sent. + * @return This Builder object for method chaining. */ - public Builder minReceiveWaitTime(Duration minReceiveWaitTime) { - this.minReceiveWaitTime = minReceiveWaitTime; + public Builder sendRequestFrequency(Duration sendRequestFrequency) { + this.sendRequestFrequency = sendRequestFrequency; return this; } /** - * Define the maximum number of concurrent receive message batches. The greater this number, the faster the queue will be - * pulling messages from the SQS servers (at the expense of consuming more threads). + * Defines the custom visibility timeout to use when retrieving messages from SQS. If set to a positive value, + * this timeout will override the default visibility timeout set on the SQS queue. If no value is set, + * then by default, the visibility timeout of the queue will be used. Only positive values are supported. * - * @param maxInflightReceiveBatches The new maxInflightReceiveBatches value. - * @return This object for method chaining. + * @param receiveMessageVisibilityTimeout The new visibilityTimeout value. + * @return This Builder object for method chaining. */ - public Builder maxInflightReceiveBatches(Integer maxInflightReceiveBatches) { - this.maxInflightReceiveBatches = maxInflightReceiveBatches; + public Builder receiveMessageVisibilityTimeout(Duration receiveMessageVisibilityTimeout) { + this.receiveMessageVisibilityTimeout = receiveMessageVisibilityTimeout; return this; } /** - * Define the maximum number of done receive batches. If more than that number of completed receive batches are waiting in - * the buffer, the querying for new messages will stop. The larger this number, the more messages the buffer queue will - * pre-fetch and keep in the buffer on the client side, and the faster receive requests will be satisfied. The visibility - * timeout of a pre-fetched message starts at the point of pre-fetch, which means that while the message is in the local - * buffer it is unavailable for other clients to process, and when this client retrieves it, part of the visibility - * timeout may have already expired. The number of messages prefetched will not exceed maxBatchSize * - * maxDoneReceiveBatches. + * Configures the minimum wait time for incoming receive message requests. The default value is 50 milliseconds. + * Without a non-zero minimum wait time, threads can easily waste CPU time by busy-waiting against empty local buffers. + * Avoid setting this to 0 unless you are confident that threads will perform useful work between each call + * to receive messages. + * The call may return sooner than the configured `WaitTimeSeconds` if there are messages in the buffer. + * If no messages are available and the wait time expires, the call will return an empty message list. * - * @param maxDoneReceiveBatches The new maxDoneReceiveBatches value. - * @return This object for method chaining. + * @param receiveMessageMinWaitDuration The new minimum wait time value. + * @return This Builder object for method chaining. */ - public Builder maxDoneReceiveBatches(Integer maxDoneReceiveBatches) { - this.maxDoneReceiveBatches = maxDoneReceiveBatches; + public Builder receiveMessageMinWaitDuration(Duration receiveMessageMinWaitDuration) { + this.receiveMessageMinWaitDuration = receiveMessageMinWaitDuration; return this; } /** - * Defines the list of message system attribute names that should be requested in receive message calls. - * Only receive message requests that request the same set of message system attributes will be satisfied - * from the receive buffers. Refer to {@link ReceiveMessageRequest#messageSystemAttributeNames()}. - * - * Note that if requests to the BatchManager are sent with different messageSystemAttributeNames than what is - * configured, the request will bypass the BatchManager and will instead make a direct call to SQS. + * Defines the list of message system attribute names to request in receive message calls. + * If no `messageSystemAttributeNames` are set in the individual request, the ones configured here will be used. + *

+ * Requests with different `messageSystemAttributeNames` than those configured here will bypass the + * BatchManager and make a direct call to SQS. Only requests with matching attribute names will be + * batched and fulfilled from the receive buffers. * - * @param messageSystemAttributeNames The list of message system attribute names to be requested. + * @param receiveMessageSystemAttributeNames The list of message system attribute names to request. * If null, an empty list will be used. * @return This builder object for method chaining. */ - public Builder messageSystemAttributeName(List messageSystemAttributeNames) { - this.messageSystemAttributeNames = messageSystemAttributeNames != null ? - Collections.unmodifiableList(messageSystemAttributeNames) : - Collections.emptyList() ; + public Builder receiveMessageSystemAttributeNames(List receiveMessageSystemAttributeNames) { + this.receiveMessageSystemAttributeNames = receiveMessageSystemAttributeNames != null ? + new ArrayList<>(receiveMessageSystemAttributeNames) : + Collections.emptyList(); return this; } /** - * Defines the message attributes that receive calls will request. Only receive message requests that request the same set - * of message attributes will be satisfied from the Receive buffer .Refer to - * {@link ReceiveMessageRequest#messageAttributeNames()}. + * Defines the list of message attribute names to request in receive message calls. + * If no `receiveMessageAttributeNames` are set in the individual requests, the ones configured here will be used. *

- * Note that if requests to the BatchManager are sent with different receiveMessageAttributeNames than what is configured, - * the request will bypass the BatchManager and will instead make a direct call to SQS. + * Requests with different `receiveMessageAttributeNames` than those configured here will bypass the batched and + * fulfilled from the receive buffers. * - * @param receiveMessageAttributeNames The list of message attributes to be requested. If null, an empty list will be - * used. + * @param receiveMessageAttributeNames The list of message attribute names to request. + * If null, an empty list will be used. * @return This builder object for method chaining. */ public Builder receiveMessageAttributeNames(List receiveMessageAttributeNames) { this.receiveMessageAttributeNames = receiveMessageAttributeNames != null ? - Collections.unmodifiableList(receiveMessageAttributeNames) : + new ArrayList<>(receiveMessageAttributeNames) : Collections.emptyList(); return this; } /** - * Define the behavior for prefetching with respect to the number of in-flight incoming receive requests made to the - * client. The advantage of this is reducing the number of outgoing requests made to SQS when incoming requests are - * reduced: in particular, if all incoming requests stop no future requests to SQS will be made. The disadvantage is - * increased latency when incoming requests first start occurring. + * Builds a new {@link BatchOverrideConfiguration} object based on the values set in this builder. * - * @param adaptivePrefetching The new adaptivePrefetching value. - * @return This object for method chaining. + * @return A new {@link BatchOverrideConfiguration} object. */ - public Builder adaptivePrefetching(Boolean adaptivePrefetching) { - this.adaptivePrefetching = adaptivePrefetching; - return this; - } - public BatchOverrideConfiguration build() { return new BatchOverrideConfiguration(this); } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java index 5d1c673a1aeb..580c23c67770 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java @@ -75,7 +75,9 @@ private DefaultSqsAsyncBatchManager(DefaultBuilder builder) { ); this.receiveMessageBatchManager = - new ReceiveMessageBatchManager(client, scheduledExecutor, builder.overrideConfiguration); + new ReceiveMessageBatchManager(client, + scheduledExecutor, + ResponseBatchConfiguration.builder(builder.overrideConfiguration).build()); } @Override diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java index ccb3280c4ab8..e8fa5ca518c5 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java @@ -58,7 +58,7 @@ public CompletableFuture processRequest(ReceiveMessageRe } int numMessages = rq.maxNumberOfMessages() != null ? rq.maxNumberOfMessages() : MAX_SUPPORTED_SQS_RECEIVE_MSG; - return queueAttributesManager.getReceiveMessageTimeout(rq, config.minReceiveWaitTime()).thenCompose(waitTimeMs -> { + return queueAttributesManager.getReceiveMessageTimeout(rq, config.messageMinWaitDuration()).thenCompose(waitTimeMs -> { CompletableFuture receiveMessageFuture = new CompletableFuture<>(); receiveQueueBuffer.receiveMessage(receiveMessageFuture, numMessages); CompletableFuture timeoutFuture = new CompletableFuture<>(); diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java index 0ac4c7b8892d..0d497081d37e 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java @@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; import software.amazon.awssdk.utils.Logger; @@ -39,19 +38,21 @@ public class ReceiveMessageBatchManager implements SdkAutoCloseable { public ReceiveMessageBatchManager(SqsAsyncClient sqsClient, ScheduledExecutorService executor, - BatchOverrideConfiguration config) { + ResponseBatchConfiguration config) { this.sqsClient = sqsClient; this.executor = executor; - this.config = new ResponseBatchConfiguration(config); + this.config = config; + + } public CompletableFuture batchRequest(ReceiveMessageRequest request) { - if (canBeRetrievedFromQueueBuffer(request)) { + String ineligibleReason = checkBatchingEligibility(request); + if (ineligibleReason == null) { return receiveBatchManagerMap.computeIfAbsent(generateBatchKey(request), key -> createReceiveBatchManager(request)) .processRequest(request); } else { - log.debug(() -> "canBeRetrievedFromQueueBuffer failed, so skipping batching for request for Queue with URL: " - + request.queueUrl()); + log.debug(() -> String.format("Batching skipped. Reason: %s", ineligibleReason)); return sqsClient.receiveMessage(request); } } @@ -77,11 +78,25 @@ public void close() { receiveBatchManagerMap.values().forEach(ReceiveBatchManager::close); } - private boolean canBeRetrievedFromQueueBuffer(ReceiveMessageRequest rq) { - return hasCompatibleAttributes(rq) && isBufferingEnabled() && rq.visibilityTimeout() == null; + private String checkBatchingEligibility(ReceiveMessageRequest rq) { + if (!hasCompatibleAttributes(rq)) { + return "Incompatible attributes."; + } + if (rq.visibilityTimeout() != null) { + return "Visibility timeout is set."; + } + if (!isBufferingEnabled()) { + return "Buffering is disabled."; + } + if (rq.overrideConfiguration().isPresent()) { + return "Request has override configurations."; + } + if (rq.waitTimeSeconds() != null && rq.waitTimeSeconds() != 0) { + return "Request has long polling enabled."; + } + return null; } - private boolean hasCompatibleAttributes(ReceiveMessageRequest rq) { return !rq.hasAttributeNames() && hasCompatibleSystemAttributes(rq) diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java index 39cbcbd6038c..895fa3d8a57a 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java @@ -15,6 +15,8 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration.MAX_SUPPORTED_SQS_RECEIVE_MSG; + import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -119,14 +121,11 @@ private void spawnMoreReceiveTasks() { private int determineDesiredBatches() { int desiredBatches = Math.max(config.maxDoneReceiveBatches(), 1); - - if (config.adaptivePrefetching()) { - int totalRequested = futures.stream() - .mapToInt(FutureRequestWrapper::getRequestedSize) - .sum(); - int batchesNeededToFulfillFutures = (int) Math.ceil((float) totalRequested / config.maxBatchItems()); - desiredBatches = Math.min(batchesNeededToFulfillFutures, desiredBatches); - } + int totalRequested = futures.stream() + .mapToInt(FutureRequestWrapper::getRequestedSize) + .sum(); + int batchesNeededToFulfillFutures = (int) Math.ceil((float) totalRequested / MAX_SUPPORTED_SQS_RECEIVE_MSG); + desiredBatches = Math.min(batchesNeededToFulfillFutures, desiredBatches); return desiredBatches; } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveSqsMessageHelper.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveSqsMessageHelper.java index b0f982f0adee..07314d2e86e2 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveSqsMessageHelper.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveSqsMessageHelper.java @@ -87,9 +87,6 @@ public CompletableFuture asyncReceiveMessage() { request.visibilityTimeout(NumericUtils.saturatedCast(this.visibilityTimeout.getSeconds())); - if (config.longPollWaitTimeout() != null) { - request.waitTimeSeconds(NumericUtils.saturatedCast(config.longPollWaitTimeout().getSeconds())); - } try { return asyncClient.receiveMessage(request.build()) .handle((response, throwable) -> { diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java index dc407c4c84f3..08cd1c0818ce 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java @@ -24,14 +24,14 @@ public final class RequestBatchConfiguration { public static final int DEFAULT_MAX_BATCH_ITEMS = 10; public static final int DEFAULT_MAX_BATCH_BYTES_SIZE = -1; - public static final int DEFAULT_MAX_BATCH_KEYS = 1000; + public static final int DEFAULT_MAX_BATCH_KEYS = 10000; public static final int DEFAULT_MAX_BUFFER_SIZE = 500; public static final Duration DEFAULT_MAX_BATCH_OPEN_IN_MS = Duration.ofMillis(200); private final Integer maxBatchItems; private final Integer maxBatchKeys; private final Integer maxBufferSize; - private final Duration maxBatchOpenDuration; + private final Duration sendRequestFrequency; private final Integer maxBatchBytesSize; private RequestBatchConfiguration(Builder builder) { @@ -39,7 +39,8 @@ private RequestBatchConfiguration(Builder builder) { this.maxBatchItems = builder.maxBatchItems != null ? builder.maxBatchItems : DEFAULT_MAX_BATCH_ITEMS; this.maxBatchKeys = builder.maxBatchKeys != null ? builder.maxBatchKeys : DEFAULT_MAX_BATCH_KEYS; this.maxBufferSize = builder.maxBufferSize != null ? builder.maxBufferSize : DEFAULT_MAX_BUFFER_SIZE; - this.maxBatchOpenDuration = builder.maxBatchOpenDuration != null ? builder.maxBatchOpenDuration : + this.sendRequestFrequency = builder.sendRequestFrequency != null ? + builder.sendRequestFrequency : DEFAULT_MAX_BATCH_OPEN_IN_MS; this.maxBatchBytesSize = builder.maxBatchBytesSize != null ? builder.maxBatchBytesSize : DEFAULT_MAX_BATCH_BYTES_SIZE; @@ -52,16 +53,15 @@ public static Builder builder() { public static Builder builder(BatchOverrideConfiguration configuration) { if (configuration != null) { return new Builder() - .maxBatchKeys(configuration.maxBatchKeys()) - .maxBatchItems(configuration.maxBatchItems()) - .maxBatchOpenDuration(configuration.maxBatchOpenDuration()) - .maxBufferSize(configuration.maxBufferSize()); + .maxBatchItems(configuration.maxBatchSize()) + .sendRequestFrequency(configuration.sendRequestFrequency()) + .maxBatchBytesSize(configuration.maxBatchSize()); } return new Builder(); } - public Duration maxBatchOpenDuration() { - return maxBatchOpenDuration; + public Duration sendRequestFrequency() { + return sendRequestFrequency; } public int maxBatchItems() { @@ -85,7 +85,7 @@ public static final class Builder { private Integer maxBatchItems; private Integer maxBatchKeys; private Integer maxBufferSize; - private Duration maxBatchOpenDuration; + private Duration sendRequestFrequency; private Integer maxBatchBytesSize; private Builder() { @@ -106,8 +106,8 @@ public Builder maxBufferSize(Integer maxBufferSize) { return this; } - public Builder maxBatchOpenDuration(Duration maxBatchOpenDuration) { - this.maxBatchOpenDuration = maxBatchOpenDuration; + public Builder sendRequestFrequency(Duration sendRequestFrequency) { + this.sendRequestFrequency = sendRequestFrequency; return this; } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java index 0e1507ceace0..fdee592a7096 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java @@ -45,10 +45,9 @@ public abstract class RequestBatchManager { protected final RequestBatchConfiguration batchConfiguration ; private final int maxBatchItems; - private final Duration maxBatchOpenDuration; + private final Duration sendRequestFrequency; private final BatchingMap requestsAndResponsesMaps; private final ScheduledExecutorService scheduledExecutor; - private final Set> pendingBatchResponses ; private final Set> pendingResponses ; @@ -57,7 +56,7 @@ protected RequestBatchManager(RequestBatchConfiguration overrideConfiguration, ScheduledExecutorService scheduledExecutor) { batchConfiguration = overrideConfiguration; this.maxBatchItems = batchConfiguration.maxBatchItems(); - this.maxBatchOpenDuration = batchConfiguration.maxBatchOpenDuration(); + this.sendRequestFrequency = batchConfiguration.sendRequestFrequency(); this.scheduledExecutor = Validate.notNull(scheduledExecutor, "Null scheduledExecutor"); pendingBatchResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); pendingResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -71,7 +70,6 @@ public CompletableFuture batchRequest(RequestT request) { try { String batchKey = getBatchKey(request); - // Handle potential byte size overflow only if there are request in map and if feature enabled if (requestsAndResponsesMaps.contains(batchKey) && batchConfiguration.maxBatchBytesSize() > 0) { Optional.of(requestsAndResponsesMaps.flushableRequestsOnByteLimitBeforeAdd(batchKey, request)) @@ -81,7 +79,9 @@ public CompletableFuture batchRequest(RequestT request) { // Add request and response to the map, scheduling a flush if necessary requestsAndResponsesMaps.put(batchKey, - () -> scheduleBufferFlush(batchKey, maxBatchOpenDuration.toMillis(), scheduledExecutor), + () -> scheduleBufferFlush(batchKey, + sendRequestFrequency.toMillis(), + scheduledExecutor), request, response); @@ -109,8 +109,10 @@ private void manualFlushBuffer(String batchKey, Map> flushableRequests) { requestsAndResponsesMaps.cancelScheduledFlush(batchKey); flushBuffer(batchKey, flushableRequests); - requestsAndResponsesMaps.putScheduledFlush(batchKey, scheduleBufferFlush(batchKey, maxBatchOpenDuration.toMillis(), - scheduledExecutor)); + requestsAndResponsesMaps.putScheduledFlush(batchKey, + scheduleBufferFlush(batchKey, + sendRequestFrequency.toMillis(), + scheduledExecutor)); } private void flushBuffer(String batchKey, Map> flushableRequests) { @@ -174,5 +176,4 @@ public void close() { requestsAndResponsesMaps.clear(); } - } \ No newline at end of file diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java index 357d8a2bb8ea..87e471e85301 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java @@ -15,8 +15,6 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; - - import java.time.Duration; import java.util.Collections; import java.util.List; @@ -27,13 +25,10 @@ @SdkInternalApi public final class ResponseBatchConfiguration { - public static final boolean LONG_POLL_DEFAULT = true; public static final Duration VISIBILITY_TIMEOUT_SECONDS_DEFAULT = null; - public static final Duration LONG_POLL_WAIT_TIMEOUT_DEFAULT = Duration.ofSeconds(20); - public static final Duration MIN_RECEIVE_WAIT_TIME_MS_DEFAULT = Duration.ofMillis(300); + public static final Duration MIN_RECEIVE_WAIT_TIME_MS_DEFAULT = Duration.ofMillis(50); public static final List RECEIVE_MESSAGE_ATTRIBUTE_NAMES_DEFAULT = Collections.emptyList(); public static final List MESSAGE_SYSTEM_ATTRIBUTE_NAMES_DEFAULT = Collections.emptyList(); - public static final boolean ADAPTIVE_PREFETCHING_DEFAULT = false; public static final int MAX_INFLIGHT_RECEIVE_BATCHES_DEFAULT = 10; public static final int MAX_DONE_RECEIVE_BATCHES_DEFAULT = 10; @@ -50,86 +45,61 @@ public final class ResponseBatchConfiguration { */ public static final int ATTRIBUTE_MAPS_PAYLOAD_BYTES = 16 * 1024; // 16 KiB - private final Duration visibilityTimeout; - private final Duration longPollWaitTimeout; - private final Duration minReceiveWaitTime; - private final List messageSystemAttributeValues; + private final Duration messageMinWaitDuration; + private final List messageSystemAttributeNames; private final List receiveMessageAttributeNames; - private final Boolean adaptivePrefetching; private final Integer maxBatchItems; private final Integer maxInflightReceiveBatches; private final Integer maxDoneReceiveBatches; - public ResponseBatchConfiguration(BatchOverrideConfiguration overrideConfiguration) { - this.visibilityTimeout = overrideConfiguration != null && overrideConfiguration.visibilityTimeout() != null - ? overrideConfiguration.visibilityTimeout() + private ResponseBatchConfiguration(Builder builder) { + this.visibilityTimeout = builder.visibilityTimeout != null + ? builder.visibilityTimeout : VISIBILITY_TIMEOUT_SECONDS_DEFAULT; - this.longPollWaitTimeout = overrideConfiguration != null && overrideConfiguration.longPollWaitTimeout() != null - ? overrideConfiguration.longPollWaitTimeout() - : LONG_POLL_WAIT_TIMEOUT_DEFAULT; - - this.minReceiveWaitTime = overrideConfiguration != null && overrideConfiguration.minReceiveWaitTime() != null - ? overrideConfiguration.minReceiveWaitTime() + this.messageMinWaitDuration = builder.messageMinWaitDuration != null + ? builder.messageMinWaitDuration : MIN_RECEIVE_WAIT_TIME_MS_DEFAULT; - this.messageSystemAttributeValues = overrideConfiguration != null - && overrideConfiguration.messageSystemAttributeName() != null - && !overrideConfiguration.messageSystemAttributeName().isEmpty() - ? overrideConfiguration.messageSystemAttributeName() + this.messageSystemAttributeNames = builder.messageSystemAttributeNames != null + ? builder.messageSystemAttributeNames : MESSAGE_SYSTEM_ATTRIBUTE_NAMES_DEFAULT; - this.receiveMessageAttributeNames = overrideConfiguration != null - && overrideConfiguration.receiveMessageAttributeNames() != null - && !overrideConfiguration.receiveMessageAttributeNames().isEmpty() - ? overrideConfiguration.receiveMessageAttributeNames() + this.receiveMessageAttributeNames = builder.receiveMessageAttributeNames != null + ? builder.receiveMessageAttributeNames : RECEIVE_MESSAGE_ATTRIBUTE_NAMES_DEFAULT; - this.adaptivePrefetching = overrideConfiguration != null && overrideConfiguration.adaptivePrefetching() != null - ? overrideConfiguration.adaptivePrefetching() - : ADAPTIVE_PREFETCHING_DEFAULT; - - this.maxBatchItems = overrideConfiguration != null && overrideConfiguration.maxBatchItems() != null - ? overrideConfiguration.maxBatchItems() + this.maxBatchItems = builder.maxBatchItems != null + ? builder.maxBatchItems : MAX_SUPPORTED_SQS_RECEIVE_MSG; - this.maxInflightReceiveBatches = overrideConfiguration != null - && overrideConfiguration.maxInflightReceiveBatches() != null - ? overrideConfiguration.maxInflightReceiveBatches() + this.maxInflightReceiveBatches = builder.maxInflightReceiveBatches != null + ? builder.maxInflightReceiveBatches : MAX_INFLIGHT_RECEIVE_BATCHES_DEFAULT; - this.maxDoneReceiveBatches = overrideConfiguration != null && overrideConfiguration.maxDoneReceiveBatches() != null - ? overrideConfiguration.maxDoneReceiveBatches() + this.maxDoneReceiveBatches = builder.maxDoneReceiveBatches != null + ? builder.maxDoneReceiveBatches : MAX_DONE_RECEIVE_BATCHES_DEFAULT; } - public Duration visibilityTimeout() { return visibilityTimeout; } - public Duration longPollWaitTimeout() { - return longPollWaitTimeout; - } - - public Duration minReceiveWaitTime() { - return minReceiveWaitTime; + public Duration messageMinWaitDuration() { + return messageMinWaitDuration; } public List messageSystemAttributeNames() { - return Collections.unmodifiableList(messageSystemAttributeValues); + return Collections.unmodifiableList(messageSystemAttributeNames); } public List receiveMessageAttributeNames() { return Collections.unmodifiableList(receiveMessageAttributeNames); } - public boolean adaptivePrefetching() { - return adaptivePrefetching; - } - public int maxBatchItems() { return maxBatchItems; } @@ -141,4 +111,68 @@ public int maxInflightReceiveBatches() { public int maxDoneReceiveBatches() { return maxDoneReceiveBatches; } + + public static Builder builder(BatchOverrideConfiguration overrideConfiguration) { + Builder builder = new Builder(); + if (overrideConfiguration != null) { + builder.messageMinWaitDuration(overrideConfiguration.receiveMessageMinWaitDuration()) + .receiveMessageAttributeNames(overrideConfiguration.receiveMessageAttributeNames()) + .messageSystemAttributeNames(overrideConfiguration.receiveMessageSystemAttributeNames()) + .visibilityTimeout(overrideConfiguration.receiveMessageVisibilityTimeout()); + } + return builder; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Duration visibilityTimeout; + private Duration messageMinWaitDuration; + private List messageSystemAttributeNames; + private List receiveMessageAttributeNames; + private Integer maxBatchItems; + private Integer maxInflightReceiveBatches; + private Integer maxDoneReceiveBatches; + + public Builder visibilityTimeout(Duration visibilityTimeout) { + this.visibilityTimeout = visibilityTimeout; + return this; + } + + public Builder messageMinWaitDuration(Duration messageMinWaitDuration) { + this.messageMinWaitDuration = messageMinWaitDuration; + return this; + } + + public Builder messageSystemAttributeNames(List messageSystemAttributeNames) { + this.messageSystemAttributeNames = messageSystemAttributeNames; + return this; + } + + public Builder receiveMessageAttributeNames(List receiveMessageAttributeNames) { + this.receiveMessageAttributeNames = receiveMessageAttributeNames; + return this; + } + + public Builder maxBatchItems(Integer maxBatchItems) { + this.maxBatchItems = maxBatchItems; + return this; + } + + public Builder maxInflightReceiveBatches(Integer maxInflightReceiveBatches) { + this.maxInflightReceiveBatches = maxInflightReceiveBatches; + return this; + } + + public Builder maxDoneReceiveBatches(Integer maxDoneReceiveBatches) { + this.maxDoneReceiveBatches = maxDoneReceiveBatches; + return this; + } + + public ResponseBatchConfiguration build() { + return new ResponseBatchConfiguration(this); + } + } } \ No newline at end of file diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java index 0b6492d520c9..d563f05f2502 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java @@ -31,78 +31,54 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; class BatchOverrideConfigurationTest { private static Stream provideConfigurations() { return Stream.of( Arguments.of(10, - 5, Duration.ofMillis(200), Duration.ofSeconds(30), - Duration.ofSeconds(20), Duration.ofMillis(50), Arrays.asList("msgAttr1"), - Arrays.asList(MessageSystemAttributeName.SENDER_ID), - true, - 10, - 5), - Arguments.of(null, null, null, null, null, null, null, null, null, null, null), + Arrays.asList(MessageSystemAttributeName.SENDER_ID)), + Arguments.of(null, null, null, null, null, null), Arguments.of(1, - 1, - Duration.ofMillis(1), Duration.ofMillis(1), Duration.ofMillis(1), Duration.ofMillis(1), Collections.emptyList(), - Collections.singletonList(MessageSystemAttributeName.SEQUENCE_NUMBER), - false, - 5, - 2) + Collections.singletonList(MessageSystemAttributeName.SEQUENCE_NUMBER)) ); } @ParameterizedTest @MethodSource("provideConfigurations") - void testBatchOverrideConfiguration(Integer maxBatchItems, - Integer maxBatchKeys, - Duration maxBatchOpenDuration, - Duration visibilityTimeout, - Duration longPollWaitTimeout, - Duration minReceiveWaitTime, + void testBatchOverrideConfiguration(Integer maxBatchSize, + Duration sendRequestFrequency, + Duration receiveMessageVisibilityTimeout, + Duration receiveMessageMinWaitDuration, List receiveMessageAttributeNames, - List messageSystemAttributeNames, - Boolean adaptivePrefetching, - Integer maxInflightReceiveBatches, - Integer maxDoneReceiveBatches) { + List receiveMessageSystemAttributeNames) { BatchOverrideConfiguration config = BatchOverrideConfiguration.builder() - .maxBatchItems(maxBatchItems) - .maxBatchKeys(maxBatchKeys) - .maxBatchOpenDuration(maxBatchOpenDuration) - .visibilityTimeout(visibilityTimeout) - .longPollWaitTimeout(longPollWaitTimeout) - .minReceiveWaitTime(minReceiveWaitTime) + .maxBatchSize(maxBatchSize) + .sendRequestFrequency(sendRequestFrequency) + .receiveMessageVisibilityTimeout(receiveMessageVisibilityTimeout) + .receiveMessageMinWaitDuration(receiveMessageMinWaitDuration) .receiveMessageAttributeNames(receiveMessageAttributeNames) - .messageSystemAttributeName(messageSystemAttributeNames) - .adaptivePrefetching(adaptivePrefetching) - .maxInflightReceiveBatches(maxInflightReceiveBatches) - .maxDoneReceiveBatches(maxDoneReceiveBatches) + .receiveMessageSystemAttributeNames(receiveMessageSystemAttributeNames) .build(); - assertEquals(maxBatchItems, config.maxBatchItems()); - assertEquals(maxBatchKeys, config.maxBatchKeys()); - assertEquals(maxBatchOpenDuration, config.maxBatchOpenDuration()); - assertEquals(visibilityTimeout, config.visibilityTimeout()); - assertEquals(longPollWaitTimeout, config.longPollWaitTimeout()); - assertEquals(minReceiveWaitTime, config.minReceiveWaitTime()); + assertEquals(maxBatchSize, config.maxBatchSize()); + assertEquals(sendRequestFrequency, config.sendRequestFrequency()); + assertEquals(receiveMessageVisibilityTimeout, config.receiveMessageVisibilityTimeout()); + assertEquals(receiveMessageMinWaitDuration, config.receiveMessageMinWaitDuration()); assertEquals(Optional.ofNullable(receiveMessageAttributeNames).orElse(Collections.emptyList()), config.receiveMessageAttributeNames()); - assertEquals(Optional.ofNullable(messageSystemAttributeNames).orElse(Collections.emptyList()), - config.messageSystemAttributeName()); - assertEquals(adaptivePrefetching, config.adaptivePrefetching()); - assertEquals(maxInflightReceiveBatches, config.maxInflightReceiveBatches()); - assertEquals(maxDoneReceiveBatches, config.maxDoneReceiveBatches()); + assertEquals(Optional.ofNullable(receiveMessageSystemAttributeNames).orElse(Collections.emptyList()), + config.receiveMessageSystemAttributeNames()); } @Test @@ -115,26 +91,42 @@ void testEqualsAndHashCode() { @Test void testToBuilder() { BatchOverrideConfiguration originalConfig = BatchOverrideConfiguration.builder() - .maxBatchItems(10) - .maxBatchKeys(5) - .maxBatchOpenDuration(Duration.ofMillis(200)) - .visibilityTimeout(Duration.ofSeconds(30)) - .longPollWaitTimeout(Duration.ofSeconds(20)) - .minReceiveWaitTime(Duration.ofMillis(50)) - .receiveMessageAttributeNames(Arrays.asList( - "msgAttr1")) - .messageSystemAttributeName(Collections.singletonList( + .maxBatchSize(10) + .sendRequestFrequency(Duration.ofMillis(200)) + .receiveMessageVisibilityTimeout(Duration.ofSeconds(30)) + .receiveMessageMinWaitDuration(Duration.ofMillis(50)) + .receiveMessageAttributeNames(Arrays.asList("msgAttr1")) + .receiveMessageSystemAttributeNames(Collections.singletonList( MessageSystemAttributeName.SENDER_ID)) - .adaptivePrefetching(true) - .maxInflightReceiveBatches(10) - .maxDoneReceiveBatches(5) .build(); BatchOverrideConfiguration.Builder builder = originalConfig.toBuilder(); BatchOverrideConfiguration newConfig = builder.build(); assertEquals(originalConfig, newConfig); // Ensure that modifying the builder does not affect the original config - builder.maxBatchItems(20); - assertNotEquals(originalConfig.maxBatchItems(), builder.build().maxBatchItems()); + builder.maxBatchSize(9); + assertNotEquals(originalConfig.maxBatchSize(), builder.build().maxBatchSize()); + // Ensure that all other fields are still equal after modifying the maxBatchSize + assertEquals(originalConfig.sendRequestFrequency(), builder.build().sendRequestFrequency()); + assertEquals(originalConfig.receiveMessageVisibilityTimeout(), builder.build().receiveMessageVisibilityTimeout()); + assertEquals(originalConfig.receiveMessageMinWaitDuration(), builder.build().receiveMessageMinWaitDuration()); + assertEquals(originalConfig.receiveMessageAttributeNames(), builder.build().receiveMessageAttributeNames()); + assertEquals(originalConfig.receiveMessageSystemAttributeNames(), builder.build().receiveMessageSystemAttributeNames()); } -} + + @Test + void testMaxBatchSizeExceedsLimitThrowsException() { + // Act & Assert + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + BatchOverrideConfiguration.builder() + .maxBatchSize(11) // Set an invalid max batch size (exceeds limit) + .build(); // This should throw IllegalArgumentException + }); + + // Assert that the exception message matches the expected output + assertEquals("The maxBatchSize must be less than or equal to 10. A batch can contain up to 10 messages.", + exception.getMessage()); + } + + +} \ No newline at end of file diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java index acde3df91247..ea94b744f166 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java @@ -52,7 +52,7 @@ import static org.mockito.Mockito.*; @ExtendWith(MockitoExtension.class) -public class ReceiveBatchManagerTest { +class ReceiveBatchManagerTest { @Mock private SqsAsyncClient sqsClient; @@ -79,22 +79,16 @@ public void setUp() { } @AfterEach - public void tearDown() { + void tearDown() { executor.shutdownNow(); } - private ResponseBatchConfiguration createConfig(int maxBatchItems, boolean adaptivePrefetching, - int maxInflightReceiveBatches, int maxDoneReceiveBatches, - Duration minReceiveWaitTime) { - return new ResponseBatchConfiguration(BatchOverrideConfiguration.builder() - .maxBatchItems(maxBatchItems) - .adaptivePrefetching(adaptivePrefetching) - .maxInflightReceiveBatches(maxInflightReceiveBatches) - .maxDoneReceiveBatches(maxDoneReceiveBatches) - .receiveMessageAttributeNames(Collections.emptyList()) - .visibilityTimeout(Duration.ofSeconds(2)) - .minReceiveWaitTime(minReceiveWaitTime) - .build()); + private ResponseBatchConfiguration createConfig(Duration minReceiveWaitTime) { + return ResponseBatchConfiguration.builder() + .receiveMessageAttributeNames(Collections.emptyList()) + .visibilityTimeout(Duration.ofSeconds(2)) + .messageMinWaitDuration(minReceiveWaitTime) + .build(); } private ReceiveMessageResponse generateMessageResponse(int count) { @@ -105,12 +99,12 @@ private ReceiveMessageResponse generateMessageResponse(int count) { } @Test - public void testProcessRequestSuccessful() throws Exception { + void testProcessRequestSuccessful() throws Exception { ReceiveMessageResponse response = generateMessageResponse(10); when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) .thenReturn(CompletableFuture.completedFuture(response)); - receiveBatchManager = new ReceiveBatchManager(sqsClient, executor, createConfig(10, true, 2, 1, Duration.ofMillis(50)), "queueUrl"); + receiveBatchManager = new ReceiveBatchManager(sqsClient, executor, createConfig( Duration.ofMillis(50)), "queueUrl"); ReceiveMessageRequest request = ReceiveMessageRequest.builder().maxNumberOfMessages(10).build(); CompletableFuture futureResponse = receiveBatchManager.processRequest(request); @@ -121,12 +115,12 @@ public void testProcessRequestSuccessful() throws Exception { } @Test - public void testProcessRequestWithCustomMaxMessages() throws Exception { + void testProcessRequestWithCustomMaxMessages() throws Exception { ReceiveMessageResponse response = generateMessageResponse(5); when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) .thenReturn(CompletableFuture.completedFuture(response)); - receiveBatchManager = new ReceiveBatchManager(sqsClient, executor, createConfig(10, true, 2, 1, Duration.ofMillis(50)), "queueUrl"); + receiveBatchManager = new ReceiveBatchManager(sqsClient, executor, createConfig(Duration.ofMillis(50)), "queueUrl"); ReceiveMessageRequest request = ReceiveMessageRequest.builder().maxNumberOfMessages(5).build(); CompletableFuture futureResponse = receiveBatchManager.processRequest(request); @@ -137,12 +131,12 @@ public void testProcessRequestWithCustomMaxMessages() throws Exception { } @Test - public void testProcessRequestErrorHandling() throws Exception { + void testProcessRequestErrorHandling() throws Exception { CompletableFuture futureResponse = new CompletableFuture<>(); futureResponse.completeExceptionally(new RuntimeException("SQS error")); when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(futureResponse); - receiveBatchManager = new ReceiveBatchManager(sqsClient, executor, createConfig(10, true, 2, 1, Duration.ofMillis(50)), "queueUrl"); + receiveBatchManager = new ReceiveBatchManager(sqsClient, executor, createConfig(Duration.ofMillis(50)), "queueUrl"); ReceiveMessageRequest request = ReceiveMessageRequest.builder().maxNumberOfMessages(10).build(); @@ -158,9 +152,9 @@ public void testProcessRequestErrorHandling() throws Exception { } @Test - public void testShutdown() throws Exception { + void testShutdown() throws Exception { - receiveBatchManager = new ReceiveBatchManager(sqsClient, executor, createConfig(10, true, 2, 1, Duration.ofMillis(50)), "queueUrl"); + receiveBatchManager = new ReceiveBatchManager(sqsClient, executor, createConfig(Duration.ofMillis(50)), "queueUrl"); ReceiveMessageRequest request = ReceiveMessageRequest.builder().maxNumberOfMessages(10).build(); CompletableFuture futureResponse = receiveBatchManager.processRequest(request); @@ -173,12 +167,12 @@ public void testShutdown() throws Exception { } @Test - public void testProcessRequestMultipleMessages() throws Exception { + void testProcessRequestMultipleMessages() throws Exception { ReceiveMessageResponse response = generateMessageResponse(10); when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) .thenReturn(CompletableFuture.completedFuture(response)); - receiveBatchManager = new ReceiveBatchManager(sqsClient, executor, createConfig(10, true, 2, 1, Duration.ofMillis(50)), "queueUrl"); + receiveBatchManager = new ReceiveBatchManager(sqsClient, executor, createConfig( Duration.ofMillis(50)), "queueUrl"); List requests = new ArrayList<>(); for (int i = 0; i < 5; i++) { @@ -202,14 +196,11 @@ public void testProcessRequestMultipleMessages() throws Exception { @Test - public void testProcessRequestWithQueueAttributes() throws Exception { + void testProcessRequestWithQueueAttributes() throws Exception { // Prepare configuration with specific message attribute names - ResponseBatchConfiguration configuration = new ResponseBatchConfiguration( - BatchOverrideConfiguration.builder() + ResponseBatchConfiguration configuration = ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Arrays.asList("AttributeValue7", "AttributeValue9")) - .adaptivePrefetching(true) - .maxDoneReceiveBatches(1).build() - ); + .build(); // Mock response for receiveMessage when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java index b51399f7e89b..5b9a02ecd140 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java @@ -16,29 +16,33 @@ package software.amazon.awssdk.services.sqs.batchmanager; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.mockito.ArgumentCaptor.forClass; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -import java.util.Arrays; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Stream; -import org.junit.jupiter.api.BeforeEach; + +import org.apache.logging.log4j.Level; import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.MockitoAnnotations; import org.mockito.junit.jupiter.MockitoExtension; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.internal.batchmanager.ReceiveMessageBatchManager; @@ -49,196 +53,201 @@ import software.amazon.awssdk.services.sqs.model.QueueAttributeName; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import software.amazon.awssdk.utils.NumericUtils; - -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; - +import software.amazon.awssdk.testutils.LogCaptor; @ExtendWith(MockitoExtension.class) -public class ReceiveMessageBatchManagerTest { +class ReceiveMessageBatchManagerTest { + + private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(4); @Mock private SqsAsyncClient sqsClient; - private ScheduledExecutorService executor = Executors.newScheduledThreadPool(4); - - private ReceiveMessageBatchManager receiveMessageBatchManager; - private ResponseBatchConfiguration config; - - @BeforeEach - public void setUp() { - - } @ParameterizedTest(name = "{index} => {0}") @MethodSource("provideBatchOverrideConfigurations") @DisplayName("Test BatchRequest with various configurations") - public void testBatchRequest_WhenBufferingDisabledAndInCompatible_ShouldNotUseBatchManager(String testCaseName, - BatchOverrideConfiguration overrideConfig, - ReceiveMessageRequest request, - boolean useBatchManager) throws Exception { - - // Initialize the ResponseBatchConfiguration and ReceiveMessageBatchManager - ResponseBatchConfiguration config = new ResponseBatchConfiguration(overrideConfig); - receiveMessageBatchManager = new ReceiveMessageBatchManager(sqsClient, executor, overrideConfig); - - CompletableFuture mockResponse = - CompletableFuture.completedFuture(ReceiveMessageResponse.builder().build()); - String visibilityTimeout = "1"; - when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(mockResponse); - if(useBatchManager) { - mockGetQueueAttributesResponse("0", visibilityTimeout); - } + void testBatchRequest(String testCaseName, + ResponseBatchConfiguration overrideConfig, + ReceiveMessageRequest request, + boolean useBatchManager, + String inEligibleReason) throws Exception { + setupBatchManager(overrideConfig); - CompletableFuture result = receiveMessageBatchManager.batchRequest(request); - result.get(2, TimeUnit.SECONDS); + CompletableFuture mockResponse = CompletableFuture.completedFuture( + ReceiveMessageResponse.builder().build()); + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(mockResponse); - // Enough time to make sure any spawned task after receiving response is completed - Thread.sleep(500); + try (LogCaptor logCaptor = LogCaptor.create(Level.DEBUG)) { - // Capture the argument passed to receiveMessage - ArgumentCaptor requestCaptor = forClass(ReceiveMessageRequest.class); + if (useBatchManager) { + mockQueueAttributes("0", "1"); + } - if (useBatchManager) { - // Verify that receiveMessage was called at least twice - verify(sqsClient, atLeast(2)).receiveMessage(requestCaptor.capture()); + CompletableFuture result = receiveMessageBatchManager.batchRequest(request); + result.get(2, TimeUnit.SECONDS); + Thread.sleep(500); - // Assertions to verify the behavior when batch manager is used - assertEquals(config.maxBatchItems(), requestCaptor.getValue().maxNumberOfMessages()); - assertEquals(Integer.parseInt(visibilityTimeout), requestCaptor.getValue().visibilityTimeout()); - assertEquals(NumericUtils.saturatedCast(config.longPollWaitTimeout().getSeconds()), - requestCaptor.getValue().waitTimeSeconds()); - } else { - // Verify that receiveMessage was called exactly once - verify(sqsClient, times(1)).receiveMessage(requestCaptor.capture()); + ArgumentCaptor requestCaptor = forClass(ReceiveMessageRequest.class); - // Assertions to verify the behavior when batch manager is not used - assertEquals(request.maxNumberOfMessages(), requestCaptor.getValue().maxNumberOfMessages()); - assertEquals(request.visibilityTimeout(), requestCaptor.getValue().visibilityTimeout()); - assertNotEquals(NumericUtils.saturatedCast(config.longPollWaitTimeout().getSeconds()), - requestCaptor.getValue().waitTimeSeconds()); - assertNotEquals(config.maxBatchItems(), - requestCaptor.getValue().maxNumberOfMessages()); + if (useBatchManager) { + verifyBatchManagerUsed(requestCaptor); + } else { + verifyBatchManagerNotUsed(request, requestCaptor, logCaptor, inEligibleReason); + } } } + private void setupBatchManager(ResponseBatchConfiguration overrideConfig) { + receiveMessageBatchManager = new ReceiveMessageBatchManager(sqsClient, EXECUTOR, overrideConfig); + } + + private void verifyBatchManagerUsed(ArgumentCaptor requestCaptor) { + verify(sqsClient, atLeast(1)).receiveMessage(requestCaptor.capture()); + assertEquals(ResponseBatchConfiguration.MAX_DONE_RECEIVE_BATCHES_DEFAULT, + requestCaptor.getValue().maxNumberOfMessages()); + } + + private void verifyBatchManagerNotUsed(ReceiveMessageRequest request, + ArgumentCaptor requestCaptor, + LogCaptor logCaptor, + String inEligibleReason) { + verify(sqsClient, times(1)).receiveMessage(requestCaptor.capture()); + assertEquals(request.maxNumberOfMessages(), requestCaptor.getValue().maxNumberOfMessages()); + assertEquals(request.visibilityTimeout(), requestCaptor.getValue().visibilityTimeout()); + assertThat(logCaptor.loggedEvents()) + .anySatisfy(logEvent -> assertThat(logEvent.getMessage().getFormattedMessage()) + .contains(inEligibleReason)); + } + + private void mockQueueAttributes(String receiveMessageWaitTimeSeconds, String visibilityTimeout) { + Map attributes = new HashMap<>(); + attributes.put(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, receiveMessageWaitTimeSeconds); + attributes.put(QueueAttributeName.VISIBILITY_TIMEOUT, visibilityTimeout); + + GetQueueAttributesResponse response = GetQueueAttributesResponse.builder() + .attributes(attributes) + .build(); + when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))) + .thenReturn(CompletableFuture.completedFuture(response)); + } private static Stream provideBatchOverrideConfigurations() { return Stream.of( Arguments.of( - "Buffering enabled, compatible system and message attributes, and no visibility timeout", - BatchOverrideConfiguration.builder() - .maxInflightReceiveBatches(10) - .maxDoneReceiveBatches(5) + "Buffering enabled, compatible system and message attributes, no visibility timeout", + ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.singletonList("attr1")) - .messageSystemAttributeName(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) .build(), ReceiveMessageRequest.builder() .queueUrl("testQueueUrl") .messageAttributeNames(Collections.singletonList("attr1")) .messageSystemAttributeNames(MessageSystemAttributeName.SENDER_ID) .build(), - true + true, + "" ), Arguments.of( - "Buffering disabled, compatible attributes, and no visibility timeout", - BatchOverrideConfiguration.builder() - .maxInflightReceiveBatches(0) // Buffering disabled - .maxDoneReceiveBatches(0) // Buffering disabled + "Buffering enabled, compatible attributes, no visibility timeout but deprecated attributeNames", + ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.singletonList("attr1")) - .messageSystemAttributeName(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) .build(), ReceiveMessageRequest.builder() .queueUrl("testQueueUrl") .messageAttributeNames(Collections.singletonList("attr1")) .messageSystemAttributeNamesWithStrings(Collections.singletonList("SenderId")) + .attributeNames(QueueAttributeName.ALL) // Deprecated api not supported for Batching .build(), - false + false, + "Incompatible attributes." ), Arguments.of( - "Buffering disabled, incompatible system attributes, and no visibility timeout", - BatchOverrideConfiguration.builder() - .maxInflightReceiveBatches(10) - .maxDoneReceiveBatches(5) + "Buffering disabled, incompatible system attributes, no visibility timeout", + ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.singletonList("attr1")) - .messageSystemAttributeName(Collections.singletonList(MessageSystemAttributeName.SENT_TIMESTAMP)) + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENT_TIMESTAMP)) .build(), ReceiveMessageRequest.builder() .queueUrl("testQueueUrl") .messageAttributeNames(Collections.singletonList("attr1")) - .messageSystemAttributeNamesWithStrings(Collections.singletonList("SenderId")) // - // Incompatible system attribute + .messageSystemAttributeNamesWithStrings(Collections.singletonList("SenderId")) .build(), - false + false, + "Incompatible attributes." ), Arguments.of( - "Buffering disabled, compatible attributes, but visibility timeout is set", - BatchOverrideConfiguration.builder() - .maxInflightReceiveBatches(10) - .maxDoneReceiveBatches(5) + "Buffering disabled, compatible attributes, visibility timeout is set", + ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.singletonList("attr1")) - .messageSystemAttributeName(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) .build(), ReceiveMessageRequest.builder() .queueUrl("testQueueUrl") .messageAttributeNames(Collections.singletonList("attr1")) .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) - .visibilityTimeout(30) // Visibility timeout is set + .visibilityTimeout(30) .build(), - false + false, + "Visibility timeout is set." ), Arguments.of( - "Buffering disabled, compatible attributes, no visibility timeout, but request has attribute names", - BatchOverrideConfiguration.builder() - .maxInflightReceiveBatches(10) - .maxDoneReceiveBatches(5) + "Buffering disabled, compatible attributes, no visibility timeout but has attribute names", + ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.singletonList("attr1")) - .messageSystemAttributeName(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) .build(), ReceiveMessageRequest.builder() .queueUrl("testQueueUrl") .messageAttributeNames(Collections.singletonList("attr1")) .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) - .attributeNamesWithStrings("All") // Request has attribute names + .attributeNamesWithStrings("All") + .build(), + false, + "Incompatible attributes." + ), + Arguments.of( + "Buffering enabled, simple ReceiveMessageRequest, no visibility timeout", + ResponseBatchConfiguration.builder() + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) + .build(), + ReceiveMessageRequest.builder() + .queueUrl("testQueueUrl") + .maxNumberOfMessages(3) + .build(), + true, + "" + ), + Arguments.of( + "Buffering disabled, request has override config", + ResponseBatchConfiguration.builder() + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) + .build(), + ReceiveMessageRequest.builder() + .queueUrl("testQueueUrl") + .maxNumberOfMessages(3) + .overrideConfiguration(o -> o.apiCallTimeout(Duration.ofSeconds(2))) .build(), - false + false, + "Request has override configurations." ), Arguments.of( - "Buffering enabled, with messageSystemAttributeName in Config and simple ReceiveMessageRequest", - BatchOverrideConfiguration.builder() - .messageSystemAttributeName(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) + "Buffering disabled, with waitTimeSeconds in ReceiveMessageRequest", + ResponseBatchConfiguration.builder() + .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID)) .build(), ReceiveMessageRequest.builder() .queueUrl("testQueueUrl") .maxNumberOfMessages(3) + .waitTimeSeconds(3) .build(), - true + false, + "Request has long polling enabled." ) ); } - - - - private void mockGetQueueAttributesResponse(String receiveMessageWaitTimeSeconds, String visibilityTimeout) { - Map attributes = new HashMap<>(); - attributes.put(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, receiveMessageWaitTimeSeconds); - attributes.put(QueueAttributeName.VISIBILITY_TIMEOUT, visibilityTimeout); - - GetQueueAttributesResponse response = GetQueueAttributesResponse.builder() - .attributes(attributes) - .build(); - - when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))) - .thenReturn(CompletableFuture.completedFuture(response)); - } - - } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveQueueBufferTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveQueueBufferTest.java index 6219997d6af9..10aa9db30a99 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveQueueBufferTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveQueueBufferTest.java @@ -16,13 +16,18 @@ package software.amazon.awssdk.services.sqs.batchmanager; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -import java.time.Duration; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,22 +36,19 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; - import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.logging.log4j.Level; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; - import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.internal.batchmanager.ReceiveQueueBuffer; import software.amazon.awssdk.services.sqs.internal.batchmanager.QueueAttributesManager; +import software.amazon.awssdk.services.sqs.internal.batchmanager.ReceiveQueueBuffer; import software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse; @@ -61,7 +63,7 @@ @ExtendWith(MockitoExtension.class) -public class ReceiveQueueBufferTest { +class ReceiveQueueBufferTest { @Mock private SqsAsyncClient sqsClient; @@ -83,7 +85,7 @@ public void setUp() { when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))) .thenReturn(CompletableFuture.completedFuture(response)); - queueAttributesManager = new QueueAttributesManager(sqsClient,"queueUrl"); + queueAttributesManager = new QueueAttributesManager(sqsClient, "queueUrl"); } @AfterEach @@ -95,10 +97,10 @@ public void clear() { } @Test - public void testReceiveMessageSuccessful() throws Exception { + void testReceiveMessageSuccessful() throws Exception { // Mock response ReceiveMessageResponse response = generateMessageResponse(10); - ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(batchConfig().build()); + ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(ResponseBatchConfiguration.builder().build()); when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(CompletableFuture.completedFuture(response)); // Create future and call receiveMessage @@ -111,21 +113,21 @@ public void testReceiveMessageSuccessful() throws Exception { assertEquals(10, receiveMessageResponse.messages().size()); } - private ReceiveQueueBuffer receiveQueueBuffer(BatchOverrideConfiguration configuration) { + private ReceiveQueueBuffer receiveQueueBuffer(ResponseBatchConfiguration configuration) { return ReceiveQueueBuffer.builder() .executor(executor) .sqsClient(sqsClient) - .config(new ResponseBatchConfiguration(configuration)) + .config(configuration) .queueUrl("queueUrl") .queueAttributesManager(queueAttributesManager) .build(); } @Test - public void testReceiveMessageSuccessful_customRequestSize() throws Exception { + void testReceiveMessageSuccessful_customRequestSize() throws Exception { // Mock response ReceiveMessageResponse response = generateMessageResponse(10); - ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(BatchOverrideConfiguration.builder().build()); + ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(ResponseBatchConfiguration.builder().build()); when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(CompletableFuture.completedFuture(response)); // Create future and call receiveMessage @@ -139,8 +141,8 @@ public void testReceiveMessageSuccessful_customRequestSize() throws Exception { } @Test - public void multipleReceiveMessagesWithDifferentBatchSizes() throws Exception { - ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(batchConfig().build()); + void multipleReceiveMessagesWithDifferentBatchSizes() throws Exception { + ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(ResponseBatchConfiguration.builder().build()); // Mock response CompletableFuture delayedResponse = new CompletableFuture<>(); executor.schedule(() -> delayedResponse.complete(generateMessageResponse(5)), 500, TimeUnit.MILLISECONDS); @@ -168,13 +170,13 @@ public void multipleReceiveMessagesWithDifferentBatchSizes() throws Exception { @Test - public void numberOfBatchesSpawned() throws Exception { + void numberOfBatchesSpawned() throws Exception { // Mock response ReceiveMessageResponse response = generateMessageResponse(10); when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(CompletableFuture.completedFuture(response)); // Create futures and call receiveMessage - ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(batchConfig().build()); + ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(ResponseBatchConfiguration.builder().build()); CompletableFuture future1 = new CompletableFuture<>(); CompletableFuture future2 = new CompletableFuture<>(); receiveQueueBuffer.receiveMessage(future1, 10); @@ -191,16 +193,15 @@ public void numberOfBatchesSpawned() throws Exception { } @Test - public void testReceiveMessageWithAdaptivePrefetchingTrue() throws Exception { + void testReceiveMessageWithAdaptivePrefetchingOfReceieveMessageApiCalls() throws Exception { // Mock response - int MAX_BATCH_ITEMS = 10; - ReceiveMessageResponse response = generateMessageResponse(MAX_BATCH_ITEMS); + ReceiveMessageResponse response = generateMessageResponse(10); when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) .thenReturn(CompletableFuture.completedFuture(response)); // Create receiveQueueBuffer with adaptive prefetching ReceiveQueueBuffer receiveQueueBuffer = - receiveQueueBuffer(BatchOverrideConfiguration.builder().adaptivePrefetching(true).build()); + receiveQueueBuffer(ResponseBatchConfiguration.builder().build()); // Create and send multiple futures using a loop @@ -224,10 +225,9 @@ public void testReceiveMessageWithAdaptivePrefetchingTrue() throws Exception { @Test - public void testReceiveMessageWithAdaptivePrefetchingTrueForSingleCall() throws Exception { - // Mock response - int MAX_BATCH_ITEMS = 10; - ReceiveMessageResponse response = generateMessageResponse(MAX_BATCH_ITEMS); + void testReceiveMessageWithAdaptivePrefetchingForASingleCall() throws Exception { + + ReceiveMessageResponse response = generateMessageResponse(10); when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) .thenReturn(CompletableFuture.completedFuture(response)); @@ -235,9 +235,7 @@ public void testReceiveMessageWithAdaptivePrefetchingTrueForSingleCall() throws ReceiveQueueBuffer receiveQueueBuffer = ReceiveQueueBuffer.builder() .executor(executor) .sqsClient(sqsClient) - .config(new ResponseBatchConfiguration(BatchOverrideConfiguration.builder() - .adaptivePrefetching(true) - .build())) + .config(ResponseBatchConfiguration.builder().build()) .queueUrl("queueUrl") .queueAttributesManager(queueAttributesManager) .build(); @@ -246,116 +244,15 @@ public void testReceiveMessageWithAdaptivePrefetchingTrueForSingleCall() throws receiveQueueBuffer.receiveMessage(future, 10); ReceiveMessageResponse receiveMessageResponse = future.get(1, TimeUnit.SECONDS); - System.out.println(receiveMessageResponse); - assertThat(receiveMessageResponse.messages().size()).isEqualTo(10); + assertThat(receiveMessageResponse.messages()).hasSize(10); Thread.sleep(500); verify(sqsClient, times(1)).receiveMessage(any(ReceiveMessageRequest.class)); } @Test - public void testReceiveMessageWithAdaptivePrefetchingFalseForSingleCall() throws Exception { - // Mock response - int MAX_BATCH_ITEMS = 10; - ReceiveMessageResponse response = generateMessageResponse(MAX_BATCH_ITEMS); - when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) - .thenReturn(CompletableFuture.completedFuture(response)); - - // Create receiveQueueBuffer with adaptive prefetching - ReceiveQueueBuffer receiveQueueBuffer = ReceiveQueueBuffer.builder() - .executor(executor) - .sqsClient(sqsClient) - .config(new ResponseBatchConfiguration( - batchConfig().build())) - .queueUrl("queueUrl") - .queueAttributesManager(queueAttributesManager) - .build(); - - - CompletableFuture future = new CompletableFuture<>(); - receiveQueueBuffer.receiveMessage(future, 10); - - ReceiveMessageResponse receiveMessageResponse = future.get(1, TimeUnit.SECONDS); - System.out.println(receiveMessageResponse); - assertThat(receiveMessageResponse.messages().size()).isEqualTo(10); - Thread.sleep(1000); - - verify(sqsClient, times(11)).receiveMessage(any(ReceiveMessageRequest.class)); - } - - @Test - public void testReceiveMessageWithAdaptivePrefetchingFalse() throws Exception { - // Mock response - int MAX_BATCH_ITEMS = 10; - ReceiveMessageResponse response = generateMessageResponse(MAX_BATCH_ITEMS); - when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(CompletableFuture.completedFuture(response)); - - ReceiveQueueBuffer receiveQueueBuffer = ReceiveQueueBuffer.builder() - .executor(executor) - .sqsClient(sqsClient) - .config(new ResponseBatchConfiguration(batchConfig().build())) - .queueUrl("queueUrl") - .queueAttributesManager(queueAttributesManager) - .build(); - - List> futures = new ArrayList<>(); - for (int i = 0; i < 30; i++) { - CompletableFuture future = new CompletableFuture<>(); - futures.add(future); - receiveQueueBuffer.receiveMessage(future, 1); - Thread.sleep(10); - } - - // Join all futures to ensure they complete - for (CompletableFuture future : futures) { - future.get(2, TimeUnit.SECONDS); - } - - verify(sqsClient, times(13)).receiveMessage(any(ReceiveMessageRequest.class)); - } - - private static BatchOverrideConfiguration.Builder batchConfig() { - return BatchOverrideConfiguration.builder(); - } - - @Test - public void testReceiveMessageWithAdaptivePrefetchingFalse_followsMaxDoneRecieveBatches() throws Exception { - // Mock response - int MAX_BATCH_ITEMS = 10; - ReceiveMessageResponse response = generateMessageResponse(MAX_BATCH_ITEMS); - when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(CompletableFuture.completedFuture(response)); - - // Create receiveQueueBuffer with adaptive prefetching - int MAX_DONE_RECEIVE_BATCH = 1; - - ReceiveQueueBuffer receiveQueueBuffer = ReceiveQueueBuffer.builder() - .executor(executor) - .sqsClient(sqsClient) - .config(new ResponseBatchConfiguration(batchConfig().maxDoneReceiveBatches(MAX_BATCH_ITEMS).build())) - .queueUrl("queueUrl") - .queueAttributesManager(queueAttributesManager) - .build(); - - // Create and send multiple futures using a loop - List> futures = new ArrayList<>(); - for (int i = 0; i < 30; i++) { - CompletableFuture future = new CompletableFuture<>(); - futures.add(future); - Thread.sleep(10); - receiveQueueBuffer.receiveMessage(future, 1); - } - - // Join all futures to ensure they complete - for (CompletableFuture future : futures) { - future.get(2, TimeUnit.SECONDS); - } - - verify(sqsClient, times(13)).receiveMessage(any(ReceiveMessageRequest.class)); - } - - @Test - public void receiveMessageShutDown() throws Exception { - ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(BatchOverrideConfiguration.builder().build()); + void receiveMessageShutDown() { + ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(ResponseBatchConfiguration.builder().build()); // Create future and call receiveMessage CompletableFuture future = new CompletableFuture<>(); @@ -369,8 +266,8 @@ public void receiveMessageShutDown() throws Exception { } @Test - public void testConcurrentExecutionWithResponses() throws Exception { - ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(BatchOverrideConfiguration.builder().build()); + void testConcurrentExecutionWithResponses() throws Exception { + ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(ResponseBatchConfiguration.builder().build()); // Mock response ReceiveMessageResponse response = generateMessageResponse(4); @@ -400,9 +297,10 @@ public void testConcurrentExecutionWithResponses() throws Exception { // Since each mocked response has 4 messages, we expect 10 * 4 = 40 messages for 10 futures assertEquals(40, totalMessages); } + @Test - public void receiveMessageErrorHandlingForSimpleError() throws Exception { - ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(BatchOverrideConfiguration.builder().build()); + void receiveMessageErrorHandlingForSimpleError() throws Exception { + ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(ResponseBatchConfiguration.builder().build()); // Mock response with exception CompletableFuture futureResponse = new CompletableFuture<>(); @@ -427,8 +325,8 @@ public void receiveMessageErrorHandlingForSimpleError() throws Exception { } @Test - public void receiveMessageErrorHandlingWhenErrorFollowSuccess() throws Exception { - ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(BatchOverrideConfiguration.builder().build()); + void receiveMessageErrorHandlingWhenErrorFollowSuccess() throws Exception { + ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(ResponseBatchConfiguration.builder().build()); // Mock responses CompletableFuture errorResponse = new CompletableFuture<>(); @@ -481,46 +379,46 @@ public void receiveMessageErrorHandlingWhenErrorFollowSuccess() throws Exception } -@Test -public void testShutdownExceptionallyCompletesAllIncompleteFutures() throws Exception { - // Initialize ReceiveQueueBuffer with required configuration - ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(BatchOverrideConfiguration.builder().build()); + @Test + void testShutdownExceptionallyCompletesAllIncompleteFutures() throws Exception { + // Initialize ReceiveQueueBuffer with required configuration + ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(ResponseBatchConfiguration.builder().build()); - // Mock SQS response and visibility timeout configuration - when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) - .thenReturn(CompletableFuture.completedFuture(generateMessageResponse(10))) - .thenReturn(new CompletableFuture<>()); // Incomplete future for later use + // Mock SQS response and visibility timeout configuration + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(CompletableFuture.completedFuture(generateMessageResponse(10))) + .thenReturn(new CompletableFuture<>()); // Incomplete future for later use - // Create and complete a successful future - CompletableFuture successFuture = new CompletableFuture<>(); - receiveQueueBuffer.receiveMessage(successFuture, 10); - successFuture.get(3, TimeUnit.SECONDS); + // Create and complete a successful future + CompletableFuture successFuture = new CompletableFuture<>(); + receiveQueueBuffer.receiveMessage(successFuture, 10); + successFuture.get(3, TimeUnit.SECONDS); - // Create multiple futures - List> futures = new ArrayList<>(); - for (int i = 0; i < 5; i++) { - CompletableFuture future = new CompletableFuture<>(); - futures.add(future); - receiveQueueBuffer.receiveMessage(future, 10); - } + // Create multiple futures + List> futures = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + CompletableFuture future = new CompletableFuture<>(); + futures.add(future); + receiveQueueBuffer.receiveMessage(future, 10); + } - // Shutdown the queue buffer and assert no exceptions are thrown - assertDoesNotThrow(() -> receiveQueueBuffer.close()); + // Shutdown the queue buffer and assert no exceptions are thrown + assertDoesNotThrow(() -> receiveQueueBuffer.close()); - // Verify that each future completes exceptionally with CancellationException - for (CompletableFuture future : futures) { - CancellationException thrown = assertThrows(CancellationException.class, () -> { - future.get(1, TimeUnit.SECONDS); - }); - assertEquals("Shutdown in progress", thrown.getMessage()); + // Verify that each future completes exceptionally with CancellationException + for (CompletableFuture future : futures) { + CancellationException thrown = assertThrows(CancellationException.class, () -> { + future.get(1, TimeUnit.SECONDS); + }); + assertEquals("Shutdown in progress", thrown.getMessage()); + } } -} @Test - public void visibilityTimeOutErrorsAreLogged() throws Exception { + void visibilityTimeOutErrorsAreLogged() throws Exception { // Initialize ReceiveQueueBuffer with required configuration - ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(BatchOverrideConfiguration.builder().build()); + ReceiveQueueBuffer receiveQueueBuffer = receiveQueueBuffer(ResponseBatchConfiguration.builder().build()); // Mock SQS response and visibility timeout configuration when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) @@ -547,18 +445,20 @@ public void visibilityTimeOutErrorsAreLogged() throws Exception { assertThat(logCaptor.loggedEvents()).anySatisfy(logEvent -> { assertThat(logEvent.getLevel()).isEqualTo(Level.WARN); assertThat(logEvent.getMessage().getFormattedMessage()) - .contains("Failed to reset the visibility timeout of unprocessed messages for queueUrl: queueUrl. As a result," - + " these unprocessed messages will remain invisible in the queue for the duration of the visibility" + .contains("Failed to reset the visibility timeout of unprocessed messages for queueUrl: queueUrl. As a " + + "result," + + " these unprocessed messages will remain invisible in the queue for the duration of the " + + "visibility" + " timeout (PT30S)"); }); } } - private ReceiveMessageResponse generateMessageResponse(int count) { - List messages = IntStream.range(0, count) - .mapToObj(i -> Message.builder().body("Message " + i).receiptHandle("handle" + i).build()) - .collect(Collectors.toList()); - return ReceiveMessageResponse.builder().messages(messages).build(); - } + private ReceiveMessageResponse generateMessageResponse(int count) { + List messages = IntStream.range(0, count) + .mapToObj(i -> Message.builder().body("Message " + i).receiptHandle("handle" + i).build()) + .collect(Collectors.toList()); + return ReceiveMessageResponse.builder().messages(messages).build(); + } } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java index 93a460832d77..7332c09c4cc2 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java @@ -15,12 +15,10 @@ package software.amazon.awssdk.services.sqs.batchmanager; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -52,13 +50,14 @@ import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse; import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; @ExtendWith(MockitoExtension.class) class ReceiveSqsMessageHelperTest { - private final String queueUrl = "test-queue-url"; + private static final String QUEUE_URL = "test-queue-url"; private final Duration visibilityTimeout = Duration.ofSeconds(30); @Mock private ScheduledExecutorService scheduledExecutorService; @@ -69,16 +68,13 @@ class ReceiveSqsMessageHelperTest { @BeforeEach void setUp() { - BatchOverrideConfiguration batchOverrideConfig = BatchOverrideConfiguration.builder() - .maxBatchItems(10) - .receiveMessageAttributeNames(Arrays.asList( - "attribute1", "attribute2")) - .visibilityTimeout(Duration.ofSeconds(20)) - .longPollWaitTimeout(Duration.ofSeconds(15)) - .build(); - config = new ResponseBatchConfiguration(batchOverrideConfig); - receiveSqsMessageHelper = new ReceiveSqsMessageHelper(queueUrl, sqsClient, visibilityTimeout, config); + config = ResponseBatchConfiguration.builder() + .receiveMessageAttributeNames(Arrays.asList( + "attribute1", "attribute2")) + .visibilityTimeout(Duration.ofSeconds(20)) + .build(); + receiveSqsMessageHelper = new ReceiveSqsMessageHelper(QUEUE_URL, sqsClient, visibilityTimeout, config); } @AfterEach @@ -103,7 +99,6 @@ void asyncReceiveMessageSuccess() throws Exception { assertEquals(1, result.get().messagesSize()); } - @Test void multipleMessageGetsAdded() throws Exception { ReceiveMessageResponse response = generateMessageResponse(10); @@ -117,10 +112,8 @@ void multipleMessageGetsAdded() throws Exception { assertTrue(result.isDone()); assertNull(result.get().getException()); assertFalse(result.get().isEmpty()); - } - @Test void asyncReceiveMessageFailure() throws Exception { // Mocking receiveMessage to throw an exception @@ -154,7 +147,7 @@ void emptyResponseReceivedFromSQS() throws Exception { } @Test - public void concurrencyTestForRemoveMessage() throws Exception { + void concurrencyTestForRemoveMessage() throws Exception { // Mocking receiveMessage to return 10 messages ReceiveMessageResponse response = generateMessageResponse(10); when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(CompletableFuture.completedFuture(response)); @@ -178,15 +171,12 @@ public void concurrencyTestForRemoveMessage() throws Exception { } })); } - // Start all threads threads.forEach(Thread::start); - // Wait for all threads to finish for (Thread thread : threads) { thread.join(); } - // Verify final state assertEquals(10, successfulRemovals.get()); assertEquals(0, receiveSqsMessageHelper.messagesSize()); @@ -237,12 +227,10 @@ void immediateMessageProcessingWithoutExpiry() throws Exception { ReceiveSqsMessageHelper receiveSqsMessageHelper1 = completableFuture.get(2, TimeUnit.SECONDS); Message message = receiveSqsMessageHelper1.removeMessage(); assertEquals(message, Message.builder().body("Message 1").build()); - - } @Test - public void expiredBatchesClearsItself() throws Exception { + void expiredBatchesClearsItself() throws Exception { // Test setup: creating a new instance of ReceiveSqsMessageHelper ReceiveSqsMessageHelper batch = new ReceiveSqsMessageHelper("queueUrl", sqsClient , Duration.ofNanos(1), config); @@ -262,19 +250,20 @@ public void expiredBatchesClearsItself() throws Exception { } @Test - public void asyncReceiveMessageArgs() throws Exception { + void asyncReceiveMessageArgs() throws Exception { + + ResponseBatchConfiguration batchOverrideConfig = ResponseBatchConfiguration.builder() - Duration visibilityTimeout = Duration.ofSeconds(9); - BatchOverrideConfiguration batchOverrideConfig = BatchOverrideConfiguration.builder() - .maxBatchItems(10) .receiveMessageAttributeNames(Arrays.asList( "custom1", "custom2")) - .visibilityTimeout(visibilityTimeout) - .longPollWaitTimeout(Duration.ofSeconds(15)) + .messageSystemAttributeNames(Arrays.asList( + MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT)) + .visibilityTimeout(Duration.ofSeconds(9)) + .messageMinWaitDuration(Duration.ofMillis(200)) .build(); ReceiveSqsMessageHelper batch = new ReceiveSqsMessageHelper( - queueUrl, sqsClient, visibilityTimeout, new ResponseBatchConfiguration(batchOverrideConfig)); + QUEUE_URL, sqsClient, Duration.ofSeconds(9), batchOverrideConfig); // Mocking receiveMessage to return a single message @@ -288,18 +277,16 @@ public void asyncReceiveMessageArgs() throws Exception { // Verify that receiveMessage was called with the correct arguments ReceiveMessageRequest expectedRequest = ReceiveMessageRequest.builder() - .queueUrl(queueUrl) + .queueUrl(QUEUE_URL) .maxNumberOfMessages(10) .messageAttributeNames("custom1", "custom2") + .messageSystemAttributeNames(Arrays.asList( + MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT)) .visibilityTimeout(9) - .waitTimeSeconds(15) .overrideConfiguration(o -> o.applyMutation(USER_AGENT_APPLIER)) .build(); verify(sqsClient, times(1)).receiveMessage(eq(expectedRequest)); } - - - } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java index 030273fcf7cd..c82984bbecd1 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java @@ -18,13 +18,17 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -69,7 +73,7 @@ void batchRequest_OnlyOneInBatch_successful() throws Exception { when(mockClient.sendBatchAsync(any(), eq(batchKey))).thenReturn(batchResponseFuture); SampleBatchManager batchManager = - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(1).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(1).build(), scheduledExecutor, mockClient); CompletableFuture response = batchManager.batchRequest(request); assertEquals("testResponse0", response.get(1, TimeUnit.SECONDS)); } @@ -83,7 +87,9 @@ void batchRequest_TwoBatchesMessagesSplitInTwoCalls_successful() throws Exceptio "testResponse")); when(mockClient.sendBatchAsync(any(), eq(batchKey1))).thenReturn(batchResponseFuture); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2) + .sendRequestFrequency(Duration.ofHours(1)).build(), + scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(request1); CompletableFuture response2 = batchManager.batchRequest(request2); assertEquals("testResponse0", response1.get(1, TimeUnit.SECONDS)); @@ -103,7 +109,7 @@ void batchRequest_TwoBatchesWithDifferentKey_successful() throws Exception { when(mockClient.sendBatchAsync(any(), eq(KEY_TWO))).thenReturn(batchResponseFutureTwo); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2).sendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(KEY_ONE + ":0"); CompletableFuture response2 = batchManager.batchRequest(KEY_TWO + ":0"); CompletableFuture response3 = batchManager.batchRequest(KEY_ONE + ":1"); @@ -157,7 +163,7 @@ void close_FlushesAllBatches() throws Exception { when(mockClient.sendBatchAsync(any(), eq(batchKey))).thenReturn(batchResponseFuture); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2).sendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(request1); CompletableFuture response2 = batchManager.batchRequest(request2); @@ -185,7 +191,7 @@ void batchRequest_ClosedWhenWaitingForResponse() throws Exception { when(mockClient.sendBatchAsync(any(), eq(batchKey))).thenReturn(batchResponseFuture); SampleBatchManager batchManager = - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(1).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(1).build(), scheduledExecutor, mockClient); CompletableFuture response = batchManager.batchRequest(request); batchManager.close(); @@ -196,27 +202,36 @@ void batchRequest_ClosedWhenWaitingForResponse() throws Exception { @Test void batchRequest_MoreThanBufferSize_Fails() throws Exception { - String KEY_ONE = "testRequestOne"; - String KEY_TWO = "testRequestTwo"; - - CompletableFuture batchResponseFutureOne = CompletableFuture.completedFuture(batchedResponse(2, KEY_ONE)); - CompletableFuture batchResponseFutureTwo = CompletableFuture.completedFuture(batchedResponse(2, KEY_TWO)); - - when(mockClient.sendBatchAsync(any(), eq(KEY_ONE))).thenReturn(batchResponseFutureOne); - when(mockClient.sendBatchAsync(any(), eq(KEY_TWO))).thenReturn(batchResponseFutureTwo); - - SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchKeys(1).maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); - CompletableFuture response1 = batchManager.batchRequest(KEY_ONE + ":0"); - CompletableFuture response2 = batchManager.batchRequest(KEY_TWO + ":0"); - CompletableFuture response3 = batchManager.batchRequest(KEY_ONE + ":1"); - CompletableFuture response4 = batchManager.batchRequest(KEY_TWO + ":1"); - - assertEquals("testRequestOne0", response1.get(1, TimeUnit.SECONDS)); - assertEquals("testRequestOne1", response3.get(1, TimeUnit.SECONDS)); + final int MAX_QUEUES_THRESHOLD = 10000; + + // Generate unique keys up to MAX_QUEUES_THRESHOLD + List keys = IntStream.range(0, MAX_QUEUES_THRESHOLD) + .mapToObj(i -> String.format("testRequest%d:%d", i, i)) + .collect(Collectors.toList()); + + // Create mock responses for all keys + keys.forEach(key -> + when(mockClient.sendBatchAsync(any(), eq(key))) + .thenReturn(CompletableFuture.completedFuture(batchedResponse(2, key))) + ); + + SampleBatchManager batchManager = new SampleBatchManager( + BatchOverrideConfiguration.builder() + .maxBatchSize(2) + .sendRequestFrequency(Duration.ofHours(1)) + .build(), + scheduledExecutor, + mockClient + ); + + List> responses = new ArrayList<>(); + for (String key : keys) { + responses.add(batchManager.batchRequest(key)); + } - ExecutionException exception = assertThrows(ExecutionException.class, () -> response2.get(1, TimeUnit.SECONDS)); - assertEquals("java.lang.IllegalStateException: Reached MaxBatchKeys of: 1", exception.getCause().toString()); + CompletableFuture extraResponse = batchManager.batchRequest(String.format("testRequest%d:%d", MAX_QUEUES_THRESHOLD, MAX_QUEUES_THRESHOLD)); + ExecutionException exception = assertThrows(ExecutionException.class, () -> extraResponse.get(1, TimeUnit.SECONDS)); + assertEquals(String.format("java.lang.IllegalStateException: Reached MaxBatchKeys of: %d", MAX_QUEUES_THRESHOLD), exception.getCause().toString()); } @AfterAll diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java index 4d3fff0cfaad..301e176aaa14 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java @@ -33,14 +33,7 @@ public class SampleBatchManager extends RequestBatchManager