From d2e59b9d54d74e678962dde46b20b485261802fd Mon Sep 17 00:00:00 2001 From: John Viegas Date: Thu, 5 Sep 2024 10:37:35 -0700 Subject: [PATCH] Update after internal poll --- .../BatchOverrideConfiguration.java | 116 +++++++++--------- .../batchmanager/ReceiveBatchManager.java | 4 +- .../ReceiveMessageBatchManager.java | 9 +- .../batchmanager/ReceiveQueueBuffer.java | 4 +- .../RequestBatchConfiguration.java | 17 +-- .../batchmanager/RequestBatchManager.java | 16 +-- .../ResponseBatchConfiguration.java | 29 +++-- .../BatchOverrideConfigurationTest.java | 23 ++-- .../batchmanager/ReceiveBatchManagerTest.java | 2 +- .../ReceiveMessageBatchManagerTest.java | 2 +- .../ReceiveSqsMessageHelperTest.java | 33 ++--- .../batchmanager/RequestBatchManagerTest.java | 8 +- .../sqs/batchmanager/SampleBatchManager.java | 5 +- 13 files changed, 130 insertions(+), 138 deletions(-) diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java index 16d9f0da80eb..9c5a5fb4053a 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java @@ -31,17 +31,17 @@ import software.amazon.awssdk.utils.builder.ToCopyableBuilder; /** - * Configuration values for the BatchManager Implementation. All values are optional, and default values will be used if they - * are not specified. + * Configuration values for the BatchManager implementation used for controlling batch operations. + * All values are optional, and default values will be used if they are not specified. */ @SdkPublicApi public final class BatchOverrideConfiguration implements ToCopyableBuilder { private final Integer maxBatchSize; - private final Duration sendRequestFrequency; + private final Duration sendMessageFrequency; private final Duration receiveMessageVisibilityTimeout; - private final Duration receiveMessageMinWaitTime; + private final Duration receiveMessageMinWaitDuration; private final List receiveMessageSystemAttributeNames; private final List receiveMessageAttributeNames; @@ -50,24 +50,21 @@ private BatchOverrideConfiguration(Builder builder) { this.maxBatchSize = Validate.isPositiveOrNull(builder.maxBatchSize, "maxBatchSize"); Validate.isTrue(this.maxBatchSize == null || this.maxBatchSize <= 10, - "A batch can contain up to 10 messages."); + "The maxBatchSize must be less than or equal to 10. A batch can contain up to 10 messages."); - this.sendRequestFrequency = Validate.isPositiveOrNull(builder.sendRequestFrequency, - "sendRequestFrequency"); + this.sendMessageFrequency = Validate.isPositiveOrNull(builder.sendMessageFrequency, + "sendMessageFrequency"); this.receiveMessageVisibilityTimeout = Validate.isPositiveOrNull(builder.receiveMessageVisibilityTimeout, "receiveMessageVisibilityTimeout"); - - this.receiveMessageMinWaitTime = Validate.isPositiveOrNull(builder.receiveMessageMinWaitTime, - "receiveMessageMinWaitTime"); - - this.receiveMessageSystemAttributeNames = builder.receiveMessageSystemAttributeNames != null ? - Collections.unmodifiableList( - new ArrayList<>(builder.receiveMessageSystemAttributeNames)) : - Collections.emptyList(); - - this.receiveMessageAttributeNames = builder.receiveMessageAttributeNames != null ? - Collections.unmodifiableList(new ArrayList<>(builder.receiveMessageAttributeNames)) : - Collections.emptyList(); + this.receiveMessageMinWaitDuration = Validate.isPositiveOrNull(builder.receiveMessageMinWaitDuration, + "receiveMessageMinWaitDuration"); + this.receiveMessageSystemAttributeNames = + builder.receiveMessageSystemAttributeNames == null ? Collections.emptyList() : + Collections.unmodifiableList(builder.receiveMessageSystemAttributeNames); + + this.receiveMessageAttributeNames = + builder.receiveMessageAttributeNames == null ? Collections.emptyList() : + Collections.unmodifiableList(builder.receiveMessageAttributeNames); } public static Builder builder() { @@ -75,47 +72,51 @@ public static Builder builder() { } /** - * @return the maximum number of items that are batched together in a single outbound request. - * A batch can contain up to a maximum of 10 messages. - * The default value is 10. + * @return the maximum number of items that can be batched together in a single outbound SQS request + * (e.g., for {@link SendMessageBatchRequest}, {@link ChangeMessageVisibilityBatchRequest}, or + * {@link DeleteMessageBatchRequest}). A batch can contain up to a maximum of 10 messages. + * The default value is 10. */ public Integer maxBatchSize() { return maxBatchSize; } /** - * @return the maximum amount of time that an outgoing call waits to be batched with messages of the same type. - * The default value is 200 milliseconds. + * @return the maximum duration an outgoing call waits for additional messages of the same type before being sent. + * If the {@link #maxBatchSize()} is reached before this duration, the batch will be sent immediately. + * The default value is 200 milliseconds. */ - public Duration sendRequestFrequency() { - return sendRequestFrequency; + public Duration sendMessageFrequency() { + return sendMessageFrequency; } /** - * @return the custom visibility timeout to use when retrieving messages from SQS. + * @return the custom visibility timeout to use when retrieving messages from SQS. If not set, + * the default visibility timeout configured on the SQS queue will be used. */ public Duration receiveMessageVisibilityTimeout() { return receiveMessageVisibilityTimeout; } /** - * @return the minimum wait time for incoming receive message requests. + * @return the minimum wait time for incoming receive message requests. Without a non-zero minimum wait time, + * threads can waste CPU resources busy-waiting for messages. The default value is 50 milliseconds. */ - public Duration receiveMessageMinWaitTime() { - return receiveMessageMinWaitTime; + public Duration receiveMessageMinWaitDuration() { + return receiveMessageMinWaitDuration; } /** - * @return the system attribute names specific to the {@link ReceiveMessageRequest} - * that will be requested via {@link ReceiveMessageRequest#messageSystemAttributeNames()}. + * @return the system attribute names to request for {@link ReceiveMessageRequest}. Requests with differing + * system attribute names will bypass the batch manager and make a direct call to SQS. */ public List receiveMessageSystemAttributeNames() { return receiveMessageSystemAttributeNames; } /** - * @return the message attribute names that are specific to receive calls - * and will be requested via {@link ReceiveMessageRequest#messageAttributeNames()}. + * @return the message attribute names to request for {@link ReceiveMessageRequest}. Requests with different + * message attribute names will bypass the batch manager and make a direct call to SQS. */ public List receiveMessageAttributeNames() { return receiveMessageAttributeNames; @@ -126,21 +127,20 @@ public List receiveMessageAttributeNames() { public Builder toBuilder() { return new Builder() .maxBatchSize(maxBatchSize) - .sendRequestFrequency(sendRequestFrequency) + .sendMessageFrequency(sendMessageFrequency) .receiveMessageVisibilityTimeout(receiveMessageVisibilityTimeout) - .receiveMessageMinWaitTime(receiveMessageMinWaitTime) + .receiveMessageMinWaitDuration(receiveMessageMinWaitDuration) .receiveMessageSystemAttributeNames(receiveMessageSystemAttributeNames) .receiveMessageAttributeNames(receiveMessageAttributeNames); } - @Override public String toString() { return ToString.builder("BatchOverrideConfiguration") .add("maxBatchSize", maxBatchSize) - .add("sendRequestFrequency", sendRequestFrequency) + .add("sendMessageFrequency", sendMessageFrequency) .add("receiveMessageVisibilityTimeout", receiveMessageVisibilityTimeout) - .add("receiveMessageMinWaitTime", receiveMessageMinWaitTime) + .add("receiveMessageMinWaitDuration", receiveMessageMinWaitDuration) .add("receiveMessageSystemAttributeNames", receiveMessageSystemAttributeNames) .add("receiveMessageAttributeNames", receiveMessageAttributeNames) .build(); @@ -160,20 +160,22 @@ public boolean equals(Object o) { if (maxBatchSize != null ? !maxBatchSize.equals(that.maxBatchSize) : that.maxBatchSize != null) { return false; } - if (sendRequestFrequency != null ? !sendRequestFrequency.equals(that.sendRequestFrequency) : - that.sendRequestFrequency != null) { + if (sendMessageFrequency != null ? !sendMessageFrequency.equals(that.sendMessageFrequency) : + that.sendMessageFrequency != null) { return false; } - if (receiveMessageVisibilityTimeout != null ? !receiveMessageVisibilityTimeout.equals(that.receiveMessageVisibilityTimeout) : + if (receiveMessageVisibilityTimeout != null + ? !receiveMessageVisibilityTimeout.equals(that.receiveMessageVisibilityTimeout) : that.receiveMessageVisibilityTimeout != null) { return false; } - if (receiveMessageMinWaitTime != null ? !receiveMessageMinWaitTime.equals(that.receiveMessageMinWaitTime) : - that.receiveMessageMinWaitTime != null) { + if (receiveMessageMinWaitDuration != null ? !receiveMessageMinWaitDuration.equals(that.receiveMessageMinWaitDuration) : + that.receiveMessageMinWaitDuration != null) { return false; } - if (receiveMessageSystemAttributeNames != null ? !receiveMessageSystemAttributeNames.equals(that.receiveMessageSystemAttributeNames) : - that.receiveMessageSystemAttributeNames != null) { + if (receiveMessageSystemAttributeNames != null ? + !receiveMessageSystemAttributeNames.equals(that.receiveMessageSystemAttributeNames) + : that.receiveMessageSystemAttributeNames != null) { return false; } return receiveMessageAttributeNames != null ? receiveMessageAttributeNames.equals(that.receiveMessageAttributeNames) : @@ -183,9 +185,9 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = maxBatchSize != null ? maxBatchSize.hashCode() : 0; - result = 31 * result + (sendRequestFrequency != null ? sendRequestFrequency.hashCode() : 0); + result = 31 * result + (sendMessageFrequency != null ? sendMessageFrequency.hashCode() : 0); result = 31 * result + (receiveMessageVisibilityTimeout != null ? receiveMessageVisibilityTimeout.hashCode() : 0); - result = 31 * result + (receiveMessageMinWaitTime != null ? receiveMessageMinWaitTime.hashCode() : 0); + result = 31 * result + (receiveMessageMinWaitDuration != null ? receiveMessageMinWaitDuration.hashCode() : 0); result = 31 * result + (receiveMessageSystemAttributeNames != null ? receiveMessageSystemAttributeNames.hashCode() : 0); result = 31 * result + (receiveMessageAttributeNames != null ? receiveMessageAttributeNames.hashCode() : 0); return result; @@ -194,9 +196,9 @@ public int hashCode() { public static final class Builder implements CopyableBuilder { private Integer maxBatchSize = 10; - private Duration sendRequestFrequency = Duration.ofMillis(200); + private Duration sendMessageFrequency ; private Duration receiveMessageVisibilityTimeout; - private Duration receiveMessageMinWaitTime = Duration.ofMillis(50); + private Duration receiveMessageMinWaitDuration ; private List receiveMessageSystemAttributeNames = Collections.emptyList(); private List receiveMessageAttributeNames = Collections.emptyList(); @@ -224,15 +226,15 @@ public Builder maxBatchSize(Integer maxBatchSize) { * requests before being sent. Outbound requests include SendMessageBatchRequest, * ChangeMessageVisibilityBatchRequest, and DeleteMessageBatchRequest. If the maxBatchSize is reached * before this duration, the batch will be sent immediately. - * Increasing the {@code sendRequestFrequency} gives more time for additional messages to be added to + * Increasing the {@code sendMessageFrequency} gives more time for additional messages to be added to * the batch, which can reduce the number of requests and increase throughput. However, a higher * frequency may also result in increased average message latency. The default value is 200 milliseconds. * - * @param sendRequestFrequency The new value for the frequency at which outbound requests are sent. + * @param sendMessageFrequency The new value for the frequency at which outbound requests are sent. * @return This Builder object for method chaining. */ - public Builder sendRequestFrequency(Duration sendRequestFrequency) { - this.sendRequestFrequency = sendRequestFrequency; + public Builder sendMessageFrequency(Duration sendMessageFrequency) { + this.sendMessageFrequency = sendMessageFrequency; return this; } @@ -257,11 +259,11 @@ public Builder receiveMessageVisibilityTimeout(Duration receiveMessageVisibility * The call may return sooner than the configured `WaitTimeSeconds` if there are messages in the buffer. * If no messages are available and the wait time expires, the call will return an empty message list. * - * @param receiveMessageMinWaitTime The new minimum wait time value. + * @param receiveMessageMinWaitDuration The new minimum wait time value. * @return This Builder object for method chaining. */ - public Builder receiveMessageMinWaitTime(Duration receiveMessageMinWaitTime) { - this.receiveMessageMinWaitTime = receiveMessageMinWaitTime; + public Builder receiveMessageMinWaitDuration(Duration receiveMessageMinWaitDuration) { + this.receiveMessageMinWaitDuration = receiveMessageMinWaitDuration; return this; } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java index 8d48d3bdb67f..e8fa5ca518c5 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java @@ -17,13 +17,11 @@ import static software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration.MAX_SUPPORTED_SQS_RECEIVE_MSG; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Function; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; @@ -60,7 +58,7 @@ public CompletableFuture processRequest(ReceiveMessageRe } int numMessages = rq.maxNumberOfMessages() != null ? rq.maxNumberOfMessages() : MAX_SUPPORTED_SQS_RECEIVE_MSG; - return queueAttributesManager.getReceiveMessageTimeout(rq, config.minReceiveWaitTime()).thenCompose(waitTimeMs -> { + return queueAttributesManager.getReceiveMessageTimeout(rq, config.messageMinWaitDuration()).thenCompose(waitTimeMs -> { CompletableFuture receiveMessageFuture = new CompletableFuture<>(); receiveQueueBuffer.receiveMessage(receiveMessageFuture, numMessages); CompletableFuture timeoutFuture = new CompletableFuture<>(); diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java index 0c4ad8a93b3d..3354c10c2d0e 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java @@ -42,14 +42,7 @@ public ReceiveMessageBatchManager(SqsAsyncClient sqsClient, BatchOverrideConfiguration config) { this.sqsClient = sqsClient; this.executor = executor; - this.config = config != null - ? ResponseBatchConfiguration.builder() - .minReceiveWaitTime(config.receiveMessageMinWaitTime()) - .receiveMessageAttributeNames(config.receiveMessageAttributeNames()) - .messageSystemAttributeNames(config.receiveMessageSystemAttributeNames()) - .visibilityTimeout(config.receiveMessageVisibilityTimeout()) - .build() - : ResponseBatchConfiguration.builder().build(); + this.config = ResponseBatchConfiguration.builder(config).build(); } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java index d2fb367e1d84..de28f6c79e09 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java @@ -15,6 +15,8 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration.MAX_SUPPORTED_SQS_RECEIVE_MSG; + import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -124,7 +126,7 @@ private int determineDesiredBatches() { int totalRequested = futures.stream() .mapToInt(FutureRequestWrapper::getRequestedSize) .sum(); - int batchesNeededToFulfillFutures = (int) Math.ceil((float) totalRequested / 10); + int batchesNeededToFulfillFutures = (int) Math.ceil((float) totalRequested / MAX_SUPPORTED_SQS_RECEIVE_MSG); desiredBatches = Math.min(batchesNeededToFulfillFutures, desiredBatches); } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java index a001c9be5ecc..98f0e1c145b8 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java @@ -31,7 +31,7 @@ public final class RequestBatchConfiguration { private final Integer maxBatchItems; private final Integer maxBatchKeys; private final Integer maxBufferSize; - private final Duration maxOutboundBatchCollectionDuration; + private final Duration sendMessageFrequency; private final Integer maxBatchBytesSize; private RequestBatchConfiguration(Builder builder) { @@ -39,7 +39,8 @@ private RequestBatchConfiguration(Builder builder) { this.maxBatchItems = builder.maxBatchItems != null ? builder.maxBatchItems : DEFAULT_MAX_BATCH_ITEMS; this.maxBatchKeys = builder.maxBatchKeys != null ? builder.maxBatchKeys : DEFAULT_MAX_BATCH_KEYS; this.maxBufferSize = builder.maxBufferSize != null ? builder.maxBufferSize : DEFAULT_MAX_BUFFER_SIZE; - this.maxOutboundBatchCollectionDuration = builder.maxOutboundBatchCollectionDuration != null ? builder.maxOutboundBatchCollectionDuration : + this.sendMessageFrequency = builder.sendMessageFrequency != null ? + builder.sendMessageFrequency : DEFAULT_MAX_BATCH_OPEN_IN_MS; this.maxBatchBytesSize = builder.maxBatchBytesSize != null ? builder.maxBatchBytesSize : DEFAULT_MAX_BATCH_BYTES_SIZE; @@ -53,14 +54,14 @@ public static Builder builder(BatchOverrideConfiguration configuration) { if (configuration != null) { return new Builder() .maxBatchItems(configuration.maxBatchSize()) - .maxOutboundBatchCollectionDuration(configuration.sendRequestFrequency()) + .sendMessageFrequency(configuration.sendMessageFrequency()) .maxBatchBytesSize(configuration.maxBatchSize()); } return new Builder(); } - public Duration maxOutboundBatchCollectionDuration() { - return maxOutboundBatchCollectionDuration; + public Duration sendMessageFrequency() { + return sendMessageFrequency; } public int maxBatchItems() { @@ -84,7 +85,7 @@ public static final class Builder { private Integer maxBatchItems; private Integer maxBatchKeys; private Integer maxBufferSize; - private Duration maxOutboundBatchCollectionDuration; + private Duration sendMessageFrequency; private Integer maxBatchBytesSize; private Builder() { @@ -105,8 +106,8 @@ public Builder maxBufferSize(Integer maxBufferSize) { return this; } - public Builder maxOutboundBatchCollectionDuration(Duration maxOutboundBatchCollectionDuration) { - this.maxOutboundBatchCollectionDuration = maxOutboundBatchCollectionDuration; + public Builder sendMessageFrequency(Duration sendMessageFrequency) { + this.sendMessageFrequency = sendMessageFrequency; return this; } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java index 6c50d78e4aac..1c8c01b26532 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java @@ -45,10 +45,9 @@ public abstract class RequestBatchManager { protected final RequestBatchConfiguration batchConfiguration ; private final int maxBatchItems; - private final Duration maxOutboundBatchCollectionDuration; + private final Duration sendMessageFrequency; private final BatchingMap requestsAndResponsesMaps; private final ScheduledExecutorService scheduledExecutor; - private final Set> pendingBatchResponses ; private final Set> pendingResponses ; @@ -57,7 +56,7 @@ protected RequestBatchManager(RequestBatchConfiguration overrideConfiguration, ScheduledExecutorService scheduledExecutor) { batchConfiguration = overrideConfiguration; this.maxBatchItems = batchConfiguration.maxBatchItems(); - this.maxOutboundBatchCollectionDuration = batchConfiguration.maxOutboundBatchCollectionDuration(); + this.sendMessageFrequency = batchConfiguration.sendMessageFrequency(); this.scheduledExecutor = Validate.notNull(scheduledExecutor, "Null scheduledExecutor"); pendingBatchResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); pendingResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -80,7 +79,9 @@ public CompletableFuture batchRequest(RequestT request) { // Add request and response to the map, scheduling a flush if necessary requestsAndResponsesMaps.put(batchKey, - () -> scheduleBufferFlush(batchKey, maxOutboundBatchCollectionDuration.toMillis(), scheduledExecutor), + () -> scheduleBufferFlush(batchKey, + sendMessageFrequency.toMillis(), + scheduledExecutor), request, response); @@ -108,8 +109,10 @@ private void manualFlushBuffer(String batchKey, Map> flushableRequests) { requestsAndResponsesMaps.cancelScheduledFlush(batchKey); flushBuffer(batchKey, flushableRequests); - requestsAndResponsesMaps.putScheduledFlush(batchKey, scheduleBufferFlush(batchKey, maxOutboundBatchCollectionDuration.toMillis(), - scheduledExecutor)); + requestsAndResponsesMaps.putScheduledFlush(batchKey, + scheduleBufferFlush(batchKey, + sendMessageFrequency.toMillis(), + scheduledExecutor)); } private void flushBuffer(String batchKey, Map> flushableRequests) { @@ -173,5 +176,4 @@ public void close() { requestsAndResponsesMaps.clear(); } - } \ No newline at end of file diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java index 48b298ed987e..0de53beacb80 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.List; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; @SdkInternalApi @@ -46,7 +47,7 @@ public final class ResponseBatchConfiguration { public static final int ATTRIBUTE_MAPS_PAYLOAD_BYTES = 16 * 1024; // 16 KiB private final Duration visibilityTimeout; - private final Duration minReceiveWaitTime; + private final Duration messageMinWaitDuration; private final List messageSystemAttributeNames; private final List receiveMessageAttributeNames; private final Boolean adaptivePrefetching; @@ -59,8 +60,8 @@ private ResponseBatchConfiguration(Builder builder) { ? builder.visibilityTimeout : VISIBILITY_TIMEOUT_SECONDS_DEFAULT; - this.minReceiveWaitTime = builder.minReceiveWaitTime != null - ? builder.minReceiveWaitTime + this.messageMinWaitDuration = builder.messageMinWaitDuration != null + ? builder.messageMinWaitDuration : MIN_RECEIVE_WAIT_TIME_MS_DEFAULT; this.messageSystemAttributeNames = builder.messageSystemAttributeNames != null @@ -93,8 +94,8 @@ public Duration visibilityTimeout() { return visibilityTimeout; } - public Duration minReceiveWaitTime() { - return minReceiveWaitTime; + public Duration messageMinWaitDuration() { + return messageMinWaitDuration; } public List messageSystemAttributeNames() { @@ -121,14 +122,24 @@ public int maxDoneReceiveBatches() { return maxDoneReceiveBatches; } + public static Builder builder(BatchOverrideConfiguration overrideConfiguration) { + Builder builder = new Builder(); + if (overrideConfiguration != null) { + builder.messageMinWaitDuration(overrideConfiguration.receiveMessageMinWaitDuration()) + .receiveMessageAttributeNames(overrideConfiguration.receiveMessageAttributeNames()) + .messageSystemAttributeNames(overrideConfiguration.receiveMessageSystemAttributeNames()) + .visibilityTimeout(overrideConfiguration.receiveMessageVisibilityTimeout()); + } + return builder; + } + public static Builder builder() { return new Builder(); } - public static class Builder { private Duration visibilityTimeout; - private Duration minReceiveWaitTime; + private Duration messageMinWaitDuration; private List messageSystemAttributeNames; private List receiveMessageAttributeNames; private Boolean adaptivePrefetching; @@ -141,8 +152,8 @@ public Builder visibilityTimeout(Duration visibilityTimeout) { return this; } - public Builder minReceiveWaitTime(Duration minReceiveWaitTime) { - this.minReceiveWaitTime = minReceiveWaitTime; + public Builder messageMinWaitDuration(Duration messageMinWaitDuration) { + this.messageMinWaitDuration = messageMinWaitDuration; return this; } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java index 522ea58bbb33..606c729a8180 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java @@ -56,25 +56,25 @@ private static Stream provideConfigurations() { @ParameterizedTest @MethodSource("provideConfigurations") void testBatchOverrideConfiguration(Integer maxBatchSize, - Duration sendRequestFrequency, + Duration sendMessageFrequency, Duration receiveMessageVisibilityTimeout, - Duration receiveMessageMinWaitTime, + Duration receiveMessageMinWaitDuration, List receiveMessageAttributeNames, List receiveMessageSystemAttributeNames) { BatchOverrideConfiguration config = BatchOverrideConfiguration.builder() .maxBatchSize(maxBatchSize) - .sendRequestFrequency(sendRequestFrequency) + .sendMessageFrequency(sendMessageFrequency) .receiveMessageVisibilityTimeout(receiveMessageVisibilityTimeout) - .receiveMessageMinWaitTime(receiveMessageMinWaitTime) + .receiveMessageMinWaitDuration(receiveMessageMinWaitDuration) .receiveMessageAttributeNames(receiveMessageAttributeNames) .receiveMessageSystemAttributeNames(receiveMessageSystemAttributeNames) .build(); assertEquals(maxBatchSize, config.maxBatchSize()); - assertEquals(sendRequestFrequency, config.sendRequestFrequency()); + assertEquals(sendMessageFrequency, config.sendMessageFrequency()); assertEquals(receiveMessageVisibilityTimeout, config.receiveMessageVisibilityTimeout()); - assertEquals(receiveMessageMinWaitTime, config.receiveMessageMinWaitTime()); + assertEquals(receiveMessageMinWaitDuration, config.receiveMessageMinWaitDuration()); assertEquals(Optional.ofNullable(receiveMessageAttributeNames).orElse(Collections.emptyList()), config.receiveMessageAttributeNames()); assertEquals(Optional.ofNullable(receiveMessageSystemAttributeNames).orElse(Collections.emptyList()), @@ -92,9 +92,9 @@ void testEqualsAndHashCode() { void testToBuilder() { BatchOverrideConfiguration originalConfig = BatchOverrideConfiguration.builder() .maxBatchSize(10) - .sendRequestFrequency(Duration.ofMillis(200)) + .sendMessageFrequency(Duration.ofMillis(200)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(30)) - .receiveMessageMinWaitTime(Duration.ofMillis(50)) + .receiveMessageMinWaitDuration(Duration.ofMillis(50)) .receiveMessageAttributeNames(Arrays.asList("msgAttr1")) .receiveMessageSystemAttributeNames(Collections.singletonList( MessageSystemAttributeName.SENDER_ID)) @@ -107,9 +107,9 @@ void testToBuilder() { builder.maxBatchSize(9); assertNotEquals(originalConfig.maxBatchSize(), builder.build().maxBatchSize()); // Ensure that all other fields are still equal after modifying the maxBatchSize - assertEquals(originalConfig.sendRequestFrequency(), builder.build().sendRequestFrequency()); + assertEquals(originalConfig.sendMessageFrequency(), builder.build().sendMessageFrequency()); assertEquals(originalConfig.receiveMessageVisibilityTimeout(), builder.build().receiveMessageVisibilityTimeout()); - assertEquals(originalConfig.receiveMessageMinWaitTime(), builder.build().receiveMessageMinWaitTime()); + assertEquals(originalConfig.receiveMessageMinWaitDuration(), builder.build().receiveMessageMinWaitDuration()); assertEquals(originalConfig.receiveMessageAttributeNames(), builder.build().receiveMessageAttributeNames()); assertEquals(originalConfig.receiveMessageSystemAttributeNames(), builder.build().receiveMessageSystemAttributeNames()); } @@ -124,7 +124,8 @@ void testMaxBatchSizeExceedsLimitThrowsException() { }); // Assert that the exception message matches the expected output - assertEquals("A batch can contain up to 10 messages.", exception.getMessage()); + assertEquals("The maxBatchSize must be less than or equal to 10. A batch can contain up to 10 messages.", + exception.getMessage()); } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java index eeb041e36858..ea94b744f166 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java @@ -87,7 +87,7 @@ private ResponseBatchConfiguration createConfig(Duration minReceiveWaitTime) { return ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Collections.emptyList()) .visibilityTimeout(Duration.ofSeconds(2)) - .minReceiveWaitTime(minReceiveWaitTime) + .messageMinWaitDuration(minReceiveWaitTime) .build(); } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java index 2cede50235f1..7416548731b2 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java @@ -77,7 +77,7 @@ void testBatchRequest_WhenBufferingDisabledAndInCompatible_ShouldNotUseBatchMana .messageSystemAttributeNames(overrideConfig.receiveMessageSystemAttributeNames()) .receiveMessageAttributeNames(overrideConfig.receiveMessageAttributeNames()) .visibilityTimeout(overrideConfig.receiveMessageVisibilityTimeout()) - .minReceiveWaitTime(overrideConfig.receiveMessageMinWaitTime()).build(); + .messageMinWaitDuration(overrideConfig.receiveMessageMinWaitDuration()).build(); receiveMessageBatchManager = new ReceiveMessageBatchManager(sqsClient, executor, overrideConfig); diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java index 8b7acf773055..7332c09c4cc2 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java @@ -15,12 +15,10 @@ package software.amazon.awssdk.services.sqs.batchmanager; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -59,7 +57,7 @@ @ExtendWith(MockitoExtension.class) class ReceiveSqsMessageHelperTest { - private final String queueUrl = "test-queue-url"; + private static final String QUEUE_URL = "test-queue-url"; private final Duration visibilityTimeout = Duration.ofSeconds(30); @Mock private ScheduledExecutorService scheduledExecutorService; @@ -76,8 +74,7 @@ void setUp() { "attribute1", "attribute2")) .visibilityTimeout(Duration.ofSeconds(20)) .build(); - - receiveSqsMessageHelper = new ReceiveSqsMessageHelper(queueUrl, sqsClient, visibilityTimeout, config); + receiveSqsMessageHelper = new ReceiveSqsMessageHelper(QUEUE_URL, sqsClient, visibilityTimeout, config); } @AfterEach @@ -102,7 +99,6 @@ void asyncReceiveMessageSuccess() throws Exception { assertEquals(1, result.get().messagesSize()); } - @Test void multipleMessageGetsAdded() throws Exception { ReceiveMessageResponse response = generateMessageResponse(10); @@ -116,10 +112,8 @@ void multipleMessageGetsAdded() throws Exception { assertTrue(result.isDone()); assertNull(result.get().getException()); assertFalse(result.get().isEmpty()); - } - @Test void asyncReceiveMessageFailure() throws Exception { // Mocking receiveMessage to throw an exception @@ -153,7 +147,7 @@ void emptyResponseReceivedFromSQS() throws Exception { } @Test - public void concurrencyTestForRemoveMessage() throws Exception { + void concurrencyTestForRemoveMessage() throws Exception { // Mocking receiveMessage to return 10 messages ReceiveMessageResponse response = generateMessageResponse(10); when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(CompletableFuture.completedFuture(response)); @@ -177,15 +171,12 @@ public void concurrencyTestForRemoveMessage() throws Exception { } })); } - // Start all threads threads.forEach(Thread::start); - // Wait for all threads to finish for (Thread thread : threads) { thread.join(); } - // Verify final state assertEquals(10, successfulRemovals.get()); assertEquals(0, receiveSqsMessageHelper.messagesSize()); @@ -236,12 +227,10 @@ void immediateMessageProcessingWithoutExpiry() throws Exception { ReceiveSqsMessageHelper receiveSqsMessageHelper1 = completableFuture.get(2, TimeUnit.SECONDS); Message message = receiveSqsMessageHelper1.removeMessage(); assertEquals(message, Message.builder().body("Message 1").build()); - - } @Test - public void expiredBatchesClearsItself() throws Exception { + void expiredBatchesClearsItself() throws Exception { // Test setup: creating a new instance of ReceiveSqsMessageHelper ReceiveSqsMessageHelper batch = new ReceiveSqsMessageHelper("queueUrl", sqsClient , Duration.ofNanos(1), config); @@ -261,21 +250,20 @@ public void expiredBatchesClearsItself() throws Exception { } @Test - public void asyncReceiveMessageArgs() throws Exception { + void asyncReceiveMessageArgs() throws Exception { - Duration visibilityTimeout = Duration.ofSeconds(9); ResponseBatchConfiguration batchOverrideConfig = ResponseBatchConfiguration.builder() .receiveMessageAttributeNames(Arrays.asList( "custom1", "custom2")) .messageSystemAttributeNames(Arrays.asList( MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT)) - .visibilityTimeout(visibilityTimeout) - .minReceiveWaitTime(Duration.ofMillis(200)) + .visibilityTimeout(Duration.ofSeconds(9)) + .messageMinWaitDuration(Duration.ofMillis(200)) .build(); ReceiveSqsMessageHelper batch = new ReceiveSqsMessageHelper( - queueUrl, sqsClient, visibilityTimeout, batchOverrideConfig); + QUEUE_URL, sqsClient, Duration.ofSeconds(9), batchOverrideConfig); // Mocking receiveMessage to return a single message @@ -289,7 +277,7 @@ public void asyncReceiveMessageArgs() throws Exception { // Verify that receiveMessage was called with the correct arguments ReceiveMessageRequest expectedRequest = ReceiveMessageRequest.builder() - .queueUrl(queueUrl) + .queueUrl(QUEUE_URL) .maxNumberOfMessages(10) .messageAttributeNames("custom1", "custom2") .messageSystemAttributeNames(Arrays.asList( @@ -301,7 +289,4 @@ public void asyncReceiveMessageArgs() throws Exception { verify(sqsClient, times(1)).receiveMessage(eq(expectedRequest)); } - - - } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java index c82984bbecd1..183a95a8639f 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java @@ -88,7 +88,7 @@ void batchRequest_TwoBatchesMessagesSplitInTwoCalls_successful() throws Exceptio when(mockClient.sendBatchAsync(any(), eq(batchKey1))).thenReturn(batchResponseFuture); SampleBatchManager batchManager= new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2) - .sendRequestFrequency(Duration.ofHours(1)).build(), + .sendMessageFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(request1); CompletableFuture response2 = batchManager.batchRequest(request2); @@ -109,7 +109,7 @@ void batchRequest_TwoBatchesWithDifferentKey_successful() throws Exception { when(mockClient.sendBatchAsync(any(), eq(KEY_TWO))).thenReturn(batchResponseFutureTwo); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2).sendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2).sendMessageFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(KEY_ONE + ":0"); CompletableFuture response2 = batchManager.batchRequest(KEY_TWO + ":0"); CompletableFuture response3 = batchManager.batchRequest(KEY_ONE + ":1"); @@ -163,7 +163,7 @@ void close_FlushesAllBatches() throws Exception { when(mockClient.sendBatchAsync(any(), eq(batchKey))).thenReturn(batchResponseFuture); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2).sendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchSize(2).sendMessageFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(request1); CompletableFuture response2 = batchManager.batchRequest(request2); @@ -218,7 +218,7 @@ void batchRequest_MoreThanBufferSize_Fails() throws Exception { SampleBatchManager batchManager = new SampleBatchManager( BatchOverrideConfiguration.builder() .maxBatchSize(2) - .sendRequestFrequency(Duration.ofHours(1)) + .sendMessageFrequency(Duration.ofHours(1)) .build(), scheduledExecutor, mockClient diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java index 35c059f81dde..301e176aaa14 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java @@ -33,10 +33,7 @@ public class SampleBatchManager extends RequestBatchManager