Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to set SmartLifecycle.phase to SqsMessageListenerContainer/DefaultListenerContainerRegistry #821

Merged
4 changes: 4 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,8 @@ The `MessageListenerContainer` interface extends `SmartLifecycle`, which provide
Containers created from `@SqsListener` annotations are registered in a `MessageListenerContainerRegistry` bean that is registered by the framework.
The containers themselves are not Spring-managed beans, and the registry is responsible for managing these containers` lifecycle in application startup and shutdown.

NOTE: The `DefaultListenerContainerRegistry ` implementation provided by the framework allows the phase value to be set through the `setPhase` method. The default value is `MessageListenerContainer.DEFAULT_PHASE`.

At startup, the containers will make requests to `SQS` to retrieve the queues` urls for the provided queue names or ARNs, and for retrieving `QueueAttributes` if so configured.
Providing queue urls instead of names and not requesting queue attributes can result in slightly better startup times since there's no need for such requests.

Expand Down Expand Up @@ -962,6 +964,8 @@ MessageListenerContainer<Object> listenerContainer(SqsAsyncClient sqsAsyncClient
}
----

NOTE: The `SqsMessageListenerContainer.builder()` allows to specify the `SmartLifecycle.phase`, to override the default value defined in `MessageListenerContainer.DEFAULT_PHASE`

===== Retrieving Containers from the Registry

Containers can be retrieved by fetching the `MessageListenerContainer` bean from the container and using the `getListenerContainers` and `getContainerById` methods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public abstract class AbstractMessageListenerContainer<T, O extends ContainerOpt
private AsyncAcknowledgementResultCallback<T> acknowledgementResultCallback = new AsyncAcknowledgementResultCallback<T>() {
};

private int phase = DEFAULT_PHASE;
estigma88 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Create an instance with the provided {@link ContainerOptions}
* @param containerOptions the options instance.
Expand Down Expand Up @@ -162,6 +164,14 @@ public void setComponentFactories(Collection<ContainerComponentFactory<T, O>> co
this.containerComponentFactories = containerComponentFactories;
}

/**
* Set the phase for the SmartLifecycle for this container instance.
* @param phase the phase.
*/
public void setPhase(int phase) {
this.phase = phase;
}

/**
* Returns the {@link ContainerOptions} instance for this container. Changed options will take effect on container
* restart.
Expand Down Expand Up @@ -252,6 +262,10 @@ public boolean isRunning() {
return this.isRunning;
}

public int getPhase() {
return this.phase;
}

@Override
public boolean isAutoStartup() {
return containerOptions.isAutoStartup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class DefaultListenerContainerRegistry implements MessageListenerContaine

private volatile boolean running = false;

private int phase = MessageListenerContainer.DEFAULT_PHASE;

@Override
public void registerListenerContainer(MessageListenerContainer<?> listenerContainer) {
Assert.notNull(listenerContainer, "listenerContainer cannot be null");
Expand Down Expand Up @@ -100,4 +102,12 @@ public boolean isRunning() {
return this.running;
}

@Override
public int getPhase() {
return phase;
}

public void setPhase(int phase) {
this.phase = phase;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
*/
public interface MessageListenerContainer<T> extends SmartLifecycle {

int DEFAULT_PHASE = Integer.MAX_VALUE;

/**
* Get the container id.
* @return the id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
Expand Down Expand Up @@ -177,6 +178,8 @@ public static class Builder<T> {

private AcknowledgementResultCallback<T> acknowledgementResultCallback;

private Integer phase;

public Builder<T> id(String id) {
this.id = id;
return this;
Expand Down Expand Up @@ -250,6 +253,11 @@ public Builder<T> configure(Consumer<SqsContainerOptionsBuilder> options) {
return this;
}

public Builder<T> phase(Integer phase) {
this.phase = phase;
return this;
}

// @formatter:off
public SqsMessageListenerContainer<T> build() {
SqsMessageListenerContainer<T> container = new SqsMessageListenerContainer<>(this.sqsAsyncClient);
Expand All @@ -262,9 +270,11 @@ public SqsMessageListenerContainer<T> build() {
.acceptIfNotNull(this.acknowledgementResultCallback, container::setAcknowledgementResultCallback)
.acceptIfNotNull(this.asyncAcknowledgementResultCallback, container::setAcknowledgementResultCallback)
.acceptIfNotNull(this.containerComponentFactories, container::setComponentFactories)
.acceptIfNotEmpty(this.queueNames, container::setQueueNames);
.acceptIfNotEmpty(this.queueNames, container::setQueueNames)
.acceptIfNotNullOrElse(container::setPhase, this.phase, DEFAULT_PHASE);
this.messageInterceptors.forEach(container::addMessageInterceptor);
this.asyncMessageInterceptors.forEach(container::addMessageInterceptor);

container.configure(this.optionsConsumer);
return container;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ void shouldAdaptBlockingComponents() {
.isInstanceOf(AsyncComponentAdapters.AbstractThreadingComponentAdapter.class)
.extracting("blockingMessageInterceptor").isEqualTo(interceptor);

assertThat(container.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE);
}

@Test
Expand Down Expand Up @@ -101,6 +102,7 @@ void shouldSetAsyncComponents() {
assertThat(container.getAcknowledgementResultCallback()).isEqualTo(callback);
assertThat(container.getContainerComponentFactories()).containsExactlyElementsOf(componentFactories);
assertThat(container.getMessageInterceptors()).containsExactly(interceptor);
assertThat(container.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ void shouldRegisterListenerContainer() {
given(container.getId()).willReturn(id);
DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry();
registry.registerListenerContainer(container);
assertThat(registry.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE);
}

@Test
Expand All @@ -47,9 +48,11 @@ void shouldGetListenerContainer() {
String id = "test-container-id";
given(container.getId()).willReturn(id);
DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry();
registry.setPhase(2);
registry.registerListenerContainer(container);
MessageListenerContainer<?> containerFromRegistry = registry.getContainerById(id);
assertThat(containerFromRegistry).isEqualTo(container);
assertThat(registry.getPhase()).isEqualTo(2);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ void shouldCreateFromBuilderWithBlockingComponents() {
List<ContainerComponentFactory<Object, SqsContainerOptions>> componentFactories = Collections
.singletonList(componentFactory);
List<String> queueNames = Arrays.asList("test-queue-name-1", "test-queue-name-2");
Integer phase = 2;

SqsMessageListenerContainer<Object> container = SqsMessageListenerContainer.builder().messageListener(listener)
.sqsAsyncClient(client).errorHandler(errorHandler).componentFactories(componentFactories)
.acknowledgementResultCallback(callback).messageInterceptor(interceptor1)
.messageInterceptor(interceptor2).queueNames(queueNames).build();
.messageInterceptor(interceptor2).queueNames(queueNames).phase(phase).build();

assertThat(container.getMessageListener())
.isInstanceOf(AsyncComponentAdapters.AbstractThreadingComponentAdapter.class)
Expand All @@ -90,6 +91,8 @@ void shouldCreateFromBuilderWithBlockingComponents() {
assertThat(container).extracting("sqsAsyncClient").isEqualTo(client);

assertThat(container.getQueueNames()).containsExactlyElementsOf(queueNames);

assertThat(container.getPhase()).isEqualTo(phase);
}

@Test
Expand All @@ -114,6 +117,7 @@ void shouldCreateFromBuilderWithAsyncComponents() {
assertThat(container.getErrorHandler()).isEqualTo(errorHandler);
assertThat(container.getAcknowledgementResultCallback()).isEqualTo(callback);
assertThat(container.getMessageInterceptors()).containsExactly(interceptor1, interceptor2);
assertThat(container.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE);
}

@Test
Expand Down