From 8532300c8bd50406c8c55c1e04f7d656a0151812 Mon Sep 17 00:00:00 2001 From: Daniel Pelaez Date: Wed, 17 May 2023 19:38:23 -0500 Subject: [PATCH 1/6] Add phase to the SqsMessageListenerContainer and its builder for the SmartLifecycle interface --- .../AbstractMessageListenerContainer.java | 15 +++++++++++++++ .../sqs/listener/SqsMessageListenerContainer.java | 12 +++++++++++- .../SqsMessageListenerContainerTests.java | 7 ++++++- 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java index 5375407d0..fcccc27b0 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java @@ -69,6 +69,8 @@ public abstract class AbstractMessageListenerContainer acknowledgementResultCallback = new AsyncAcknowledgementResultCallback() { }; + private int phase; + /** * Create an instance with the provided {@link ContainerOptions} * @param containerOptions the options instance. @@ -162,6 +164,14 @@ public void setComponentFactories(Collection> co this.containerComponentFactories = containerComponentFactories; } + /** + * Set the phase for the SmartLifecycle for this container instance. + * @param phase the phase. + */ + public void setPhase(int phase) { + this.phase = phase; + } + /** * Returns the {@link ContainerOptions} instance for this container. Changed options will take effect on container * restart. @@ -252,6 +262,11 @@ public boolean isRunning() { return this.isRunning; } + @Override + public int getPhase() { + return this.phase; + } + @Override public void start() { if (this.isRunning) { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainer.java index 2f79772c0..5b2bfd950 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainer.java @@ -31,6 +31,7 @@ import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.SmartLifecycle; import org.springframework.messaging.Message; import org.springframework.util.Assert; import software.amazon.awssdk.services.sqs.SqsAsyncClient; @@ -177,6 +178,8 @@ public static class Builder { private AcknowledgementResultCallback acknowledgementResultCallback; + private Integer phase; + public Builder id(String id) { this.id = id; return this; @@ -250,6 +253,11 @@ public Builder configure(Consumer options) { return this; } + public Builder phase(Integer phase) { + this.phase = phase; + return this; + } + // @formatter:off public SqsMessageListenerContainer build() { SqsMessageListenerContainer container = new SqsMessageListenerContainer<>(this.sqsAsyncClient); @@ -262,9 +270,11 @@ public SqsMessageListenerContainer build() { .acceptIfNotNull(this.acknowledgementResultCallback, container::setAcknowledgementResultCallback) .acceptIfNotNull(this.asyncAcknowledgementResultCallback, container::setAcknowledgementResultCallback) .acceptIfNotNull(this.containerComponentFactories, container::setComponentFactories) - .acceptIfNotEmpty(this.queueNames, container::setQueueNames); + .acceptIfNotEmpty(this.queueNames, container::setQueueNames) + .acceptIfNotNullOrElse(container::setPhase, this.phase, SmartLifecycle.DEFAULT_PHASE); this.messageInterceptors.forEach(container::addMessageInterceptor); this.asyncMessageInterceptors.forEach(container::addMessageInterceptor); + container.configure(this.optionsConsumer); return container; } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainerTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainerTests.java index 0dbc77fcf..5e0237301 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainerTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainerTests.java @@ -36,6 +36,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import org.junit.jupiter.api.Test; +import org.springframework.context.SmartLifecycle; import org.springframework.core.task.SimpleAsyncTaskExecutor; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; @@ -61,11 +62,12 @@ void shouldCreateFromBuilderWithBlockingComponents() { List> componentFactories = Collections .singletonList(componentFactory); List queueNames = Arrays.asList("test-queue-name-1", "test-queue-name-2"); + Integer phase = 2; SqsMessageListenerContainer container = SqsMessageListenerContainer.builder().messageListener(listener) .sqsAsyncClient(client).errorHandler(errorHandler).componentFactories(componentFactories) .acknowledgementResultCallback(callback).messageInterceptor(interceptor1) - .messageInterceptor(interceptor2).queueNames(queueNames).build(); + .messageInterceptor(interceptor2).queueNames(queueNames).phase(phase).build(); assertThat(container.getMessageListener()) .isInstanceOf(AsyncComponentAdapters.AbstractThreadingComponentAdapter.class) @@ -90,6 +92,8 @@ void shouldCreateFromBuilderWithBlockingComponents() { assertThat(container).extracting("sqsAsyncClient").isEqualTo(client); assertThat(container.getQueueNames()).containsExactlyElementsOf(queueNames); + + assertThat(container.getPhase()).isEqualTo(phase); } @Test @@ -114,6 +118,7 @@ void shouldCreateFromBuilderWithAsyncComponents() { assertThat(container.getErrorHandler()).isEqualTo(errorHandler); assertThat(container.getAcknowledgementResultCallback()).isEqualTo(callback); assertThat(container.getMessageInterceptors()).containsExactly(interceptor1, interceptor2); + assertThat(container.getPhase()).isEqualTo(SmartLifecycle.DEFAULT_PHASE); } @Test From 9840be7fa480dcb22ccea6f7a180b874a0bcd0db Mon Sep 17 00:00:00 2001 From: Daniel Pelaez Date: Thu, 18 May 2023 08:08:06 -0500 Subject: [PATCH 2/6] Add phase to the SqsMessageListenerContainer and its builder for the SmartLifecycle interface --- .../cloud/sqs/listener/AbstractMessageListenerContainer.java | 2 +- .../cloud/sqs/listener/SqsMessageListenerContainer.java | 2 +- .../sqs/listener/AbstractMessageListenerContainerTests.java | 3 +++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java index fcccc27b0..d32b12f82 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java @@ -69,7 +69,7 @@ public abstract class AbstractMessageListenerContainer acknowledgementResultCallback = new AsyncAcknowledgementResultCallback() { }; - private int phase; + private int phase = DEFAULT_PHASE; /** * Create an instance with the provided {@link ContainerOptions} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainer.java index 5b2bfd950..9bf78e84b 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainer.java @@ -271,7 +271,7 @@ public SqsMessageListenerContainer build() { .acceptIfNotNull(this.asyncAcknowledgementResultCallback, container::setAcknowledgementResultCallback) .acceptIfNotNull(this.containerComponentFactories, container::setComponentFactories) .acceptIfNotEmpty(this.queueNames, container::setQueueNames) - .acceptIfNotNullOrElse(container::setPhase, this.phase, SmartLifecycle.DEFAULT_PHASE); + .acceptIfNotNullOrElse(container::setPhase, this.phase, DEFAULT_PHASE); this.messageInterceptors.forEach(container::addMessageInterceptor); this.asyncMessageInterceptors.forEach(container::addMessageInterceptor); diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainerTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainerTests.java index 999b72a5e..ff5355446 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainerTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainerTests.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import org.junit.jupiter.api.Test; +import org.springframework.context.SmartLifecycle; /** * Tests for {@link AbstractMessageListenerContainer}. @@ -73,6 +74,7 @@ void shouldAdaptBlockingComponents() { .isInstanceOf(AsyncComponentAdapters.AbstractThreadingComponentAdapter.class) .extracting("blockingMessageInterceptor").isEqualTo(interceptor); + assertThat(container.getPhase()).isEqualTo(SmartLifecycle.DEFAULT_PHASE); } @Test @@ -101,6 +103,7 @@ void shouldSetAsyncComponents() { assertThat(container.getAcknowledgementResultCallback()).isEqualTo(callback); assertThat(container.getContainerComponentFactories()).containsExactlyElementsOf(componentFactories); assertThat(container.getMessageInterceptors()).containsExactly(interceptor); + assertThat(container.getPhase()).isEqualTo(SmartLifecycle.DEFAULT_PHASE); } From 1afc5e8f2e1de13edad6541caa77604536bae136 Mon Sep 17 00:00:00 2001 From: Daniel Pelaez Date: Tue, 23 May 2023 09:25:31 -0500 Subject: [PATCH 3/6] Add phase to the SqsMessageListenerContainer and its builder for the SmartLifecycle interface --- docs/src/main/asciidoc/sqs.adoc | 2 ++ .../cloud/sqs/listener/MessageListenerContainer.java | 2 ++ .../sqs/listener/AbstractMessageListenerContainerTests.java | 5 ++--- .../cloud/sqs/listener/SqsMessageListenerContainerTests.java | 3 +-- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index e7f5d80a8..9fa9d209d 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -955,6 +955,8 @@ MessageListenerContainer listenerContainer(SqsAsyncClient sqsAsyncClient } ---- +NOTE: You can also specify the `SmartLifecycle.phase`, using `SqsMessageListenerContainer.builder()`, if you are interested on overriding the default value defined in `MessageListenerContainer.DEFAULT_PHASE` + ===== Retrieving Containers from the Registry Containers can be retrieved by fetching the `MessageListenerContainer` bean from the container and using the `getListenerContainers` and `getContainerById` methods. diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/MessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/MessageListenerContainer.java index 02bc862bc..aa0effcac 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/MessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/MessageListenerContainer.java @@ -28,6 +28,8 @@ */ public interface MessageListenerContainer extends SmartLifecycle { + int DEFAULT_PHASE = Integer.MAX_VALUE; + /** * Get the container id. * @return the id. diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainerTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainerTests.java index ff5355446..f7e450286 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainerTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainerTests.java @@ -27,7 +27,6 @@ import java.util.Collections; import java.util.List; import org.junit.jupiter.api.Test; -import org.springframework.context.SmartLifecycle; /** * Tests for {@link AbstractMessageListenerContainer}. @@ -74,7 +73,7 @@ void shouldAdaptBlockingComponents() { .isInstanceOf(AsyncComponentAdapters.AbstractThreadingComponentAdapter.class) .extracting("blockingMessageInterceptor").isEqualTo(interceptor); - assertThat(container.getPhase()).isEqualTo(SmartLifecycle.DEFAULT_PHASE); + assertThat(container.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE); } @Test @@ -103,7 +102,7 @@ void shouldSetAsyncComponents() { assertThat(container.getAcknowledgementResultCallback()).isEqualTo(callback); assertThat(container.getContainerComponentFactories()).containsExactlyElementsOf(componentFactories); assertThat(container.getMessageInterceptors()).containsExactly(interceptor); - assertThat(container.getPhase()).isEqualTo(SmartLifecycle.DEFAULT_PHASE); + assertThat(container.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE); } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainerTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainerTests.java index 5e0237301..b4bfaa204 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainerTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainerTests.java @@ -36,7 +36,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import org.junit.jupiter.api.Test; -import org.springframework.context.SmartLifecycle; import org.springframework.core.task.SimpleAsyncTaskExecutor; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; @@ -118,7 +117,7 @@ void shouldCreateFromBuilderWithAsyncComponents() { assertThat(container.getErrorHandler()).isEqualTo(errorHandler); assertThat(container.getAcknowledgementResultCallback()).isEqualTo(callback); assertThat(container.getMessageInterceptors()).containsExactly(interceptor1, interceptor2); - assertThat(container.getPhase()).isEqualTo(SmartLifecycle.DEFAULT_PHASE); + assertThat(container.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE); } @Test From e3285825a849ecd4aabf62bd318b2252f2e37ee4 Mon Sep 17 00:00:00 2001 From: Daniel Pelaez Date: Tue, 23 May 2023 10:00:50 -0500 Subject: [PATCH 4/6] Add phase to the DefaultListenerContainerRegistry --- docs/src/main/asciidoc/sqs.adoc | 2 ++ .../sqs/listener/DefaultListenerContainerRegistry.java | 10 ++++++++++ .../DefaultListenerContainerRegistryTests.java | 3 +++ 3 files changed, 15 insertions(+) diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index 9fa9d209d..9cabd5e39 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -927,6 +927,8 @@ The `MessageListenerContainer` interface extends `SmartLifecycle`, which provide Containers created from `@SqsListener` annotations are registered in a `MessageListenerContainerRegistry` bean that is registered by the framework. The containers themselves are not Spring-managed beans, and the registry is responsible for managing these containers` lifecycle in application startup and shutdown. +NOTE: The framework offers the `DefaultListenerContainerRegistry` implementation, where you can override the `SmartLifecycle.phase`. By default, the `phase` value is `MessageListenerContainer.DEFAULT_PHASE` + At startup, the containers will make requests to `SQS` to retrieve the queues` urls for the provided queue names or ARNs, and for retrieving `QueueAttributes` if so configured. Providing queue urls instead of names and not requesting queue attributes can result in slightly better startup times since there's no need for such requests. diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistry.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistry.java index 4b3051406..10c0c1d2b 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistry.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistry.java @@ -50,6 +50,8 @@ public class DefaultListenerContainerRegistry implements MessageListenerContaine private volatile boolean running = false; + private int phase = MessageListenerContainer.DEFAULT_PHASE; + @Override public void registerListenerContainer(MessageListenerContainer listenerContainer) { Assert.notNull(listenerContainer, "listenerContainer cannot be null"); @@ -95,4 +97,12 @@ public boolean isRunning() { return this.running; } + @Override + public int getPhase() { + return phase; + } + + public void setPhase(int phase) { + this.phase = phase; + } } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistryTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistryTests.java index e4e7c34e3..9c986be0c 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistryTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistryTests.java @@ -38,6 +38,7 @@ void shouldRegisterListenerContainer() { given(container.getId()).willReturn(id); DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry(); registry.registerListenerContainer(container); + assertThat(registry.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE); } @Test @@ -46,9 +47,11 @@ void shouldGetListenerContainer() { String id = "test-container-id"; given(container.getId()).willReturn(id); DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry(); + registry.setPhase(2); registry.registerListenerContainer(container); MessageListenerContainer containerFromRegistry = registry.getContainerById(id); assertThat(containerFromRegistry).isEqualTo(container); + assertThat(registry.getPhase()).isEqualTo(2); } @Test From 8090066653009e81453fd8847ba7398df9f300d5 Mon Sep 17 00:00:00 2001 From: Daniel Pelaez Date: Fri, 16 Jun 2023 07:30:51 -0500 Subject: [PATCH 5/6] Improve documentation --- docs/src/main/asciidoc/sqs.adoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index 9cabd5e39..6825ac973 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -927,7 +927,7 @@ The `MessageListenerContainer` interface extends `SmartLifecycle`, which provide Containers created from `@SqsListener` annotations are registered in a `MessageListenerContainerRegistry` bean that is registered by the framework. The containers themselves are not Spring-managed beans, and the registry is responsible for managing these containers` lifecycle in application startup and shutdown. -NOTE: The framework offers the `DefaultListenerContainerRegistry` implementation, where you can override the `SmartLifecycle.phase`. By default, the `phase` value is `MessageListenerContainer.DEFAULT_PHASE` +NOTE: The `DefaultListenerContainerRegistry ` implementation provided by the framework allows the phase value to be set through the `setPhase` method. The default value is `MessageListenerContainer.DEFAULT_PHASE`. At startup, the containers will make requests to `SQS` to retrieve the queues` urls for the provided queue names or ARNs, and for retrieving `QueueAttributes` if so configured. Providing queue urls instead of names and not requesting queue attributes can result in slightly better startup times since there's no need for such requests. @@ -957,7 +957,7 @@ MessageListenerContainer listenerContainer(SqsAsyncClient sqsAsyncClient } ---- -NOTE: You can also specify the `SmartLifecycle.phase`, using `SqsMessageListenerContainer.builder()`, if you are interested on overriding the default value defined in `MessageListenerContainer.DEFAULT_PHASE` +NOTE: The `SqsMessageListenerContainer.builder()` allows to specify the `SmartLifecycle.phase`, to override the default value defined in `MessageListenerContainer.DEFAULT_PHASE` ===== Retrieving Containers from the Registry From e19a8032c740f617a3af182c713912b7f6847b5c Mon Sep 17 00:00:00 2001 From: Tomaz Fernandes <76525045+tomazfernandes@users.noreply.github.com> Date: Sun, 25 Jun 2023 21:21:47 -0300 Subject: [PATCH 6/6] Fix merge commit --- .../cloud/sqs/listener/AbstractMessageListenerContainer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java index dd53d8a6f..9566fbb7a 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java @@ -264,6 +264,7 @@ public boolean isRunning() { public int getPhase() { return this.phase; + } @Override public boolean isAutoStartup() {