From 2d8966fa67259833910dd76ccc49c8c847a09049 Mon Sep 17 00:00:00 2001 From: John Viegas Date: Tue, 27 Aug 2024 11:51:32 -0700 Subject: [PATCH 1/5] Initial changes --- .../BatchOverrideConfiguration.java | 26 +++++++++---------- .../BatchOverrideConfigurationTest.java | 8 +++--- .../batchmanager/RequestBatchBufferTest.java | 6 ++--- .../batchmanager/RequestBatchManagerTest.java | 8 +++--- 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java index 9d68dc5ef262..2e00f089b02a 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java @@ -37,7 +37,7 @@ public final class BatchOverrideConfiguration implements ToCopyableBuilder provideConfigurations() { @MethodSource("provideConfigurations") void testBatchOverrideConfiguration(Integer maxBatchItems, Integer maxBatchKeys, - Duration maxBatchOpenDuration, + Duration batchSendRequestFrequency, Duration visibilityTimeout, Duration longPollWaitTimeout, Duration minReceiveWaitTime, @@ -79,7 +79,7 @@ void testBatchOverrideConfiguration(Integer maxBatchItems, BatchOverrideConfiguration config = BatchOverrideConfiguration.builder() .maxBatchItems(maxBatchItems) .maxBatchKeys(maxBatchKeys) - .maxBatchOpenDuration(maxBatchOpenDuration) + .batchSendRequestFrequency(batchSendRequestFrequency) .visibilityTimeout(visibilityTimeout) .longPollWaitTimeout(longPollWaitTimeout) .minReceiveWaitTime(minReceiveWaitTime) @@ -92,7 +92,7 @@ void testBatchOverrideConfiguration(Integer maxBatchItems, assertEquals(maxBatchItems, config.maxBatchItems()); assertEquals(maxBatchKeys, config.maxBatchKeys()); - assertEquals(maxBatchOpenDuration, config.maxBatchOpenDuration()); + assertEquals(batchSendRequestFrequency, config.batchSendRequestFrequency()); assertEquals(visibilityTimeout, config.visibilityTimeout()); assertEquals(longPollWaitTimeout, config.longPollWaitTimeout()); assertEquals(minReceiveWaitTime, config.minReceiveWaitTime()); @@ -117,7 +117,7 @@ void testToBuilder() { BatchOverrideConfiguration originalConfig = BatchOverrideConfiguration.builder() .maxBatchItems(10) .maxBatchKeys(5) - .maxBatchOpenDuration(Duration.ofMillis(200)) + .batchSendRequestFrequency(Duration.ofMillis(200)) .visibilityTimeout(Duration.ofSeconds(30)) .longPollWaitTimeout(Duration.ofSeconds(20)) .minReceiveWaitTime(Duration.ofMillis(50)) diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java index 502cf5290ec8..c7d6861b232c 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java @@ -36,7 +36,7 @@ class RequestBatchBufferTest { @BeforeEach void setUp() { scheduledFlush = mock(ScheduledFuture.class); - batchBuffer = new RequestBatchBuffer<>(10, scheduledFlush); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, ); } @Test @@ -117,12 +117,12 @@ void whenExtractFlushedEntriesThenReturnCorrectEntries() { @Test void whenHasNextBatchEntryThenReturnTrue() { batchBuffer.put("request1", new CompletableFuture<>()); - assertTrue(batchBuffer.flushableRequests(1).containsKey("0")); + assertTrue(batchBuffer.flushableRequests("request1").containsKey("0")); } @Test void whenNextBatchEntryThenReturnNextEntryId() { batchBuffer.put("request1", new CompletableFuture<>()); - assertEquals("0", batchBuffer.flushableRequests(1).keySet().iterator().next()); + assertEquals("0", batchBuffer.flushableRequests("request1").keySet().iterator().next()); } } \ No newline at end of file diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java index 030273fcf7cd..1a592b25294d 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java @@ -83,7 +83,7 @@ void batchRequest_TwoBatchesMessagesSplitInTwoCalls_successful() throws Exceptio "testResponse")); when(mockClient.sendBatchAsync(any(), eq(batchKey1))).thenReturn(batchResponseFuture); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).batchSendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(request1); CompletableFuture response2 = batchManager.batchRequest(request2); assertEquals("testResponse0", response1.get(1, TimeUnit.SECONDS)); @@ -103,7 +103,7 @@ void batchRequest_TwoBatchesWithDifferentKey_successful() throws Exception { when(mockClient.sendBatchAsync(any(), eq(KEY_TWO))).thenReturn(batchResponseFutureTwo); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).batchSendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(KEY_ONE + ":0"); CompletableFuture response2 = batchManager.batchRequest(KEY_TWO + ":0"); CompletableFuture response3 = batchManager.batchRequest(KEY_ONE + ":1"); @@ -157,7 +157,7 @@ void close_FlushesAllBatches() throws Exception { when(mockClient.sendBatchAsync(any(), eq(batchKey))).thenReturn(batchResponseFuture); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).batchSendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(request1); CompletableFuture response2 = batchManager.batchRequest(request2); @@ -206,7 +206,7 @@ void batchRequest_MoreThanBufferSize_Fails() throws Exception { when(mockClient.sendBatchAsync(any(), eq(KEY_TWO))).thenReturn(batchResponseFutureTwo); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchKeys(1).maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchKeys(1).maxBatchItems(2).batchSendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(KEY_ONE + ":0"); CompletableFuture response2 = batchManager.batchRequest(KEY_TWO + ":0"); CompletableFuture response3 = batchManager.batchRequest(KEY_ONE + ":1"); From 3f8a0655ece3b638c3ed320a86fc5e9aebb61408 Mon Sep 17 00:00:00 2001 From: John Viegas Date: Tue, 27 Aug 2024 11:52:06 -0700 Subject: [PATCH 2/5] Initial changes 2 --- .../BatchingExecutionContext.java | 8 ++ .../internal/batchmanager/BatchingMap.java | 22 +++-- .../ChangeMessageVisibilityBatchManager.java | 19 ++++- .../DefaultSqsAsyncBatchManager.java | 6 +- .../DeleteMessageBatchManager.java | 22 ++++- .../batchmanager/RequestBatchBuffer.java | 25 +++--- .../RequestBatchConfiguration.java | 12 +-- .../batchmanager/RequestBatchManager.java | 34 +++++--- .../ResponsePayloadCalculator.java | 77 ++++++++++++++++++ .../batchmanager/SQS_Builder_Attributes.xlsx | Bin 0 -> 6235 bytes .../batchmanager/SendMessageBatchManager.java | 72 +++++++++++++++- .../batchmanager/SqsMessageDefault.java | 8 ++ 12 files changed, 259 insertions(+), 46 deletions(-) create mode 100644 services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponsePayloadCalculator.java create mode 100644 services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SQS_Builder_Attributes.xlsx diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java index ffabc599b13c..503175e51fc0 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java @@ -24,9 +24,12 @@ public final class BatchingExecutionContext { private final RequestT request; private final CompletableFuture response; + private final long responsePayload; + public BatchingExecutionContext(RequestT request, CompletableFuture response) { this.request = request; this.response = response; + responsePayload = ResponsePayloadCalculator.calculateMessageSize(request); } public RequestT request() { @@ -36,4 +39,9 @@ public RequestT request() { public CompletableFuture response() { return response; } + + + public long responsePayload() { + return responsePayload; + } } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java index 406002409814..cfcb5164d96f 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java @@ -20,6 +20,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.function.BiConsumer; +import java.util.function.BiPredicate; +import java.util.function.Predicate; import java.util.function.Supplier; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -32,13 +34,15 @@ public final class BatchingMap { private final int maxBatchKeys; - private final int maxBufferSize; private final Map> batchContextMap; + private final BiPredicate>, RequestT> flushCondition ; - public BatchingMap(int maxBatchKeys, int maxBufferSize) { + + public BatchingMap(int maxBatchKeys, + BiPredicate>, RequestT> flushCondition) { this.batchContextMap = new ConcurrentHashMap<>(); this.maxBatchKeys = maxBatchKeys; - this.maxBufferSize = maxBufferSize; + this.flushCondition = flushCondition; } public void put(String batchKey, Supplier> scheduleFlush, RequestT request, @@ -47,7 +51,7 @@ public void put(String batchKey, Supplier> scheduleFlush, Req if (batchContextMap.size() == maxBatchKeys) { throw new IllegalStateException("Reached MaxBatchKeys of: " + maxBatchKeys); } - return new RequestBatchBuffer<>(maxBufferSize, scheduleFlush.get()); + return new RequestBatchBuffer<>(scheduleFlush.get(), flushCondition); }).put(request, response); } @@ -59,11 +63,15 @@ public void forEach(BiConsumer> batchContextMap.forEach(action); } - public Map> flushableRequests(String batchKey, - int maxBatchItems) { - return batchContextMap.get(batchKey).flushableRequests(maxBatchItems); + public Map> flushableRequests(String batchKey, RequestT request) { + return batchContextMap.get(batchKey).flushableRequests(request); + } + + public Map> flushableRequests(String batchKey) { + return batchContextMap.get(batchKey).flushableRequests(null); } + public Map> flushableScheduledRequests(String batchKey, int maxBatchItems) { return batchContextMap.get(batchKey).flushableScheduledRequests(maxBatchItems); diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java index d8a392308874..a68f0535615c 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java @@ -17,9 +17,11 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.BiPredicate; import java.util.stream.Collectors; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; @@ -43,13 +45,26 @@ public class ChangeMessageVisibilityBatchManager extends RequestBatchManager shouldFlush(stringBatchingExecutionContextMap, changeMessageVisibilityRequest, overrideConfiguration) + ); this.sqsAsyncClient = sqsAsyncClient; } + private static boolean shouldFlush(Map> contextMap, + ChangeMessageVisibilityRequest request, RequestBatchConfiguration configuration) { + if (request != null) { + return false; + } + return contextMap.size() >= configuration.maxBatchItems(); + } + private static ChangeMessageVisibilityBatchRequest createChangeMessageVisibilityBatchRequest( List> identifiedRequests, String batchKey) { List entries = identifiedRequests diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java index 3dde89462dda..cbe2ec7da793 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java @@ -49,13 +49,13 @@ private DefaultSqsAsyncBatchManager(DefaultBuilder builder) { ScheduledExecutorService scheduledExecutor = builder.scheduledExecutor; - this.sendMessageBatchManager = new SendMessageBatchManager(builder.overrideConfiguration, + this.sendMessageBatchManager = new SendMessageBatchManager(new RequestBatchConfiguration(builder.overrideConfiguration), scheduledExecutor, client); - this.deleteMessageBatchManager = new DeleteMessageBatchManager(builder.overrideConfiguration, + this.deleteMessageBatchManager = new DeleteMessageBatchManager(new RequestBatchConfiguration(builder.overrideConfiguration), scheduledExecutor, client); - this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(builder.overrideConfiguration, + this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(new RequestBatchConfiguration(builder.overrideConfiguration), scheduledExecutor, client); diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java index c2c3385507cc..9537ce7aebf0 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; @@ -25,8 +26,9 @@ import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; @@ -43,13 +45,27 @@ public class DeleteMessageBatchManager extends RequestBatchManager shouldFlush(stringBatchingExecutionContextMap, changeMessageVisibilityRequest, overrideConfiguration) + ); this.sqsAsyncClient = sqsAsyncClient; } + + private static boolean shouldFlush(Map> contextMap, + DeleteMessageRequest request, RequestBatchConfiguration configuration) { + if (request != null) { + return false; + } + return contextMap.size() >= configuration.maxBatchItems(); + } + + + private static DeleteMessageBatchRequest createDeleteMessageBatchRequest( List> identifiedRequests, String batchKey) { List entries = identifiedRequests diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java index 57baed55d1c1..ce23b15ed50b 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java @@ -15,12 +15,16 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_SEND_MESSAGE_BATCH_SIZE; + import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; +import java.util.function.BiPredicate; +import java.util.function.Predicate; import java.util.stream.Collectors; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -30,10 +34,8 @@ public final class RequestBatchBuffer { private final Map> idToBatchContext; - /** - * Maximum number of elements that can be included in the BatchBuffer. - */ - private final int maxBufferSize; + + private BiPredicate>, RequestT> flushCondition ; /** * Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next @@ -41,6 +43,7 @@ public final class RequestBatchBuffer { * response pair is received. */ private int nextId; + private int maxBatchItems; /** * Keeps track of the ID of the next entry to be added in a batch request. This ID does not necessarily correlate to a @@ -54,23 +57,25 @@ public final class RequestBatchBuffer { */ private ScheduledFuture scheduledFlush; - public RequestBatchBuffer(int maxBufferSize, ScheduledFuture scheduledFlush) { + public RequestBatchBuffer(ScheduledFuture scheduledFlush, + BiPredicate>, RequestT> flushCondition) { this.idToBatchContext = new ConcurrentHashMap<>(); - this.maxBufferSize = maxBufferSize; this.nextId = 0; this.nextBatchEntry = 0; this.scheduledFlush = scheduledFlush; + this.flushCondition = flushCondition; } - public Map> flushableRequests(int maxBatchItems) { + public Map> flushableRequests(RequestT request) { synchronized (flushLock) { - if (idToBatchContext.size() >= maxBatchItems) { + if (flushCondition.test(idToBatchContext, request)) { return extractFlushedEntries(maxBatchItems); } return new ConcurrentHashMap<>(); } } + public Map> flushableScheduledRequests(int maxBatchItems) { synchronized (flushLock) { if (idToBatchContext.size() > 0) { @@ -93,8 +98,8 @@ private Map> extractFlushe public void put(RequestT request, CompletableFuture response) { synchronized (this) { - if (idToBatchContext.size() == maxBufferSize) { - throw new IllegalStateException("Reached MaxBufferSize of: " + maxBufferSize); + if (idToBatchContext.size() == MAX_SEND_MESSAGE_BATCH_SIZE) { + throw new IllegalStateException("Reached MaxBufferSize of: " + MAX_SEND_MESSAGE_BATCH_SIZE); } if (nextId == Integer.MAX_VALUE) { diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java index 364d04c76101..bfcdd1896ee5 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java @@ -24,14 +24,14 @@ public final class RequestBatchConfiguration { public static final int DEFAULT_MAX_BATCH_ITEMS = 10; - public static final int DEFAULT_MAX_BATCH_KEYS = 100; + public static final int DEFAULT_MAX_BATCH_KEYS = 1000; public static final int DEFAULT_MAX_BUFFER_SIZE = 500; public static final Duration DEFAULT_MAX_BATCH_OPEN_IN_MS = Duration.ofMillis(200); private final Integer maxBatchItems; private final Integer maxBatchKeys; private final Integer maxBufferSize; - private final Duration maxBatchOpenDuration; + private final Duration batchSendRequestFrequency; public RequestBatchConfiguration(BatchOverrideConfiguration overrideConfiguration) { this.maxBatchItems = Optional.ofNullable(overrideConfiguration) @@ -46,13 +46,13 @@ public RequestBatchConfiguration(BatchOverrideConfiguration overrideConfiguratio .map(BatchOverrideConfiguration::maxBufferSize) .orElse(DEFAULT_MAX_BUFFER_SIZE); - this.maxBatchOpenDuration = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::maxBatchOpenDuration) + this.batchSendRequestFrequency = Optional.ofNullable(overrideConfiguration) + .map(BatchOverrideConfiguration::batchSendRequestFrequency) .orElse(DEFAULT_MAX_BATCH_OPEN_IN_MS); } - public Duration maxBatchOpenDuration() { - return maxBatchOpenDuration; + public Duration batchSendRequestFrequency() { + return batchSendRequestFrequency; } public int maxBatchItems() { diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java index 4cde84cd2603..a36d80b01a2b 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.function.BiPredicate; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; import software.amazon.awssdk.utils.Either; @@ -34,22 +35,26 @@ @SdkInternalApi public abstract class RequestBatchManager { private final int maxBatchItems; - private final Duration maxBatchOpenDuration; + private final Duration batchSendRequestFrequency; private final BatchingMap requestsAndResponsesMaps; private final ScheduledExecutorService scheduledExecutor; private final Set> pendingBatchResponses ; private final Set> pendingResponses ; + protected final RequestBatchConfiguration batchConfiguration ; - protected RequestBatchManager(BatchOverrideConfiguration overrideConfiguration, ScheduledExecutorService scheduledExecutor) { - RequestBatchConfiguration batchConfiguration = new RequestBatchConfiguration(overrideConfiguration); - this.requestsAndResponsesMaps = new BatchingMap<>(batchConfiguration.maxBatchKeys(), - batchConfiguration.maxBufferSize()); + + protected RequestBatchManager(RequestBatchConfiguration overrideConfiguration, + ScheduledExecutorService scheduledExecutor, + BiPredicate>, RequestT> flushCondition ) { + batchConfiguration = overrideConfiguration; this.maxBatchItems = batchConfiguration.maxBatchItems(); - this.maxBatchOpenDuration = batchConfiguration.maxBatchOpenDuration(); + this.batchSendRequestFrequency = batchConfiguration.batchSendRequestFrequency(); this.scheduledExecutor = Validate.notNull(scheduledExecutor, "Null scheduledExecutor"); pendingBatchResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); pendingResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); + this.requestsAndResponsesMaps = new BatchingMap<>(batchConfiguration.maxBatchKeys(), flushCondition); + } public CompletableFuture batchRequest(RequestT request) { @@ -57,11 +62,13 @@ public CompletableFuture batchRequest(RequestT request) { pendingResponses.add(response); try { String batchKey = getBatchKey(request); + // Consider the current request to calculate in advance the size before adding + flushBufferIfNeeded(batchKey, request); requestsAndResponsesMaps.put(batchKey, - () -> scheduleBufferFlush(batchKey, maxBatchOpenDuration.toMillis(), scheduledExecutor), + () -> scheduleBufferFlush(batchKey, batchSendRequestFrequency.toMillis(), scheduledExecutor), request, response); - flushBufferIfNeeded(batchKey); + flushBufferIfNeeded(batchKey, null); } catch (Exception e) { response.completeExceptionally(e); } @@ -76,10 +83,9 @@ protected abstract CompletableFuture batchAndSend(List, IdentifiableMessage>> mapBatchResponse(BatchResponseT batchResponse); - - private void flushBufferIfNeeded(String batchKey) { + private void flushBufferIfNeeded(String batchKey, RequestT requestT) { Map> flushableRequests = - requestsAndResponsesMaps.flushableRequests(batchKey, maxBatchItems); + requestsAndResponsesMaps.flushableRequests(batchKey, requestT); if (!flushableRequests.isEmpty()) { manualFlushBuffer(batchKey, flushableRequests); } @@ -89,7 +95,7 @@ private void manualFlushBuffer(String batchKey, Map> flushableRequests) { requestsAndResponsesMaps.cancelScheduledFlush(batchKey); flushBuffer(batchKey, flushableRequests); - requestsAndResponsesMaps.putScheduledFlush(batchKey, scheduleBufferFlush(batchKey, maxBatchOpenDuration.toMillis(), + requestsAndResponsesMaps.putScheduledFlush(batchKey, scheduleBufferFlush(batchKey, batchSendRequestFrequency.toMillis(), scheduledExecutor)); } @@ -142,7 +148,7 @@ public void close() { requestsAndResponsesMaps.forEach((batchKey, batchBuffer) -> { requestsAndResponsesMaps.cancelScheduledFlush(batchKey); Map> flushableRequests = - requestsAndResponsesMaps.flushableRequests(batchKey, maxBatchItems); + requestsAndResponsesMaps.flushableRequests(batchKey); while (!flushableRequests.isEmpty()) { flushBuffer(batchKey, flushableRequests); @@ -153,4 +159,6 @@ public void close() { pendingResponses.forEach(future -> future.cancel(true)); requestsAndResponsesMaps.clear(); } + + } \ No newline at end of file diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponsePayloadCalculator.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponsePayloadCalculator.java new file mode 100644 index 000000000000..e57eb4b515ff --- /dev/null +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponsePayloadCalculator.java @@ -0,0 +1,77 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.sqs.internal.batchmanager; + +import java.nio.charset.StandardCharsets; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeValue; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; + +public class ResponsePayloadCalculator { + + private static final int MAX_SIZE_BYTES = 262_144; // 256 KiB + + /** + * Evaluates if the total size of the message body, message attributes, and message system attributes + * exceeds the maximum allowed size for a SendMessageRequest. If the request is not a SendMessageRequest, + * it returns -1. + * + * @param request the request to evaluate + * @param the type of the request + * @return the total size in bytes if the request is a SendMessageRequest, otherwise -1 + */ + public static long calculateMessageSize(RequestT request) { + if (!(request instanceof SendMessageRequest)) { + return -1; + } + + SendMessageRequest sendMessageRequest = (SendMessageRequest) request; + long totalSize = 0; + + // Calculate size of messageBody + if (sendMessageRequest.messageBody() != null) { + totalSize += sendMessageRequest.messageBody().getBytes(StandardCharsets.UTF_8).length; + } + + // Calculate size of messageAttributes + if (sendMessageRequest.messageAttributes() != null) { + totalSize += sendMessageRequest.messageAttributes().entrySet().stream() + .mapToInt(entry -> { + String key = entry.getKey(); + MessageAttributeValue value = entry.getValue(); + return key.getBytes(StandardCharsets.UTF_8).length + + value.dataType().getBytes(StandardCharsets.UTF_8).length + + (value.stringValue() != null ? + value.stringValue().getBytes(StandardCharsets.UTF_8).length : 0); + }).sum(); + } + + // Calculate size of messageSystemAttributes + if (sendMessageRequest.messageSystemAttributes() != null) { + totalSize += sendMessageRequest.messageSystemAttributes().entrySet().stream() + .mapToInt(entry -> { + String key = entry.getKey().toString(); + MessageSystemAttributeValue value = entry.getValue(); + return key.getBytes(StandardCharsets.UTF_8).length + + (value.stringValue() != null ? + value.stringValue().getBytes(StandardCharsets.UTF_8).length : 0); + }).sum(); + } + + return totalSize; + } + +} \ No newline at end of file diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SQS_Builder_Attributes.xlsx b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SQS_Builder_Attributes.xlsx new file mode 100644 index 0000000000000000000000000000000000000000..cdc98dea2f73b12de59f6d40a4ac13f6e2e73ef2 GIT binary patch literal 6235 zcmZ`-WmHt{79K#lk&*6@W@za~QfBB9B!*^&2I=mWlr9k|6{IDVE)hXGKe|CW?)Z7{ z^~(D0IqSUdI%lot`SHAa@3Z#?@M{VI3d>`P1vDo@q87SBN$+P@bm$LFqAyk+%#7<@6yA@lDH% zeXt=UFG#K2Vt2-+J5G$0lOm0iHXpZz_`2XB@4g7>R!OmEuGpFfy{yI@3yD~=87hlH zXIZ-i5Qz_Vq8Ig&WlklsF-*afklwRWi%uZwOh65)<@M?nE!^v)VQKKg{bbE8y0GBK zug7K}Cp9a>TeO&zQQM}R_WH$O9+5Jj(^IbIbNODOkRVY}+;PDr0ql?&6>D~X6hYsr zX1-qXo-Vd8(YAU_Y9q_U^vdHT;ZILK;b=nz-gY-HrXo*Cp{Y_&5)tWthf4H})3x6& zPK(<%sWWnA%NYo_RX_fmE=5+3^xEn3)G#8Kg?B{+Cb>8f(!s) zBX$#KsJ#m}*Uz;gsr^YO2uEtqM}Fn}IUS2_+6k896a(|4{f)TmOxX6FwU@S~vqbuVWET=BnkjDuZ%q z@5gw3pkfc2SK?WoTdlZSI(r;(pL)NI4D0aNMudA?-0_>rJ=crgfYp%F+99c_MyA)- zJfi~kFhlr8;*Mznt(C!5AB%-5Rb=4vvAwJ?-PL?H@W|Rzor$I^@sb?Hk^H>cENeo| z(kDy(2CZ(C(@!zihGSG@>TVk-Uxo%Tc<5uAU-6DU6dLgX{HchXo8kUCLWu^1BIJnO z)1Ld6C?0TUJ2N=k?q^K?Mo1 zlyU}}aQcPGBDy8GkCC0mPd9a|$3?S8{2cox1dG$BT0w`KOPdUT>a*D7Wlf3DCJ7w? zApND!Pi0)Jp-@*B?!PylpAyBUsKjT22mIuucJej8P0R+;<)Oyd z!$)r}$wy<+x@A62-dJ|!#*&t(w_Xl41SD|M;Dx}%mfFCFZ>WaQ50AS0yVDjm&SCYXtlsfS^e{5N+CdJP|0`g zBB##HdmL{?$CUe~1g&bTHY6wwE{?OQBPZ3JJf5Z|s+?4;T{h`T{aDJEcG%Sf9AB?5 z0>3T`Et4~U{O+sAdY%?|1Dkfp_5J;|s{2>vX{%bNbf?hT3$`pQ+mrE9_|*mGluMy& z;(Ci=J8{28dsSR|bbq2+n*3@01J07qYc_6{A}*t?7RH;#$Ekm{Ml74(d#bP!Pk6&y z;2DFlOIUf*q;jF7M8gyoMs3Lt8UP)um7>OMO9Xy`_G69CbeZ!MEnDo5mANz zJEH!RYq5}iL>(e%J_3%<*-+au7zmVpQqhtLkh2+Pp5xb{5`9lKdLzE2#p<|R!?!&9 zWyq^RL@>%X-)(fAGD!&s$IT{W?2=MKFn&U)Eo;rd5z4>nD1%~PsZVQ~36fOYpOIDuLoPqW}vw`9=H)bx`#QkDle z1xrMSgS>&jz2x~3+d|Q7_Uu z`V*^OO1*EaZjYzR`~-bK7_#O`OB8>K&b!AXLXH9e5McrUKYz!FGUsA#>I}8ebal3Y zS-Jd7;SW0V@UMylerr{v=b?vEFDNmx!9-&y+Tj7t{H)EE=ITpdDm;8emy6E#$h@;_ z-f8CR%oKX8O$f~kRa3V%adOy$K+lDg6xKTLjTB2T)a5V=9rdUAGq;%;S(bY|QgZL< zZQ7{jsSaoq$zg0zJa-2z@OO9YH*kO180-u0i0}*h5zrsNyi5tf5jpig{2b4Bc042U4$=rG4+iB^RP7d9Qk6v+DU$ zfM+oKZ!TG;-@#G)B&(LP5&<3~BCcv5%CX=!Y`NEXqbui@FTZ zP+iXt4(+t0s~WNGu2VAuY_^|A%cgaShyJ*#bXEH?T*O-CEwM=lF?DVttF_=P$C*^9 zyj&@AJi#_a`n16BPIjeGoSS{iYrLJ$MbTu}v77nvc(Rf(cjzH$T@!b?L*;D6S4Z}% z`HXPy`}r8zCvOwSZvo{@M}Z;gA&*2>*xfpIoxUYIi}`a}#rG7A+8_%wOIiUvee`w( z*%gf{9GRPvY@hEfeA|Z^pArb1G8{SY24f8vmIarnWw=Esd=Bo#6mZvk z8q`h051S^@SL!cNqWY3tEF+q;-x!2W9r;kTr)Bb>gu$Yu+t~MBD#W%LKQ~WbG%=-h z?Z}_@gVtV~d9Hf`J^n*B@^$sWr)_imp zh`?{pgz(UBjqTVv+Upn#JG7Pp_rfbXiCu>lUDqMkp3krHQ{_ZaSBl4J1^4;WoHI{NZf-gqA5KZRkP5uilkcz zI|ts2kmQL@v%CRxw6+R~0@(+b{P=pth5L!j3$Q%V{#E|VdKC=p@roFS{k+o@^Vb7R zaHRX?>t3xvWJIl)&xpxmpJjhh_3BgUl6Vo)4+gJynkwvk$FNM~I>-yXW){qo!4}Go zXmEXgpTe|i_&q5LWU$!hMAd&tj{lI=}IJrSFxsj^&9n^Dfz2x)78>?;bmQB6{*s9pZ#kCO&i6KToy=O>NWMJb8tmAH1{3etW0YCcRM5dW(n<4T@i*4x-|^ zT0!5fM3g8K(}jdTFv%@ADk{j1 zsDrEIdhI){mPUq|Hi?A=Nku2)v--I1uVYl#qBxTHF*Uiex zV)ct~$y_ln*TemXD2upRAtHnd&3C>aD)#XDWQgn`iTWSH}}3Eg+N>JfDMa zFXnT_pCA0N^7py@4w!%7`MMLGpP6WuzI#2O`&p(mfGp|=f#&HMc@IX2r}z6e1)B^lzr-2)yvwD;c0I#-R4ecPF6SY6`_@LnmT;jDu* zB4Hz>OS}TC82OcFmU~>Aj+SPvvT<(|Suh4vIWWuW4@7cUC@BQGc)lhIy^B&+rhXD? z;#99iu#o_Y^Fm4-m2)vvW#8O2TbJv4yZ6qIpCvSqI&+SfyyvufIb1b2LVZ7Iv7*;$bq(uMae zIWdzLK1h^|ckwoZ+qvLeHAlfwKdNj&Ch*9PvHl|c%{r)qHNwdmGoM30oT%8SL#fT6 zS5)$e21sK5$kIwm%KSrjuvt@nF7BHBp*D+~bZks}V+XnJ+>>-oz(n$@r=`tY#;E;Z zCKwmP3f;d%pIc70>zP8(z#eL_z8;sD9B&5_tN98kt`oUkW5o}zDw|P3a`;1sY}$>b z8Q@Ygc7%oltMZAn@cW`hneMz4-BXU0`pcr_18^KX9n;|%meZ%DoW%F|GJ7c_+BEbqQJ0J1{95k zK95%?CJcu-#)R3^bI3RIZsf< zT64A%s`!6w0~=fOS!1#JKYM$q_41N*jVmP3yncp5KnfL}s;vhThI1t=jO}5QQabDB zkp#W((URftp3mA)j(gx+wgBO80BNA}%LU1lXeQEHn54M3gk@+>SbUov*scTfZ!s4$IL~b4O}^SB54*nIUsUWTd`yf6}alY9A#+;-z{r2GM~=8At2+DuFA^rVCnrGs$C1 z7&IQ`X|l3kD%|w0W6~<8p5jItmXxH zJh;2fjj~|Qtyp)1_GcSVQQySdiaUZ`;@sN?K4LMj4o~Lvf{E++H@HhXR)l`lMXzv+R*K za{5UH7{p(VUN_3^O}jPAB_3%xOH8MWZBJh|}-jubmvuC+{C9oi-Mxqx~lI0bp-u7Kpj-OUIGhww`kSc%=2Uih1 zx5&}EKe^Se>`|Ss$(5yXRv4H$4#}E9f5>b;U8^LzRZiz0CMm=XpMCQVT%yS%rQJ?u z`r!3ME!#en5N`s!CbalT6zS@K^q2zPN$*Ru&vK8}cLs8vE-ntAc-{B)O5#bHQx3dZ zxR<8K=fI8p`8R*GKGUAoNQS&&-)qb_l7DJy+*tC}1EDDaVpJjcUrkLN9e+vM{$%T? zIf#%{+)ab-SW8&pb%5T!T=4@v3ae4QN40;E6{J>mD}75qW`Ezv=i=teTU$T%L2!$< z;GojtGlvz~qaF6;S|;zQne1D0vLxq}_jA&{Cjr~aIg#4ONc3cF988Zm^ks=3WEH~l zv>&OMr!jwsuPD&!;yiV8;Ggllo(p1-7(PeHk%c4N_NmH{RF5i`s9Xf3Llje2H4C zwK|2Duz212OCU21A3u$aolELRwh-zPi|i-`VKemUe9{_RMM%rpSr{%Lf5F@(L32+l z6I&lAjPXgV&po9JVkh$+WQlQh;0DQ(nX(w@E#`9L8oS>y0!FNaU-{9v6ukD`(^t%jq z2a(?lpRs;1{C6z53%%Q&{)TSh{0+U^tKKEJTm60$3=sW8ExZfAo9Mscw1_bKZz=tE X+JjY45qbguScp3San7HCKi~cZoG;*h literal 0 HcmV?d00001 diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java index d52e25b3626e..053bd0116b77 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java @@ -15,11 +15,16 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES; + +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.BiPredicate; import java.util.stream.Collectors; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; @@ -27,6 +32,10 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeValue; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; @@ -43,13 +52,71 @@ public class SendMessageBatchManager extends RequestBatchManager shouldFlush(stringBatchingExecutionContextMap, changeMessageVisibilityRequest, overrideConfiguration) + ); this.asyncClient = asyncClient; } + public static long sum(SendMessageRequest request) { + int totalSize = 0; + + // Calculate size of messageBody + if (request.messageBody() != null) { + totalSize += request.messageBody().getBytes(StandardCharsets.UTF_8).length; + } + + // Calculate size of messageAttributes + if (request.messageAttributes() != null) { + totalSize += request.messageAttributes().entrySet().stream() + .mapToInt(entry -> { + String key = entry.getKey(); + MessageAttributeValue value = entry.getValue(); + return key.getBytes(StandardCharsets.UTF_8).length + + value.dataType().getBytes(StandardCharsets.UTF_8).length + + (value.stringValue() != null ? value.stringValue().getBytes(StandardCharsets.UTF_8).length + : 0); + }).sum(); + } + + // Calculate size of messageSystemAttributes + if (request.messageSystemAttributes() != null) { + totalSize += request.messageSystemAttributes().entrySet().stream() + .mapToInt(entry -> { + String key = entry.getKey().toString(); + MessageSystemAttributeValue value = entry.getValue(); + return key.getBytes(StandardCharsets.UTF_8).length + + (value.stringValue() != null ? value.stringValue().getBytes(StandardCharsets.UTF_8).length + : 0); + }).sum(); + } + + return totalSize ; + } + + + private static boolean shouldFlush(Map> contextMap, + SendMessageRequest request, RequestBatchConfiguration configuration) { + if (request == null) { + return contextMap.size() > configuration.maxBatchItems(); + } + + if(contextMap.size() >= configuration.maxBatchItems()){ + return true; + } + // Sum up all the responsePayload values in the contextMap + long totalPayloadSize = contextMap.values().stream() + .mapToLong(BatchingExecutionContext::responsePayload) + .sum(); + + return totalPayloadSize >= MAX_PAYLOAD_SIZE_BYTES; + } + + private static IdentifiableMessage sendMessageCreateThrowable(BatchResultErrorEntry failedEntry) { String key = failedEntry.id(); AwsErrorDetails errorDetailsBuilder = AwsErrorDetails.builder() @@ -139,4 +206,5 @@ IdentifiableMessage>> mapBatchResponse(SendMessageBatchResponse batch }); return mappedResponses; } + } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java index 540818a90aa6..55a921dcd7fb 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java @@ -22,6 +22,14 @@ public final class SqsMessageDefault { public static final int MAX_SUPPORTED_SQS_RECEIVE_MSG = 10; + /** + * https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html + * We can send upto up to 10 messages thus the buffer size is set to 10 + */ + public static final int MAX_SEND_MESSAGE_BATCH_SIZE = 10; + + + public static final long MAX_PAYLOAD_SIZE_BYTES = 262_144; // 256 KiB private SqsMessageDefault() { } From 63779a5b94b9394c820228b9eb0f24a018d631ca Mon Sep 17 00:00:00 2001 From: John Viegas Date: Tue, 27 Aug 2024 18:25:46 -0700 Subject: [PATCH 3/5] Byte Based batching for SendMessage API --- .../BatchOverrideConfiguration.java | 3 +- .../BatchingExecutionContext.java | 7 +- .../internal/batchmanager/BatchingMap.java | 18 ++-- .../ChangeMessageVisibilityBatchManager.java | 30 ++----- .../DefaultSqsAsyncBatchManager.java | 19 +++- .../DeleteMessageBatchManager.java | 7 +- .../batchmanager/RequestBatchBuffer.java | 49 ++++++---- .../RequestBatchConfiguration.java | 76 ++++++++++++---- .../batchmanager/RequestBatchManager.java | 21 +++-- .../RequestPayloadCalculator.java | 52 +++++++++++ .../ResponsePayloadCalculator.java | 77 ---------------- .../batchmanager/SendMessageBatchManager.java | 72 +-------------- .../batchmanager/SqsMessageDefault.java | 11 ++- .../batchmanager/RequestBatchBufferTest.java | 83 +++++++++++++++-- .../RequestPayloadCalculatorTest.java | 90 +++++++++++++++++++ .../sqs/batchmanager/SampleBatchManager.java | 11 ++- 16 files changed, 388 insertions(+), 238 deletions(-) create mode 100644 services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestPayloadCalculator.java delete mode 100644 services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponsePayloadCalculator.java create mode 100644 services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestPayloadCalculatorTest.java diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java index 2e00f089b02a..532e460c75a6 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java @@ -51,7 +51,8 @@ private BatchOverrideConfiguration(Builder builder) { this.maxBatchItems = Validate.isPositiveOrNull(builder.maxBatchItems, "maxBatchItems"); this.maxBatchKeys = Validate.isPositiveOrNull(builder.maxBatchKeys, "maxBatchKeys"); this.maxBufferSize = Validate.isPositiveOrNull(builder.maxBufferSize, "maxBufferSize"); - this.batchSendRequestFrequency = Validate.isPositiveOrNull(builder.batchSendRequestFrequency, "batchSendRequestFrequency"); + this.batchSendRequestFrequency = Validate.isPositiveOrNull(builder.batchSendRequestFrequency, + "batchSendRequestFrequency"); this.visibilityTimeout = Validate.isPositiveOrNull(builder.visibilityTimeout, "visibilityTimeout"); this.longPollWaitTimeout = Validate.isPositiveOrNull(builder.longPollWaitTimeout, "longPollWaitTimeout"); this.minReceiveWaitTime = Validate.isPositiveOrNull(builder.minReceiveWaitTime, "minReceiveWaitTime"); diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java index 503175e51fc0..2ab7cfc57f16 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java @@ -15,6 +15,7 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -24,12 +25,12 @@ public final class BatchingExecutionContext { private final RequestT request; private final CompletableFuture response; - private final long responsePayload; + private final Optional responsePayload; public BatchingExecutionContext(RequestT request, CompletableFuture response) { this.request = request; this.response = response; - responsePayload = ResponsePayloadCalculator.calculateMessageSize(request); + responsePayload = RequestPayloadCalculator.calculateMessageSize(request); } public RequestT request() { @@ -41,7 +42,7 @@ public CompletableFuture response() { } - public long responsePayload() { + public Optional responsePayload() { return responsePayload; } } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java index cfcb5164d96f..563e6c1d778d 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java @@ -20,8 +20,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.function.BiConsumer; -import java.util.function.BiPredicate; -import java.util.function.Predicate; import java.util.function.Supplier; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -34,15 +32,17 @@ public final class BatchingMap { private final int maxBatchKeys; + private final int maxBatchBytesSize; + private final int maxBatchSize; private final Map> batchContextMap; - private final BiPredicate>, RequestT> flushCondition ; - public BatchingMap(int maxBatchKeys, - BiPredicate>, RequestT> flushCondition) { + int maxBatchBytesSize, + int maxBatchSize) { this.batchContextMap = new ConcurrentHashMap<>(); this.maxBatchKeys = maxBatchKeys; - this.flushCondition = flushCondition; + this.maxBatchBytesSize = maxBatchBytesSize; + this.maxBatchSize = maxBatchSize; } public void put(String batchKey, Supplier> scheduleFlush, RequestT request, @@ -51,10 +51,14 @@ public void put(String batchKey, Supplier> scheduleFlush, Req if (batchContextMap.size() == maxBatchKeys) { throw new IllegalStateException("Reached MaxBatchKeys of: " + maxBatchKeys); } - return new RequestBatchBuffer<>(scheduleFlush.get(), flushCondition); + return new RequestBatchBuffer<>(scheduleFlush.get(), maxBatchSize, maxBatchBytesSize); }).put(request, response); } + public boolean contains(String batchKey) { + return batchContextMap.containsKey(batchKey); + } + public void putScheduledFlush(String batchKey, ScheduledFuture scheduledFlush) { batchContextMap.get(batchKey).putScheduledFlush(scheduledFlush); } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java index a68f0535615c..bd2150d26fa8 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java @@ -17,17 +17,14 @@ import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -import java.util.function.BiPredicate; import java.util.stream.Collectors; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry; @@ -49,22 +46,10 @@ public class ChangeMessageVisibilityBatchManager extends RequestBatchManager shouldFlush(stringBatchingExecutionContextMap, changeMessageVisibilityRequest, overrideConfiguration) - ); + super(overrideConfiguration, scheduledExecutor); this.sqsAsyncClient = sqsAsyncClient; } - private static boolean shouldFlush(Map> contextMap, - ChangeMessageVisibilityRequest request, RequestBatchConfiguration configuration) { - if (request != null) { - return false; - } - return contextMap.size() >= configuration.maxBatchItems(); - } - private static ChangeMessageVisibilityBatchRequest createChangeMessageVisibilityBatchRequest( List> identifiedRequests, String batchKey) { List entries = identifiedRequests @@ -79,10 +64,10 @@ private static ChangeMessageVisibilityBatchRequest createChangeMessageVisibility .overrideConfiguration(); return overrideConfiguration.map( config -> ChangeMessageVisibilityBatchRequest.builder() - .queueUrl(batchKey) - .overrideConfiguration(config) - .entries(entries) - .build()) + .queueUrl(batchKey) + .overrideConfiguration(config) + .entries(entries) + .build()) .orElseGet(() -> ChangeMessageVisibilityBatchRequest.builder() .queueUrl(batchKey) .entries(entries) @@ -119,7 +104,6 @@ private static IdentifiableMessage changeMessageVisibilityCreateThrow } - @Override protected CompletableFuture batchAndSend( List> identifiedRequests, String batchKey) { @@ -130,8 +114,8 @@ protected CompletableFuture batchAndSend( @Override protected String getBatchKey(ChangeMessageVisibilityRequest request) { - return request.overrideConfiguration().map(overrideConfig -> request.queueUrl() + overrideConfig.hashCode()) - .orElseGet(request::queueUrl); + return request.overrideConfiguration().map(overrideConfig -> request.queueUrl() + overrideConfig.hashCode()) + .orElseGet(request::queueUrl); } @Override diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java index cbe2ec7da793..99cce4a9ef25 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java @@ -15,6 +15,8 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -49,13 +51,24 @@ private DefaultSqsAsyncBatchManager(DefaultBuilder builder) { ScheduledExecutorService scheduledExecutor = builder.scheduledExecutor; - this.sendMessageBatchManager = new SendMessageBatchManager(new RequestBatchConfiguration(builder.overrideConfiguration), + RequestBatchConfiguration.Builder configBuilder = + builder.overrideConfiguration != null ? + RequestBatchConfiguration.builder() + .batchSendRequestFrequency(builder.overrideConfiguration.batchSendRequestFrequency()) + .maxBatchItems(builder.overrideConfiguration.maxBatchItems()) + .maxBufferSize(builder.overrideConfiguration.maxBufferSize()) + .maxBatchKeys(builder.overrideConfiguration.maxBatchKeys()) + : RequestBatchConfiguration.builder(); + + this.sendMessageBatchManager = new SendMessageBatchManager(configBuilder + .maxBatchBytesSize(MAX_PAYLOAD_SIZE_BYTES) + .build(), scheduledExecutor, client); - this.deleteMessageBatchManager = new DeleteMessageBatchManager(new RequestBatchConfiguration(builder.overrideConfiguration), + this.deleteMessageBatchManager = new DeleteMessageBatchManager(configBuilder.build(), scheduledExecutor, client); - this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(new RequestBatchConfiguration(builder.overrideConfiguration), + this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(configBuilder.build(), scheduledExecutor, client); diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java index 9537ce7aebf0..d078c7108cbc 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java @@ -27,8 +27,6 @@ import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; @@ -48,9 +46,7 @@ public class DeleteMessageBatchManager extends RequestBatchManager shouldFlush(stringBatchingExecutionContextMap, changeMessageVisibilityRequest, overrideConfiguration) - ); + super(overrideConfiguration, scheduledExecutor); this.sqsAsyncClient = sqsAsyncClient; } @@ -65,7 +61,6 @@ private static boolean shouldFlush(Map> identifiedRequests, String batchKey) { List entries = identifiedRequests diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java index ce23b15ed50b..9e5442c52bab 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java @@ -23,32 +23,26 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; -import java.util.function.BiPredicate; -import java.util.function.Predicate; import java.util.stream.Collectors; import software.amazon.awssdk.annotations.SdkInternalApi; @SdkInternalApi -public final class RequestBatchBuffer { +public final class RequestBatchBuffer { private final Object flushLock = new Object(); private final Map> idToBatchContext; - - - private BiPredicate>, RequestT> flushCondition ; - + private final int maxBatchItems; + private final int maxBatchSizeInBytes; /** * Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next * BatchingExecutionContext. For simplicity, the ID is just an integer that is incremented everytime a new request and * response pair is received. */ private int nextId; - private int maxBatchItems; - /** - * Keeps track of the ID of the next entry to be added in a batch request. This ID does not necessarily correlate to a - * request that already exists in the idToBatchContext map since it refers to the next entry (ex. if the last entry added - * to idToBatchContext had an id of 22, nextBatchEntry will have a value of 23). + * Keeps track of the ID of the next entry to be added in a batch request. This ID does not necessarily correlate to a request + * that already exists in the idToBatchContext map since it refers to the next entry (ex. if the last entry added to + * idToBatchContext had an id of 22, nextBatchEntry will have a value of 23). */ private int nextBatchEntry; @@ -58,27 +52,50 @@ public final class RequestBatchBuffer { private ScheduledFuture scheduledFlush; public RequestBatchBuffer(ScheduledFuture scheduledFlush, - BiPredicate>, RequestT> flushCondition) { + int maxBatchItems, int maxBatchSizeInBytes) { this.idToBatchContext = new ConcurrentHashMap<>(); this.nextId = 0; this.nextBatchEntry = 0; this.scheduledFlush = scheduledFlush; - this.flushCondition = flushCondition; + this.maxBatchItems = maxBatchItems; + this.maxBatchSizeInBytes = maxBatchSizeInBytes; } + /** + * When request is null it checks if current contents in idToBatchContext are flushable. When request is passed it calculate + * if the new request can overflow of ByteSize if yes then it flushes the messages + */ public Map> flushableRequests(RequestT request) { synchronized (flushLock) { - if (flushCondition.test(idToBatchContext, request)) { + if (isByteSizeThresholdCrossed(request) || isMaxBatchSizeLimitReached(request)) { return extractFlushedEntries(maxBatchItems); } + return new ConcurrentHashMap<>(); } } + private boolean isMaxBatchSizeLimitReached(RequestT request) { + int batchSizeLimit = request != null ? this.maxBatchItems + 1 : this.maxBatchItems; + return idToBatchContext.size() >= batchSizeLimit; + } + + private boolean isByteSizeThresholdCrossed(RequestT request) { + if (maxBatchSizeInBytes < 0) { + return false; + } + long incomingRequestBytes = RequestPayloadCalculator.calculateMessageSize(request).orElse(0L); + long totalPayloadSize = idToBatchContext.values().stream() + .map(BatchingExecutionContext::responsePayload) + .mapToLong(opt -> opt.orElse(0L)) + .sum() + incomingRequestBytes; + return totalPayloadSize > maxBatchSizeInBytes; + } + public Map> flushableScheduledRequests(int maxBatchItems) { synchronized (flushLock) { - if (idToBatchContext.size() > 0) { + if (!idToBatchContext.isEmpty()) { return extractFlushedEntries(maxBatchItems); } return new ConcurrentHashMap<>(); diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java index bfcdd1896ee5..851fb6613019 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java @@ -16,39 +16,36 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; import java.time.Duration; -import java.util.Optional; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; @SdkInternalApi public final class RequestBatchConfiguration { public static final int DEFAULT_MAX_BATCH_ITEMS = 10; + public static final int DEFAULT_MAX_BATCH_BYTES_SIZE = -1; public static final int DEFAULT_MAX_BATCH_KEYS = 1000; public static final int DEFAULT_MAX_BUFFER_SIZE = 500; - public static final Duration DEFAULT_MAX_BATCH_OPEN_IN_MS = Duration.ofMillis(200); + public static final Duration DEFAULT_MAX_BATCH_OPEN_IN_MS = Duration.ofMillis(50); private final Integer maxBatchItems; private final Integer maxBatchKeys; private final Integer maxBufferSize; private final Duration batchSendRequestFrequency; + private final Integer maxBatchBytesSize; - public RequestBatchConfiguration(BatchOverrideConfiguration overrideConfiguration) { - this.maxBatchItems = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::maxBatchItems) - .orElse(DEFAULT_MAX_BATCH_ITEMS); + private RequestBatchConfiguration(Builder builder) { - this.maxBatchKeys = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::maxBatchKeys) - .orElse(DEFAULT_MAX_BATCH_KEYS); + this.maxBatchItems = builder.maxBatchItems != null ? builder.maxBatchItems : DEFAULT_MAX_BATCH_ITEMS; + this.maxBatchKeys = builder.maxBatchKeys != null ? builder.maxBatchKeys : DEFAULT_MAX_BATCH_KEYS; + this.maxBufferSize = builder.maxBufferSize != null ? builder.maxBufferSize : DEFAULT_MAX_BUFFER_SIZE; + this.batchSendRequestFrequency = builder.batchSendRequestFrequency != null ? builder.batchSendRequestFrequency : + DEFAULT_MAX_BATCH_OPEN_IN_MS; + this.maxBatchBytesSize = builder.maxBatchBytesSize != null ? builder.maxBatchBytesSize : DEFAULT_MAX_BATCH_BYTES_SIZE; - this.maxBufferSize = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::maxBufferSize) - .orElse(DEFAULT_MAX_BUFFER_SIZE); + } - this.batchSendRequestFrequency = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::batchSendRequestFrequency) - .orElse(DEFAULT_MAX_BATCH_OPEN_IN_MS); + public static Builder builder() { + return new Builder(); } public Duration batchSendRequestFrequency() { @@ -66,4 +63,51 @@ public int maxBatchKeys() { public int maxBufferSize() { return maxBufferSize; } + + public int maxBatchBytesSize() { + return maxBatchBytesSize; + } + + public static final class Builder { + + private Integer maxBatchItems; + private Integer maxBatchKeys; + private Integer maxBufferSize; + private Duration batchSendRequestFrequency; + private Integer maxBatchBytesSize; + + private Builder() { + } + + public Builder maxBatchItems(Integer maxBatchItems) { + this.maxBatchItems = maxBatchItems; + return this; + } + + public Builder maxBatchKeys(Integer maxBatchKeys) { + this.maxBatchKeys = maxBatchKeys; + return this; + } + + public Builder maxBufferSize(Integer maxBufferSize) { + this.maxBufferSize = maxBufferSize; + return this; + } + + public Builder batchSendRequestFrequency(Duration batchSendRequestFrequency) { + this.batchSendRequestFrequency = batchSendRequestFrequency; + return this; + } + + public Builder maxBatchBytesSize(Integer maxBatchBytesSize) { + this.maxBatchBytesSize = maxBatchBytesSize; + return this; + } + + public RequestBatchConfiguration build() { + return new RequestBatchConfiguration(this); + } + } + + } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java index a36d80b01a2b..360db264b19d 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java @@ -26,14 +26,14 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.function.BiPredicate; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; import software.amazon.awssdk.utils.Either; import software.amazon.awssdk.utils.Validate; @SdkInternalApi public abstract class RequestBatchManager { + protected final RequestBatchConfiguration batchConfiguration ; + private final int maxBatchItems; private final Duration batchSendRequestFrequency; private final BatchingMap requestsAndResponsesMaps; @@ -41,19 +41,19 @@ public abstract class RequestBatchManager { private final Set> pendingBatchResponses ; private final Set> pendingResponses ; - protected final RequestBatchConfiguration batchConfiguration ; protected RequestBatchManager(RequestBatchConfiguration overrideConfiguration, - ScheduledExecutorService scheduledExecutor, - BiPredicate>, RequestT> flushCondition ) { + ScheduledExecutorService scheduledExecutor) { batchConfiguration = overrideConfiguration; this.maxBatchItems = batchConfiguration.maxBatchItems(); this.batchSendRequestFrequency = batchConfiguration.batchSendRequestFrequency(); this.scheduledExecutor = Validate.notNull(scheduledExecutor, "Null scheduledExecutor"); pendingBatchResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); pendingResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); - this.requestsAndResponsesMaps = new BatchingMap<>(batchConfiguration.maxBatchKeys(), flushCondition); + this.requestsAndResponsesMaps = new BatchingMap<>(batchConfiguration.maxBatchKeys(), + overrideConfiguration.maxBatchBytesSize(), + overrideConfiguration.maxBatchItems()); } @@ -62,10 +62,13 @@ public CompletableFuture batchRequest(RequestT request) { pendingResponses.add(response); try { String batchKey = getBatchKey(request); - // Consider the current request to calculate in advance the size before adding - flushBufferIfNeeded(batchKey, request); + // If there are bufferred messages then make sure adding a new message will not cause overflow of maxBytesSize + if (requestsAndResponsesMaps.contains(batchKey)) { + flushBufferIfNeeded(batchKey, request); + } requestsAndResponsesMaps.put(batchKey, - () -> scheduleBufferFlush(batchKey, batchSendRequestFrequency.toMillis(), scheduledExecutor), + () -> scheduleBufferFlush(batchKey, batchSendRequestFrequency.toMillis(), + scheduledExecutor), request, response); flushBufferIfNeeded(batchKey, null); diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestPayloadCalculator.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestPayloadCalculator.java new file mode 100644 index 000000000000..ec68eddeb643 --- /dev/null +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestPayloadCalculator.java @@ -0,0 +1,52 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.sqs.internal.batchmanager; + +import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.ATTRIBUTE_MAPS_PAYLOAD_BYTES; + +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; + +@SdkInternalApi +public final class RequestPayloadCalculator { + + private RequestPayloadCalculator() { + } + + /** + * Evaluates the total size of the message body, message attributes, and message system attributes for a SendMessageRequest. + * If the request is not a SendMessageRequest, returns an empty Optional. + * + * @param request the request to evaluate + * @param the type of the request + * @return an Optional containing the total size in bytes if the request is a SendMessageRequest, otherwise an empty Optional + */ + public static Optional calculateMessageSize(RequestT request) { + if (!(request instanceof SendMessageRequest)) { + return Optional.empty(); + } + SendMessageRequest sendMessageRequest = (SendMessageRequest) request; + long totalSize = calculateBodySize(sendMessageRequest) + ATTRIBUTE_MAPS_PAYLOAD_BYTES; + return Optional.of(totalSize); + } + + private static long calculateBodySize(SendMessageRequest request) { + return request.messageBody() != null ? request.messageBody().getBytes(StandardCharsets.UTF_8).length : 0; + } + +} diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponsePayloadCalculator.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponsePayloadCalculator.java deleted file mode 100644 index e57eb4b515ff..000000000000 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponsePayloadCalculator.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.services.sqs.internal.batchmanager; - -import java.nio.charset.StandardCharsets; -import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; -import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeValue; -import software.amazon.awssdk.services.sqs.model.SendMessageRequest; - -public class ResponsePayloadCalculator { - - private static final int MAX_SIZE_BYTES = 262_144; // 256 KiB - - /** - * Evaluates if the total size of the message body, message attributes, and message system attributes - * exceeds the maximum allowed size for a SendMessageRequest. If the request is not a SendMessageRequest, - * it returns -1. - * - * @param request the request to evaluate - * @param the type of the request - * @return the total size in bytes if the request is a SendMessageRequest, otherwise -1 - */ - public static long calculateMessageSize(RequestT request) { - if (!(request instanceof SendMessageRequest)) { - return -1; - } - - SendMessageRequest sendMessageRequest = (SendMessageRequest) request; - long totalSize = 0; - - // Calculate size of messageBody - if (sendMessageRequest.messageBody() != null) { - totalSize += sendMessageRequest.messageBody().getBytes(StandardCharsets.UTF_8).length; - } - - // Calculate size of messageAttributes - if (sendMessageRequest.messageAttributes() != null) { - totalSize += sendMessageRequest.messageAttributes().entrySet().stream() - .mapToInt(entry -> { - String key = entry.getKey(); - MessageAttributeValue value = entry.getValue(); - return key.getBytes(StandardCharsets.UTF_8).length + - value.dataType().getBytes(StandardCharsets.UTF_8).length + - (value.stringValue() != null ? - value.stringValue().getBytes(StandardCharsets.UTF_8).length : 0); - }).sum(); - } - - // Calculate size of messageSystemAttributes - if (sendMessageRequest.messageSystemAttributes() != null) { - totalSize += sendMessageRequest.messageSystemAttributes().entrySet().stream() - .mapToInt(entry -> { - String key = entry.getKey().toString(); - MessageSystemAttributeValue value = entry.getValue(); - return key.getBytes(StandardCharsets.UTF_8).length + - (value.stringValue() != null ? - value.stringValue().getBytes(StandardCharsets.UTF_8).length : 0); - }).sum(); - } - - return totalSize; - } - -} \ No newline at end of file diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java index 053bd0116b77..1c6dfb6307ed 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java @@ -15,27 +15,17 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; -import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES; - -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -import java.util.function.BiPredicate; import java.util.stream.Collectors; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse; -import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; -import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeValue; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; @@ -55,68 +45,10 @@ public class SendMessageBatchManager extends RequestBatchManager shouldFlush(stringBatchingExecutionContextMap, changeMessageVisibilityRequest, overrideConfiguration) - ); + super(overrideConfiguration, scheduledExecutor); this.asyncClient = asyncClient; } - public static long sum(SendMessageRequest request) { - int totalSize = 0; - - // Calculate size of messageBody - if (request.messageBody() != null) { - totalSize += request.messageBody().getBytes(StandardCharsets.UTF_8).length; - } - - // Calculate size of messageAttributes - if (request.messageAttributes() != null) { - totalSize += request.messageAttributes().entrySet().stream() - .mapToInt(entry -> { - String key = entry.getKey(); - MessageAttributeValue value = entry.getValue(); - return key.getBytes(StandardCharsets.UTF_8).length + - value.dataType().getBytes(StandardCharsets.UTF_8).length + - (value.stringValue() != null ? value.stringValue().getBytes(StandardCharsets.UTF_8).length - : 0); - }).sum(); - } - - // Calculate size of messageSystemAttributes - if (request.messageSystemAttributes() != null) { - totalSize += request.messageSystemAttributes().entrySet().stream() - .mapToInt(entry -> { - String key = entry.getKey().toString(); - MessageSystemAttributeValue value = entry.getValue(); - return key.getBytes(StandardCharsets.UTF_8).length + - (value.stringValue() != null ? value.stringValue().getBytes(StandardCharsets.UTF_8).length - : 0); - }).sum(); - } - - return totalSize ; - } - - - private static boolean shouldFlush(Map> contextMap, - SendMessageRequest request, RequestBatchConfiguration configuration) { - if (request == null) { - return contextMap.size() > configuration.maxBatchItems(); - } - - if(contextMap.size() >= configuration.maxBatchItems()){ - return true; - } - // Sum up all the responsePayload values in the contextMap - long totalPayloadSize = contextMap.values().stream() - .mapToLong(BatchingExecutionContext::responsePayload) - .sum(); - - return totalPayloadSize >= MAX_PAYLOAD_SIZE_BYTES; - } - - private static IdentifiableMessage sendMessageCreateThrowable(BatchResultErrorEntry failedEntry) { String key = failedEntry.id(); AwsErrorDetails errorDetailsBuilder = AwsErrorDetails.builder() @@ -180,7 +112,7 @@ private static SendMessageBatchRequestEntry createSendMessageBatchRequestEntry(S @Override protected CompletableFuture batchAndSend(List> - identifiedRequests, String batchKey) { + identifiedRequests, String batchKey) { SendMessageBatchRequest batchRequest = createSendMessageBatchRequest(identifiedRequests, batchKey); return asyncClient.sendMessageBatch(batchRequest); } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java index 55a921dcd7fb..a67ddfd33bad 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java @@ -29,7 +29,16 @@ public final class SqsMessageDefault { public static final int MAX_SEND_MESSAGE_BATCH_SIZE = 10; - public static final long MAX_PAYLOAD_SIZE_BYTES = 262_144; // 256 KiB + public static final int MAX_PAYLOAD_SIZE_BYTES = 262_144; // 256 KiB + + /** + * + * AWS SQS Message Attributes Documentation + * + * Rounding up max payload due to attribute maps. + * This was not done in V1, thus an issue was reported where batch messages failed with payload size exceeding the maximum. + */ + public static final int ATTRIBUTE_MAPS_PAYLOAD_BYTES = 16 * 1024; // 16 KiB private SqsMessageDefault() { } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java index c7d6861b232c..c10e3085ba8e 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java @@ -24,6 +24,9 @@ import java.util.concurrent.ScheduledFuture; import software.amazon.awssdk.services.sqs.internal.batchmanager.RequestBatchBuffer; import software.amazon.awssdk.services.sqs.internal.batchmanager.BatchingExecutionContext; +import software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -36,11 +39,11 @@ class RequestBatchBufferTest { @BeforeEach void setUp() { scheduledFlush = mock(ScheduledFuture.class); - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, ); } @Test void whenPutRequestThenBufferContainsRequest() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); assertEquals(1, batchBuffer.responses().size()); @@ -48,15 +51,17 @@ void whenPutRequestThenBufferContainsRequest() { @Test void whenFlushableRequestsThenReturnRequestsUpToMaxBatchItems() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); - Map> flushedRequests = batchBuffer.flushableRequests(1); + Map> flushedRequests = batchBuffer.flushableRequests(null); assertEquals(1, flushedRequests.size()); assertTrue(flushedRequests.containsKey("0")); } @Test void whenFlushableScheduledRequestsThenReturnAllRequests() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); Map> flushedRequests = batchBuffer.flushableScheduledRequests(1); @@ -66,6 +71,7 @@ void whenFlushableScheduledRequestsThenReturnAllRequests() { @Test void whenMaxBufferSizeReachedThenThrowException() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); for (int i = 0; i < 10; i++) { batchBuffer.put("request" + i, new CompletableFuture<>()); } @@ -74,6 +80,7 @@ void whenMaxBufferSizeReachedThenThrowException() { @Test void whenPutScheduledFlushThenFlushIsSet() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); ScheduledFuture newScheduledFlush = mock(ScheduledFuture.class); batchBuffer.putScheduledFlush(newScheduledFlush); assertNotNull(newScheduledFlush); @@ -81,12 +88,14 @@ void whenPutScheduledFlushThenFlushIsSet() { @Test void whenCancelScheduledFlushThenFlushIsCancelled() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); batchBuffer.cancelScheduledFlush(); verify(scheduledFlush).cancel(false); } @Test void whenGetResponsesThenReturnAllResponses() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); CompletableFuture response1 = new CompletableFuture<>(); CompletableFuture response2 = new CompletableFuture<>(); batchBuffer.put("request1", response1); @@ -99,6 +108,7 @@ void whenGetResponsesThenReturnAllResponses() { @Test void whenClearBufferThenBufferIsEmpty() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); batchBuffer.clear(); @@ -107,22 +117,85 @@ void whenClearBufferThenBufferIsEmpty() { @Test void whenExtractFlushedEntriesThenReturnCorrectEntries() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); for (int i = 0; i < 5; i++) { batchBuffer.put("request" + i, new CompletableFuture<>()); } - Map> flushedEntries = batchBuffer.flushableRequests(5); + Map> flushedEntries = batchBuffer.flushableRequests(null); assertEquals(5, flushedEntries.size()); } @Test void whenHasNextBatchEntryThenReturnTrue() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); batchBuffer.put("request1", new CompletableFuture<>()); - assertTrue(batchBuffer.flushableRequests("request1").containsKey("0")); + assertTrue(batchBuffer.flushableRequests(null).containsKey("0")); } + @Test void whenNextBatchEntryThenReturnNextEntryId() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); batchBuffer.put("request1", new CompletableFuture<>()); - assertEquals("0", batchBuffer.flushableRequests("request1").keySet().iterator().next()); + assertEquals("0", batchBuffer.flushableRequests(null).keySet().iterator().next()); + } + + + + @Test + void whenRequestPassedWithLessBytesinArgs_thenCheckForSizeOnly_andDonotFlush() { + RequestBatchBuffer batchBuffer + = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + for (int i = 0; i < 5; i++) { + batchBuffer.put(SendMessageRequest.builder().build(), + new CompletableFuture<>()); + } + Map> flushedEntries = + batchBuffer.flushableRequests(SendMessageRequest.builder().messageBody("Hi").build()); + assertEquals(0, flushedEntries.size()); } + + + + @Test + void testFlushWhenPayloadExceedsMaxSize() { + RequestBatchBuffer batchBuffer + = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + + String largeMessageBody = createLargeString('a',245_760); + batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(), + new CompletableFuture<>()); + Map> flushedEntries = + batchBuffer.flushableRequests(SendMessageRequest.builder().messageBody("NewMessage").build()); + assertEquals(1, flushedEntries.size()); + } + + @Test + void testFlushWhenCumulativePayloadExceedsMaxSize() { + RequestBatchBuffer batchBuffer + = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + + String largeMessageBody = createLargeString('a',130_000); + batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(), + new CompletableFuture<>()); + batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(), + new CompletableFuture<>()); + Map> flushedEntries = + batchBuffer.flushableRequests(SendMessageRequest.builder().messageBody("NewMessage").build()); + + //Flushes both the messages since thier sum is greater than 256Kb + assertEquals(2, flushedEntries.size()); + } + + + private String createLargeString(char ch, int length) { + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append(ch); + } + return sb.toString(); + } + + + } \ No newline at end of file diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestPayloadCalculatorTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestPayloadCalculatorTest.java new file mode 100644 index 000000000000..71b54f20dd6b --- /dev/null +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestPayloadCalculatorTest.java @@ -0,0 +1,90 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.sqs.batchmanager; + +import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.awssdk.services.sqs.internal.batchmanager.RequestPayloadCalculator; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; + +import java.util.Optional; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.ATTRIBUTE_MAPS_PAYLOAD_BYTES; + +@TestInstance(Lifecycle.PER_CLASS) +public class RequestPayloadCalculatorTest { + + @ParameterizedTest + @MethodSource("provideRequestsForMessageSizeCalculation") + @DisplayName("Test calculateMessageSize with different SendMessageRequest inputs") + void testCalculateMessageSize(SendMessageRequest request, long expectedSize) { + Optional actualSize = RequestPayloadCalculator.calculateMessageSize(request); + assertEquals(Optional.of(expectedSize), actualSize); + } + + private Stream provideRequestsForMessageSizeCalculation() { + return Stream.of( + Arguments.of( + SendMessageRequest.builder().messageBody("Test message").build(), + "Test message".getBytes(StandardCharsets.UTF_8).length + ATTRIBUTE_MAPS_PAYLOAD_BYTES + ), + Arguments.of( + SendMessageRequest.builder().messageBody("").build(), + 0L + ATTRIBUTE_MAPS_PAYLOAD_BYTES + ), + Arguments.of( + SendMessageRequest.builder().messageBody(null).build(), + 0L + ATTRIBUTE_MAPS_PAYLOAD_BYTES + ), + Arguments.of( + SendMessageRequest.builder().messageBody("Another test message").build(), + "Another test message".getBytes(StandardCharsets.UTF_8).length + ATTRIBUTE_MAPS_PAYLOAD_BYTES + ) + ); + } + + @ParameterizedTest + @MethodSource("provideNonSendMessageRequest") + @DisplayName("Test calculateMessageSize with non-SendMessageRequest inputs") + void testCalculateMessageSizeWithNonSendMessageRequest(Object request) { + Optional actualSize = RequestPayloadCalculator.calculateMessageSize(request); + assertEquals(Optional.empty(), actualSize); + } + + private Stream provideNonSendMessageRequest() { + return Stream.of( + Arguments.of(ChangeMessageVisibilityRequest.builder() + .queueUrl("https://sqs.us-west-2.amazonaws.com/MyQueue") + .receiptHandle("some-receipt-handle") + .visibilityTimeout(60) + .build()), + + Arguments.of(DeleteMessageRequest.builder() + .queueUrl("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue") + .receiptHandle("some-receipt-handle") + .build()) + ); + } +} diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java index ac8b2d00cc9f..628b39aa8d50 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java @@ -20,7 +20,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import software.amazon.awssdk.services.sqs.internal.batchmanager.IdentifiableMessage; +import software.amazon.awssdk.services.sqs.internal.batchmanager.RequestBatchConfiguration; import software.amazon.awssdk.services.sqs.internal.batchmanager.RequestBatchManager; +import software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault; import software.amazon.awssdk.utils.Either; public class SampleBatchManager extends RequestBatchManager { @@ -30,7 +32,14 @@ public class SampleBatchManager extends RequestBatchManager Date: Wed, 28 Aug 2024 10:58:05 -0700 Subject: [PATCH 4/5] Byte Based batching for SendMessage API --- .../BatchOverrideConfiguration.java | 27 +++++---- .../BatchingExecutionContext.java | 12 ++-- .../internal/batchmanager/BatchingMap.java | 24 ++++---- .../ChangeMessageVisibilityBatchManager.java | 6 +- .../DefaultSqsAsyncBatchManager.java | 35 +++++------- .../DeleteMessageBatchManager.java | 13 ----- .../batchmanager/RequestBatchBuffer.java | 57 +++++++++++-------- .../RequestBatchConfiguration.java | 29 +++++++--- .../batchmanager/RequestBatchManager.java | 42 +++++++------- .../RequestPayloadCalculator.java | 6 +- .../batchmanager/SqsMessageDefault.java | 9 +-- .../BatchOverrideConfigurationTest.java | 8 +-- .../batchmanager/RequestBatchBufferTest.java | 44 +++++++------- .../batchmanager/RequestBatchManagerTest.java | 8 +-- .../RequestPayloadCalculatorTest.java | 12 ++-- .../sqs/batchmanager/SampleBatchManager.java | 4 +- 16 files changed, 165 insertions(+), 171 deletions(-) diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java index 532e460c75a6..7157fbb8601d 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java @@ -37,7 +37,7 @@ public final class BatchOverrideConfiguration implements ToCopyableBuilder { private final RequestT request; private final CompletableFuture response; - private final Optional responsePayload; + private final Optional responsePayloadByteSize; public BatchingExecutionContext(RequestT request, CompletableFuture response) { this.request = request; this.response = response; - responsePayload = RequestPayloadCalculator.calculateMessageSize(request); + responsePayloadByteSize = RequestPayloadCalculator.calculateMessageSize(request); } public RequestT request() { @@ -41,8 +41,10 @@ public CompletableFuture response() { return response; } - - public Optional responsePayload() { - return responsePayload; + /** + * Optional because responsePayloadByteSize is required only for SendMessageRequests and not for other requests. + */ + public Optional responsePayloadByteSize() { + return responsePayloadByteSize; } } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java index 563e6c1d778d..171b76aa4bff 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java @@ -34,15 +34,15 @@ public final class BatchingMap { private final int maxBatchKeys; private final int maxBatchBytesSize; private final int maxBatchSize; + private final int maxBufferSize; private final Map> batchContextMap; - public BatchingMap(int maxBatchKeys, - int maxBatchBytesSize, - int maxBatchSize) { + public BatchingMap(RequestBatchConfiguration overrideConfiguration) { this.batchContextMap = new ConcurrentHashMap<>(); - this.maxBatchKeys = maxBatchKeys; - this.maxBatchBytesSize = maxBatchBytesSize; - this.maxBatchSize = maxBatchSize; + this.maxBatchKeys = overrideConfiguration.maxBatchKeys(); + this.maxBatchBytesSize = overrideConfiguration.maxBatchBytesSize(); + this.maxBatchSize = overrideConfiguration.maxBatchItems(); + this.maxBufferSize = overrideConfiguration.maxBufferSize(); } public void put(String batchKey, Supplier> scheduleFlush, RequestT request, @@ -51,7 +51,7 @@ public void put(String batchKey, Supplier> scheduleFlush, Req if (batchContextMap.size() == maxBatchKeys) { throw new IllegalStateException("Reached MaxBatchKeys of: " + maxBatchKeys); } - return new RequestBatchBuffer<>(scheduleFlush.get(), maxBatchSize, maxBatchBytesSize); + return new RequestBatchBuffer<>(scheduleFlush.get(), maxBatchSize, maxBatchBytesSize, maxBufferSize); }).put(request, response); } @@ -67,14 +67,14 @@ public void forEach(BiConsumer> batchContextMap.forEach(action); } - public Map> flushableRequests(String batchKey, RequestT request) { - return batchContextMap.get(batchKey).flushableRequests(request); - } - public Map> flushableRequests(String batchKey) { - return batchContextMap.get(batchKey).flushableRequests(null); + return batchContextMap.get(batchKey).flushableRequests(); } + public Map> flushableRequestsOnByteLimitBeforeAdd(String batchKey, + RequestT request) { + return batchContextMap.get(batchKey).flushableRequestsOnByteLimitBeforeAdd(request); + } public Map> flushableScheduledRequests(String batchKey, int maxBatchItems) { diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java index bd2150d26fa8..6260552e08e4 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java @@ -42,7 +42,6 @@ public class ChangeMessageVisibilityBatchManager extends RequestBatchManager changeMessageVisibilityCreateThrow return new IdentifiableMessage<>(key, response); } - @Override protected CompletableFuture batchAndSend( List> identifiedRequests, String batchKey) { @@ -114,8 +112,8 @@ protected CompletableFuture batchAndSend( @Override protected String getBatchKey(ChangeMessageVisibilityRequest request) { - return request.overrideConfiguration().map(overrideConfig -> request.queueUrl() + overrideConfig.hashCode()) - .orElseGet(request::queueUrl); + return request.overrideConfiguration().map(overrideConfig -> request.queueUrl() + overrideConfig.hashCode()) + .orElseGet(request::queueUrl); } @Override diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java index 99cce4a9ef25..eded3419fcfc 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java @@ -15,7 +15,7 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; -import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; @@ -48,27 +48,20 @@ public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager { private DefaultSqsAsyncBatchManager(DefaultBuilder builder) { this.client = Validate.notNull(builder.client, "client cannot be null"); - ScheduledExecutorService scheduledExecutor = builder.scheduledExecutor; - - RequestBatchConfiguration.Builder configBuilder = - builder.overrideConfiguration != null ? - RequestBatchConfiguration.builder() - .batchSendRequestFrequency(builder.overrideConfiguration.batchSendRequestFrequency()) - .maxBatchItems(builder.overrideConfiguration.maxBatchItems()) - .maxBufferSize(builder.overrideConfiguration.maxBufferSize()) - .maxBatchKeys(builder.overrideConfiguration.maxBatchKeys()) - : RequestBatchConfiguration.builder(); - - this.sendMessageBatchManager = new SendMessageBatchManager(configBuilder - .maxBatchBytesSize(MAX_PAYLOAD_SIZE_BYTES) - .build(), - scheduledExecutor, - client); - this.deleteMessageBatchManager = new DeleteMessageBatchManager(configBuilder.build(), - scheduledExecutor, - client); - this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(configBuilder.build(), + this.sendMessageBatchManager = + new SendMessageBatchManager(RequestBatchConfiguration + .builder(builder.overrideConfiguration) + .maxBatchBytesSize(MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES) + .build(), + scheduledExecutor, + client); + this.deleteMessageBatchManager = + new DeleteMessageBatchManager(RequestBatchConfiguration.builder(builder.overrideConfiguration).build(), + scheduledExecutor, + client); + this.changeMessageVisibilityBatchManager = + new ChangeMessageVisibilityBatchManager(RequestBatchConfiguration.builder(builder.overrideConfiguration).build(), scheduledExecutor, client); diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java index d078c7108cbc..cb33985e5a12 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java @@ -17,7 +17,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; @@ -36,7 +35,6 @@ import software.amazon.awssdk.services.sqs.model.SqsException; import software.amazon.awssdk.utils.Either; - @SdkInternalApi public class DeleteMessageBatchManager extends RequestBatchManager { @@ -50,17 +48,6 @@ protected DeleteMessageBatchManager(RequestBatchConfiguration overrideConfigurat this.sqsAsyncClient = sqsAsyncClient; } - - private static boolean shouldFlush(Map> contextMap, - DeleteMessageRequest request, RequestBatchConfiguration configuration) { - if (request != null) { - return false; - } - return contextMap.size() >= configuration.maxBatchItems(); - } - - private static DeleteMessageBatchRequest createDeleteMessageBatchRequest( List> identifiedRequests, String batchKey) { List entries = identifiedRequests diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java index 9e5442c52bab..84a1337efbdb 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java @@ -15,9 +15,9 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; -import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_SEND_MESSAGE_BATCH_SIZE; import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.stream.Collectors; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.utils.CollectionUtils; @SdkInternalApi public final class RequestBatchBuffer { @@ -32,6 +33,7 @@ public final class RequestBatchBuffer { private final Map> idToBatchContext; private final int maxBatchItems; + private final int maxBufferSize; private final int maxBatchSizeInBytes; /** * Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next @@ -52,53 +54,58 @@ public final class RequestBatchBuffer { private ScheduledFuture scheduledFlush; public RequestBatchBuffer(ScheduledFuture scheduledFlush, - int maxBatchItems, int maxBatchSizeInBytes) { + int maxBatchItems, int maxBatchSizeInBytes, int maxBufferSize) { this.idToBatchContext = new ConcurrentHashMap<>(); this.nextId = 0; this.nextBatchEntry = 0; this.scheduledFlush = scheduledFlush; this.maxBatchItems = maxBatchItems; + this.maxBufferSize = maxBufferSize; this.maxBatchSizeInBytes = maxBatchSizeInBytes; } - /** - * When request is null it checks if current contents in idToBatchContext are flushable. When request is passed it calculate - * if the new request can overflow of ByteSize if yes then it flushes the messages - */ - public Map> flushableRequests(RequestT request) { + public Map> flushableRequests() { synchronized (flushLock) { - if (isByteSizeThresholdCrossed(request) || isMaxBatchSizeLimitReached(request)) { - return extractFlushedEntries(maxBatchItems); - } - - return new ConcurrentHashMap<>(); + return (isByteSizeThresholdCrossed(0) || isMaxBatchSizeLimitReached()) + ? extractFlushedEntries(maxBatchItems) + : Collections.emptyMap(); } } - private boolean isMaxBatchSizeLimitReached(RequestT request) { - int batchSizeLimit = request != null ? this.maxBatchItems + 1 : this.maxBatchItems; - return idToBatchContext.size() >= batchSizeLimit; + + private boolean isMaxBatchSizeLimitReached() { + return idToBatchContext.size() >= maxBatchItems; + } + + public Map> flushableRequestsOnByteLimitBeforeAdd(RequestT request) { + synchronized (flushLock) { + if (maxBatchSizeInBytes > 0 && !idToBatchContext.isEmpty()) { + int incomingRequestBytes = RequestPayloadCalculator.calculateMessageSize(request).orElse(0); + if (isByteSizeThresholdCrossed(incomingRequestBytes)) { + return extractFlushedEntries(maxBatchItems); + } + } + return Collections.emptyMap(); + } } - private boolean isByteSizeThresholdCrossed(RequestT request) { + private boolean isByteSizeThresholdCrossed(int incomingRequestBytes) { if (maxBatchSizeInBytes < 0) { return false; } - long incomingRequestBytes = RequestPayloadCalculator.calculateMessageSize(request).orElse(0L); - long totalPayloadSize = idToBatchContext.values().stream() - .map(BatchingExecutionContext::responsePayload) - .mapToLong(opt -> opt.orElse(0L)) - .sum() + incomingRequestBytes; + int totalPayloadSize = idToBatchContext.values().stream() + .map(BatchingExecutionContext::responsePayloadByteSize) + .mapToInt(opt -> opt.orElse(0)) + .sum() + incomingRequestBytes; return totalPayloadSize > maxBatchSizeInBytes; } - public Map> flushableScheduledRequests(int maxBatchItems) { synchronized (flushLock) { if (!idToBatchContext.isEmpty()) { return extractFlushedEntries(maxBatchItems); } - return new ConcurrentHashMap<>(); + return Collections.emptyMap(); } } @@ -115,8 +122,8 @@ private Map> extractFlushe public void put(RequestT request, CompletableFuture response) { synchronized (this) { - if (idToBatchContext.size() == MAX_SEND_MESSAGE_BATCH_SIZE) { - throw new IllegalStateException("Reached MaxBufferSize of: " + MAX_SEND_MESSAGE_BATCH_SIZE); + if (idToBatchContext.size() == maxBufferSize) { + throw new IllegalStateException("Reached MaxBufferSize of: " + maxBufferSize); } if (nextId == Integer.MAX_VALUE) { diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java index 851fb6613019..dc407c4c84f3 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java @@ -17,6 +17,7 @@ import java.time.Duration; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; @SdkInternalApi public final class RequestBatchConfiguration { @@ -25,12 +26,12 @@ public final class RequestBatchConfiguration { public static final int DEFAULT_MAX_BATCH_BYTES_SIZE = -1; public static final int DEFAULT_MAX_BATCH_KEYS = 1000; public static final int DEFAULT_MAX_BUFFER_SIZE = 500; - public static final Duration DEFAULT_MAX_BATCH_OPEN_IN_MS = Duration.ofMillis(50); + public static final Duration DEFAULT_MAX_BATCH_OPEN_IN_MS = Duration.ofMillis(200); private final Integer maxBatchItems; private final Integer maxBatchKeys; private final Integer maxBufferSize; - private final Duration batchSendRequestFrequency; + private final Duration maxBatchOpenDuration; private final Integer maxBatchBytesSize; private RequestBatchConfiguration(Builder builder) { @@ -38,7 +39,7 @@ private RequestBatchConfiguration(Builder builder) { this.maxBatchItems = builder.maxBatchItems != null ? builder.maxBatchItems : DEFAULT_MAX_BATCH_ITEMS; this.maxBatchKeys = builder.maxBatchKeys != null ? builder.maxBatchKeys : DEFAULT_MAX_BATCH_KEYS; this.maxBufferSize = builder.maxBufferSize != null ? builder.maxBufferSize : DEFAULT_MAX_BUFFER_SIZE; - this.batchSendRequestFrequency = builder.batchSendRequestFrequency != null ? builder.batchSendRequestFrequency : + this.maxBatchOpenDuration = builder.maxBatchOpenDuration != null ? builder.maxBatchOpenDuration : DEFAULT_MAX_BATCH_OPEN_IN_MS; this.maxBatchBytesSize = builder.maxBatchBytesSize != null ? builder.maxBatchBytesSize : DEFAULT_MAX_BATCH_BYTES_SIZE; @@ -48,8 +49,19 @@ public static Builder builder() { return new Builder(); } - public Duration batchSendRequestFrequency() { - return batchSendRequestFrequency; + public static Builder builder(BatchOverrideConfiguration configuration) { + if (configuration != null) { + return new Builder() + .maxBatchKeys(configuration.maxBatchKeys()) + .maxBatchItems(configuration.maxBatchItems()) + .maxBatchOpenDuration(configuration.maxBatchOpenDuration()) + .maxBufferSize(configuration.maxBufferSize()); + } + return new Builder(); + } + + public Duration maxBatchOpenDuration() { + return maxBatchOpenDuration; } public int maxBatchItems() { @@ -73,7 +85,7 @@ public static final class Builder { private Integer maxBatchItems; private Integer maxBatchKeys; private Integer maxBufferSize; - private Duration batchSendRequestFrequency; + private Duration maxBatchOpenDuration; private Integer maxBatchBytesSize; private Builder() { @@ -94,8 +106,8 @@ public Builder maxBufferSize(Integer maxBufferSize) { return this; } - public Builder batchSendRequestFrequency(Duration batchSendRequestFrequency) { - this.batchSendRequestFrequency = batchSendRequestFrequency; + public Builder maxBatchOpenDuration(Duration maxBatchOpenDuration) { + this.maxBatchOpenDuration = maxBatchOpenDuration; return this; } @@ -109,5 +121,4 @@ public RequestBatchConfiguration build() { } } - } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java index 360db264b19d..dc25430613c1 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -35,7 +36,7 @@ public abstract class RequestBatchManager { protected final RequestBatchConfiguration batchConfiguration ; private final int maxBatchItems; - private final Duration batchSendRequestFrequency; + private final Duration maxBatchOpenDuration; private final BatchingMap requestsAndResponsesMaps; private final ScheduledExecutorService scheduledExecutor; @@ -47,34 +48,43 @@ protected RequestBatchManager(RequestBatchConfiguration overrideConfiguration, ScheduledExecutorService scheduledExecutor) { batchConfiguration = overrideConfiguration; this.maxBatchItems = batchConfiguration.maxBatchItems(); - this.batchSendRequestFrequency = batchConfiguration.batchSendRequestFrequency(); + this.maxBatchOpenDuration = batchConfiguration.maxBatchOpenDuration(); this.scheduledExecutor = Validate.notNull(scheduledExecutor, "Null scheduledExecutor"); pendingBatchResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); pendingResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); - this.requestsAndResponsesMaps = new BatchingMap<>(batchConfiguration.maxBatchKeys(), - overrideConfiguration.maxBatchBytesSize(), - overrideConfiguration.maxBatchItems()); + this.requestsAndResponsesMaps = new BatchingMap<>(overrideConfiguration); } public CompletableFuture batchRequest(RequestT request) { CompletableFuture response = new CompletableFuture<>(); pendingResponses.add(response); + try { String batchKey = getBatchKey(request); - // If there are bufferred messages then make sure adding a new message will not cause overflow of maxBytesSize - if (requestsAndResponsesMaps.contains(batchKey)) { - flushBufferIfNeeded(batchKey, request); + + // Handle potential byte size overflow only if there are request in map and if feature enabled + if (requestsAndResponsesMaps.contains(batchKey) && batchConfiguration.maxBatchBytesSize() > 0) { + Optional.of(requestsAndResponsesMaps.flushableRequestsOnByteLimitBeforeAdd(batchKey, request)) + .filter(flushableRequests -> !flushableRequests.isEmpty()) + .ifPresent(flushableRequests -> manualFlushBuffer(batchKey, flushableRequests)); } + + // Add request and response to the map, scheduling a flush if necessary requestsAndResponsesMaps.put(batchKey, - () -> scheduleBufferFlush(batchKey, batchSendRequestFrequency.toMillis(), - scheduledExecutor), + () -> scheduleBufferFlush(batchKey, maxBatchOpenDuration.toMillis(), scheduledExecutor), request, response); - flushBufferIfNeeded(batchKey, null); + + // Immediately flush if the batch is full + Optional.of(requestsAndResponsesMaps.flushableRequests(batchKey)) + .filter(flushableRequests -> !flushableRequests.isEmpty()) + .ifPresent(flushableRequests -> manualFlushBuffer(batchKey, flushableRequests)); + } catch (Exception e) { response.completeExceptionally(e); } + return response; } @@ -86,19 +96,11 @@ protected abstract CompletableFuture batchAndSend(List, IdentifiableMessage>> mapBatchResponse(BatchResponseT batchResponse); - private void flushBufferIfNeeded(String batchKey, RequestT requestT) { - Map> flushableRequests = - requestsAndResponsesMaps.flushableRequests(batchKey, requestT); - if (!flushableRequests.isEmpty()) { - manualFlushBuffer(batchKey, flushableRequests); - } - } - private void manualFlushBuffer(String batchKey, Map> flushableRequests) { requestsAndResponsesMaps.cancelScheduledFlush(batchKey); flushBuffer(batchKey, flushableRequests); - requestsAndResponsesMaps.putScheduledFlush(batchKey, scheduleBufferFlush(batchKey, batchSendRequestFrequency.toMillis(), + requestsAndResponsesMaps.putScheduledFlush(batchKey, scheduleBufferFlush(batchKey, maxBatchOpenDuration.toMillis(), scheduledExecutor)); } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestPayloadCalculator.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestPayloadCalculator.java index ec68eddeb643..6d2927c7cd9b 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestPayloadCalculator.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestPayloadCalculator.java @@ -36,16 +36,16 @@ private RequestPayloadCalculator() { * @param the type of the request * @return an Optional containing the total size in bytes if the request is a SendMessageRequest, otherwise an empty Optional */ - public static Optional calculateMessageSize(RequestT request) { + public static Optional calculateMessageSize(RequestT request) { if (!(request instanceof SendMessageRequest)) { return Optional.empty(); } SendMessageRequest sendMessageRequest = (SendMessageRequest) request; - long totalSize = calculateBodySize(sendMessageRequest) + ATTRIBUTE_MAPS_PAYLOAD_BYTES; + Integer totalSize = calculateBodySize(sendMessageRequest) + ATTRIBUTE_MAPS_PAYLOAD_BYTES; return Optional.of(totalSize); } - private static long calculateBodySize(SendMessageRequest request) { + private static int calculateBodySize(SendMessageRequest request) { return request.messageBody() != null ? request.messageBody().getBytes(StandardCharsets.UTF_8).length : 0; } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java index a67ddfd33bad..a44402fa2d4b 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java @@ -22,14 +22,7 @@ public final class SqsMessageDefault { public static final int MAX_SUPPORTED_SQS_RECEIVE_MSG = 10; - /** - * https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html - * We can send upto up to 10 messages thus the buffer size is set to 10 - */ - public static final int MAX_SEND_MESSAGE_BATCH_SIZE = 10; - - - public static final int MAX_PAYLOAD_SIZE_BYTES = 262_144; // 256 KiB + public static final int MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES = 262_144; // 256 KiB /** * diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java index f92a7c653e96..0b6492d520c9 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfigurationTest.java @@ -66,7 +66,7 @@ private static Stream provideConfigurations() { @MethodSource("provideConfigurations") void testBatchOverrideConfiguration(Integer maxBatchItems, Integer maxBatchKeys, - Duration batchSendRequestFrequency, + Duration maxBatchOpenDuration, Duration visibilityTimeout, Duration longPollWaitTimeout, Duration minReceiveWaitTime, @@ -79,7 +79,7 @@ void testBatchOverrideConfiguration(Integer maxBatchItems, BatchOverrideConfiguration config = BatchOverrideConfiguration.builder() .maxBatchItems(maxBatchItems) .maxBatchKeys(maxBatchKeys) - .batchSendRequestFrequency(batchSendRequestFrequency) + .maxBatchOpenDuration(maxBatchOpenDuration) .visibilityTimeout(visibilityTimeout) .longPollWaitTimeout(longPollWaitTimeout) .minReceiveWaitTime(minReceiveWaitTime) @@ -92,7 +92,7 @@ void testBatchOverrideConfiguration(Integer maxBatchItems, assertEquals(maxBatchItems, config.maxBatchItems()); assertEquals(maxBatchKeys, config.maxBatchKeys()); - assertEquals(batchSendRequestFrequency, config.batchSendRequestFrequency()); + assertEquals(maxBatchOpenDuration, config.maxBatchOpenDuration()); assertEquals(visibilityTimeout, config.visibilityTimeout()); assertEquals(longPollWaitTimeout, config.longPollWaitTimeout()); assertEquals(minReceiveWaitTime, config.minReceiveWaitTime()); @@ -117,7 +117,7 @@ void testToBuilder() { BatchOverrideConfiguration originalConfig = BatchOverrideConfiguration.builder() .maxBatchItems(10) .maxBatchKeys(5) - .batchSendRequestFrequency(Duration.ofMillis(200)) + .maxBatchOpenDuration(Duration.ofMillis(200)) .visibilityTimeout(Duration.ofSeconds(30)) .longPollWaitTimeout(Duration.ofSeconds(20)) .minReceiveWaitTime(Duration.ofMillis(50)) diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java index c10e3085ba8e..a5d57a238272 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java @@ -36,6 +36,8 @@ class RequestBatchBufferTest { private RequestBatchBuffer batchBuffer; private ScheduledFuture scheduledFlush; + private static int maxBufferSize = 1000; + @BeforeEach void setUp() { scheduledFlush = mock(ScheduledFuture.class); @@ -43,7 +45,7 @@ void setUp() { @Test void whenPutRequestThenBufferContainsRequest() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); assertEquals(1, batchBuffer.responses().size()); @@ -51,17 +53,17 @@ void whenPutRequestThenBufferContainsRequest() { @Test void whenFlushableRequestsThenReturnRequestsUpToMaxBatchItems() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); - Map> flushedRequests = batchBuffer.flushableRequests(null); + Map> flushedRequests = batchBuffer.flushableRequests(); assertEquals(1, flushedRequests.size()); assertTrue(flushedRequests.containsKey("0")); } @Test void whenFlushableScheduledRequestsThenReturnAllRequests() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); Map> flushedRequests = batchBuffer.flushableScheduledRequests(1); @@ -71,7 +73,7 @@ void whenFlushableScheduledRequestsThenReturnAllRequests() { @Test void whenMaxBufferSizeReachedThenThrowException() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 3, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, 10); for (int i = 0; i < 10; i++) { batchBuffer.put("request" + i, new CompletableFuture<>()); } @@ -80,7 +82,7 @@ void whenMaxBufferSizeReachedThenThrowException() { @Test void whenPutScheduledFlushThenFlushIsSet() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); ScheduledFuture newScheduledFlush = mock(ScheduledFuture.class); batchBuffer.putScheduledFlush(newScheduledFlush); assertNotNull(newScheduledFlush); @@ -88,14 +90,14 @@ void whenPutScheduledFlushThenFlushIsSet() { @Test void whenCancelScheduledFlushThenFlushIsCancelled() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); batchBuffer.cancelScheduledFlush(); verify(scheduledFlush).cancel(false); } @Test void whenGetResponsesThenReturnAllResponses() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response1 = new CompletableFuture<>(); CompletableFuture response2 = new CompletableFuture<>(); batchBuffer.put("request1", response1); @@ -108,7 +110,7 @@ void whenGetResponsesThenReturnAllResponses() { @Test void whenClearBufferThenBufferIsEmpty() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); batchBuffer.clear(); @@ -117,27 +119,27 @@ void whenClearBufferThenBufferIsEmpty() { @Test void whenExtractFlushedEntriesThenReturnCorrectEntries() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); for (int i = 0; i < 5; i++) { batchBuffer.put("request" + i, new CompletableFuture<>()); } - Map> flushedEntries = batchBuffer.flushableRequests(null); + Map> flushedEntries = batchBuffer.flushableRequests(); assertEquals(5, flushedEntries.size()); } @Test void whenHasNextBatchEntryThenReturnTrue() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); batchBuffer.put("request1", new CompletableFuture<>()); - assertTrue(batchBuffer.flushableRequests(null).containsKey("0")); + assertTrue(batchBuffer.flushableRequests().containsKey("0")); } @Test void whenNextBatchEntryThenReturnNextEntryId() { - batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); batchBuffer.put("request1", new CompletableFuture<>()); - assertEquals("0", batchBuffer.flushableRequests(null).keySet().iterator().next()); + assertEquals("0", batchBuffer.flushableRequests().keySet().iterator().next()); } @@ -145,13 +147,13 @@ void whenNextBatchEntryThenReturnNextEntryId() { @Test void whenRequestPassedWithLessBytesinArgs_thenCheckForSizeOnly_andDonotFlush() { RequestBatchBuffer batchBuffer - = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); for (int i = 0; i < 5; i++) { batchBuffer.put(SendMessageRequest.builder().build(), new CompletableFuture<>()); } Map> flushedEntries = - batchBuffer.flushableRequests(SendMessageRequest.builder().messageBody("Hi").build()); + batchBuffer.flushableRequestsOnByteLimitBeforeAdd(SendMessageRequest.builder().messageBody("Hi").build()); assertEquals(0, flushedEntries.size()); } @@ -160,20 +162,20 @@ void whenRequestPassedWithLessBytesinArgs_thenCheckForSizeOnly_andDonotFlush() { @Test void testFlushWhenPayloadExceedsMaxSize() { RequestBatchBuffer batchBuffer - = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); String largeMessageBody = createLargeString('a',245_760); batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(), new CompletableFuture<>()); Map> flushedEntries = - batchBuffer.flushableRequests(SendMessageRequest.builder().messageBody("NewMessage").build()); + batchBuffer.flushableRequestsOnByteLimitBeforeAdd(SendMessageRequest.builder().messageBody("NewMessage").build()); assertEquals(1, flushedEntries.size()); } @Test void testFlushWhenCumulativePayloadExceedsMaxSize() { RequestBatchBuffer batchBuffer - = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES); + = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); String largeMessageBody = createLargeString('a',130_000); batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(), @@ -181,7 +183,7 @@ void testFlushWhenCumulativePayloadExceedsMaxSize() { batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(), new CompletableFuture<>()); Map> flushedEntries = - batchBuffer.flushableRequests(SendMessageRequest.builder().messageBody("NewMessage").build()); + batchBuffer.flushableRequestsOnByteLimitBeforeAdd(SendMessageRequest.builder().messageBody("NewMessage").build()); //Flushes both the messages since thier sum is greater than 256Kb assertEquals(2, flushedEntries.size()); diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java index 1a592b25294d..030273fcf7cd 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchManagerTest.java @@ -83,7 +83,7 @@ void batchRequest_TwoBatchesMessagesSplitInTwoCalls_successful() throws Exceptio "testResponse")); when(mockClient.sendBatchAsync(any(), eq(batchKey1))).thenReturn(batchResponseFuture); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).batchSendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(request1); CompletableFuture response2 = batchManager.batchRequest(request2); assertEquals("testResponse0", response1.get(1, TimeUnit.SECONDS)); @@ -103,7 +103,7 @@ void batchRequest_TwoBatchesWithDifferentKey_successful() throws Exception { when(mockClient.sendBatchAsync(any(), eq(KEY_TWO))).thenReturn(batchResponseFutureTwo); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).batchSendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(KEY_ONE + ":0"); CompletableFuture response2 = batchManager.batchRequest(KEY_TWO + ":0"); CompletableFuture response3 = batchManager.batchRequest(KEY_ONE + ":1"); @@ -157,7 +157,7 @@ void close_FlushesAllBatches() throws Exception { when(mockClient.sendBatchAsync(any(), eq(batchKey))).thenReturn(batchResponseFuture); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).batchSendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(request1); CompletableFuture response2 = batchManager.batchRequest(request2); @@ -206,7 +206,7 @@ void batchRequest_MoreThanBufferSize_Fails() throws Exception { when(mockClient.sendBatchAsync(any(), eq(KEY_TWO))).thenReturn(batchResponseFutureTwo); SampleBatchManager batchManager= - new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchKeys(1).maxBatchItems(2).batchSendRequestFrequency(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); + new SampleBatchManager(BatchOverrideConfiguration.builder().maxBatchKeys(1).maxBatchItems(2).maxBatchOpenDuration(Duration.ofHours(1)).build(), scheduledExecutor, mockClient); CompletableFuture response1 = batchManager.batchRequest(KEY_ONE + ":0"); CompletableFuture response2 = batchManager.batchRequest(KEY_TWO + ":0"); CompletableFuture response3 = batchManager.batchRequest(KEY_ONE + ":1"); diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestPayloadCalculatorTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestPayloadCalculatorTest.java index 71b54f20dd6b..fd58fcaf5a8c 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestPayloadCalculatorTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestPayloadCalculatorTest.java @@ -34,13 +34,13 @@ import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.ATTRIBUTE_MAPS_PAYLOAD_BYTES; @TestInstance(Lifecycle.PER_CLASS) -public class RequestPayloadCalculatorTest { +class RequestPayloadCalculatorTest { @ParameterizedTest @MethodSource("provideRequestsForMessageSizeCalculation") @DisplayName("Test calculateMessageSize with different SendMessageRequest inputs") - void testCalculateMessageSize(SendMessageRequest request, long expectedSize) { - Optional actualSize = RequestPayloadCalculator.calculateMessageSize(request); + void testCalculateMessageSize(SendMessageRequest request, int expectedSize) { + Optional actualSize = RequestPayloadCalculator.calculateMessageSize(request); assertEquals(Optional.of(expectedSize), actualSize); } @@ -52,11 +52,11 @@ private Stream provideRequestsForMessageSizeCalculation() { ), Arguments.of( SendMessageRequest.builder().messageBody("").build(), - 0L + ATTRIBUTE_MAPS_PAYLOAD_BYTES + ATTRIBUTE_MAPS_PAYLOAD_BYTES ), Arguments.of( SendMessageRequest.builder().messageBody(null).build(), - 0L + ATTRIBUTE_MAPS_PAYLOAD_BYTES + ATTRIBUTE_MAPS_PAYLOAD_BYTES ), Arguments.of( SendMessageRequest.builder().messageBody("Another test message").build(), @@ -69,7 +69,7 @@ private Stream provideRequestsForMessageSizeCalculation() { @MethodSource("provideNonSendMessageRequest") @DisplayName("Test calculateMessageSize with non-SendMessageRequest inputs") void testCalculateMessageSizeWithNonSendMessageRequest(Object request) { - Optional actualSize = RequestPayloadCalculator.calculateMessageSize(request); + Optional actualSize = RequestPayloadCalculator.calculateMessageSize(request); assertEquals(Optional.empty(), actualSize); } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java index 628b39aa8d50..d083f2502b38 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java @@ -33,8 +33,8 @@ protected SampleBatchManager(BatchOverrideConfiguration batchOverrideConfigurati ScheduledExecutorService executorService, CustomClient client) { super(RequestBatchConfiguration.builder() - .batchSendRequestFrequency(batchOverrideConfiguration.batchSendRequestFrequency()) - .maxBatchBytesSize(SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES) + .maxBatchOpenDuration(batchOverrideConfiguration.maxBatchOpenDuration()) + .maxBatchBytesSize(SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES) .maxBatchItems(batchOverrideConfiguration.maxBatchItems()) .maxBufferSize(batchOverrideConfiguration.maxBufferSize()) .maxBatchKeys(batchOverrideConfiguration.maxBatchKeys()) From 2fbbf7d46b820ae34d431f50775d0e40292f7eec Mon Sep 17 00:00:00 2001 From: John Viegas Date: Thu, 29 Aug 2024 10:28:40 -0700 Subject: [PATCH 5/5] Handled comments --- .../DefaultSqsAsyncBatchManager.java | 38 +++++++++++------- .../batchmanager/SQS_Builder_Attributes.xlsx | Bin 6235 -> 0 bytes .../batchmanager/RequestBatchBufferTest.java | 2 - 3 files changed, 23 insertions(+), 17 deletions(-) delete mode 100644 services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SQS_Builder_Attributes.xlsx diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java index eded3419fcfc..ab4600a65c97 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java @@ -49,24 +49,32 @@ public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager { private DefaultSqsAsyncBatchManager(DefaultBuilder builder) { this.client = Validate.notNull(builder.client, "client cannot be null"); ScheduledExecutorService scheduledExecutor = builder.scheduledExecutor; + this.sendMessageBatchManager = - new SendMessageBatchManager(RequestBatchConfiguration - .builder(builder.overrideConfiguration) - .maxBatchBytesSize(MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES) - .build(), - scheduledExecutor, - client); + new SendMessageBatchManager( + RequestBatchConfiguration.builder(builder.overrideConfiguration) + .maxBatchBytesSize(MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES) + .build(), + scheduledExecutor, + client + ); + this.deleteMessageBatchManager = - new DeleteMessageBatchManager(RequestBatchConfiguration.builder(builder.overrideConfiguration).build(), - scheduledExecutor, - client); - this.changeMessageVisibilityBatchManager = - new ChangeMessageVisibilityBatchManager(RequestBatchConfiguration.builder(builder.overrideConfiguration).build(), - scheduledExecutor, - client); + new DeleteMessageBatchManager( + RequestBatchConfiguration.builder(builder.overrideConfiguration).build(), + scheduledExecutor, + client + ); - this.receiveMessageBatchManager = new ReceiveMessageBatchManager(client, scheduledExecutor, - builder.overrideConfiguration); + this.changeMessageVisibilityBatchManager = + new ChangeMessageVisibilityBatchManager( + RequestBatchConfiguration.builder(builder.overrideConfiguration).build(), + scheduledExecutor, + client + ); + + this.receiveMessageBatchManager = + new ReceiveMessageBatchManager(client, scheduledExecutor, builder.overrideConfiguration); } @Override diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SQS_Builder_Attributes.xlsx b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SQS_Builder_Attributes.xlsx deleted file mode 100644 index cdc98dea2f73b12de59f6d40a4ac13f6e2e73ef2..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6235 zcmZ`-WmHt{79K#lk&*6@W@za~QfBB9B!*^&2I=mWlr9k|6{IDVE)hXGKe|CW?)Z7{ z^~(D0IqSUdI%lot`SHAa@3Z#?@M{VI3d>`P1vDo@q87SBN$+P@bm$LFqAyk+%#7<@6yA@lDH% zeXt=UFG#K2Vt2-+J5G$0lOm0iHXpZz_`2XB@4g7>R!OmEuGpFfy{yI@3yD~=87hlH zXIZ-i5Qz_Vq8Ig&WlklsF-*afklwRWi%uZwOh65)<@M?nE!^v)VQKKg{bbE8y0GBK zug7K}Cp9a>TeO&zQQM}R_WH$O9+5Jj(^IbIbNODOkRVY}+;PDr0ql?&6>D~X6hYsr zX1-qXo-Vd8(YAU_Y9q_U^vdHT;ZILK;b=nz-gY-HrXo*Cp{Y_&5)tWthf4H})3x6& zPK(<%sWWnA%NYo_RX_fmE=5+3^xEn3)G#8Kg?B{+Cb>8f(!s) zBX$#KsJ#m}*Uz;gsr^YO2uEtqM}Fn}IUS2_+6k896a(|4{f)TmOxX6FwU@S~vqbuVWET=BnkjDuZ%q z@5gw3pkfc2SK?WoTdlZSI(r;(pL)NI4D0aNMudA?-0_>rJ=crgfYp%F+99c_MyA)- zJfi~kFhlr8;*Mznt(C!5AB%-5Rb=4vvAwJ?-PL?H@W|Rzor$I^@sb?Hk^H>cENeo| z(kDy(2CZ(C(@!zihGSG@>TVk-Uxo%Tc<5uAU-6DU6dLgX{HchXo8kUCLWu^1BIJnO z)1Ld6C?0TUJ2N=k?q^K?Mo1 zlyU}}aQcPGBDy8GkCC0mPd9a|$3?S8{2cox1dG$BT0w`KOPdUT>a*D7Wlf3DCJ7w? zApND!Pi0)Jp-@*B?!PylpAyBUsKjT22mIuucJej8P0R+;<)Oyd z!$)r}$wy<+x@A62-dJ|!#*&t(w_Xl41SD|M;Dx}%mfFCFZ>WaQ50AS0yVDjm&SCYXtlsfS^e{5N+CdJP|0`g zBB##HdmL{?$CUe~1g&bTHY6wwE{?OQBPZ3JJf5Z|s+?4;T{h`T{aDJEcG%Sf9AB?5 z0>3T`Et4~U{O+sAdY%?|1Dkfp_5J;|s{2>vX{%bNbf?hT3$`pQ+mrE9_|*mGluMy& z;(Ci=J8{28dsSR|bbq2+n*3@01J07qYc_6{A}*t?7RH;#$Ekm{Ml74(d#bP!Pk6&y z;2DFlOIUf*q;jF7M8gyoMs3Lt8UP)um7>OMO9Xy`_G69CbeZ!MEnDo5mANz zJEH!RYq5}iL>(e%J_3%<*-+au7zmVpQqhtLkh2+Pp5xb{5`9lKdLzE2#p<|R!?!&9 zWyq^RL@>%X-)(fAGD!&s$IT{W?2=MKFn&U)Eo;rd5z4>nD1%~PsZVQ~36fOYpOIDuLoPqW}vw`9=H)bx`#QkDle z1xrMSgS>&jz2x~3+d|Q7_Uu z`V*^OO1*EaZjYzR`~-bK7_#O`OB8>K&b!AXLXH9e5McrUKYz!FGUsA#>I}8ebal3Y zS-Jd7;SW0V@UMylerr{v=b?vEFDNmx!9-&y+Tj7t{H)EE=ITpdDm;8emy6E#$h@;_ z-f8CR%oKX8O$f~kRa3V%adOy$K+lDg6xKTLjTB2T)a5V=9rdUAGq;%;S(bY|QgZL< zZQ7{jsSaoq$zg0zJa-2z@OO9YH*kO180-u0i0}*h5zrsNyi5tf5jpig{2b4Bc042U4$=rG4+iB^RP7d9Qk6v+DU$ zfM+oKZ!TG;-@#G)B&(LP5&<3~BCcv5%CX=!Y`NEXqbui@FTZ zP+iXt4(+t0s~WNGu2VAuY_^|A%cgaShyJ*#bXEH?T*O-CEwM=lF?DVttF_=P$C*^9 zyj&@AJi#_a`n16BPIjeGoSS{iYrLJ$MbTu}v77nvc(Rf(cjzH$T@!b?L*;D6S4Z}% z`HXPy`}r8zCvOwSZvo{@M}Z;gA&*2>*xfpIoxUYIi}`a}#rG7A+8_%wOIiUvee`w( z*%gf{9GRPvY@hEfeA|Z^pArb1G8{SY24f8vmIarnWw=Esd=Bo#6mZvk z8q`h051S^@SL!cNqWY3tEF+q;-x!2W9r;kTr)Bb>gu$Yu+t~MBD#W%LKQ~WbG%=-h z?Z}_@gVtV~d9Hf`J^n*B@^$sWr)_imp zh`?{pgz(UBjqTVv+Upn#JG7Pp_rfbXiCu>lUDqMkp3krHQ{_ZaSBl4J1^4;WoHI{NZf-gqA5KZRkP5uilkcz zI|ts2kmQL@v%CRxw6+R~0@(+b{P=pth5L!j3$Q%V{#E|VdKC=p@roFS{k+o@^Vb7R zaHRX?>t3xvWJIl)&xpxmpJjhh_3BgUl6Vo)4+gJynkwvk$FNM~I>-yXW){qo!4}Go zXmEXgpTe|i_&q5LWU$!hMAd&tj{lI=}IJrSFxsj^&9n^Dfz2x)78>?;bmQB6{*s9pZ#kCO&i6KToy=O>NWMJb8tmAH1{3etW0YCcRM5dW(n<4T@i*4x-|^ zT0!5fM3g8K(}jdTFv%@ADk{j1 zsDrEIdhI){mPUq|Hi?A=Nku2)v--I1uVYl#qBxTHF*Uiex zV)ct~$y_ln*TemXD2upRAtHnd&3C>aD)#XDWQgn`iTWSH}}3Eg+N>JfDMa zFXnT_pCA0N^7py@4w!%7`MMLGpP6WuzI#2O`&p(mfGp|=f#&HMc@IX2r}z6e1)B^lzr-2)yvwD;c0I#-R4ecPF6SY6`_@LnmT;jDu* zB4Hz>OS}TC82OcFmU~>Aj+SPvvT<(|Suh4vIWWuW4@7cUC@BQGc)lhIy^B&+rhXD? z;#99iu#o_Y^Fm4-m2)vvW#8O2TbJv4yZ6qIpCvSqI&+SfyyvufIb1b2LVZ7Iv7*;$bq(uMae zIWdzLK1h^|ckwoZ+qvLeHAlfwKdNj&Ch*9PvHl|c%{r)qHNwdmGoM30oT%8SL#fT6 zS5)$e21sK5$kIwm%KSrjuvt@nF7BHBp*D+~bZks}V+XnJ+>>-oz(n$@r=`tY#;E;Z zCKwmP3f;d%pIc70>zP8(z#eL_z8;sD9B&5_tN98kt`oUkW5o}zDw|P3a`;1sY}$>b z8Q@Ygc7%oltMZAn@cW`hneMz4-BXU0`pcr_18^KX9n;|%meZ%DoW%F|GJ7c_+BEbqQJ0J1{95k zK95%?CJcu-#)R3^bI3RIZsf< zT64A%s`!6w0~=fOS!1#JKYM$q_41N*jVmP3yncp5KnfL}s;vhThI1t=jO}5QQabDB zkp#W((URftp3mA)j(gx+wgBO80BNA}%LU1lXeQEHn54M3gk@+>SbUov*scTfZ!s4$IL~b4O}^SB54*nIUsUWTd`yf6}alY9A#+;-z{r2GM~=8At2+DuFA^rVCnrGs$C1 z7&IQ`X|l3kD%|w0W6~<8p5jItmXxH zJh;2fjj~|Qtyp)1_GcSVQQySdiaUZ`;@sN?K4LMj4o~Lvf{E++H@HhXR)l`lMXzv+R*K za{5UH7{p(VUN_3^O}jPAB_3%xOH8MWZBJh|}-jubmvuC+{C9oi-Mxqx~lI0bp-u7Kpj-OUIGhww`kSc%=2Uih1 zx5&}EKe^Se>`|Ss$(5yXRv4H$4#}E9f5>b;U8^LzRZiz0CMm=XpMCQVT%yS%rQJ?u z`r!3ME!#en5N`s!CbalT6zS@K^q2zPN$*Ru&vK8}cLs8vE-ntAc-{B)O5#bHQx3dZ zxR<8K=fI8p`8R*GKGUAoNQS&&-)qb_l7DJy+*tC}1EDDaVpJjcUrkLN9e+vM{$%T? zIf#%{+)ab-SW8&pb%5T!T=4@v3ae4QN40;E6{J>mD}75qW`Ezv=i=teTU$T%L2!$< z;GojtGlvz~qaF6;S|;zQne1D0vLxq}_jA&{Cjr~aIg#4ONc3cF988Zm^ks=3WEH~l zv>&OMr!jwsuPD&!;yiV8;Ggllo(p1-7(PeHk%c4N_NmH{RF5i`s9Xf3Llje2H4C zwK|2Duz212OCU21A3u$aolELRwh-zPi|i-`VKemUe9{_RMM%rpSr{%Lf5F@(L32+l z6I&lAjPXgV&po9JVkh$+WQlQh;0DQ(nX(w@E#`9L8oS>y0!FNaU-{9v6ukD`(^t%jq z2a(?lpRs;1{C6z53%%Q&{)TSh{0+U^tKKEJTm60$3=sW8ExZfAo9Mscw1_bKZz=tE X+JjY45qbguScp3San7HCKi~cZoG;*h diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java index a5d57a238272..e90bc2590170 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java @@ -142,8 +142,6 @@ void whenNextBatchEntryThenReturnNextEntryId() { assertEquals("0", batchBuffer.flushableRequests().keySet().iterator().next()); } - - @Test void whenRequestPassedWithLessBytesinArgs_thenCheckForSizeOnly_andDonotFlush() { RequestBatchBuffer batchBuffer