diff --git a/docs/src/main/asciidoc/_configprops.adoc b/docs/src/main/asciidoc/_configprops.adoc index e44304c89..53545d674 100644 --- a/docs/src/main/asciidoc/_configprops.adoc +++ b/docs/src/main/asciidoc/_configprops.adoc @@ -76,6 +76,7 @@ |spring.cloud.aws.sqs.listener.max-concurrent-messages | | The maximum concurrent messages that can be processed simultaneously for each queue. Note that if acknowledgement batching is being used, the actual maximum number of messages inflight might be higher. |spring.cloud.aws.sqs.listener.max-messages-per-poll | | The maximum number of messages to be retrieved in a single poll to SQS. |spring.cloud.aws.sqs.listener.poll-timeout | | The maximum amount of time for a poll to SQS. +|spring.cloud.aws.sqs.queue-not-found-strategy | `+++CREATE+++` | Configures strategy when queue not found. |spring.cloud.aws.sqs.region | | Overrides the default region. -|=== \ No newline at end of file +|=== diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index d1d4ca766..df32b8cee 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -786,6 +786,7 @@ The Spring Boot Starter for SQS provides the following auto-configuration proper | <> | Maximum number of inflight messages per queue. | No | 10 | <> | Maximum number of messages to be received per poll. | No | 10 | <> | Maximum amount of time to wait for messages in a poll. | No | 10 seconds +| `spring.cloud.aws.sqs.queue-not-found-strategy` | The strategy to be used by SqsTemplate and SqsListeners when a queue does not exist. | No | CREATE |=== diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java index aa41805a9..a8c25161e 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java @@ -29,6 +29,7 @@ import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor; +import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder; import io.awspring.cloud.sqs.operations.SqsTemplate; import io.awspring.cloud.sqs.operations.SqsTemplateBuilder; import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter; @@ -51,6 +52,7 @@ * * @author Tomaz Fernandes * @author Maciej Walkowiak + * @author Wei Jiang * @since 3.0 */ @AutoConfiguration @@ -82,6 +84,9 @@ public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider builder.configureDefaultConverter(converter -> converter.setObjectMapper(om))); + if (sqsProperties.getQueueNotFoundStrategy() != null) { + builder.configure((options) -> options.queueNotFoundStrategy(sqsProperties.getQueueNotFoundStrategy())); + } return builder.build(); } @@ -112,8 +117,9 @@ private void setObjectMapper(SqsMessageListenerContainerFactory factory, factory.configure(options -> options.messageConverter(messageConverter)); } - private void configureContainerOptions(ContainerOptionsBuilder options) { + private void configureContainerOptions(SqsContainerOptionsBuilder options) { PropertyMapper mapper = PropertyMapper.get().alwaysApplyingWhenNonNull(); + mapper.from(this.sqsProperties.getQueueNotFoundStrategy()).to(options::queueNotFoundStrategy); mapper.from(this.sqsProperties.getListener().getMaxConcurrentMessages()).to(options::maxConcurrentMessages); mapper.from(this.sqsProperties.getListener().getMaxMessagesPerPoll()).to(options::maxMessagesPerPoll); mapper.from(this.sqsProperties.getListener().getPollTimeout()).to(options::pollTimeout); diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java index 7facf18cf..d372ccad1 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java @@ -16,6 +16,8 @@ package io.awspring.cloud.autoconfigure.sqs; import io.awspring.cloud.autoconfigure.AwsClientProperties; +import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy; + import java.time.Duration; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.lang.Nullable; @@ -24,6 +26,7 @@ * Properties related to AWS SQS. * * @author Tomaz Fernandes + * @author Wei Jiang * @since 3.0 */ @ConfigurationProperties(prefix = SqsProperties.PREFIX) @@ -44,6 +47,26 @@ public void setListener(Listener listener) { this.listener = listener; } + @Nullable + private QueueNotFoundStrategy queueNotFoundStrategy; + + /** + * Return the strategy to use if the queue is not found. + * @return the {@link QueueNotFoundStrategy} + */ + @Nullable + public QueueNotFoundStrategy getQueueNotFoundStrategy() { + return queueNotFoundStrategy; + } + + /** + * Set the strategy to use if the queue is not found. + * @param queueNotFoundStrategy the strategy to set. + */ + public void setQueueNotFoundStrategy(QueueNotFoundStrategy queueNotFoundStrategy) { + this.queueNotFoundStrategy = queueNotFoundStrategy; + } + public static class Listener { /** diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationIntegrationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationIntegrationTest.java index 461d4d930..6d3f5ee2e 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationIntegrationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationIntegrationTest.java @@ -16,12 +16,18 @@ package io.awspring.cloud.autoconfigure.sqs; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration; import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import io.awspring.cloud.sqs.QueueAttributesResolvingException; import io.awspring.cloud.sqs.annotation.SqsListener; import io.awspring.cloud.sqs.operations.SqsTemplate; +import org.testcontainers.shaded.org.bouncycastle.util.Arrays; +import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; + +import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -29,6 +35,7 @@ import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.ApplicationContextException; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.testcontainers.containers.localstack.LocalStackContainer; @@ -40,6 +47,7 @@ * Integration tests for {@link SqsAutoConfiguration}. * * @author Tomaz Fernandes + * @author Wei Jiang */ @SpringBootTest @Testcontainers @@ -49,18 +57,30 @@ class SqsAutoConfigurationIntegrationTest { private static final String PAYLOAD = "Test"; - private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() - .withPropertyValues("spring.cloud.aws.sqs.region=eu-west-1", - "spring.cloud.aws.sqs.endpoint=" + localstack.getEndpoint(), - "spring.cloud.aws.credentials.access-key=noop", "spring.cloud.aws.credentials.secret-key=noop", - "spring.cloud.aws.region.static=eu-west-1") - .withConfiguration(AutoConfigurations.of(RegionProviderAutoConfiguration.class, - CredentialsProviderAutoConfiguration.class, SqsAutoConfiguration.class, AwsAutoConfiguration.class, - ListenerConfiguration.class)); - @Container static LocalStackContainer localstack = new LocalStackContainer( - DockerImageName.parse("localstack/localstack:3.2.0")); + DockerImageName.parse("localstack/localstack:3.2.0")); + + static { + localstack.start(); + } + + private static final String[] BASE_PARAMS = {"spring.cloud.aws.sqs.region=eu-west-1", + "spring.cloud.aws.sqs.endpoint=" + localstack.getEndpoint(), + "spring.cloud.aws.credentials.access-key=noop", "spring.cloud.aws.credentials.secret-key=noop", + "spring.cloud.aws.region.static=eu-west-1"}; + + private static final AutoConfigurations BASE_CONFIGURATIONS = AutoConfigurations.of(RegionProviderAutoConfiguration.class, + CredentialsProviderAutoConfiguration.class, SqsAutoConfiguration.class, AwsAutoConfiguration.class, + ListenerConfiguration.class); + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withPropertyValues(BASE_PARAMS) + .withConfiguration(BASE_CONFIGURATIONS); + + private final ApplicationContextRunner applicationContextRunnerWithFailStrategy = new ApplicationContextRunner() + .withPropertyValues(Arrays.append(BASE_PARAMS, "spring.cloud.aws.sqs.queue-not-found-strategy=fail")) + .withConfiguration(BASE_CONFIGURATIONS); @SuppressWarnings("unchecked") @Test @@ -73,6 +93,25 @@ void sendsAndReceivesMessage() { }); } + @Test + void containerReceivesMessageWithFailQueueNotFoundStrategy() { + applicationContextRunnerWithFailStrategy.run(context -> + assertThatThrownBy(() -> context.getBean(SqsTemplate.class).send(to -> to.queue("QUEUE_DOES_NOT_EXISTS").payload(PAYLOAD))) + .isInstanceOf(IllegalStateException.class).cause().isInstanceOf(ApplicationContextException.class).cause() + .isInstanceOf(CompletionException.class).cause().isInstanceOf(QueueAttributesResolvingException.class) + .cause().isInstanceOf(QueueDoesNotExistException.class)); + } + + @Test + void templateReceivesMessageWithFailQueueNotFoundStrategy() { + applicationContextRunnerWithFailStrategy + .run(context -> + assertThatThrownBy(() -> context.getBean(SqsTemplate.class).receive("QUEUE_DOES_NOT_EXIST", String.class)) + .isInstanceOf(IllegalStateException.class).cause().isInstanceOf(ApplicationContextException.class).cause() + .isInstanceOf(CompletionException.class).cause().isInstanceOf(QueueAttributesResolvingException.class) + .cause().isInstanceOf(QueueDoesNotExistException.class)); + } + static class Listener { @Autowired diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java index 57a72f0ea..edb40fcef 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java @@ -32,6 +32,7 @@ import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; import io.awspring.cloud.sqs.listener.ContainerOptions; import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder; +import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy; import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.operations.SqsTemplate; @@ -60,6 +61,7 @@ * Tests for {@link SqsAutoConfiguration}. * * @author Tomaz Fernandes + * @author Wei Jiang */ class SqsAutoConfigurationTest { @@ -110,6 +112,18 @@ void withCustomEndpoint() { }); } + @Test + void withCustomQueueNotFoundStrategy() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.queue-not-found-strategy=fail").run(context -> { + assertThat(context).hasSingleBean(SqsProperties.class); + SqsProperties sqsProperties = context.getBean(SqsProperties.class); + assertThat(context).hasSingleBean(SqsAsyncClient.class); + assertThat(context).hasSingleBean(SqsTemplate.class); + assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class); + assertThat(sqsProperties.getQueueNotFoundStrategy()).isEqualTo(QueueNotFoundStrategy.FAIL); + }); + } + // @formatter:off @Test void customSqsClientConfigurer() {