diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java index aea4c91ca1d4..910866e3088b 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java @@ -15,8 +15,6 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; -import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER; - import java.util.ArrayList; import java.util.List; import java.util.Optional; diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java index ab4600a65c97..5d1c673a1aeb 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java @@ -15,7 +15,8 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; -import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES; + +import static software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java index 4877b6e9551e..683d1f8a97ab 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java @@ -15,8 +15,6 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; -import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER; - import java.util.ArrayList; import java.util.List; import java.util.Optional; diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/QueueAttributesManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/QueueAttributesManager.java index 8351fe90ecd3..268b563ac694 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/QueueAttributesManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/QueueAttributesManager.java @@ -15,7 +15,8 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; -import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER; + +import static software.amazon.awssdk.services.sqs.internal.batchmanager.RequestBatchManager.USER_AGENT_APPLIER; import java.time.Duration; import java.util.Arrays; diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java index f695990c8914..ccb3280c4ab8 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java @@ -15,7 +15,7 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; -import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_SUPPORTED_SQS_RECEIVE_MSG; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration.MAX_SUPPORTED_SQS_RECEIVE_MSG; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java index 4937ec2ce370..39cbcbd6038c 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java @@ -47,13 +47,12 @@ public class ReceiveQueueBuffer implements SdkAutoCloseable { private final AtomicBoolean processingFutures = new AtomicBoolean(false); - private ReceiveQueueBuffer(ScheduledExecutorService executor, SqsAsyncClient sqsClient, - ResponseBatchConfiguration config, String queueUrl, QueueAttributesManager queueAttributesManager) { - this.executor = executor; - this.sqsClient = sqsClient; - this.config = config; - this.queueUrl = queueUrl; - this.queueAttributesManager = queueAttributesManager; + private ReceiveQueueBuffer(Builder builder) { + this.executor = builder.executor; + this.sqsClient = builder.sqsClient; + this.config = builder.config; + this.queueUrl = builder.queueUrl; + this.queueAttributesManager = builder.queueAttributesManager; } public static Builder builder() { @@ -242,7 +241,7 @@ public Builder queueAttributesManager(QueueAttributesManager queueAttributesMana } public ReceiveQueueBuffer build() { - return new ReceiveQueueBuffer(executor, sqsClient, config, queueUrl, queueAttributesManager); + return new ReceiveQueueBuffer(this); } } } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveSqsMessageHelper.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveSqsMessageHelper.java index ab24cddee89b..b0f982f0adee 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveSqsMessageHelper.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveSqsMessageHelper.java @@ -16,7 +16,7 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; -import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.RequestBatchManager.USER_AGENT_APPLIER; import java.time.Duration; import java.util.List; diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java index dc25430613c1..0e1507ceace0 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java @@ -27,12 +27,21 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; +import software.amazon.awssdk.core.ApiName; import software.amazon.awssdk.utils.Either; import software.amazon.awssdk.utils.Validate; @SdkInternalApi public abstract class RequestBatchManager { + + + // abm stands for Automatic Batching Manager + public static final Consumer USER_AGENT_APPLIER = + b -> b.addApiName(ApiName.builder().version("abm").name("hll").build()); + protected final RequestBatchConfiguration batchConfiguration ; private final int maxBatchItems; diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestPayloadCalculator.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestPayloadCalculator.java index 6d2927c7cd9b..bd5c2c9b41f4 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestPayloadCalculator.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestPayloadCalculator.java @@ -15,7 +15,7 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; -import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.ATTRIBUTE_MAPS_PAYLOAD_BYTES; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration.ATTRIBUTE_MAPS_PAYLOAD_BYTES; import java.nio.charset.StandardCharsets; import java.util.Optional; diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java index 16acb845f8ca..357d8a2bb8ea 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponseBatchConfiguration.java @@ -15,10 +15,11 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; + + import java.time.Duration; import java.util.Collections; import java.util.List; -import java.util.Optional; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; @@ -33,10 +34,23 @@ public final class ResponseBatchConfiguration { public static final List RECEIVE_MESSAGE_ATTRIBUTE_NAMES_DEFAULT = Collections.emptyList(); public static final List MESSAGE_SYSTEM_ATTRIBUTE_NAMES_DEFAULT = Collections.emptyList(); public static final boolean ADAPTIVE_PREFETCHING_DEFAULT = false; - public static final int MAX_BATCH_ITEMS_DEFAULT = 10; public static final int MAX_INFLIGHT_RECEIVE_BATCHES_DEFAULT = 10; public static final int MAX_DONE_RECEIVE_BATCHES_DEFAULT = 10; + public static final int MAX_SUPPORTED_SQS_RECEIVE_MSG = 10; + + public static final int MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES = 262_144; // 256 KiB + + /** + * + * AWS SQS Message Attributes Documentation + * + * Rounding up max payload due to attribute maps. + * This was not done in V1, thus an issue was reported where batch messages failed with payload size exceeding the maximum. + */ + public static final int ATTRIBUTE_MAPS_PAYLOAD_BYTES = 16 * 1024; // 16 KiB + + private final Duration visibilityTimeout; private final Duration longPollWaitTimeout; private final Duration minReceiveWaitTime; @@ -48,47 +62,50 @@ public final class ResponseBatchConfiguration { private final Integer maxDoneReceiveBatches; public ResponseBatchConfiguration(BatchOverrideConfiguration overrideConfiguration) { - this.visibilityTimeout = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::visibilityTimeout) - .orElse(VISIBILITY_TIMEOUT_SECONDS_DEFAULT); - - this.longPollWaitTimeout = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::longPollWaitTimeout) - .orElse(LONG_POLL_WAIT_TIMEOUT_DEFAULT); - - this.minReceiveWaitTime = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::minReceiveWaitTime) - .orElse(MIN_RECEIVE_WAIT_TIME_MS_DEFAULT); - - this.messageSystemAttributeValues = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::messageSystemAttributeName) - .filter(list -> !list.isEmpty()) - .orElse(MESSAGE_SYSTEM_ATTRIBUTE_NAMES_DEFAULT); - - this.receiveMessageAttributeNames = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::receiveMessageAttributeNames) - .filter(list -> !list.isEmpty()) - .orElse(RECEIVE_MESSAGE_ATTRIBUTE_NAMES_DEFAULT); - - this.adaptivePrefetching = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::adaptivePrefetching) - .orElse(ADAPTIVE_PREFETCHING_DEFAULT); - - this.maxBatchItems = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::maxBatchItems) - .orElse(MAX_BATCH_ITEMS_DEFAULT); - - - this.maxInflightReceiveBatches = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::maxInflightReceiveBatches) - .orElse(MAX_INFLIGHT_RECEIVE_BATCHES_DEFAULT); - - this.maxDoneReceiveBatches = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::maxDoneReceiveBatches) - .orElse(MAX_DONE_RECEIVE_BATCHES_DEFAULT); + this.visibilityTimeout = overrideConfiguration != null && overrideConfiguration.visibilityTimeout() != null + ? overrideConfiguration.visibilityTimeout() + : VISIBILITY_TIMEOUT_SECONDS_DEFAULT; + + this.longPollWaitTimeout = overrideConfiguration != null && overrideConfiguration.longPollWaitTimeout() != null + ? overrideConfiguration.longPollWaitTimeout() + : LONG_POLL_WAIT_TIMEOUT_DEFAULT; + + this.minReceiveWaitTime = overrideConfiguration != null && overrideConfiguration.minReceiveWaitTime() != null + ? overrideConfiguration.minReceiveWaitTime() + : MIN_RECEIVE_WAIT_TIME_MS_DEFAULT; + + this.messageSystemAttributeValues = overrideConfiguration != null + && overrideConfiguration.messageSystemAttributeName() != null + && !overrideConfiguration.messageSystemAttributeName().isEmpty() + ? overrideConfiguration.messageSystemAttributeName() + : MESSAGE_SYSTEM_ATTRIBUTE_NAMES_DEFAULT; + + this.receiveMessageAttributeNames = overrideConfiguration != null + && overrideConfiguration.receiveMessageAttributeNames() != null + && !overrideConfiguration.receiveMessageAttributeNames().isEmpty() + ? overrideConfiguration.receiveMessageAttributeNames() + : RECEIVE_MESSAGE_ATTRIBUTE_NAMES_DEFAULT; + + this.adaptivePrefetching = overrideConfiguration != null && overrideConfiguration.adaptivePrefetching() != null + ? overrideConfiguration.adaptivePrefetching() + : ADAPTIVE_PREFETCHING_DEFAULT; + + this.maxBatchItems = overrideConfiguration != null && overrideConfiguration.maxBatchItems() != null + ? overrideConfiguration.maxBatchItems() + : MAX_SUPPORTED_SQS_RECEIVE_MSG; + + this.maxInflightReceiveBatches = overrideConfiguration != null + && overrideConfiguration.maxInflightReceiveBatches() != null + ? overrideConfiguration.maxInflightReceiveBatches() + : MAX_INFLIGHT_RECEIVE_BATCHES_DEFAULT; + + this.maxDoneReceiveBatches = overrideConfiguration != null && overrideConfiguration.maxDoneReceiveBatches() != null + ? overrideConfiguration.maxDoneReceiveBatches() + : MAX_DONE_RECEIVE_BATCHES_DEFAULT; } + public Duration visibilityTimeout() { return visibilityTimeout; } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java index ad612dbb8895..18fcdc42192f 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java @@ -15,8 +15,6 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; -import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER; - import java.util.ArrayList; import java.util.List; import java.util.Optional; diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java deleted file mode 100644 index 2179a29c4925..000000000000 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.services.sqs.internal.batchmanager; - -import java.util.function.Consumer; -import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; -import software.amazon.awssdk.core.ApiName; - -@SdkInternalApi -public final class SqsMessageDefault { - - public static final int MAX_SUPPORTED_SQS_RECEIVE_MSG = 10; - - public static final int MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES = 262_144; // 256 KiB - - // abm stands for Automatic Batching Manager - public static final Consumer USER_AGENT_APPLIER = - b -> b.addApiName(ApiName.builder().version("abm").name("hll").build()); - - - /** - * - * AWS SQS Message Attributes Documentation - * - * Rounding up max payload due to attribute maps. - * This was not done in V1, thus an issue was reported where batch messages failed with payload size exceeding the maximum. - */ - public static final int ATTRIBUTE_MAPS_PAYLOAD_BYTES = 16 * 1024; // 16 KiB - - private SqsMessageDefault() { - } -} diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java index b8a6c1a1758b..93a460832d77 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveSqsMessageHelperTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.RequestBatchManager.USER_AGENT_APPLIER; import java.time.Duration; import java.util.ArrayList; @@ -48,7 +49,6 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.internal.batchmanager.ReceiveSqsMessageHelper; import software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration; -import software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse; import software.amazon.awssdk.services.sqs.model.Message; @@ -293,7 +293,7 @@ public void asyncReceiveMessageArgs() throws Exception { .messageAttributeNames("custom1", "custom2") .visibilityTimeout(9) .waitTimeSeconds(15) - .overrideConfiguration(o -> o.applyMutation(SqsMessageDefault.USER_AGENT_APPLIER)) + .overrideConfiguration(o -> o.applyMutation(USER_AGENT_APPLIER)) .build(); verify(sqsClient, times(1)).receiveMessage(eq(expectedRequest)); diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java index e90bc2590170..0829d8cd5693 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java @@ -24,12 +24,12 @@ import java.util.concurrent.ScheduledFuture; import software.amazon.awssdk.services.sqs.internal.batchmanager.RequestBatchBuffer; import software.amazon.awssdk.services.sqs.internal.batchmanager.BatchingExecutionContext; -import software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import software.amazon.awssdk.services.sqs.model.SendMessageResponse; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES; class RequestBatchBufferTest { @@ -45,7 +45,7 @@ void setUp() { @Test void whenPutRequestThenBufferContainsRequest() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); assertEquals(1, batchBuffer.responses().size()); @@ -53,7 +53,7 @@ void whenPutRequestThenBufferContainsRequest() { @Test void whenFlushableRequestsThenReturnRequestsUpToMaxBatchItems() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); Map> flushedRequests = batchBuffer.flushableRequests(); @@ -63,7 +63,7 @@ void whenFlushableRequestsThenReturnRequestsUpToMaxBatchItems() { @Test void whenFlushableScheduledRequestsThenReturnAllRequests() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); Map> flushedRequests = batchBuffer.flushableScheduledRequests(1); @@ -73,7 +73,7 @@ void whenFlushableScheduledRequestsThenReturnAllRequests() { @Test void whenMaxBufferSizeReachedThenThrowException() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 3, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, 10); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 3, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, 10); for (int i = 0; i < 10; i++) { batchBuffer.put("request" + i, new CompletableFuture<>()); } @@ -82,7 +82,7 @@ void whenMaxBufferSizeReachedThenThrowException() { @Test void whenPutScheduledFlushThenFlushIsSet() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); ScheduledFuture newScheduledFlush = mock(ScheduledFuture.class); batchBuffer.putScheduledFlush(newScheduledFlush); assertNotNull(newScheduledFlush); @@ -90,14 +90,14 @@ void whenPutScheduledFlushThenFlushIsSet() { @Test void whenCancelScheduledFlushThenFlushIsCancelled() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); batchBuffer.cancelScheduledFlush(); verify(scheduledFlush).cancel(false); } @Test void whenGetResponsesThenReturnAllResponses() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response1 = new CompletableFuture<>(); CompletableFuture response2 = new CompletableFuture<>(); batchBuffer.put("request1", response1); @@ -110,7 +110,7 @@ void whenGetResponsesThenReturnAllResponses() { @Test void whenClearBufferThenBufferIsEmpty() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); batchBuffer.clear(); @@ -119,7 +119,7 @@ void whenClearBufferThenBufferIsEmpty() { @Test void whenExtractFlushedEntriesThenReturnCorrectEntries() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 5, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); for (int i = 0; i < 5; i++) { batchBuffer.put("request" + i, new CompletableFuture<>()); } @@ -129,7 +129,7 @@ void whenExtractFlushedEntriesThenReturnCorrectEntries() { @Test void whenHasNextBatchEntryThenReturnTrue() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); batchBuffer.put("request1", new CompletableFuture<>()); assertTrue(batchBuffer.flushableRequests().containsKey("0")); } @@ -137,7 +137,7 @@ void whenHasNextBatchEntryThenReturnTrue() { @Test void whenNextBatchEntryThenReturnNextEntryId() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); batchBuffer.put("request1", new CompletableFuture<>()); assertEquals("0", batchBuffer.flushableRequests().keySet().iterator().next()); } @@ -145,7 +145,7 @@ void whenNextBatchEntryThenReturnNextEntryId() { @Test void whenRequestPassedWithLessBytesinArgs_thenCheckForSizeOnly_andDonotFlush() { RequestBatchBuffer batchBuffer - = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + = new RequestBatchBuffer<>(scheduledFlush, 5, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); for (int i = 0; i < 5; i++) { batchBuffer.put(SendMessageRequest.builder().build(), new CompletableFuture<>()); @@ -160,7 +160,7 @@ void whenRequestPassedWithLessBytesinArgs_thenCheckForSizeOnly_andDonotFlush() { @Test void testFlushWhenPayloadExceedsMaxSize() { RequestBatchBuffer batchBuffer - = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + = new RequestBatchBuffer<>(scheduledFlush, 5, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); String largeMessageBody = createLargeString('a',245_760); batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(), @@ -173,7 +173,7 @@ void testFlushWhenPayloadExceedsMaxSize() { @Test void testFlushWhenCumulativePayloadExceedsMaxSize() { RequestBatchBuffer batchBuffer - = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + = new RequestBatchBuffer<>(scheduledFlush, 5, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); String largeMessageBody = createLargeString('a',130_000); batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(), diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestPayloadCalculatorTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestPayloadCalculatorTest.java index fd58fcaf5a8c..89ac8a35496e 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestPayloadCalculatorTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestPayloadCalculatorTest.java @@ -31,7 +31,7 @@ import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; -import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.ATTRIBUTE_MAPS_PAYLOAD_BYTES; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration.ATTRIBUTE_MAPS_PAYLOAD_BYTES; @TestInstance(Lifecycle.PER_CLASS) class RequestPayloadCalculatorTest { diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java index d083f2502b38..4d3fff0cfaad 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java @@ -15,6 +15,8 @@ package software.amazon.awssdk.services.sqs.batchmanager; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -22,7 +24,6 @@ import software.amazon.awssdk.services.sqs.internal.batchmanager.IdentifiableMessage; import software.amazon.awssdk.services.sqs.internal.batchmanager.RequestBatchConfiguration; import software.amazon.awssdk.services.sqs.internal.batchmanager.RequestBatchManager; -import software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault; import software.amazon.awssdk.utils.Either; public class SampleBatchManager extends RequestBatchManager { @@ -34,7 +35,7 @@ protected SampleBatchManager(BatchOverrideConfiguration batchOverrideConfigurati CustomClient client) { super(RequestBatchConfiguration.builder() .maxBatchOpenDuration(batchOverrideConfiguration.maxBatchOpenDuration()) - .maxBatchBytesSize(SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES) + .maxBatchBytesSize(MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES) .maxBatchItems(batchOverrideConfiguration.maxBatchItems()) .maxBufferSize(batchOverrideConfiguration.maxBufferSize()) .maxBatchKeys(batchOverrideConfiguration.maxBatchKeys())