Skip to content

Commit

Permalink
Merge branch 'main' into autoStartup_support
Browse files Browse the repository at this point in the history
  • Loading branch information
tomazfernandes authored Jun 25, 2023
2 parents 1867214 + 4e37bb3 commit 413d492
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ protected TaskExecutor createTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int poolSize = getContainerOptions().getMaxConcurrentMessages() * this.messageSources.size();
executor.setMaxPoolSize(poolSize);
executor.setCorePoolSize(getContainerOptions().getMaxMessagesPerPoll());
executor.setCorePoolSize(poolSize);
// Necessary due to a small racing condition between releasing the permit and releasing the thread.
executor.setQueueCapacity(poolSize);
executor.setAllowCoreThreadTimeOut(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.awspring.cloud.sqs.CompletableFutures;
Expand Down Expand Up @@ -53,8 +54,10 @@
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -119,6 +122,8 @@ class SqsIntegrationTests extends BaseSqsIntegrationTest {

static final String MANUALLY_CREATE_FACTORY_QUEUE_NAME = "manually_create_factory_test_queue";

static final String MAX_CONCURRENT_MESSAGES_QUEUE_NAME = "max_concurrent_messages_test_queue";

static final String LOW_RESOURCE_FACTORY = "lowResourceFactory";

static final String MANUAL_ACK_FACTORY = "manualAcknowledgementFactory";
Expand All @@ -143,7 +148,8 @@ static void beforeTests() {
singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "20")),
createQueue(client, MANUALLY_CREATE_CONTAINER_QUEUE_NAME),
createQueue(client, MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME),
createQueue(client, MANUALLY_CREATE_FACTORY_QUEUE_NAME)).join();
createQueue(client, MANUALLY_CREATE_FACTORY_QUEUE_NAME),
createQueue(client, MAX_CONCURRENT_MESSAGES_QUEUE_NAME)).join();
}

@Autowired
Expand Down Expand Up @@ -294,6 +300,20 @@ void manuallyCreatesFactory() throws Exception {
assertThat(latchContainer.manuallyCreatedFactorySinkLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
void maxConcurrentMessages() {
List<Message<String>> messages1 = IntStream.range(0, 10)
.mapToObj(index -> "maxConcurrentMessages-payload-" + index)
.map(payload -> MessageBuilder.withPayload(payload).build()).collect(Collectors.toList());
List<Message<String>> messages2 = IntStream.range(10, 20)
.mapToObj(index -> "maxConcurrentMessages-payload-" + index)
.map(payload -> MessageBuilder.withPayload(payload).build()).collect(Collectors.toList());
sqsTemplate.sendManyAsync(MAX_CONCURRENT_MESSAGES_QUEUE_NAME, messages1);
sqsTemplate.sendManyAsync(MAX_CONCURRENT_MESSAGES_QUEUE_NAME, messages2);
logger.debug("Sent messages to queue {} with messages {} and {}", MAX_CONCURRENT_MESSAGES_QUEUE_NAME, messages1, messages2);
assertDoesNotThrow(() -> latchContainer.maxConcurrentMessagesBarrier.await(10, TimeUnit.SECONDS));
}

static class ReceivesMessageListener {

@Autowired
Expand Down Expand Up @@ -418,6 +438,18 @@ void listen(Message<String> message, MessageHeaders headers, Acknowledgement ack
}
}

static class MaxConcurrentMessagesListener {

@Autowired
LatchContainer latchContainer;

@SqsListener(queueNames = MAX_CONCURRENT_MESSAGES_QUEUE_NAME, maxMessagesPerPoll = "10", maxConcurrentMessages = "20", id = "max-concurrent-messages")
void listen(String message) throws BrokenBarrierException, InterruptedException {
logger.debug("Received message in Listener Method: " + message);
latchContainer.maxConcurrentMessagesBarrier.await();
}
}

static class LatchContainer {

final CountDownLatch receivesMessageLatch = new CountDownLatch(1);
Expand All @@ -441,6 +473,7 @@ static class LatchContainer {
final CountDownLatch acknowledgementCallbackBatchLatch = new CountDownLatch(1);
final CountDownLatch acknowledgementCallbackErrorLatch = new CountDownLatch(1);
final CountDownLatch manuallyInactiveCreatedContainerLatch = new CountDownLatch(1);
final CyclicBarrier maxConcurrentMessagesBarrier = new CyclicBarrier(21);

}

Expand Down Expand Up @@ -649,6 +682,11 @@ ResolvesParameterTypesListener resolvesParameterTypesListener() {
return new ResolvesParameterTypesListener();
}

@Bean
MaxConcurrentMessagesListener maxConcurrentMessagesListener() {
return new MaxConcurrentMessagesListener();
}

@Bean
SqsListenerConfigurer customizer() {
return registrar -> {
Expand Down

0 comments on commit 413d492

Please sign in to comment.