-
Notifications
You must be signed in to change notification settings - Fork 864
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix issue where the Scheduled Timeout was incorrectly completing the futures with empty messages #5571
Fix issue where the Scheduled Timeout was incorrectly completing the futures with empty messages #5571
Conversation
…futures with empty messages
TimeUnit.MILLISECONDS); | ||
return receiveMessageFuture.applyToEither(timeoutFuture, Function.identity()); | ||
executor.schedule(() -> { | ||
if (!receiveMessageFuture.isDone()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to check isDone here, complete will handle it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -91,9 +91,6 @@ private String checkBatchingEligibility(ReceiveMessageRequest rq) { | |||
if (rq.overrideConfiguration().isPresent()) { | |||
return "Request has override configurations."; | |||
} | |||
if (rq.waitTimeSeconds() != null && rq.waitTimeSeconds() != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we removing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the user passes this value then user can control the Polling time out , this is behaviour same as https://github.com/aws/aws-sdk-java/blob/61d73631fac8535ad70666bbce9e70a1d2cea2ca/aws-java-sdk-sqs/src/main/java/com/amazonaws/services/sqs/buffered/ReceiveQueueBuffer.java#L192-L197
I removed this in earlier PR but when I was doing V1 comparison I realized that V1 uses this value to determine the Client side Wait time for that particular request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already supported in V2
Lines 76 to 82 in dd9379d
Integer waitTimeSeconds = rq.waitTimeSeconds(); | |
if (waitTimeSeconds != null) { | |
long waitTimeMillis = TimeUnit.SECONDS.toMillis(waitTimeSeconds); | |
return CompletableFuture.completedFuture(Duration.ofMillis(Math.max(configuredWaitTime.toMillis(), waitTimeMillis))); | |
} | |
CompletableFuture<Map<QueueAttributeName, String>> attributeFuture = getAttributeMap(); |
But after handling the Surface API review comments where I added a check to bypass, I went ahead and added check to this since it was an additional Attribute related to long polling but when used with Automatic Batch manager we use it client side polling time so thus we need this to go through the batch manager
Quality Gate failedFailed conditions See analysis details on SonarCloud Catch issues before they fail your Quality Gate with our IDE extension SonarLint |
910356b
into
feature/master/sqs-batch-manager
* Codegenerate BatchManager API under AsyncClient and Initail Interfaces for BatchManager (#5321) * Codegenerate BatchManager API under AsyncClient and Add empty initial Batchmanager interfaces and Implementations * Addressed review comments * Added Internal classes required for BatchManager Implementation * Revert "Added Internal classes required for BatchManager Implementation" This reverts commit 318969b. * Internal classes and RequestBatchManager Impelementation (#5418) * Added Internal classes required for BatchManager Implementation * Added Batch Send Implementation * Handled review comments * Handled review comments * Handled review comments * Made RequestBatchManager class a Abstract class * Checkstyle issues * Removed unused methods * New lines removed * Made public static to private state for sqsBatch functions * Constants added * Sonar cloud issues fixed * commit to check why test on codebuild * Increased Timeouts for get * Added abstract methods * Handled comments to remove Builders * Handled comments to take care when batchmanager closed while pending requests * Handled comments * Checkstyle issue * Added Consumer builders args for existing APIs of BatchManager (#5514) * Receive Batch Manager Implementation (#5488) * Add Recieve Buffer Queue And its related configuration * Update ReceiveBatch manager * Recieve Batch Manager Implementation * Receive Batch Manager Implemetation * Handled review comments * Checkstyle failure * Flsky test case fixed * Flaky test case fixed * Hamdled review comments * Handled comments * Removed ReceiveMessageCompletableFuture * SdkClosable implemented * Added ReceiveMessageBatchManager class for completeness * Checkstyle issues * Null checks * Handled comments from Zoe * Updated the defaults to 50ms same as V1 after surface area review * Revert "Updated the defaults to 50ms same as V1 after surface area review" This reverts commit e7d2295. * Bytes Based batching for SendMessageRequest Batching (#5540) * Initial changes * Initial changes 2 * Byte Based batching for SendMessage API * Byte Based batching for SendMessage API * Handled comments * Checkstyle issue * Add User Agent for Sqs Calls made using Automatic Batching Manager (#5546) * Add User Agent for Sqs Calls made using Automatic Batching Manager as hll/abm * Review comments * Update comments from PR 5488 (#5550) * Update comments of PR 5488 * Update comments from PR 5488 * Handled surface area review comments (#5563) * Initial version * Intermediate changes * Update after internal poll * ResponseCOnfiguration construction updated * RequestOverride configuration check added to Bypass batch manager * Handled review comments * Removed TODO since validations are handled in BatchPverrideConfiguration * Fix issue where the Scheduled Timeout was incorrectly completing the futures with empty messages (#5571) * Fix issue where the Scheduled Timeout was incorrectly completing the futures with empty messages * Handled review comments * Integ test for Automatic Request Batching (#5576) * feat(sqs): add BatchManager for client-side request batching to Amazon SQS The new BatchManager allows for simple request batching using client-side buffering, improving cost efficiency and reducing the number of requests sent to Amazon SQS. The client-side buffering supports up to 10 requests per batch and is supported by the SqsAsyncClient. Batched requests, along with receive message polling, help to increase throughput. * Add check for scheduledExecutor such that it not null when creating SqsAsyncBatchManager (#5582) * Add check for scheduledExecutor such that it not null when creating SqsAsyncBatchManager * Update services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerBuilderTest.java Co-authored-by: David Ho <70000000+davidh44@users.noreply.github.com> * Update services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerBuilderTest.java Co-authored-by: David Ho <70000000+davidh44@users.noreply.github.com> * Update services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerBuilderTest.java Co-authored-by: David Ho <70000000+davidh44@users.noreply.github.com> --------- Co-authored-by: David Ho <70000000+davidh44@users.noreply.github.com> * Updating Timeouts in gets so that we dont wait infinitely --------- Co-authored-by: David Ho <70000000+davidh44@users.noreply.github.com>
Motivation and Context
The key modification was ensuring that the same future passed to
receiveQueueBuffer.receiveMessage
is the one that gets completed, regardless of whether a timeout occurs.Previously, a new future (timeoutFuture) was scheduled to complete after the timeout period, but this future was independent of the one added to the futures queue. Now, instead of creating a new timeout future, we directly complete the original future (receiveMessageFuture) if it hasn't already been completed before the timeout occurs.
Root Cause
The issue occurred because a new CompletableFuture (timeout future) was being created and completed during timeouts, rather than completing the original future added to the ReceiveBuffer. This left the original future uncompleted, causing the buffer to return empty responses.
Fix
The fix ensures that the same
receiveMessageFuture
passed to the buffer is completed, even when a timeout occurs. The timeout logic now checks if the original future is complete and directly completes it, avoiding the creation of a separate timeout future. This ensures that the buffer behaves correctly and returns the expected messages.Modifications
The key modification was ensuring that the same future passed to
receiveQueueBuffer.receiveMessage
is the one that gets completed, regardless of whether a timeout occurs.Previously, a new future (timeoutFuture) was scheduled to complete after the timeout period, but this future was independent of the one added to the futures queue. Now, instead of creating a new timeout future, we directly complete the original future (receiveMessageFuture) if it hasn't already been completed before the timeout occurs.
Testing
License