Skip to content

Commit

Permalink
Ability to set SmartLifecycle.phase to SqsMessageListenerContainer/De…
Browse files Browse the repository at this point in the history
…faultListenerContainerRegistry (#821)

* Add phase to the SqsMessageListenerContainer and its builder for the SmartLifecycle interface

* Add phase to the SqsMessageListenerContainer and its builder for the SmartLifecycle interface

* Add phase to the SqsMessageListenerContainer and its builder for the SmartLifecycle interface

* Add phase to the DefaultListenerContainerRegistry

* Improve documentation

* Fix merge commit

---------

Co-authored-by: Tomaz Fernandes <76525045+tomazfernandes@users.noreply.github.com>
  • Loading branch information
estigma88 and tomazfernandes committed Jun 26, 2023
1 parent 0bd7be9 commit 42da25f
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 2 deletions.
4 changes: 4 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,8 @@ The `MessageListenerContainer` interface extends `SmartLifecycle`, which provide
Containers created from `@SqsListener` annotations are registered in a `MessageListenerContainerRegistry` bean that is registered by the framework.
The containers themselves are not Spring-managed beans, and the registry is responsible for managing these containers` lifecycle in application startup and shutdown.

NOTE: The `DefaultListenerContainerRegistry ` implementation provided by the framework allows the phase value to be set through the `setPhase` method. The default value is `MessageListenerContainer.DEFAULT_PHASE`.

At startup, the containers will make requests to `SQS` to retrieve the queues` urls for the provided queue names or ARNs, and for retrieving `QueueAttributes` if so configured.
Providing queue urls instead of names and not requesting queue attributes can result in slightly better startup times since there's no need for such requests.

Expand Down Expand Up @@ -962,6 +964,8 @@ MessageListenerContainer<Object> listenerContainer(SqsAsyncClient sqsAsyncClient
}
----

NOTE: The `SqsMessageListenerContainer.builder()` allows to specify the `SmartLifecycle.phase`, to override the default value defined in `MessageListenerContainer.DEFAULT_PHASE`

===== Retrieving Containers from the Registry

Containers can be retrieved by fetching the `MessageListenerContainer` bean from the container and using the `getListenerContainers` and `getContainerById` methods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public abstract class AbstractMessageListenerContainer<T, O extends ContainerOpt
private AsyncAcknowledgementResultCallback<T> acknowledgementResultCallback = new AsyncAcknowledgementResultCallback<T>() {
};

private int phase = DEFAULT_PHASE;

/**
* Create an instance with the provided {@link ContainerOptions}
* @param containerOptions the options instance.
Expand Down Expand Up @@ -162,6 +164,14 @@ public void setComponentFactories(Collection<ContainerComponentFactory<T, O>> co
this.containerComponentFactories = containerComponentFactories;
}

/**
* Set the phase for the SmartLifecycle for this container instance.
* @param phase the phase.
*/
public void setPhase(int phase) {
this.phase = phase;
}

/**
* Returns the {@link ContainerOptions} instance for this container. Changed options will take effect on container
* restart.
Expand Down Expand Up @@ -252,6 +262,10 @@ public boolean isRunning() {
return this.isRunning;
}

public int getPhase() {
return this.phase;
}

@Override
public boolean isAutoStartup() {
return containerOptions.isAutoStartup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class DefaultListenerContainerRegistry implements MessageListenerContaine

private volatile boolean running = false;

private int phase = MessageListenerContainer.DEFAULT_PHASE;

@Override
public void registerListenerContainer(MessageListenerContainer<?> listenerContainer) {
Assert.notNull(listenerContainer, "listenerContainer cannot be null");
Expand Down Expand Up @@ -100,4 +102,12 @@ public boolean isRunning() {
return this.running;
}

@Override
public int getPhase() {
return phase;
}

public void setPhase(int phase) {
this.phase = phase;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
*/
public interface MessageListenerContainer<T> extends SmartLifecycle {

int DEFAULT_PHASE = Integer.MAX_VALUE;

/**
* Get the container id.
* @return the id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
Expand Down Expand Up @@ -177,6 +178,8 @@ public static class Builder<T> {

private AcknowledgementResultCallback<T> acknowledgementResultCallback;

private Integer phase;

public Builder<T> id(String id) {
this.id = id;
return this;
Expand Down Expand Up @@ -250,6 +253,11 @@ public Builder<T> configure(Consumer<SqsContainerOptionsBuilder> options) {
return this;
}

public Builder<T> phase(Integer phase) {
this.phase = phase;
return this;
}

// @formatter:off
public SqsMessageListenerContainer<T> build() {
SqsMessageListenerContainer<T> container = new SqsMessageListenerContainer<>(this.sqsAsyncClient);
Expand All @@ -262,9 +270,11 @@ public SqsMessageListenerContainer<T> build() {
.acceptIfNotNull(this.acknowledgementResultCallback, container::setAcknowledgementResultCallback)
.acceptIfNotNull(this.asyncAcknowledgementResultCallback, container::setAcknowledgementResultCallback)
.acceptIfNotNull(this.containerComponentFactories, container::setComponentFactories)
.acceptIfNotEmpty(this.queueNames, container::setQueueNames);
.acceptIfNotEmpty(this.queueNames, container::setQueueNames)
.acceptIfNotNullOrElse(container::setPhase, this.phase, DEFAULT_PHASE);
this.messageInterceptors.forEach(container::addMessageInterceptor);
this.asyncMessageInterceptors.forEach(container::addMessageInterceptor);

container.configure(this.optionsConsumer);
return container;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ void shouldAdaptBlockingComponents() {
.isInstanceOf(AsyncComponentAdapters.AbstractThreadingComponentAdapter.class)
.extracting("blockingMessageInterceptor").isEqualTo(interceptor);

assertThat(container.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE);
}

@Test
Expand Down Expand Up @@ -101,6 +102,7 @@ void shouldSetAsyncComponents() {
assertThat(container.getAcknowledgementResultCallback()).isEqualTo(callback);
assertThat(container.getContainerComponentFactories()).containsExactlyElementsOf(componentFactories);
assertThat(container.getMessageInterceptors()).containsExactly(interceptor);
assertThat(container.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ void shouldRegisterListenerContainer() {
given(container.getId()).willReturn(id);
DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry();
registry.registerListenerContainer(container);
assertThat(registry.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE);
}

@Test
Expand All @@ -47,9 +48,11 @@ void shouldGetListenerContainer() {
String id = "test-container-id";
given(container.getId()).willReturn(id);
DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry();
registry.setPhase(2);
registry.registerListenerContainer(container);
MessageListenerContainer<?> containerFromRegistry = registry.getContainerById(id);
assertThat(containerFromRegistry).isEqualTo(container);
assertThat(registry.getPhase()).isEqualTo(2);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ void shouldCreateFromBuilderWithBlockingComponents() {
List<ContainerComponentFactory<Object, SqsContainerOptions>> componentFactories = Collections
.singletonList(componentFactory);
List<String> queueNames = Arrays.asList("test-queue-name-1", "test-queue-name-2");
Integer phase = 2;

SqsMessageListenerContainer<Object> container = SqsMessageListenerContainer.builder().messageListener(listener)
.sqsAsyncClient(client).errorHandler(errorHandler).componentFactories(componentFactories)
.acknowledgementResultCallback(callback).messageInterceptor(interceptor1)
.messageInterceptor(interceptor2).queueNames(queueNames).build();
.messageInterceptor(interceptor2).queueNames(queueNames).phase(phase).build();

assertThat(container.getMessageListener())
.isInstanceOf(AsyncComponentAdapters.AbstractThreadingComponentAdapter.class)
Expand All @@ -90,6 +91,8 @@ void shouldCreateFromBuilderWithBlockingComponents() {
assertThat(container).extracting("sqsAsyncClient").isEqualTo(client);

assertThat(container.getQueueNames()).containsExactlyElementsOf(queueNames);

assertThat(container.getPhase()).isEqualTo(phase);
}

@Test
Expand All @@ -114,6 +117,7 @@ void shouldCreateFromBuilderWithAsyncComponents() {
assertThat(container.getErrorHandler()).isEqualTo(errorHandler);
assertThat(container.getAcknowledgementResultCallback()).isEqualTo(callback);
assertThat(container.getMessageInterceptors()).containsExactly(interceptor1, interceptor2);
assertThat(container.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE);
}

@Test
Expand Down

0 comments on commit 42da25f

Please sign in to comment.