From 42da25f66db097e6e6176c2c698aa85dec69aeec Mon Sep 17 00:00:00 2001 From: Daniel Andres Pelaez Lopez Date: Sun, 25 Jun 2023 19:30:32 -0500 Subject: [PATCH] Ability to set SmartLifecycle.phase to SqsMessageListenerContainer/DefaultListenerContainerRegistry (#821) * Add phase to the SqsMessageListenerContainer and its builder for the SmartLifecycle interface * Add phase to the SqsMessageListenerContainer and its builder for the SmartLifecycle interface * Add phase to the SqsMessageListenerContainer and its builder for the SmartLifecycle interface * Add phase to the DefaultListenerContainerRegistry * Improve documentation * Fix merge commit --------- Co-authored-by: Tomaz Fernandes <76525045+tomazfernandes@users.noreply.github.com> --- docs/src/main/asciidoc/sqs.adoc | 4 ++++ .../listener/AbstractMessageListenerContainer.java | 14 ++++++++++++++ .../listener/DefaultListenerContainerRegistry.java | 10 ++++++++++ .../sqs/listener/MessageListenerContainer.java | 2 ++ .../sqs/listener/SqsMessageListenerContainer.java | 12 +++++++++++- .../AbstractMessageListenerContainerTests.java | 2 ++ .../DefaultListenerContainerRegistryTests.java | 3 +++ .../listener/SqsMessageListenerContainerTests.java | 6 +++++- 8 files changed, 51 insertions(+), 2 deletions(-) diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index dc5d1bf10..755996929 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -934,6 +934,8 @@ The `MessageListenerContainer` interface extends `SmartLifecycle`, which provide Containers created from `@SqsListener` annotations are registered in a `MessageListenerContainerRegistry` bean that is registered by the framework. The containers themselves are not Spring-managed beans, and the registry is responsible for managing these containers` lifecycle in application startup and shutdown. +NOTE: The `DefaultListenerContainerRegistry ` implementation provided by the framework allows the phase value to be set through the `setPhase` method. The default value is `MessageListenerContainer.DEFAULT_PHASE`. + At startup, the containers will make requests to `SQS` to retrieve the queues` urls for the provided queue names or ARNs, and for retrieving `QueueAttributes` if so configured. Providing queue urls instead of names and not requesting queue attributes can result in slightly better startup times since there's no need for such requests. @@ -962,6 +964,8 @@ MessageListenerContainer listenerContainer(SqsAsyncClient sqsAsyncClient } ---- +NOTE: The `SqsMessageListenerContainer.builder()` allows to specify the `SmartLifecycle.phase`, to override the default value defined in `MessageListenerContainer.DEFAULT_PHASE` + ===== Retrieving Containers from the Registry Containers can be retrieved by fetching the `MessageListenerContainer` bean from the container and using the `getListenerContainers` and `getContainerById` methods. diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java index 5307a9483..9566fbb7a 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java @@ -69,6 +69,8 @@ public abstract class AbstractMessageListenerContainer acknowledgementResultCallback = new AsyncAcknowledgementResultCallback() { }; + private int phase = DEFAULT_PHASE; + /** * Create an instance with the provided {@link ContainerOptions} * @param containerOptions the options instance. @@ -162,6 +164,14 @@ public void setComponentFactories(Collection> co this.containerComponentFactories = containerComponentFactories; } + /** + * Set the phase for the SmartLifecycle for this container instance. + * @param phase the phase. + */ + public void setPhase(int phase) { + this.phase = phase; + } + /** * Returns the {@link ContainerOptions} instance for this container. Changed options will take effect on container * restart. @@ -252,6 +262,10 @@ public boolean isRunning() { return this.isRunning; } + public int getPhase() { + return this.phase; + } + @Override public boolean isAutoStartup() { return containerOptions.isAutoStartup(); diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistry.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistry.java index df07f1413..04a2b288f 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistry.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistry.java @@ -53,6 +53,8 @@ public class DefaultListenerContainerRegistry implements MessageListenerContaine private volatile boolean running = false; + private int phase = MessageListenerContainer.DEFAULT_PHASE; + @Override public void registerListenerContainer(MessageListenerContainer listenerContainer) { Assert.notNull(listenerContainer, "listenerContainer cannot be null"); @@ -100,4 +102,12 @@ public boolean isRunning() { return this.running; } + @Override + public int getPhase() { + return phase; + } + + public void setPhase(int phase) { + this.phase = phase; + } } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/MessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/MessageListenerContainer.java index 02bc862bc..aa0effcac 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/MessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/MessageListenerContainer.java @@ -28,6 +28,8 @@ */ public interface MessageListenerContainer extends SmartLifecycle { + int DEFAULT_PHASE = Integer.MAX_VALUE; + /** * Get the container id. * @return the id. diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainer.java index 2f79772c0..9bf78e84b 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainer.java @@ -31,6 +31,7 @@ import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.SmartLifecycle; import org.springframework.messaging.Message; import org.springframework.util.Assert; import software.amazon.awssdk.services.sqs.SqsAsyncClient; @@ -177,6 +178,8 @@ public static class Builder { private AcknowledgementResultCallback acknowledgementResultCallback; + private Integer phase; + public Builder id(String id) { this.id = id; return this; @@ -250,6 +253,11 @@ public Builder configure(Consumer options) { return this; } + public Builder phase(Integer phase) { + this.phase = phase; + return this; + } + // @formatter:off public SqsMessageListenerContainer build() { SqsMessageListenerContainer container = new SqsMessageListenerContainer<>(this.sqsAsyncClient); @@ -262,9 +270,11 @@ public SqsMessageListenerContainer build() { .acceptIfNotNull(this.acknowledgementResultCallback, container::setAcknowledgementResultCallback) .acceptIfNotNull(this.asyncAcknowledgementResultCallback, container::setAcknowledgementResultCallback) .acceptIfNotNull(this.containerComponentFactories, container::setComponentFactories) - .acceptIfNotEmpty(this.queueNames, container::setQueueNames); + .acceptIfNotEmpty(this.queueNames, container::setQueueNames) + .acceptIfNotNullOrElse(container::setPhase, this.phase, DEFAULT_PHASE); this.messageInterceptors.forEach(container::addMessageInterceptor); this.asyncMessageInterceptors.forEach(container::addMessageInterceptor); + container.configure(this.optionsConsumer); return container; } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainerTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainerTests.java index 999b72a5e..f7e450286 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainerTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainerTests.java @@ -73,6 +73,7 @@ void shouldAdaptBlockingComponents() { .isInstanceOf(AsyncComponentAdapters.AbstractThreadingComponentAdapter.class) .extracting("blockingMessageInterceptor").isEqualTo(interceptor); + assertThat(container.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE); } @Test @@ -101,6 +102,7 @@ void shouldSetAsyncComponents() { assertThat(container.getAcknowledgementResultCallback()).isEqualTo(callback); assertThat(container.getContainerComponentFactories()).containsExactlyElementsOf(componentFactories); assertThat(container.getMessageInterceptors()).containsExactly(interceptor); + assertThat(container.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE); } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistryTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistryTests.java index f888dbbea..b05426ca5 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistryTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistryTests.java @@ -39,6 +39,7 @@ void shouldRegisterListenerContainer() { given(container.getId()).willReturn(id); DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry(); registry.registerListenerContainer(container); + assertThat(registry.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE); } @Test @@ -47,9 +48,11 @@ void shouldGetListenerContainer() { String id = "test-container-id"; given(container.getId()).willReturn(id); DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry(); + registry.setPhase(2); registry.registerListenerContainer(container); MessageListenerContainer containerFromRegistry = registry.getContainerById(id); assertThat(containerFromRegistry).isEqualTo(container); + assertThat(registry.getPhase()).isEqualTo(2); } @Test diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainerTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainerTests.java index 0dbc77fcf..b4bfaa204 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainerTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainerTests.java @@ -61,11 +61,12 @@ void shouldCreateFromBuilderWithBlockingComponents() { List> componentFactories = Collections .singletonList(componentFactory); List queueNames = Arrays.asList("test-queue-name-1", "test-queue-name-2"); + Integer phase = 2; SqsMessageListenerContainer container = SqsMessageListenerContainer.builder().messageListener(listener) .sqsAsyncClient(client).errorHandler(errorHandler).componentFactories(componentFactories) .acknowledgementResultCallback(callback).messageInterceptor(interceptor1) - .messageInterceptor(interceptor2).queueNames(queueNames).build(); + .messageInterceptor(interceptor2).queueNames(queueNames).phase(phase).build(); assertThat(container.getMessageListener()) .isInstanceOf(AsyncComponentAdapters.AbstractThreadingComponentAdapter.class) @@ -90,6 +91,8 @@ void shouldCreateFromBuilderWithBlockingComponents() { assertThat(container).extracting("sqsAsyncClient").isEqualTo(client); assertThat(container.getQueueNames()).containsExactlyElementsOf(queueNames); + + assertThat(container.getPhase()).isEqualTo(phase); } @Test @@ -114,6 +117,7 @@ void shouldCreateFromBuilderWithAsyncComponents() { assertThat(container.getErrorHandler()).isEqualTo(errorHandler); assertThat(container.getAcknowledgementResultCallback()).isEqualTo(callback); assertThat(container.getMessageInterceptors()).containsExactly(interceptor1, interceptor2); + assertThat(container.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE); } @Test