diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java index 9d68dc5ef262..7157fbb8601d 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java @@ -170,7 +170,7 @@ public String toString() { .add("maxBatchItems", maxBatchItems) .add("maxBatchKeys", maxBatchKeys) .add("maxBufferSize", maxBufferSize) - .add("maxBatchOpenDuration", maxBatchOpenDuration.toMillis()) + .add("maxBatchOpenDuration", maxBatchOpenDuration) .add("visibilityTimeout", visibilityTimeout) .add("longPollWaitTimeout", longPollWaitTimeout) .add("minReceiveWaitTime", minReceiveWaitTime) diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java index ffabc599b13c..d3cd056f7b11 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java @@ -15,6 +15,7 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -24,9 +25,12 @@ public final class BatchingExecutionContext { private final RequestT request; private final CompletableFuture response; + private final Optional responsePayloadByteSize; + public BatchingExecutionContext(RequestT request, CompletableFuture response) { this.request = request; this.response = response; + responsePayloadByteSize = RequestPayloadCalculator.calculateMessageSize(request); } public RequestT request() { @@ -36,4 +40,11 @@ public RequestT request() { public CompletableFuture response() { return response; } + + /** + * Optional because responsePayloadByteSize is required only for SendMessageRequests and not for other requests. + */ + public Optional responsePayloadByteSize() { + return responsePayloadByteSize; + } } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java index 406002409814..171b76aa4bff 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java @@ -32,13 +32,17 @@ public final class BatchingMap { private final int maxBatchKeys; + private final int maxBatchBytesSize; + private final int maxBatchSize; private final int maxBufferSize; private final Map> batchContextMap; - public BatchingMap(int maxBatchKeys, int maxBufferSize) { + public BatchingMap(RequestBatchConfiguration overrideConfiguration) { this.batchContextMap = new ConcurrentHashMap<>(); - this.maxBatchKeys = maxBatchKeys; - this.maxBufferSize = maxBufferSize; + this.maxBatchKeys = overrideConfiguration.maxBatchKeys(); + this.maxBatchBytesSize = overrideConfiguration.maxBatchBytesSize(); + this.maxBatchSize = overrideConfiguration.maxBatchItems(); + this.maxBufferSize = overrideConfiguration.maxBufferSize(); } public void put(String batchKey, Supplier> scheduleFlush, RequestT request, @@ -47,10 +51,14 @@ public void put(String batchKey, Supplier> scheduleFlush, Req if (batchContextMap.size() == maxBatchKeys) { throw new IllegalStateException("Reached MaxBatchKeys of: " + maxBatchKeys); } - return new RequestBatchBuffer<>(maxBufferSize, scheduleFlush.get()); + return new RequestBatchBuffer<>(scheduleFlush.get(), maxBatchSize, maxBatchBytesSize, maxBufferSize); }).put(request, response); } + public boolean contains(String batchKey) { + return batchContextMap.containsKey(batchKey); + } + public void putScheduledFlush(String batchKey, ScheduledFuture scheduledFlush) { batchContextMap.get(batchKey).putScheduledFlush(scheduledFlush); } @@ -59,9 +67,13 @@ public void forEach(BiConsumer> batchContextMap.forEach(action); } - public Map> flushableRequests(String batchKey, - int maxBatchItems) { - return batchContextMap.get(batchKey).flushableRequests(maxBatchItems); + public Map> flushableRequests(String batchKey) { + return batchContextMap.get(batchKey).flushableRequests(); + } + + public Map> flushableRequestsOnByteLimitBeforeAdd(String batchKey, + RequestT request) { + return batchContextMap.get(batchKey).flushableRequestsOnByteLimitBeforeAdd(request); } public Map> flushableScheduledRequests(String batchKey, diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java index d8a392308874..6260552e08e4 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java @@ -25,7 +25,6 @@ import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry; @@ -43,7 +42,7 @@ public class ChangeMessageVisibilityBatchManager extends RequestBatchManager ChangeMessageVisibilityBatchRequest.builder() - .queueUrl(batchKey) - .overrideConfiguration(config) - .entries(entries) - .build()) + .queueUrl(batchKey) + .overrideConfiguration(config) + .entries(entries) + .build()) .orElseGet(() -> ChangeMessageVisibilityBatchRequest.builder() .queueUrl(batchKey) .entries(entries) @@ -103,8 +102,6 @@ private static IdentifiableMessage changeMessageVisibilityCreateThrow return new IdentifiableMessage<>(key, response); } - - @Override protected CompletableFuture batchAndSend( List> identifiedRequests, String batchKey) { diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java index 3dde89462dda..ab4600a65c97 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java @@ -15,6 +15,8 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -46,21 +48,33 @@ public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager { private DefaultSqsAsyncBatchManager(DefaultBuilder builder) { this.client = Validate.notNull(builder.client, "client cannot be null"); - ScheduledExecutorService scheduledExecutor = builder.scheduledExecutor; - this.sendMessageBatchManager = new SendMessageBatchManager(builder.overrideConfiguration, - scheduledExecutor, - client); - this.deleteMessageBatchManager = new DeleteMessageBatchManager(builder.overrideConfiguration, - scheduledExecutor, - client); - this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(builder.overrideConfiguration, - scheduledExecutor, - client); - - this.receiveMessageBatchManager = new ReceiveMessageBatchManager(client, scheduledExecutor, - builder.overrideConfiguration); + this.sendMessageBatchManager = + new SendMessageBatchManager( + RequestBatchConfiguration.builder(builder.overrideConfiguration) + .maxBatchBytesSize(MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES) + .build(), + scheduledExecutor, + client + ); + + this.deleteMessageBatchManager = + new DeleteMessageBatchManager( + RequestBatchConfiguration.builder(builder.overrideConfiguration).build(), + scheduledExecutor, + client + ); + + this.changeMessageVisibilityBatchManager = + new ChangeMessageVisibilityBatchManager( + RequestBatchConfiguration.builder(builder.overrideConfiguration).build(), + scheduledExecutor, + client + ); + + this.receiveMessageBatchManager = + new ReceiveMessageBatchManager(client, scheduledExecutor, builder.overrideConfiguration); } @Override diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java index c2c3385507cc..cb33985e5a12 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java @@ -25,7 +25,6 @@ import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; @@ -36,14 +35,13 @@ import software.amazon.awssdk.services.sqs.model.SqsException; import software.amazon.awssdk.utils.Either; - @SdkInternalApi public class DeleteMessageBatchManager extends RequestBatchManager { private final SqsAsyncClient sqsAsyncClient; - protected DeleteMessageBatchManager(BatchOverrideConfiguration overrideConfiguration, + protected DeleteMessageBatchManager(RequestBatchConfiguration overrideConfiguration, ScheduledExecutorService scheduledExecutor, SqsAsyncClient sqsAsyncClient) { super(overrideConfiguration, scheduledExecutor); diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java index 57baed55d1c1..84a1337efbdb 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java @@ -15,7 +15,9 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; + import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -23,29 +25,26 @@ import java.util.concurrent.ScheduledFuture; import java.util.stream.Collectors; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.utils.CollectionUtils; @SdkInternalApi -public final class RequestBatchBuffer { +public final class RequestBatchBuffer { private final Object flushLock = new Object(); private final Map> idToBatchContext; - - /** - * Maximum number of elements that can be included in the BatchBuffer. - */ + private final int maxBatchItems; private final int maxBufferSize; - + private final int maxBatchSizeInBytes; /** * Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next * BatchingExecutionContext. For simplicity, the ID is just an integer that is incremented everytime a new request and * response pair is received. */ private int nextId; - /** - * Keeps track of the ID of the next entry to be added in a batch request. This ID does not necessarily correlate to a - * request that already exists in the idToBatchContext map since it refers to the next entry (ex. if the last entry added - * to idToBatchContext had an id of 22, nextBatchEntry will have a value of 23). + * Keeps track of the ID of the next entry to be added in a batch request. This ID does not necessarily correlate to a request + * that already exists in the idToBatchContext map since it refers to the next entry (ex. if the last entry added to + * idToBatchContext had an id of 22, nextBatchEntry will have a value of 23). */ private int nextBatchEntry; @@ -54,29 +53,59 @@ public final class RequestBatchBuffer { */ private ScheduledFuture scheduledFlush; - public RequestBatchBuffer(int maxBufferSize, ScheduledFuture scheduledFlush) { + public RequestBatchBuffer(ScheduledFuture scheduledFlush, + int maxBatchItems, int maxBatchSizeInBytes, int maxBufferSize) { this.idToBatchContext = new ConcurrentHashMap<>(); - this.maxBufferSize = maxBufferSize; this.nextId = 0; this.nextBatchEntry = 0; this.scheduledFlush = scheduledFlush; + this.maxBatchItems = maxBatchItems; + this.maxBufferSize = maxBufferSize; + this.maxBatchSizeInBytes = maxBatchSizeInBytes; } - public Map> flushableRequests(int maxBatchItems) { + public Map> flushableRequests() { synchronized (flushLock) { - if (idToBatchContext.size() >= maxBatchItems) { - return extractFlushedEntries(maxBatchItems); + return (isByteSizeThresholdCrossed(0) || isMaxBatchSizeLimitReached()) + ? extractFlushedEntries(maxBatchItems) + : Collections.emptyMap(); + } + } + + + private boolean isMaxBatchSizeLimitReached() { + return idToBatchContext.size() >= maxBatchItems; + } + + public Map> flushableRequestsOnByteLimitBeforeAdd(RequestT request) { + synchronized (flushLock) { + if (maxBatchSizeInBytes > 0 && !idToBatchContext.isEmpty()) { + int incomingRequestBytes = RequestPayloadCalculator.calculateMessageSize(request).orElse(0); + if (isByteSizeThresholdCrossed(incomingRequestBytes)) { + return extractFlushedEntries(maxBatchItems); + } } - return new ConcurrentHashMap<>(); + return Collections.emptyMap(); + } + } + + private boolean isByteSizeThresholdCrossed(int incomingRequestBytes) { + if (maxBatchSizeInBytes < 0) { + return false; } + int totalPayloadSize = idToBatchContext.values().stream() + .map(BatchingExecutionContext::responsePayloadByteSize) + .mapToInt(opt -> opt.orElse(0)) + .sum() + incomingRequestBytes; + return totalPayloadSize > maxBatchSizeInBytes; } public Map> flushableScheduledRequests(int maxBatchItems) { synchronized (flushLock) { - if (idToBatchContext.size() > 0) { + if (!idToBatchContext.isEmpty()) { return extractFlushedEntries(maxBatchItems); } - return new ConcurrentHashMap<>(); + return Collections.emptyMap(); } } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java index 364d04c76101..dc407c4c84f3 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java @@ -16,7 +16,6 @@ package software.amazon.awssdk.services.sqs.internal.batchmanager; import java.time.Duration; -import java.util.Optional; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; @@ -24,7 +23,8 @@ public final class RequestBatchConfiguration { public static final int DEFAULT_MAX_BATCH_ITEMS = 10; - public static final int DEFAULT_MAX_BATCH_KEYS = 100; + public static final int DEFAULT_MAX_BATCH_BYTES_SIZE = -1; + public static final int DEFAULT_MAX_BATCH_KEYS = 1000; public static final int DEFAULT_MAX_BUFFER_SIZE = 500; public static final Duration DEFAULT_MAX_BATCH_OPEN_IN_MS = Duration.ofMillis(200); @@ -32,23 +32,32 @@ public final class RequestBatchConfiguration { private final Integer maxBatchKeys; private final Integer maxBufferSize; private final Duration maxBatchOpenDuration; + private final Integer maxBatchBytesSize; - public RequestBatchConfiguration(BatchOverrideConfiguration overrideConfiguration) { - this.maxBatchItems = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::maxBatchItems) - .orElse(DEFAULT_MAX_BATCH_ITEMS); + private RequestBatchConfiguration(Builder builder) { - this.maxBatchKeys = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::maxBatchKeys) - .orElse(DEFAULT_MAX_BATCH_KEYS); + this.maxBatchItems = builder.maxBatchItems != null ? builder.maxBatchItems : DEFAULT_MAX_BATCH_ITEMS; + this.maxBatchKeys = builder.maxBatchKeys != null ? builder.maxBatchKeys : DEFAULT_MAX_BATCH_KEYS; + this.maxBufferSize = builder.maxBufferSize != null ? builder.maxBufferSize : DEFAULT_MAX_BUFFER_SIZE; + this.maxBatchOpenDuration = builder.maxBatchOpenDuration != null ? builder.maxBatchOpenDuration : + DEFAULT_MAX_BATCH_OPEN_IN_MS; + this.maxBatchBytesSize = builder.maxBatchBytesSize != null ? builder.maxBatchBytesSize : DEFAULT_MAX_BATCH_BYTES_SIZE; - this.maxBufferSize = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::maxBufferSize) - .orElse(DEFAULT_MAX_BUFFER_SIZE); + } + + public static Builder builder() { + return new Builder(); + } - this.maxBatchOpenDuration = Optional.ofNullable(overrideConfiguration) - .map(BatchOverrideConfiguration::maxBatchOpenDuration) - .orElse(DEFAULT_MAX_BATCH_OPEN_IN_MS); + public static Builder builder(BatchOverrideConfiguration configuration) { + if (configuration != null) { + return new Builder() + .maxBatchKeys(configuration.maxBatchKeys()) + .maxBatchItems(configuration.maxBatchItems()) + .maxBatchOpenDuration(configuration.maxBatchOpenDuration()) + .maxBufferSize(configuration.maxBufferSize()); + } + return new Builder(); } public Duration maxBatchOpenDuration() { @@ -66,4 +75,50 @@ public int maxBatchKeys() { public int maxBufferSize() { return maxBufferSize; } + + public int maxBatchBytesSize() { + return maxBatchBytesSize; + } + + public static final class Builder { + + private Integer maxBatchItems; + private Integer maxBatchKeys; + private Integer maxBufferSize; + private Duration maxBatchOpenDuration; + private Integer maxBatchBytesSize; + + private Builder() { + } + + public Builder maxBatchItems(Integer maxBatchItems) { + this.maxBatchItems = maxBatchItems; + return this; + } + + public Builder maxBatchKeys(Integer maxBatchKeys) { + this.maxBatchKeys = maxBatchKeys; + return this; + } + + public Builder maxBufferSize(Integer maxBufferSize) { + this.maxBufferSize = maxBufferSize; + return this; + } + + public Builder maxBatchOpenDuration(Duration maxBatchOpenDuration) { + this.maxBatchOpenDuration = maxBatchOpenDuration; + return this; + } + + public Builder maxBatchBytesSize(Integer maxBatchBytesSize) { + this.maxBatchBytesSize = maxBatchBytesSize; + return this; + } + + public RequestBatchConfiguration build() { + return new RequestBatchConfiguration(this); + } + } + } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java index 4cde84cd2603..dc25430613c1 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -27,12 +28,13 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; import software.amazon.awssdk.utils.Either; import software.amazon.awssdk.utils.Validate; @SdkInternalApi public abstract class RequestBatchManager { + protected final RequestBatchConfiguration batchConfiguration ; + private final int maxBatchItems; private final Duration maxBatchOpenDuration; private final BatchingMap requestsAndResponsesMaps; @@ -41,30 +43,48 @@ public abstract class RequestBatchManager { private final Set> pendingBatchResponses ; private final Set> pendingResponses ; - protected RequestBatchManager(BatchOverrideConfiguration overrideConfiguration, ScheduledExecutorService scheduledExecutor) { - RequestBatchConfiguration batchConfiguration = new RequestBatchConfiguration(overrideConfiguration); - this.requestsAndResponsesMaps = new BatchingMap<>(batchConfiguration.maxBatchKeys(), - batchConfiguration.maxBufferSize()); + + protected RequestBatchManager(RequestBatchConfiguration overrideConfiguration, + ScheduledExecutorService scheduledExecutor) { + batchConfiguration = overrideConfiguration; this.maxBatchItems = batchConfiguration.maxBatchItems(); this.maxBatchOpenDuration = batchConfiguration.maxBatchOpenDuration(); this.scheduledExecutor = Validate.notNull(scheduledExecutor, "Null scheduledExecutor"); pendingBatchResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); pendingResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); + this.requestsAndResponsesMaps = new BatchingMap<>(overrideConfiguration); + } public CompletableFuture batchRequest(RequestT request) { CompletableFuture response = new CompletableFuture<>(); pendingResponses.add(response); + try { String batchKey = getBatchKey(request); + + // Handle potential byte size overflow only if there are request in map and if feature enabled + if (requestsAndResponsesMaps.contains(batchKey) && batchConfiguration.maxBatchBytesSize() > 0) { + Optional.of(requestsAndResponsesMaps.flushableRequestsOnByteLimitBeforeAdd(batchKey, request)) + .filter(flushableRequests -> !flushableRequests.isEmpty()) + .ifPresent(flushableRequests -> manualFlushBuffer(batchKey, flushableRequests)); + } + + // Add request and response to the map, scheduling a flush if necessary requestsAndResponsesMaps.put(batchKey, () -> scheduleBufferFlush(batchKey, maxBatchOpenDuration.toMillis(), scheduledExecutor), request, response); - flushBufferIfNeeded(batchKey); + + // Immediately flush if the batch is full + Optional.of(requestsAndResponsesMaps.flushableRequests(batchKey)) + .filter(flushableRequests -> !flushableRequests.isEmpty()) + .ifPresent(flushableRequests -> manualFlushBuffer(batchKey, flushableRequests)); + } catch (Exception e) { response.completeExceptionally(e); } + return response; } @@ -76,15 +96,6 @@ protected abstract CompletableFuture batchAndSend(List, IdentifiableMessage>> mapBatchResponse(BatchResponseT batchResponse); - - private void flushBufferIfNeeded(String batchKey) { - Map> flushableRequests = - requestsAndResponsesMaps.flushableRequests(batchKey, maxBatchItems); - if (!flushableRequests.isEmpty()) { - manualFlushBuffer(batchKey, flushableRequests); - } - } - private void manualFlushBuffer(String batchKey, Map> flushableRequests) { requestsAndResponsesMaps.cancelScheduledFlush(batchKey); @@ -142,7 +153,7 @@ public void close() { requestsAndResponsesMaps.forEach((batchKey, batchBuffer) -> { requestsAndResponsesMaps.cancelScheduledFlush(batchKey); Map> flushableRequests = - requestsAndResponsesMaps.flushableRequests(batchKey, maxBatchItems); + requestsAndResponsesMaps.flushableRequests(batchKey); while (!flushableRequests.isEmpty()) { flushBuffer(batchKey, flushableRequests); @@ -153,4 +164,6 @@ public void close() { pendingResponses.forEach(future -> future.cancel(true)); requestsAndResponsesMaps.clear(); } + + } \ No newline at end of file diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestPayloadCalculator.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestPayloadCalculator.java new file mode 100644 index 000000000000..6d2927c7cd9b --- /dev/null +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestPayloadCalculator.java @@ -0,0 +1,52 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.sqs.internal.batchmanager; + +import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.ATTRIBUTE_MAPS_PAYLOAD_BYTES; + +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; + +@SdkInternalApi +public final class RequestPayloadCalculator { + + private RequestPayloadCalculator() { + } + + /** + * Evaluates the total size of the message body, message attributes, and message system attributes for a SendMessageRequest. + * If the request is not a SendMessageRequest, returns an empty Optional. + * + * @param request the request to evaluate + * @param the type of the request + * @return an Optional containing the total size in bytes if the request is a SendMessageRequest, otherwise an empty Optional + */ + public static Optional calculateMessageSize(RequestT request) { + if (!(request instanceof SendMessageRequest)) { + return Optional.empty(); + } + SendMessageRequest sendMessageRequest = (SendMessageRequest) request; + Integer totalSize = calculateBodySize(sendMessageRequest) + ATTRIBUTE_MAPS_PAYLOAD_BYTES; + return Optional.of(totalSize); + } + + private static int calculateBodySize(SendMessageRequest request) { + return request.messageBody() != null ? request.messageBody().getBytes(StandardCharsets.UTF_8).length : 0; + } + +} diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java index d52e25b3626e..1c6dfb6307ed 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java @@ -25,7 +25,6 @@ import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; @@ -43,7 +42,7 @@ public class SendMessageBatchManager extends RequestBatchManager batchAndSend(List> - identifiedRequests, String batchKey) { + identifiedRequests, String batchKey) { SendMessageBatchRequest batchRequest = createSendMessageBatchRequest(identifiedRequests, batchKey); return asyncClient.sendMessageBatch(batchRequest); } @@ -139,4 +138,5 @@ IdentifiableMessage>> mapBatchResponse(SendMessageBatchResponse batch }); return mappedResponses; } + } diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java index 540818a90aa6..a44402fa2d4b 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java @@ -22,6 +22,16 @@ public final class SqsMessageDefault { public static final int MAX_SUPPORTED_SQS_RECEIVE_MSG = 10; + public static final int MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES = 262_144; // 256 KiB + + /** + * + * AWS SQS Message Attributes Documentation + * + * Rounding up max payload due to attribute maps. + * This was not done in V1, thus an issue was reported where batch messages failed with payload size exceeding the maximum. + */ + public static final int ATTRIBUTE_MAPS_PAYLOAD_BYTES = 16 * 1024; // 16 KiB private SqsMessageDefault() { } diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java index 502cf5290ec8..e90bc2590170 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java @@ -24,6 +24,9 @@ import java.util.concurrent.ScheduledFuture; import software.amazon.awssdk.services.sqs.internal.batchmanager.RequestBatchBuffer; import software.amazon.awssdk.services.sqs.internal.batchmanager.BatchingExecutionContext; +import software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -33,14 +36,16 @@ class RequestBatchBufferTest { private RequestBatchBuffer batchBuffer; private ScheduledFuture scheduledFlush; + private static int maxBufferSize = 1000; + @BeforeEach void setUp() { scheduledFlush = mock(ScheduledFuture.class); - batchBuffer = new RequestBatchBuffer<>(10, scheduledFlush); } @Test void whenPutRequestThenBufferContainsRequest() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); assertEquals(1, batchBuffer.responses().size()); @@ -48,15 +53,17 @@ void whenPutRequestThenBufferContainsRequest() { @Test void whenFlushableRequestsThenReturnRequestsUpToMaxBatchItems() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); - Map> flushedRequests = batchBuffer.flushableRequests(1); + Map> flushedRequests = batchBuffer.flushableRequests(); assertEquals(1, flushedRequests.size()); assertTrue(flushedRequests.containsKey("0")); } @Test void whenFlushableScheduledRequestsThenReturnAllRequests() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); Map> flushedRequests = batchBuffer.flushableScheduledRequests(1); @@ -66,6 +73,7 @@ void whenFlushableScheduledRequestsThenReturnAllRequests() { @Test void whenMaxBufferSizeReachedThenThrowException() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 3, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, 10); for (int i = 0; i < 10; i++) { batchBuffer.put("request" + i, new CompletableFuture<>()); } @@ -74,6 +82,7 @@ void whenMaxBufferSizeReachedThenThrowException() { @Test void whenPutScheduledFlushThenFlushIsSet() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); ScheduledFuture newScheduledFlush = mock(ScheduledFuture.class); batchBuffer.putScheduledFlush(newScheduledFlush); assertNotNull(newScheduledFlush); @@ -81,12 +90,14 @@ void whenPutScheduledFlushThenFlushIsSet() { @Test void whenCancelScheduledFlushThenFlushIsCancelled() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); batchBuffer.cancelScheduledFlush(); verify(scheduledFlush).cancel(false); } @Test void whenGetResponsesThenReturnAllResponses() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response1 = new CompletableFuture<>(); CompletableFuture response2 = new CompletableFuture<>(); batchBuffer.put("request1", response1); @@ -99,6 +110,7 @@ void whenGetResponsesThenReturnAllResponses() { @Test void whenClearBufferThenBufferIsEmpty() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); batchBuffer.clear(); @@ -107,22 +119,83 @@ void whenClearBufferThenBufferIsEmpty() { @Test void whenExtractFlushedEntriesThenReturnCorrectEntries() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); for (int i = 0; i < 5; i++) { batchBuffer.put("request" + i, new CompletableFuture<>()); } - Map> flushedEntries = batchBuffer.flushableRequests(5); + Map> flushedEntries = batchBuffer.flushableRequests(); assertEquals(5, flushedEntries.size()); } @Test void whenHasNextBatchEntryThenReturnTrue() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); batchBuffer.put("request1", new CompletableFuture<>()); - assertTrue(batchBuffer.flushableRequests(1).containsKey("0")); + assertTrue(batchBuffer.flushableRequests().containsKey("0")); } + @Test void whenNextBatchEntryThenReturnNextEntryId() { + batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); batchBuffer.put("request1", new CompletableFuture<>()); - assertEquals("0", batchBuffer.flushableRequests(1).keySet().iterator().next()); + assertEquals("0", batchBuffer.flushableRequests().keySet().iterator().next()); + } + + @Test + void whenRequestPassedWithLessBytesinArgs_thenCheckForSizeOnly_andDonotFlush() { + RequestBatchBuffer batchBuffer + = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + for (int i = 0; i < 5; i++) { + batchBuffer.put(SendMessageRequest.builder().build(), + new CompletableFuture<>()); + } + Map> flushedEntries = + batchBuffer.flushableRequestsOnByteLimitBeforeAdd(SendMessageRequest.builder().messageBody("Hi").build()); + assertEquals(0, flushedEntries.size()); + } + + + + @Test + void testFlushWhenPayloadExceedsMaxSize() { + RequestBatchBuffer batchBuffer + = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + + String largeMessageBody = createLargeString('a',245_760); + batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(), + new CompletableFuture<>()); + Map> flushedEntries = + batchBuffer.flushableRequestsOnByteLimitBeforeAdd(SendMessageRequest.builder().messageBody("NewMessage").build()); + assertEquals(1, flushedEntries.size()); + } + + @Test + void testFlushWhenCumulativePayloadExceedsMaxSize() { + RequestBatchBuffer batchBuffer + = new RequestBatchBuffer<>(scheduledFlush, 5, SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); + + String largeMessageBody = createLargeString('a',130_000); + batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(), + new CompletableFuture<>()); + batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(), + new CompletableFuture<>()); + Map> flushedEntries = + batchBuffer.flushableRequestsOnByteLimitBeforeAdd(SendMessageRequest.builder().messageBody("NewMessage").build()); + + //Flushes both the messages since thier sum is greater than 256Kb + assertEquals(2, flushedEntries.size()); } + + + private String createLargeString(char ch, int length) { + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append(ch); + } + return sb.toString(); + } + + + } \ No newline at end of file diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestPayloadCalculatorTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestPayloadCalculatorTest.java new file mode 100644 index 000000000000..fd58fcaf5a8c --- /dev/null +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestPayloadCalculatorTest.java @@ -0,0 +1,90 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.sqs.batchmanager; + +import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.awssdk.services.sqs.internal.batchmanager.RequestPayloadCalculator; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; + +import java.util.Optional; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.ATTRIBUTE_MAPS_PAYLOAD_BYTES; + +@TestInstance(Lifecycle.PER_CLASS) +class RequestPayloadCalculatorTest { + + @ParameterizedTest + @MethodSource("provideRequestsForMessageSizeCalculation") + @DisplayName("Test calculateMessageSize with different SendMessageRequest inputs") + void testCalculateMessageSize(SendMessageRequest request, int expectedSize) { + Optional actualSize = RequestPayloadCalculator.calculateMessageSize(request); + assertEquals(Optional.of(expectedSize), actualSize); + } + + private Stream provideRequestsForMessageSizeCalculation() { + return Stream.of( + Arguments.of( + SendMessageRequest.builder().messageBody("Test message").build(), + "Test message".getBytes(StandardCharsets.UTF_8).length + ATTRIBUTE_MAPS_PAYLOAD_BYTES + ), + Arguments.of( + SendMessageRequest.builder().messageBody("").build(), + ATTRIBUTE_MAPS_PAYLOAD_BYTES + ), + Arguments.of( + SendMessageRequest.builder().messageBody(null).build(), + ATTRIBUTE_MAPS_PAYLOAD_BYTES + ), + Arguments.of( + SendMessageRequest.builder().messageBody("Another test message").build(), + "Another test message".getBytes(StandardCharsets.UTF_8).length + ATTRIBUTE_MAPS_PAYLOAD_BYTES + ) + ); + } + + @ParameterizedTest + @MethodSource("provideNonSendMessageRequest") + @DisplayName("Test calculateMessageSize with non-SendMessageRequest inputs") + void testCalculateMessageSizeWithNonSendMessageRequest(Object request) { + Optional actualSize = RequestPayloadCalculator.calculateMessageSize(request); + assertEquals(Optional.empty(), actualSize); + } + + private Stream provideNonSendMessageRequest() { + return Stream.of( + Arguments.of(ChangeMessageVisibilityRequest.builder() + .queueUrl("https://sqs.us-west-2.amazonaws.com/MyQueue") + .receiptHandle("some-receipt-handle") + .visibilityTimeout(60) + .build()), + + Arguments.of(DeleteMessageRequest.builder() + .queueUrl("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueue") + .receiptHandle("some-receipt-handle") + .build()) + ); + } +} diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java index ac8b2d00cc9f..d083f2502b38 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SampleBatchManager.java @@ -20,7 +20,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import software.amazon.awssdk.services.sqs.internal.batchmanager.IdentifiableMessage; +import software.amazon.awssdk.services.sqs.internal.batchmanager.RequestBatchConfiguration; import software.amazon.awssdk.services.sqs.internal.batchmanager.RequestBatchManager; +import software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault; import software.amazon.awssdk.utils.Either; public class SampleBatchManager extends RequestBatchManager { @@ -30,7 +32,14 @@ public class SampleBatchManager extends RequestBatchManager