Skip to content

Commit a946bd6

Browse files
committed
Byte Based batching for SendMessage API
1 parent 63779a5 commit a946bd6

16 files changed

+165
-171
lines changed

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public final class BatchOverrideConfiguration implements ToCopyableBuilder<Batch
3737
private final Integer maxBatchItems;
3838
private final Integer maxBatchKeys;
3939
private final Integer maxBufferSize;
40-
private final Duration batchSendRequestFrequency;
40+
private final Duration maxBatchOpenDuration;
4141
private final Duration visibilityTimeout;
4242
private final Duration longPollWaitTimeout;
4343
private final Duration minReceiveWaitTime;
@@ -51,8 +51,7 @@ private BatchOverrideConfiguration(Builder builder) {
5151
this.maxBatchItems = Validate.isPositiveOrNull(builder.maxBatchItems, "maxBatchItems");
5252
this.maxBatchKeys = Validate.isPositiveOrNull(builder.maxBatchKeys, "maxBatchKeys");
5353
this.maxBufferSize = Validate.isPositiveOrNull(builder.maxBufferSize, "maxBufferSize");
54-
this.batchSendRequestFrequency = Validate.isPositiveOrNull(builder.batchSendRequestFrequency,
55-
"batchSendRequestFrequency");
54+
this.maxBatchOpenDuration = Validate.isPositiveOrNull(builder.maxBatchOpenDuration, "maxBatchOpenDuration");
5655
this.visibilityTimeout = Validate.isPositiveOrNull(builder.visibilityTimeout, "visibilityTimeout");
5756
this.longPollWaitTimeout = Validate.isPositiveOrNull(builder.longPollWaitTimeout, "longPollWaitTimeout");
5857
this.minReceiveWaitTime = Validate.isPositiveOrNull(builder.minReceiveWaitTime, "minReceiveWaitTime");
@@ -95,8 +94,8 @@ public Integer maxDoneReceiveBatches() {
9594
/**
9695
* @return the optional maximum amount of time that an outgoing call waits to be batched with messages of the same type.
9796
*/
98-
public Duration batchSendRequestFrequency() {
99-
return batchSendRequestFrequency;
97+
public Duration maxBatchOpenDuration() {
98+
return maxBatchOpenDuration;
10099
}
101100

102101
/**
@@ -154,7 +153,7 @@ public Builder toBuilder() {
154153
return new Builder().maxBatchItems(maxBatchItems)
155154
.maxBatchKeys(maxBatchKeys)
156155
.maxBufferSize(maxBufferSize)
157-
.batchSendRequestFrequency(batchSendRequestFrequency)
156+
.maxBatchOpenDuration(maxBatchOpenDuration)
158157
.visibilityTimeout(visibilityTimeout)
159158
.longPollWaitTimeout(longPollWaitTimeout)
160159
.minReceiveWaitTime(minReceiveWaitTime)
@@ -171,7 +170,7 @@ public String toString() {
171170
.add("maxBatchItems", maxBatchItems)
172171
.add("maxBatchKeys", maxBatchKeys)
173172
.add("maxBufferSize", maxBufferSize)
174-
.add("batchSendRequestFrequency", batchSendRequestFrequency)
173+
.add("maxBatchOpenDuration", maxBatchOpenDuration)
175174
.add("visibilityTimeout", visibilityTimeout)
176175
.add("longPollWaitTimeout", longPollWaitTimeout)
177176
.add("minReceiveWaitTime", minReceiveWaitTime)
@@ -204,8 +203,8 @@ public boolean equals(Object o) {
204203
return false;
205204
}
206205

207-
if (batchSendRequestFrequency != null ? !batchSendRequestFrequency.equals(that.batchSendRequestFrequency) :
208-
that.batchSendRequestFrequency != null) {
206+
if (maxBatchOpenDuration != null ? !maxBatchOpenDuration.equals(that.maxBatchOpenDuration) :
207+
that.maxBatchOpenDuration != null) {
209208
return false;
210209
}
211210
if (visibilityTimeout != null ? !visibilityTimeout.equals(that.visibilityTimeout) :
@@ -245,7 +244,7 @@ public int hashCode() {
245244
int result = maxBatchItems != null ? maxBatchItems.hashCode() : 0;
246245
result = 31 * result + (maxBatchKeys != null ? maxBatchKeys.hashCode() : 0);
247246
result = 31 * result + (maxBufferSize != null ? maxBufferSize.hashCode() : 0);
248-
result = 31 * result + (batchSendRequestFrequency != null ? batchSendRequestFrequency.hashCode() : 0);
247+
result = 31 * result + (maxBatchOpenDuration != null ? maxBatchOpenDuration.hashCode() : 0);
249248
result = 31 * result + (visibilityTimeout != null ? visibilityTimeout.hashCode() : 0);
250249
result = 31 * result + (longPollWaitTimeout != null ? longPollWaitTimeout.hashCode() : 0);
251250
result = 31 * result + (minReceiveWaitTime != null ? minReceiveWaitTime.hashCode() : 0);
@@ -262,7 +261,7 @@ public static final class Builder implements CopyableBuilder<Builder, BatchOverr
262261
private Integer maxBatchItems;
263262
private Integer maxBatchKeys;
264263
private Integer maxBufferSize;
265-
private Duration batchSendRequestFrequency;
264+
private Duration maxBatchOpenDuration;
266265
private Duration visibilityTimeout;
267266
private Duration longPollWaitTimeout;
268267
private Duration minReceiveWaitTime;
@@ -316,11 +315,11 @@ public Builder maxBufferSize(Integer maxBufferSize) {
316315
* Define the maximum amount of time that an outgoing call waits for other requests before sending out a
317316
* batch request.
318317
* TODO : Decide if Ms needs to be added to the name in surface API review meeting
319-
* @param batchSendRequestFrequency The new batchSendRequestFrequency value.
318+
* @param maxBatchOpenDuration The new maxBatchOpenDuration value.
320319
* @return This object for method chaining.
321320
*/
322-
public Builder batchSendRequestFrequency(Duration batchSendRequestFrequency) {
323-
this.batchSendRequestFrequency = batchSendRequestFrequency;
321+
public Builder maxBatchOpenDuration(Duration maxBatchOpenDuration) {
322+
this.maxBatchOpenDuration = maxBatchOpenDuration;
324323
return this;
325324
}
326325

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ public final class BatchingExecutionContext<RequestT, ResponseT> {
2525
private final RequestT request;
2626
private final CompletableFuture<ResponseT> response;
2727

28-
private final Optional<Long> responsePayload;
28+
private final Optional<Integer> responsePayloadByteSize;
2929

3030
public BatchingExecutionContext(RequestT request, CompletableFuture<ResponseT> response) {
3131
this.request = request;
3232
this.response = response;
33-
responsePayload = RequestPayloadCalculator.calculateMessageSize(request);
33+
responsePayloadByteSize = RequestPayloadCalculator.calculateMessageSize(request);
3434
}
3535

3636
public RequestT request() {
@@ -41,8 +41,10 @@ public CompletableFuture<ResponseT> response() {
4141
return response;
4242
}
4343

44-
45-
public Optional<Long> responsePayload() {
46-
return responsePayload;
44+
/**
45+
* Optional because responsePayloadByteSize is required only for SendMessageRequests and not for other requests.
46+
*/
47+
public Optional<Integer> responsePayloadByteSize() {
48+
return responsePayloadByteSize;
4749
}
4850
}

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ public final class BatchingMap<RequestT, ResponseT> {
3434
private final int maxBatchKeys;
3535
private final int maxBatchBytesSize;
3636
private final int maxBatchSize;
37+
private final int maxBufferSize;
3738
private final Map<String, RequestBatchBuffer<RequestT, ResponseT>> batchContextMap;
3839

39-
public BatchingMap(int maxBatchKeys,
40-
int maxBatchBytesSize,
41-
int maxBatchSize) {
40+
public BatchingMap(RequestBatchConfiguration overrideConfiguration) {
4241
this.batchContextMap = new ConcurrentHashMap<>();
43-
this.maxBatchKeys = maxBatchKeys;
44-
this.maxBatchBytesSize = maxBatchBytesSize;
45-
this.maxBatchSize = maxBatchSize;
42+
this.maxBatchKeys = overrideConfiguration.maxBatchKeys();
43+
this.maxBatchBytesSize = overrideConfiguration.maxBatchBytesSize();
44+
this.maxBatchSize = overrideConfiguration.maxBatchItems();
45+
this.maxBufferSize = overrideConfiguration.maxBufferSize();
4646
}
4747

4848
public void put(String batchKey, Supplier<ScheduledFuture<?>> scheduleFlush, RequestT request,
@@ -51,7 +51,7 @@ public void put(String batchKey, Supplier<ScheduledFuture<?>> scheduleFlush, Req
5151
if (batchContextMap.size() == maxBatchKeys) {
5252
throw new IllegalStateException("Reached MaxBatchKeys of: " + maxBatchKeys);
5353
}
54-
return new RequestBatchBuffer<>(scheduleFlush.get(), maxBatchSize, maxBatchBytesSize);
54+
return new RequestBatchBuffer<>(scheduleFlush.get(), maxBatchSize, maxBatchBytesSize, maxBufferSize);
5555
}).put(request, response);
5656
}
5757

@@ -67,14 +67,14 @@ public void forEach(BiConsumer<String, RequestBatchBuffer<RequestT, ResponseT>>
6767
batchContextMap.forEach(action);
6868
}
6969

70-
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(String batchKey, RequestT request) {
71-
return batchContextMap.get(batchKey).flushableRequests(request);
72-
}
73-
7470
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(String batchKey) {
75-
return batchContextMap.get(batchKey).flushableRequests(null);
71+
return batchContextMap.get(batchKey).flushableRequests();
7672
}
7773

74+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequestsOnByteLimitBeforeAdd(String batchKey,
75+
RequestT request) {
76+
return batchContextMap.get(batchKey).flushableRequestsOnByteLimitBeforeAdd(request);
77+
}
7878

7979
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(String batchKey,
8080
int maxBatchItems) {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ public class ChangeMessageVisibilityBatchManager extends RequestBatchManager<Cha
4242

4343
private final SqsAsyncClient sqsAsyncClient;
4444

45-
4645
protected ChangeMessageVisibilityBatchManager(RequestBatchConfiguration overrideConfiguration,
4746
ScheduledExecutorService scheduledExecutor,
4847
SqsAsyncClient sqsAsyncClient) {
@@ -103,7 +102,6 @@ private static IdentifiableMessage<Throwable> changeMessageVisibilityCreateThrow
103102
return new IdentifiableMessage<>(key, response);
104103
}
105104

106-
107105
@Override
108106
protected CompletableFuture<ChangeMessageVisibilityBatchResponse> batchAndSend(
109107
List<IdentifiableMessage<ChangeMessageVisibilityRequest>> identifiedRequests, String batchKey) {
@@ -114,8 +112,8 @@ protected CompletableFuture<ChangeMessageVisibilityBatchResponse> batchAndSend(
114112

115113
@Override
116114
protected String getBatchKey(ChangeMessageVisibilityRequest request) {
117-
return request.overrideConfiguration().map(overrideConfig -> request.queueUrl() + overrideConfig.hashCode())
118-
.orElseGet(request::queueUrl);
115+
return request.overrideConfiguration().map(overrideConfig -> request.queueUrl() + overrideConfig.hashCode())
116+
.orElseGet(request::queueUrl);
119117
}
120118

121119
@Override

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

18-
import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES;
18+
import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES;
1919

2020
import java.util.concurrent.CompletableFuture;
2121
import java.util.concurrent.ScheduledExecutorService;
@@ -48,27 +48,20 @@ public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {
4848

4949
private DefaultSqsAsyncBatchManager(DefaultBuilder builder) {
5050
this.client = Validate.notNull(builder.client, "client cannot be null");
51-
5251
ScheduledExecutorService scheduledExecutor = builder.scheduledExecutor;
53-
54-
RequestBatchConfiguration.Builder configBuilder =
55-
builder.overrideConfiguration != null ?
56-
RequestBatchConfiguration.builder()
57-
.batchSendRequestFrequency(builder.overrideConfiguration.batchSendRequestFrequency())
58-
.maxBatchItems(builder.overrideConfiguration.maxBatchItems())
59-
.maxBufferSize(builder.overrideConfiguration.maxBufferSize())
60-
.maxBatchKeys(builder.overrideConfiguration.maxBatchKeys())
61-
: RequestBatchConfiguration.builder();
62-
63-
this.sendMessageBatchManager = new SendMessageBatchManager(configBuilder
64-
.maxBatchBytesSize(MAX_PAYLOAD_SIZE_BYTES)
65-
.build(),
66-
scheduledExecutor,
67-
client);
68-
this.deleteMessageBatchManager = new DeleteMessageBatchManager(configBuilder.build(),
69-
scheduledExecutor,
70-
client);
71-
this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(configBuilder.build(),
52+
this.sendMessageBatchManager =
53+
new SendMessageBatchManager(RequestBatchConfiguration
54+
.builder(builder.overrideConfiguration)
55+
.maxBatchBytesSize(MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES)
56+
.build(),
57+
scheduledExecutor,
58+
client);
59+
this.deleteMessageBatchManager =
60+
new DeleteMessageBatchManager(RequestBatchConfiguration.builder(builder.overrideConfiguration).build(),
61+
scheduledExecutor,
62+
client);
63+
this.changeMessageVisibilityBatchManager =
64+
new ChangeMessageVisibilityBatchManager(RequestBatchConfiguration.builder(builder.overrideConfiguration).build(),
7265
scheduledExecutor,
7366
client);
7467

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import java.util.ArrayList;
1919
import java.util.List;
20-
import java.util.Map;
2120
import java.util.Optional;
2221
import java.util.concurrent.CompletableFuture;
2322
import java.util.concurrent.ScheduledExecutorService;
@@ -36,7 +35,6 @@
3635
import software.amazon.awssdk.services.sqs.model.SqsException;
3736
import software.amazon.awssdk.utils.Either;
3837

39-
4038
@SdkInternalApi
4139
public class DeleteMessageBatchManager extends RequestBatchManager<DeleteMessageRequest, DeleteMessageResponse,
4240
DeleteMessageBatchResponse> {
@@ -50,17 +48,6 @@ protected DeleteMessageBatchManager(RequestBatchConfiguration overrideConfigurat
5048
this.sqsAsyncClient = sqsAsyncClient;
5149
}
5250

53-
54-
private static boolean shouldFlush(Map<String, BatchingExecutionContext<DeleteMessageRequest,
55-
DeleteMessageResponse>> contextMap,
56-
DeleteMessageRequest request, RequestBatchConfiguration configuration) {
57-
if (request != null) {
58-
return false;
59-
}
60-
return contextMap.size() >= configuration.maxBatchItems();
61-
}
62-
63-
6451
private static DeleteMessageBatchRequest createDeleteMessageBatchRequest(
6552
List<IdentifiableMessage<DeleteMessageRequest>> identifiedRequests, String batchKey) {
6653
List<DeleteMessageBatchRequestEntry> entries = identifiedRequests

0 commit comments

Comments
 (0)