Skip to content

Commit 910356b

Browse files
authored
Fix issue where the Scheduled Timeout was incorrectly completing the futures with empty messages (#5571)
* Fix issue where the Scheduled Timeout was incorrectly completing the futures with empty messages * Handled review comments
1 parent dd9379d commit 910356b

File tree

4 files changed

+244
-10
lines changed

4 files changed

+244
-10
lines changed

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.concurrent.CompletableFuture;
2121
import java.util.concurrent.ScheduledExecutorService;
2222
import java.util.concurrent.TimeUnit;
23-
import java.util.function.Function;
2423
import software.amazon.awssdk.annotations.SdkInternalApi;
2524
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2625
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
@@ -61,10 +60,10 @@ public CompletableFuture<ReceiveMessageResponse> processRequest(ReceiveMessageRe
6160
return queueAttributesManager.getReceiveMessageTimeout(rq, config.messageMinWaitDuration()).thenCompose(waitTimeMs -> {
6261
CompletableFuture<ReceiveMessageResponse> receiveMessageFuture = new CompletableFuture<>();
6362
receiveQueueBuffer.receiveMessage(receiveMessageFuture, numMessages);
64-
CompletableFuture<ReceiveMessageResponse> timeoutFuture = new CompletableFuture<>();
65-
executor.schedule(() -> timeoutFuture.complete(ReceiveMessageResponse.builder().build()), waitTimeMs.toMillis(),
63+
executor.schedule(() -> receiveMessageFuture.complete(ReceiveMessageResponse.builder().build()),
64+
waitTimeMs.toMillis(),
6665
TimeUnit.MILLISECONDS);
67-
return receiveMessageFuture.applyToEither(timeoutFuture, Function.identity());
66+
return receiveMessageFuture;
6867

6968
});
7069
}

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,6 @@ private String checkBatchingEligibility(ReceiveMessageRequest rq) {
9191
if (rq.overrideConfiguration().isPresent()) {
9292
return "Request has override configurations.";
9393
}
94-
if (rq.waitTimeSeconds() != null && rq.waitTimeSeconds() != 0) {
95-
return "Request has long polling enabled.";
96-
}
9794
return null;
9895
}
9996

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.sqs.batchmanager;
17+
18+
19+
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
20+
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
21+
import static com.github.tomakehurst.wiremock.client.WireMock.post;
22+
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
23+
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
24+
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
25+
import static org.assertj.core.api.Assertions.assertThat;
26+
27+
import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
28+
import java.net.URI;
29+
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicInteger;
32+
import org.junit.jupiter.api.Test;
33+
import org.junit.jupiter.api.extension.ExtendWith;
34+
import org.junit.jupiter.api.extension.RegisterExtension;
35+
import org.mockito.junit.jupiter.MockitoExtension;
36+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
37+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
38+
import software.amazon.awssdk.core.interceptor.Context;
39+
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
40+
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
41+
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
42+
import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder;
43+
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
44+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
45+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
46+
47+
@ExtendWith(MockitoExtension.class)
48+
class ReceiveBatchesMockTest {
49+
50+
private static final int OFFSET_DELAY = 100;
51+
// Default queue attribute response with placeholders for parameters
52+
private static final String QUEUE_ATTRIBUTE_RESPONSE = "{\n" +
53+
" \"Attributes\": {\n" +
54+
" \"ReceiveMessageWaitTimeSeconds\": \"%s\",\n" +
55+
" \"VisibilityTimeout\": \"%s\"\n" +
56+
" }\n" +
57+
"}";
58+
@RegisterExtension
59+
static WireMockExtension wireMock = WireMockExtension.newInstance()
60+
.options(wireMockConfig().dynamicPort().dynamicHttpsPort())
61+
.configureStaticDsl(true)
62+
.build();
63+
private SqsAsyncBatchManager receiveMessageBatchManager;
64+
65+
@Test
66+
void testTimeoutOccursBeforeSqsResponds() throws Exception {
67+
setupBatchManager();
68+
69+
// Delays for testing
70+
int queueAttributesApiDelay = 51;
71+
int receiveMessagesDelay = 150;
72+
73+
// Stub the WireMock server to simulate delayed responses
74+
mockQueueAttributes(queueAttributesApiDelay);
75+
mockReceiveMessages(receiveMessagesDelay, 2);
76+
77+
CompletableFuture<ReceiveMessageResponse> future = batchManagerReceiveMessage();
78+
assertThat(future.get(1000, TimeUnit.MILLISECONDS).messages()).isEmpty();
79+
80+
Thread.sleep(queueAttributesApiDelay + receiveMessagesDelay + OFFSET_DELAY);
81+
82+
CompletableFuture<ReceiveMessageResponse> secondCall = batchManagerReceiveMessage();
83+
assertThat(secondCall.get(1000, TimeUnit.MILLISECONDS).messages()).hasSize(2);
84+
}
85+
86+
@Test
87+
void testResponseReceivedBeforeTimeout() throws Exception {
88+
setupBatchManager();
89+
90+
// Delays for testing
91+
int queueAttributesApiDelay = 5;
92+
int receiveMessagesDelay = 5;
93+
94+
// Set short delays to ensure response before timeout
95+
mockQueueAttributes(queueAttributesApiDelay);
96+
mockReceiveMessages(receiveMessagesDelay, 2);
97+
98+
CompletableFuture<ReceiveMessageResponse> future = batchManagerReceiveMessage();
99+
assertThat(future.get(1000, TimeUnit.MILLISECONDS).messages()).hasSize(2);
100+
}
101+
102+
@Test
103+
void testTimeoutOccursBasedOnUserSetWaitTime() throws Exception {
104+
setupBatchManager();
105+
106+
// Delays for testing
107+
int queueAttributesApiDelay = 100;
108+
int receiveMessagesDelay = 100;
109+
110+
// Configure response delays
111+
mockQueueAttributes(queueAttributesApiDelay);
112+
mockReceiveMessages(receiveMessagesDelay, 2);
113+
114+
CompletableFuture<ReceiveMessageResponse> future = receiveMessageWithWaitTime(1);
115+
assertThat(future.get(1000, TimeUnit.MILLISECONDS).messages()).hasSize(2);
116+
}
117+
118+
@Test
119+
void testMessagesAreFetchedFromBufferWhenAvailable() throws Exception {
120+
ApiCaptureInterceptor interceptor = new ApiCaptureInterceptor();
121+
SqsAsyncClient sqsAsyncClient = getAsyncClientBuilder()
122+
.overrideConfiguration(o -> o.addExecutionInterceptor(interceptor))
123+
.build();
124+
125+
SqsAsyncBatchManager batchManager = sqsAsyncClient.batchManager();
126+
127+
// Delays for testing
128+
int queueAttributesApiDelay = 100;
129+
int receiveMessagesDelay = 1000;
130+
131+
// Setup delayed responses
132+
mockQueueAttributes(queueAttributesApiDelay);
133+
mockReceiveMessages(receiveMessagesDelay, 10);
134+
135+
// First message should be empty due to delay
136+
CompletableFuture<ReceiveMessageResponse> firstMessage =
137+
batchManager.receiveMessage(r -> r.queueUrl("test").maxNumberOfMessages(1));
138+
assertThat(firstMessage.get(1000, TimeUnit.MILLISECONDS).messages()).isEmpty();
139+
140+
// Wait for SQS message to be processed
141+
Thread.sleep(queueAttributesApiDelay + receiveMessagesDelay + OFFSET_DELAY);
142+
assertThat(interceptor.receiveApiCalls.get()).isEqualTo(1);
143+
assertThat(interceptor.getQueueAttributesApiCalls.get()).isEqualTo(1);
144+
interceptor.reset();
145+
146+
// Fetch 10 messages from the buffer
147+
for (int i = 0; i < 10; i++) {
148+
CompletableFuture<ReceiveMessageResponse> future =
149+
batchManager.receiveMessage(r -> r.queueUrl("test").maxNumberOfMessages(1));
150+
ReceiveMessageResponse response = future.get(500, TimeUnit.MILLISECONDS);
151+
assertThat(response.messages()).hasSize(1);
152+
}
153+
assertThat(interceptor.receiveApiCalls.get()).isEqualTo(0);
154+
assertThat(interceptor.getQueueAttributesApiCalls.get()).isEqualTo(0);
155+
}
156+
157+
// Utility methods for reuse across tests
158+
159+
private void setupBatchManager() {
160+
SqsAsyncClient sqsAsyncClient = getAsyncClientBuilder().build();
161+
receiveMessageBatchManager = sqsAsyncClient.batchManager();
162+
}
163+
164+
private void mockQueueAttributes(int delay) {
165+
stubFor(post(urlEqualTo("/"))
166+
.withHeader("x-amz-target", equalTo("AmazonSQS.GetQueueAttributes"))
167+
.willReturn(aResponse()
168+
.withStatus(200)
169+
.withBody(String.format(QUEUE_ATTRIBUTE_RESPONSE, "0", "30"))
170+
.withFixedDelay(delay)));
171+
}
172+
173+
private void mockReceiveMessages(int delay, int numMessages) {
174+
stubFor(post(urlEqualTo("/"))
175+
.withHeader("x-amz-target", equalTo("AmazonSQS.ReceiveMessage"))
176+
.willReturn(aResponse()
177+
.withStatus(200)
178+
.withBody(generateMessagesJson(numMessages))
179+
.withFixedDelay(delay)));
180+
}
181+
182+
private CompletableFuture<ReceiveMessageResponse> batchManagerReceiveMessage() {
183+
return receiveMessageBatchManager.receiveMessage(r -> r.queueUrl("test"));
184+
}
185+
186+
private CompletableFuture<ReceiveMessageResponse> receiveMessageWithWaitTime(int waitTimeSeconds) {
187+
return receiveMessageBatchManager.receiveMessage(r -> r.queueUrl("test").waitTimeSeconds(waitTimeSeconds));
188+
}
189+
190+
// Helper method for building the async client
191+
private SqsAsyncClientBuilder getAsyncClientBuilder() {
192+
return SqsAsyncClient.builder()
193+
.endpointOverride(URI.create(String.format("http://localhost:%s/", wireMock.getPort())))
194+
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "secret")));
195+
}
196+
197+
// Utility to generate the response for multiple messages in JSON format
198+
private String generateMessagesJson(int numMessages) {
199+
StringBuilder sb = new StringBuilder();
200+
sb.append("{\n \"Messages\": [\n");
201+
for (int i = 0; i < numMessages; i++) {
202+
sb.append(" {\n");
203+
sb.append(" \"Body\": \"Message 6\",\n");
204+
sb.append(" \"MD5OfBody\": \"05d2a129ebdb00cfa6e92aaf9f090547\",\n");
205+
sb.append(" \"MessageId\": \"57d2\",\n");
206+
sb.append(" \"ReceiptHandle\": \"AQEB\"\n");
207+
sb.append(" }");
208+
if (i < numMessages - 1) {
209+
sb.append(",");
210+
}
211+
sb.append("\n");
212+
}
213+
sb.append(" ]\n}");
214+
return sb.toString();
215+
}
216+
217+
// Interceptor to capture the API call counts
218+
static class ApiCaptureInterceptor implements ExecutionInterceptor {
219+
220+
AtomicInteger receiveApiCalls = new AtomicInteger();
221+
AtomicInteger getQueueAttributesApiCalls = new AtomicInteger();
222+
223+
void reset() {
224+
receiveApiCalls.set(0);
225+
getQueueAttributesApiCalls.set(0);
226+
}
227+
228+
@Override
229+
public void afterExecution(Context.AfterExecution context, ExecutionAttributes executionAttributes) {
230+
if (context.request() instanceof ReceiveMessageRequest) {
231+
receiveApiCalls.incrementAndGet();
232+
}
233+
if (context.request() instanceof GetQueueAttributesRequest) {
234+
getQueueAttributesApiCalls.incrementAndGet();
235+
}
236+
}
237+
}
238+
}

services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ private static Stream<Arguments> provideBatchOverrideConfigurations() {
236236
"Request has override configurations."
237237
),
238238
Arguments.of(
239-
"Buffering disabled, with waitTimeSeconds in ReceiveMessageRequest",
239+
"Buffering enabled, with waitTimeSeconds in ReceiveMessageRequest",
240240
ResponseBatchConfiguration.builder()
241241
.messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID))
242242
.build(),
@@ -245,8 +245,8 @@ private static Stream<Arguments> provideBatchOverrideConfigurations() {
245245
.maxNumberOfMessages(3)
246246
.waitTimeSeconds(3)
247247
.build(),
248-
false,
249-
"Request has long polling enabled."
248+
true,
249+
""
250250
)
251251
);
252252
}

0 commit comments

Comments
 (0)