Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bytes Based batching for SendMessageRequest Batching #5540

Merged
merged 5 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public String toString() {
.add("maxBatchItems", maxBatchItems)
.add("maxBatchKeys", maxBatchKeys)
.add("maxBufferSize", maxBufferSize)
.add("maxBatchOpenDuration", maxBatchOpenDuration.toMillis())
.add("maxBatchOpenDuration", maxBatchOpenDuration)
.add("visibilityTimeout", visibilityTimeout)
.add("longPollWaitTimeout", longPollWaitTimeout)
.add("minReceiveWaitTime", minReceiveWaitTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;

Expand All @@ -24,9 +25,12 @@ public final class BatchingExecutionContext<RequestT, ResponseT> {
private final RequestT request;
private final CompletableFuture<ResponseT> response;

private final Optional<Integer> responsePayloadByteSize;

public BatchingExecutionContext(RequestT request, CompletableFuture<ResponseT> response) {
this.request = request;
this.response = response;
responsePayloadByteSize = RequestPayloadCalculator.calculateMessageSize(request);
}

public RequestT request() {
Expand All @@ -36,4 +40,11 @@ public RequestT request() {
public CompletableFuture<ResponseT> response() {
return response;
}

/**
* Optional because responsePayloadByteSize is required only for SendMessageRequests and not for other requests.
*/
public Optional<Integer> responsePayloadByteSize() {
return responsePayloadByteSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@
public final class BatchingMap<RequestT, ResponseT> {

private final int maxBatchKeys;
private final int maxBatchBytesSize;
private final int maxBatchSize;
private final int maxBufferSize;
private final Map<String, RequestBatchBuffer<RequestT, ResponseT>> batchContextMap;

public BatchingMap(int maxBatchKeys, int maxBufferSize) {
public BatchingMap(RequestBatchConfiguration overrideConfiguration) {
this.batchContextMap = new ConcurrentHashMap<>();
this.maxBatchKeys = maxBatchKeys;
this.maxBufferSize = maxBufferSize;
this.maxBatchKeys = overrideConfiguration.maxBatchKeys();
this.maxBatchBytesSize = overrideConfiguration.maxBatchBytesSize();
this.maxBatchSize = overrideConfiguration.maxBatchItems();
this.maxBufferSize = overrideConfiguration.maxBufferSize();
}

public void put(String batchKey, Supplier<ScheduledFuture<?>> scheduleFlush, RequestT request,
Expand All @@ -47,10 +51,14 @@ public void put(String batchKey, Supplier<ScheduledFuture<?>> scheduleFlush, Req
if (batchContextMap.size() == maxBatchKeys) {
throw new IllegalStateException("Reached MaxBatchKeys of: " + maxBatchKeys);
}
return new RequestBatchBuffer<>(maxBufferSize, scheduleFlush.get());
return new RequestBatchBuffer<>(scheduleFlush.get(), maxBatchSize, maxBatchBytesSize, maxBufferSize);
}).put(request, response);
}

public boolean contains(String batchKey) {
return batchContextMap.containsKey(batchKey);
}

public void putScheduledFlush(String batchKey, ScheduledFuture<?> scheduledFlush) {
batchContextMap.get(batchKey).putScheduledFlush(scheduledFlush);
}
Expand All @@ -59,9 +67,13 @@ public void forEach(BiConsumer<String, RequestBatchBuffer<RequestT, ResponseT>>
batchContextMap.forEach(action);
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(String batchKey,
int maxBatchItems) {
return batchContextMap.get(batchKey).flushableRequests(maxBatchItems);
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(String batchKey) {
return batchContextMap.get(batchKey).flushableRequests();
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequestsOnByteLimitBeforeAdd(String batchKey,
RequestT request) {
return batchContextMap.get(batchKey).flushableRequestsOnByteLimitBeforeAdd(request);
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(String batchKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
Expand All @@ -43,7 +42,7 @@ public class ChangeMessageVisibilityBatchManager extends RequestBatchManager<Cha

private final SqsAsyncClient sqsAsyncClient;

protected ChangeMessageVisibilityBatchManager(BatchOverrideConfiguration overrideConfiguration,
protected ChangeMessageVisibilityBatchManager(RequestBatchConfiguration overrideConfiguration,
ScheduledExecutorService scheduledExecutor,
SqsAsyncClient sqsAsyncClient) {
super(overrideConfiguration, scheduledExecutor);
Expand All @@ -64,10 +63,10 @@ private static ChangeMessageVisibilityBatchRequest createChangeMessageVisibility
.overrideConfiguration();
return overrideConfiguration.map(
config -> ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(batchKey)
.overrideConfiguration(config)
.entries(entries)
.build())
.queueUrl(batchKey)
.overrideConfiguration(config)
.entries(entries)
.build())
.orElseGet(() -> ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(batchKey)
.entries(entries)
Expand Down Expand Up @@ -103,8 +102,6 @@ private static IdentifiableMessage<Throwable> changeMessageVisibilityCreateThrow
return new IdentifiableMessage<>(key, response);
}



@Override
protected CompletableFuture<ChangeMessageVisibilityBatchResponse> batchAndSend(
List<IdentifiableMessage<ChangeMessageVisibilityRequest>> identifiedRequests, String batchKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand Down Expand Up @@ -46,16 +48,20 @@ public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {

private DefaultSqsAsyncBatchManager(DefaultBuilder builder) {
this.client = Validate.notNull(builder.client, "client cannot be null");

ScheduledExecutorService scheduledExecutor = builder.scheduledExecutor;

this.sendMessageBatchManager = new SendMessageBatchManager(builder.overrideConfiguration,
scheduledExecutor,
client);
this.deleteMessageBatchManager = new DeleteMessageBatchManager(builder.overrideConfiguration,
scheduledExecutor,
client);
this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(builder.overrideConfiguration,
this.sendMessageBatchManager =
new SendMessageBatchManager(RequestBatchConfiguration
.builder(builder.overrideConfiguration)
.maxBatchBytesSize(MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES)
.build(),
scheduledExecutor,
client);
this.deleteMessageBatchManager =
new DeleteMessageBatchManager(RequestBatchConfiguration.builder(builder.overrideConfiguration).build(),
scheduledExecutor,
client);
this.changeMessageVisibilityBatchManager =
new ChangeMessageVisibilityBatchManager(RequestBatchConfiguration.builder(builder.overrideConfiguration).build(),
scheduledExecutor,
client);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation is weird


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
Expand All @@ -36,14 +35,13 @@
import software.amazon.awssdk.services.sqs.model.SqsException;
import software.amazon.awssdk.utils.Either;


@SdkInternalApi
public class DeleteMessageBatchManager extends RequestBatchManager<DeleteMessageRequest, DeleteMessageResponse,
DeleteMessageBatchResponse> {

private final SqsAsyncClient sqsAsyncClient;

protected DeleteMessageBatchManager(BatchOverrideConfiguration overrideConfiguration,
protected DeleteMessageBatchManager(RequestBatchConfiguration overrideConfiguration,
ScheduledExecutorService scheduledExecutor,
SqsAsyncClient sqsAsyncClient) {
super(overrideConfiguration, scheduledExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,36 @@

package software.amazon.awssdk.services.sqs.internal.batchmanager;


import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.CollectionUtils;

@SdkInternalApi
public final class RequestBatchBuffer<RequestT, ResponseT> {
public final class RequestBatchBuffer<RequestT, ResponseT> {
private final Object flushLock = new Object();

private final Map<String, BatchingExecutionContext<RequestT, ResponseT>> idToBatchContext;

/**
* Maximum number of elements that can be included in the BatchBuffer.
*/
private final int maxBatchItems;
private final int maxBufferSize;

private final int maxBatchSizeInBytes;
/**
* Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next
* BatchingExecutionContext. For simplicity, the ID is just an integer that is incremented everytime a new request and
* response pair is received.
*/
private int nextId;

/**
* Keeps track of the ID of the next entry to be added in a batch request. This ID does not necessarily correlate to a
* request that already exists in the idToBatchContext map since it refers to the next entry (ex. if the last entry added
* to idToBatchContext had an id of 22, nextBatchEntry will have a value of 23).
* Keeps track of the ID of the next entry to be added in a batch request. This ID does not necessarily correlate to a request
* that already exists in the idToBatchContext map since it refers to the next entry (ex. if the last entry added to
* idToBatchContext had an id of 22, nextBatchEntry will have a value of 23).
*/
private int nextBatchEntry;

Expand All @@ -54,29 +53,59 @@ public final class RequestBatchBuffer<RequestT, ResponseT> {
*/
private ScheduledFuture<?> scheduledFlush;

public RequestBatchBuffer(int maxBufferSize, ScheduledFuture<?> scheduledFlush) {
public RequestBatchBuffer(ScheduledFuture<?> scheduledFlush,
int maxBatchItems, int maxBatchSizeInBytes, int maxBufferSize) {
this.idToBatchContext = new ConcurrentHashMap<>();
this.maxBufferSize = maxBufferSize;
this.nextId = 0;
this.nextBatchEntry = 0;
this.scheduledFlush = scheduledFlush;
this.maxBatchItems = maxBatchItems;
this.maxBufferSize = maxBufferSize;
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(int maxBatchItems) {
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests() {
synchronized (flushLock) {
if (idToBatchContext.size() >= maxBatchItems) {
return extractFlushedEntries(maxBatchItems);
return (isByteSizeThresholdCrossed(0) || isMaxBatchSizeLimitReached())
? extractFlushedEntries(maxBatchItems)
: Collections.emptyMap();
}
}


private boolean isMaxBatchSizeLimitReached() {
return idToBatchContext.size() >= maxBatchItems;
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequestsOnByteLimitBeforeAdd(RequestT request) {
synchronized (flushLock) {
if (maxBatchSizeInBytes > 0 && !idToBatchContext.isEmpty()) {
int incomingRequestBytes = RequestPayloadCalculator.calculateMessageSize(request).orElse(0);
if (isByteSizeThresholdCrossed(incomingRequestBytes)) {
return extractFlushedEntries(maxBatchItems);
}
}
return new ConcurrentHashMap<>();
return Collections.emptyMap();
}
}

private boolean isByteSizeThresholdCrossed(int incomingRequestBytes) {
if (maxBatchSizeInBytes < 0) {
return false;
}
int totalPayloadSize = idToBatchContext.values().stream()
.map(BatchingExecutionContext::responsePayloadByteSize)
.mapToInt(opt -> opt.orElse(0))
.sum() + incomingRequestBytes;
return totalPayloadSize > maxBatchSizeInBytes;
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(int maxBatchItems) {
synchronized (flushLock) {
if (idToBatchContext.size() > 0) {
if (!idToBatchContext.isEmpty()) {
return extractFlushedEntries(maxBatchItems);
}
return new ConcurrentHashMap<>();
return Collections.emptyMap();
}
}

Expand Down
Loading
Loading