Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing BatchManager in sdk-core #2613

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
8abe56f
Adding batchBuffer and testing code
Jul 13, 2021
55d6849
Adding sdk-core tests and fixing request to response mapping
Jul 14, 2021
f6e485a
Updating sdk-core tests with object to mimic batch request entry
Jul 14, 2021
fa01f37
Adding wrapper classes for functions
Jul 14, 2021
7a2c0b9
Making some changes for thread safety and updating tests
Jul 15, 2021
be0d218
Refactoring code and how requests are cleared. Simplifying scheduling…
Jul 15, 2021
d89d35f
Changing scheduled flush code. Need to test in next commit
Jul 15, 2021
7191141
Adding more test cases and scheduling periodically instead of just once
Jul 15, 2021
6efeb64
Refactoring responseMap into a wrapper class
Jul 15, 2021
aec0d5c
Fixing issues related to periodic scheduling. Need to test with multi…
Jul 19, 2021
db83454
Using map of currentIds instead of a singular currentId.
Jul 19, 2021
2c55e01
Fixed race conditions related to cancelling scheduled buffer flush
Jul 20, 2021
4b77e0b
Resetting cancellableFlush's flags after flush is completed
Jul 20, 2021
fe6950a
Multithreading test works. need to clean up code and recommit
Jul 20, 2021
1e3d0d8
Found another way to reset cancellableFlush flags. Also cleaning up code
Jul 20, 2021
ec2f127
Updating request batching tests with multi threading tests
Jul 20, 2021
1f9bf88
Renamed to batchManager and cleaned up variable names to be more self…
Jul 21, 2021
b11b1b9
Refactored sdk-core tests
Jul 21, 2021
581a620
Fixing checkstyle issues
Jul 21, 2021
043619d
Fixing checkstyle issues in the tests as well as refactoring some tests
Jul 21, 2021
102424b
Updating and adding SQS batching tests and removing xml.bind
Jul 21, 2021
4b6ea92
Using existing Md5Utils for Sqs batch tests
Jul 21, 2021
d1efa49
Naming and style changes to address PR comments
Jul 22, 2021
4b4cb85
Refactoring tests to use maps instead of arrays for better request-to…
Jul 22, 2021
2528a65
Name changes, refactoring code and addressing minor PR comments
Jul 22, 2021
d76a4ff
Refactoring SQS integration tests to use identifiable responses
Jul 23, 2021
cff589a
Combining request and response maps into one BatchingMap
Jul 23, 2021
e8089e6
Renaming generics as RequestT, ResponseT etc.
Jul 23, 2021
f478261
Refactoring tests to calculate batchGroupId from request and removing…
Jul 23, 2021
cbfc34a
Cleaning up names and methods
Jul 26, 2021
5c94dd2
Adding the batchGroupId function that I forgot last time
Jul 26, 2021
915be6f
Refactoring batchManager to use builder pattern and an overrideConfig…
Jul 26, 2021
e387b21
Removing need for separate executor (just use scheduled executor)
Jul 26, 2021
0b17fcb
Adding javadocs and renaming variables/classes to be more intuitive
Jul 26, 2021
7849b42
Refactoring incrementing currentId to a separate BatchUtils class
Jul 26, 2021
660803b
Refactoring currentIds map and scheduledFlushes map into the BatchBuffer
Jul 26, 2021
d9a40c3
Removing need to check if flush has executed but need to check if flu…
Jul 26, 2021
826fab3
Small changes to address github PR comments
Jul 27, 2021
424dd62
Removing completionService and just using CompletableFuture.supplyAsync
Jul 27, 2021
d2725ec
Adding code to cover exception thrown from the sendAndBatch function
Jul 27, 2021
89d55f1
Adding exception test and fixing batchingExcution locking
Jul 27, 2021
137b340
Adding lock object back into batchingExecutionContext for both read a…
Jul 27, 2021
7ac8916
Addressing minor github PR comments (making classes final, cleaning u…
Jul 28, 2021
52e6316
Just renaming cancelScheduledFlushIfNeeded method
Jul 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.internal.batchutilities.BatchManager;
import software.amazon.awssdk.utils.ToString;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.CopyableBuilder;
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;

/**
* Configuration values for the {@link BatchManager}. All values are optional, and the default values will be used
* if they are not specified.
*/
@SdkPublicApi
public final class BatchOverrideConfiguration implements ToCopyableBuilder<BatchOverrideConfiguration.Builder,
BatchOverrideConfiguration> {

private final Integer maxBatchItems;
private final Duration maxBatchOpenInMs;
private final ScheduledExecutorService scheduledExecutor;

public BatchOverrideConfiguration(Builder builder) {
Validate.notNull(builder.maxBatchItems, "maxBatchItems cannot be null");
this.maxBatchItems = Validate.isPositive(builder.maxBatchItems, "maxBatchItems");
Validate.notNull(builder.maxBatchOpenInMs, "maxBatchOpenInMs cannot be null");
this.maxBatchOpenInMs = Validate.isPositive(builder.maxBatchOpenInMs, "maxBachOpenInMs");
this.scheduledExecutor = Validate.notNull(builder.scheduledExecutor, "scheduledExecutor cannot be null");
}

public static Builder builder() {
return new Builder();
}

/**
* @return the optional maximum number of messages that are batched together in a single request.
*/
public Integer maxBatchItems() {
return maxBatchItems;
}

/**
* @return the optional maximum amount of time (in milliseconds) that an outgoing call waits to be batched with messages of
* the same type.
*/
public Duration maxBatchOpenInMs() {
return maxBatchOpenInMs;
}

public ScheduledExecutorService scheduledExecutor() {
return scheduledExecutor;
}

@Override
public Builder toBuilder() {
return new Builder().maxBatchItems(maxBatchItems)
.maxBatchOpenInMs(maxBatchOpenInMs)
.scheduledExecutor(scheduledExecutor);
}

@Override
public String toString() {
return ToString.builder("BatchOverrideConfiguration")
.add("maxBatchItems", maxBatchItems)
.add("maxBatchOpenInMs", maxBatchOpenInMs)
.build();
}

public static final class Builder implements CopyableBuilder<Builder, BatchOverrideConfiguration> {

private Integer maxBatchItems;
private Duration maxBatchOpenInMs;
private ScheduledExecutorService scheduledExecutor;

private Builder() {
}

public Builder maxBatchItems(Integer maxBatchItems) {
this.maxBatchItems = maxBatchItems;
return this;
}

public Builder maxBatchOpenInMs(Duration maxBatchOpenInMs) {
this.maxBatchOpenInMs = maxBatchOpenInMs;
return this;
}

public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
this.scheduledExecutor = scheduledExecutor;
return this;
}

public BatchOverrideConfiguration build() {
return new BatchOverrideConfiguration(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchutilities;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkProtectedApi;

/**
* Takes a list of identified requests in addition to a destination and batches the requests into a batch request.
* It then sends the batch request and returns a CompletableFuture of the response.
* @param <RequestT> the type of an outgoing request.
* @param <BatchResponseT> the type of an outgoing batch response.
*/
@FunctionalInterface
@SdkProtectedApi
public interface BatchAndSend<RequestT, BatchResponseT> {
CompletableFuture<BatchResponseT> batchAndSend(List<IdentifiableRequest<RequestT>> identifiedRequests, String batchGroupId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchutilities;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import software.amazon.awssdk.annotations.SdkInternalApi;

@SdkInternalApi
public final class BatchBuffer<RequestT, ResponseT> {
private final Map<String, BatchingExecutionContext<RequestT, ResponseT>> idToBatchContext;
private final AtomicInteger numRequests;

/**
* Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next
* BatchingExecutionContext. For simplicity, the ID is just an integer that is incremented everytime a new request and
* response pair is received.
*/
private final AtomicInteger nextId;

/**
* The scheduled flush tasks associated with this batchBuffer.
*/
private ScheduledFlush scheduledFlush;

public BatchBuffer(ScheduledFlush scheduledFlush) {
this.idToBatchContext = new ConcurrentHashMap<>();
this.numRequests = new AtomicInteger(0);
this.nextId = new AtomicInteger(0);
this.scheduledFlush = scheduledFlush;
}

public int size() {
return idToBatchContext.size();
}

public int requestSize() {
return numRequests.get();
}

public boolean hasRequests() {
return numRequests.get() != 0;
}

public boolean hasResponses() {
return !idToBatchContext.isEmpty();
}

public boolean containsKey(String key) {
return idToBatchContext.containsKey(key);
}

public RequestT getRequest(String key) {
return idToBatchContext.get(key).request();
}

public CompletableFuture<ResponseT> getResponse(String key) {
return idToBatchContext.get(key).response();
}

public ScheduledFlush getScheduledFlush() {
return scheduledFlush;
}

public BatchingExecutionContext<RequestT, ResponseT> put(RequestT request, CompletableFuture<ResponseT> response) {
numRequests.getAndIncrement();
String id = BatchUtils.getAndIncrementId(nextId);
return idToBatchContext.put(id, new BatchingExecutionContext<>(request, response));
}

public void putScheduledFlush(ScheduledFlush scheduledFlush) {
this.scheduledFlush = scheduledFlush;
}

public void cancelScheduledFlush() {
scheduledFlush.cancel();
}

public void removeRequest(String key) {
if (idToBatchContext.get(key).removeRequest()) {
numRequests.getAndDecrement();
}
}

public BatchingExecutionContext<RequestT, ResponseT> remove(String key) {
return idToBatchContext.remove(key);
}

public Collection<BatchingExecutionContext<RequestT, ResponseT>> values() {
return idToBatchContext.values();
}

public Collection<CompletableFuture<ResponseT>> responses() {
return idToBatchContext.values()
.stream()
.map(BatchingExecutionContext::response)
.collect(Collectors.toList());
}

public Set<Map.Entry<String, BatchingExecutionContext<RequestT, ResponseT>>> entrySet() {
return idToBatchContext.entrySet();
}

public void clear() {
numRequests.set(0);
idToBatchContext.clear();
}

public void forEach(BiConsumer<String, BatchingExecutionContext<RequestT, ResponseT>> action) {
idToBatchContext.forEach(action);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchutilities;

import software.amazon.awssdk.annotations.SdkProtectedApi;

/**
* Takes a request and extracts a batchGroupId as determined by the caller.
* TODO: For right now, the batchKey is a String but this may change as needed in the future.
* @param <RequestT> the request.
*/
@FunctionalInterface
@SdkProtectedApi
public interface BatchKeyMapper<RequestT> {
String getBatchKey(RequestT request);
}
Loading