Skip to content

Commit

Permalink
Add autoconfiguration for queue not found strategy (#1190)
Browse files Browse the repository at this point in the history
Add SQS QueueNotFoundStrategy auto-configuration support

Fixes #1175

---------

Co-authored-by: wmz7year <wmz7year@users.noreply.github.com>
  • Loading branch information
tomazfernandes and wmz7year authored Aug 17, 2024
1 parent 7570672 commit 4a90b73
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 12 deletions.
3 changes: 2 additions & 1 deletion docs/src/main/asciidoc/_configprops.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
|spring.cloud.aws.sqs.listener.max-concurrent-messages | | The maximum concurrent messages that can be processed simultaneously for each queue. Note that if acknowledgement batching is being used, the actual maximum number of messages inflight might be higher.
|spring.cloud.aws.sqs.listener.max-messages-per-poll | | The maximum number of messages to be retrieved in a single poll to SQS.
|spring.cloud.aws.sqs.listener.poll-timeout | | The maximum amount of time for a poll to SQS.
|spring.cloud.aws.sqs.queue-not-found-strategy | `+++CREATE+++` | Configures strategy when queue not found.
|spring.cloud.aws.sqs.region | | Overrides the default region.

|===
|===
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ The Spring Boot Starter for SQS provides the following auto-configuration proper
| <<maxConcurrentMessages, `spring.cloud.aws.sqs.listener.max-inflight-messages-per-queue`>> | Maximum number of inflight messages per queue. | No | 10
| <<maxMessagesPerPoll, `spring.cloud.aws.sqs.listener.max-messages-per-poll`>> | Maximum number of messages to be received per poll. | No | 10
| <<pollTimeout, `spring.cloud.aws.sqs.listener.poll-timeout`>> | Maximum amount of time to wait for messages in a poll. | No | 10 seconds
| `spring.cloud.aws.sqs.queue-not-found-strategy` | The strategy to be used by SqsTemplate and SqsListeners when a queue does not exist. | No | CREATE
|===


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor;
import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import io.awspring.cloud.sqs.operations.SqsTemplateBuilder;
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
Expand All @@ -51,6 +52,7 @@
*
* @author Tomaz Fernandes
* @author Maciej Walkowiak
* @author Wei Jiang
* @since 3.0
*/
@AutoConfiguration
Expand Down Expand Up @@ -82,6 +84,9 @@ public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<Obj
SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient);
objectMapperProvider
.ifAvailable(om -> builder.configureDefaultConverter(converter -> converter.setObjectMapper(om)));
if (sqsProperties.getQueueNotFoundStrategy() != null) {
builder.configure((options) -> options.queueNotFoundStrategy(sqsProperties.getQueueNotFoundStrategy()));
}
return builder.build();
}

Expand Down Expand Up @@ -112,8 +117,9 @@ private void setObjectMapper(SqsMessageListenerContainerFactory<Object> factory,
factory.configure(options -> options.messageConverter(messageConverter));
}

private void configureContainerOptions(ContainerOptionsBuilder<?, ?> options) {
private void configureContainerOptions(SqsContainerOptionsBuilder options) {
PropertyMapper mapper = PropertyMapper.get().alwaysApplyingWhenNonNull();
mapper.from(this.sqsProperties.getQueueNotFoundStrategy()).to(options::queueNotFoundStrategy);
mapper.from(this.sqsProperties.getListener().getMaxConcurrentMessages()).to(options::maxConcurrentMessages);
mapper.from(this.sqsProperties.getListener().getMaxMessagesPerPoll()).to(options::maxMessagesPerPoll);
mapper.from(this.sqsProperties.getListener().getPollTimeout()).to(options::pollTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.awspring.cloud.autoconfigure.sqs;

import io.awspring.cloud.autoconfigure.AwsClientProperties;
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;

import java.time.Duration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.lang.Nullable;
Expand All @@ -24,6 +26,7 @@
* Properties related to AWS SQS.
*
* @author Tomaz Fernandes
* @author Wei Jiang
* @since 3.0
*/
@ConfigurationProperties(prefix = SqsProperties.PREFIX)
Expand All @@ -44,6 +47,26 @@ public void setListener(Listener listener) {
this.listener = listener;
}

@Nullable
private QueueNotFoundStrategy queueNotFoundStrategy;

/**
* Return the strategy to use if the queue is not found.
* @return the {@link QueueNotFoundStrategy}
*/
@Nullable
public QueueNotFoundStrategy getQueueNotFoundStrategy() {
return queueNotFoundStrategy;
}

/**
* Set the strategy to use if the queue is not found.
* @param queueNotFoundStrategy the strategy to set.
*/
public void setQueueNotFoundStrategy(QueueNotFoundStrategy queueNotFoundStrategy) {
this.queueNotFoundStrategy = queueNotFoundStrategy;
}

public static class Listener {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,26 @@
package io.awspring.cloud.autoconfigure.sqs;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration;
import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration;
import io.awspring.cloud.sqs.QueueAttributesResolvingException;
import io.awspring.cloud.sqs.annotation.SqsListener;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import org.testcontainers.shaded.org.bouncycastle.util.Arrays;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;

import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.ApplicationContextException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.testcontainers.containers.localstack.LocalStackContainer;
Expand All @@ -40,6 +47,7 @@
* Integration tests for {@link SqsAutoConfiguration}.
*
* @author Tomaz Fernandes
* @author Wei Jiang
*/
@SpringBootTest
@Testcontainers
Expand All @@ -49,18 +57,30 @@ class SqsAutoConfigurationIntegrationTest {

private static final String PAYLOAD = "Test";

private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withPropertyValues("spring.cloud.aws.sqs.region=eu-west-1",
"spring.cloud.aws.sqs.endpoint=" + localstack.getEndpoint(),
"spring.cloud.aws.credentials.access-key=noop", "spring.cloud.aws.credentials.secret-key=noop",
"spring.cloud.aws.region.static=eu-west-1")
.withConfiguration(AutoConfigurations.of(RegionProviderAutoConfiguration.class,
CredentialsProviderAutoConfiguration.class, SqsAutoConfiguration.class, AwsAutoConfiguration.class,
ListenerConfiguration.class));

@Container
static LocalStackContainer localstack = new LocalStackContainer(
DockerImageName.parse("localstack/localstack:3.2.0"));
DockerImageName.parse("localstack/localstack:3.2.0"));

static {
localstack.start();
}

private static final String[] BASE_PARAMS = {"spring.cloud.aws.sqs.region=eu-west-1",
"spring.cloud.aws.sqs.endpoint=" + localstack.getEndpoint(),
"spring.cloud.aws.credentials.access-key=noop", "spring.cloud.aws.credentials.secret-key=noop",
"spring.cloud.aws.region.static=eu-west-1"};

private static final AutoConfigurations BASE_CONFIGURATIONS = AutoConfigurations.of(RegionProviderAutoConfiguration.class,
CredentialsProviderAutoConfiguration.class, SqsAutoConfiguration.class, AwsAutoConfiguration.class,
ListenerConfiguration.class);

private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withPropertyValues(BASE_PARAMS)
.withConfiguration(BASE_CONFIGURATIONS);

private final ApplicationContextRunner applicationContextRunnerWithFailStrategy = new ApplicationContextRunner()
.withPropertyValues(Arrays.append(BASE_PARAMS, "spring.cloud.aws.sqs.queue-not-found-strategy=fail"))
.withConfiguration(BASE_CONFIGURATIONS);

@SuppressWarnings("unchecked")
@Test
Expand All @@ -73,6 +93,25 @@ void sendsAndReceivesMessage() {
});
}

@Test
void containerReceivesMessageWithFailQueueNotFoundStrategy() {
applicationContextRunnerWithFailStrategy.run(context ->
assertThatThrownBy(() -> context.getBean(SqsTemplate.class).send(to -> to.queue("QUEUE_DOES_NOT_EXISTS").payload(PAYLOAD)))
.isInstanceOf(IllegalStateException.class).cause().isInstanceOf(ApplicationContextException.class).cause()
.isInstanceOf(CompletionException.class).cause().isInstanceOf(QueueAttributesResolvingException.class)
.cause().isInstanceOf(QueueDoesNotExistException.class));
}

@Test
void templateReceivesMessageWithFailQueueNotFoundStrategy() {
applicationContextRunnerWithFailStrategy
.run(context ->
assertThatThrownBy(() -> context.getBean(SqsTemplate.class).receive("QUEUE_DOES_NOT_EXIST", String.class))
.isInstanceOf(IllegalStateException.class).cause().isInstanceOf(ApplicationContextException.class).cause()
.isInstanceOf(CompletionException.class).cause().isInstanceOf(QueueAttributesResolvingException.class)
.cause().isInstanceOf(QueueDoesNotExistException.class));
}

static class Listener {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.operations.SqsTemplate;
Expand Down Expand Up @@ -60,6 +61,7 @@
* Tests for {@link SqsAutoConfiguration}.
*
* @author Tomaz Fernandes
* @author Wei Jiang
*/
class SqsAutoConfigurationTest {

Expand Down Expand Up @@ -110,6 +112,18 @@ void withCustomEndpoint() {
});
}

@Test
void withCustomQueueNotFoundStrategy() {
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.queue-not-found-strategy=fail").run(context -> {
assertThat(context).hasSingleBean(SqsProperties.class);
SqsProperties sqsProperties = context.getBean(SqsProperties.class);
assertThat(context).hasSingleBean(SqsAsyncClient.class);
assertThat(context).hasSingleBean(SqsTemplate.class);
assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class);
assertThat(sqsProperties.getQueueNotFoundStrategy()).isEqualTo(QueueNotFoundStrategy.FAIL);
});
}

// @formatter:off
@Test
void customSqsClientConfigurer() {
Expand Down

0 comments on commit 4a90b73

Please sign in to comment.