Skip to content

Commit

Permalink
Add BackOff when SQS Poll throws an exception (#1008)
Browse files Browse the repository at this point in the history
Fixes #714

Adds support for a Spring Retry BackOffPolicy to be executed when an exception is thrown by a polling thread.

Adds the configuration to ContainerOptions with a exponential default of 1000ms / 2.0 / 10000ms.
  • Loading branch information
tomazfernandes committed Feb 2, 2024
1 parent ddbdb8d commit 49d7ef4
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 1 deletion.
10 changes: 10 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,11 @@ See AWS documentation for more information.
After that period, the framework will try to perform a partial acquire with the available permits, resulting in a poll for less than `maxMessagesPerPoll` messages, unless otherwise configured.
See <<Message Processing Throughput>>.

|<<pollBackOffPolicy>>
|Any valid `BackOffPolicy` implementation
|`ExponentialBackOffPolicy`
|The back off policy to be applied when a polling thread throws an error. The default is an exponential policy with a delay of `1s`, a multiplier of `2.0`, and a maximum of `10s`.

|`autoStartup`
|true, false
|true
Expand Down Expand Up @@ -1569,6 +1574,11 @@ Represents the maximum amount of time the container will wait for `maxMessagesPe
This wait is applied per queue and one queue has no interference in another in this regard.
Defaults to 10 seconds.

===== pollBackOffPolicy
Since 3.2 it's possible to specify a `BackOffPolicy` which will be applied when a polling thread throws an exception.
The default policy is an exponential back off with a delay of 1000ms, a 2.0 multiplier, and a 10000ms maximum delay.
Note that in highly concurrent environments with many polling threads it may happen that a successful poll cancels the next scheduled backoff before it happens, and as such no back offs need to be executed.

==== Default Polling Behavior
By default, the framework starts all queues in `low throughput mode`, where it will perform one poll for messages at a time.
When a poll returns at least one message, the queue enters a `high throughput mode` where it will try to fulfill `maxConcurrentMessages` messages by making (maxConcurrentMessages / maxMessagesPerPoll) parallel polls to the queue.
Expand Down
4 changes: 4 additions & 0 deletions spring-cloud-aws-sqs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.time.Duration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.BackOffPolicyBuilder;
import org.springframework.util.Assert;

/**
Expand All @@ -41,6 +43,8 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,

private final Duration pollTimeout;

private final BackOffPolicy pollBackOffPolicy;

private final Duration maxDelayBetweenPolls;

private final Duration listenerShutdownTimeout;
Expand Down Expand Up @@ -75,6 +79,7 @@ protected AbstractContainerOptions(Builder<?, ?> builder) {
this.maxMessagesPerPoll = builder.maxMessagesPerPoll;
this.autoStartup = builder.autoStartup;
this.pollTimeout = builder.pollTimeout;
this.pollBackOffPolicy = builder.pollBackOffPolicy;
this.maxDelayBetweenPolls = builder.maxDelayBetweenPolls;
this.listenerShutdownTimeout = builder.listenerShutdownTimeout;
this.acknowledgementShutdownTimeout = builder.acknowledgementShutdownTimeout;
Expand Down Expand Up @@ -112,6 +117,11 @@ public Duration getPollTimeout() {
return this.pollTimeout;
}

@Override
public BackOffPolicy getPollBackOffPolicy() {
return this.pollBackOffPolicy;
}

@Override
public Duration getMaxDelayBetweenPolls() {
return this.maxDelayBetweenPolls;
Expand Down Expand Up @@ -188,6 +198,14 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,

private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofSeconds(10);

private static final double DEFAULT_BACK_OFF_MULTIPLIER = 2.0;

private static final int DEFAULT_BACK_OFF_DELAY = 1000;

private static final int DEFAULT_BACK_OFF_MAX_DELAY = 10000;

private static final BackOffPolicy DEFAULT_POLL_BACK_OFF_POLICY = buildDefaultBackOffPolicy();

private static final Duration DEFAULT_SEMAPHORE_TIMEOUT = Duration.ofSeconds(10);

private static final Duration DEFAULT_LISTENER_SHUTDOWN_TIMEOUT = Duration.ofSeconds(20);
Expand All @@ -210,6 +228,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,

private Duration pollTimeout = DEFAULT_POLL_TIMEOUT;

private BackOffPolicy pollBackOffPolicy = DEFAULT_POLL_BACK_OFF_POLICY;

private Duration maxDelayBetweenPolls = DEFAULT_SEMAPHORE_TIMEOUT;

private BackPressureMode backPressureMode = DEFAULT_THROUGHPUT_CONFIGURATION;
Expand Down Expand Up @@ -247,6 +267,7 @@ protected Builder(AbstractContainerOptions<?, ?> options) {
this.maxMessagesPerPoll = options.maxMessagesPerPoll;
this.autoStartup = options.autoStartup;
this.pollTimeout = options.pollTimeout;
this.pollBackOffPolicy = options.pollBackOffPolicy;
this.maxDelayBetweenPolls = options.maxDelayBetweenPolls;
this.listenerShutdownTimeout = options.listenerShutdownTimeout;
this.acknowledgementShutdownTimeout = options.acknowledgementShutdownTimeout;
Expand Down Expand Up @@ -287,6 +308,13 @@ public B pollTimeout(Duration pollTimeout) {
return self();
}

@Override
public B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) {
Assert.notNull(pollBackOffPolicy, "pollBackOffPolicy cannot be null");
this.pollBackOffPolicy = pollBackOffPolicy;
return self();
}

@Override
public B maxDelayBetweenPolls(Duration maxDelayBetweenPolls) {
Assert.notNull(maxDelayBetweenPolls, "semaphoreAcquireTimeout cannot be null");
Expand Down Expand Up @@ -377,6 +405,10 @@ private B self() {
return (B) this;
}

private static BackOffPolicy buildDefaultBackOffPolicy() {
return BackOffPolicyBuilder.newBuilder().multiplier(DEFAULT_BACK_OFF_MULTIPLIER)
.delay(DEFAULT_BACK_OFF_DELAY).maxDelay(DEFAULT_BACK_OFF_MAX_DELAY).build();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collection;
import org.springframework.core.task.TaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.retry.backoff.BackOffPolicy;

/**
* Contains the options to be used by the {@link MessageListenerContainer} at runtime. Note that after the object has
Expand Down Expand Up @@ -73,6 +74,15 @@ public interface ContainerOptions<O extends ContainerOptions<O, B>, B extends Co
*/
Duration getPollTimeout();

/**
* Return the {@link BackOffPolicy} to be applied when polling throws an exception.
* @return the timeout duration.
* @since 3.2
*/
default BackOffPolicy getPollBackOffPolicy() {
throw new UnsupportedOperationException("Poll Back Off not supported by this ContainerOptions");
}

/**
* Return the {@link TaskExecutor} to be used by this container's components. It's shared by the
* {@link io.awspring.cloud.sqs.listener.sink.MessageSink} and any blocking components the container might have. Due
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
import java.time.Duration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.retry.backoff.BackOffPolicy;

/**
* A builder for creating a {@link ContainerOptions} instance.
Expand Down Expand Up @@ -74,6 +75,16 @@ public interface ContainerOptionsBuilder<B extends ContainerOptionsBuilder<B, O>
*/
B pollTimeout(Duration pollTimeout);

/**
* Set the {@link BackOffPolicy} to use when polling throws an exception.
* @param pollBackOffPolicy the back off policy.
* @return this instance.
* @since 3.2
*/
default B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) {
throw new UnsupportedOperationException("Poll back off not supported by this container options builder");
}

/**
* Set the {@link ListenerMode} mode for this container. Default is {@link ListenerMode#SINGLE_MESSAGE}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.retry.backoff.BackOffContext;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

Expand Down Expand Up @@ -65,6 +68,10 @@ public abstract class AbstractPollingMessageSource<T, S> extends AbstractMessage

private Duration shutdownTimeout;

private BackOffPolicy pollBackOffPolicy;

private AtomicReference<BackOffContext> pollBackOffContext = new AtomicReference<>();

private TaskExecutor taskExecutor;

private BatchAwareBackPressureHandler backPressureHandler;
Expand All @@ -87,6 +94,7 @@ public abstract class AbstractPollingMessageSource<T, S> extends AbstractMessage
@Override
protected void configureMessageSource(ContainerOptions<?, ?> containerOptions) {
this.shutdownTimeout = containerOptions.getListenerShutdownTimeout();
this.pollBackOffPolicy = containerOptions.getPollBackOffPolicy();
doConfigure(containerOptions);
}

Expand Down Expand Up @@ -163,6 +171,7 @@ public void start() {
Assert.notNull(this.messageSink, "messageSink not set");
Assert.notNull(this.backPressureHandler, "backPressureHandler not set");
Assert.notNull(this.acknowledgmentProcessor, "acknowledgmentProcessor not set");
Assert.notNull(this.pollBackOffPolicy, "pollBackOffPolicy not set");
logger.debug("Starting {} for queue {}", getClass().getSimpleName(), this.pollingEndpointName);
this.running = true;
ConfigUtils.INSTANCE
Expand Down Expand Up @@ -194,6 +203,7 @@ private void pollAndEmitMessages() {
if (!isRunning()) {
continue;
}
handlePollBackOff();
logger.trace("Requesting permits for queue {}", this.pollingEndpointName);
final int acquiredPermits = this.backPressureHandler.requestBatch();
if (acquiredPermits == 0) {
Expand All @@ -209,6 +219,7 @@ private void pollAndEmitMessages() {
}
// @formatter:off
managePollingFuture(doPollForMessages(acquiredPermits))
.thenApply(this::resetBackOffContext)
.exceptionally(this::handlePollingException)
.thenApply(msgs -> releaseUnusedPermits(acquiredPermits, msgs))
.thenApply(this::convertMessages)
Expand All @@ -228,6 +239,16 @@ private void pollAndEmitMessages() {
logger.debug("Execution thread stopped for queue {}", this.pollingEndpointName);
}

private void handlePollBackOff() {
BackOffContext backOffContext = this.pollBackOffContext.get();
if (backOffContext == null) {
return;
}
logger.trace("Back off context found, backing off");
this.pollBackOffPolicy.backOff(backOffContext);
logger.trace("Resuming from back off");
}

protected abstract CompletableFuture<Collection<S>> doPollForMessages(int messagesToRequest);

public Collection<S> releaseUnusedPermits(int permits, Collection<S> msgs) {
Expand Down Expand Up @@ -273,10 +294,30 @@ private Void handleSinkException(Throwable t) {
}

private Collection<S> handlePollingException(Throwable t) {
logger.error("Error polling for messages in queue {}", this.pollingEndpointName, t);
logger.error("Error polling for messages in queue {}.", this.pollingEndpointName, t);
if (this.pollBackOffContext.get() == null) {
logger.trace("Setting back off policy in queue {}", this.pollingEndpointName);
this.pollBackOffContext.set(createBackOffContext());
}
return Collections.emptyList();
}

private BackOffContext createBackOffContext() {
BackOffContext context = this.pollBackOffPolicy.start(null);
return context != null ? context : new NoOpsBackOffContext();
}

private static class NoOpsBackOffContext implements BackOffContext {
}

private Collection<S> resetBackOffContext(Collection<S> messages) {
if (this.pollBackOffContext.get() != null) {
logger.trace("Polling successful, resetting back off context.");
this.pollBackOffContext.set(null);
}
return messages;
}

private <F> CompletableFuture<F> managePollingFuture(CompletableFuture<F> pollingFuture) {
this.pollingFutures.add(pollingFuture);
pollingFuture.thenRun(() -> this.pollingFutures.remove(pollingFuture));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.springframework.core.task.TaskExecutor;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.BackOffPolicyBuilder;

import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;

Expand Down Expand Up @@ -96,6 +99,13 @@ void shouldSetTaskExecutor() {
assertThat(options.getComponentsTaskExecutor()).isEqualTo(executor);
}

@Test
void shouldSetPollBackOffPolicyExecutor() {
BackOffPolicy pollBackOffPolicy = BackOffPolicyBuilder.newDefaultPolicy();
SqsContainerOptions options = SqsContainerOptions.builder().pollBackOffPolicy(pollBackOffPolicy).build();
assertThat(options.getPollBackOffPolicy()).isEqualTo(pollBackOffPolicy);
}

@Test
void shouldSetQueueNotFoundStrategy() {
SqsContainerOptions options = SqsContainerOptions.builder().queueNotFoundStrategy(QueueNotFoundStrategy.FAIL)
Expand All @@ -118,6 +128,7 @@ private SqsContainerOptions createConfiguredOptions() {
private SqsContainerOptionsBuilder createConfiguredBuilder() {
return SqsContainerOptions.builder().acknowledgementShutdownTimeout(Duration.ofSeconds(7))
.messageVisibility(Duration.ofSeconds(11))
.pollBackOffPolicy(BackOffPolicyBuilder.newBuilder().delay(1000).build())
.queueAttributeNames(Collections.singletonList(QueueAttributeName.QUEUE_ARN))
.messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.MESSAGE_GROUP_ID))
.messageAttributeNames(Collections.singletonList("my-attribute"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.InstanceOfAssertFactories.type;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

import io.awspring.cloud.sqs.MessageExecutionThreadFactory;
import io.awspring.cloud.sqs.listener.BackPressureMode;
Expand All @@ -43,6 +47,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.retry.backoff.BackOffContext;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.test.util.ReflectionTestUtils;
import software.amazon.awssdk.services.sqs.model.Message;
Expand Down Expand Up @@ -227,6 +233,60 @@ else if (hasAcquired9.compareAndSet(false, true)) {
assertThat(hasThrownError.get()).isFalse();
}

@Test
void shouldBackOffIfPollingThrowsAnError() {

var testName = "shouldBackOffIfPollingThrowsAnError";

var backPressureHandler = SemaphoreBackPressureHandler.builder().acquireTimeout(Duration.ofMillis(200))
.batchSize(10).totalPermits(40).throughputConfiguration(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES)
.build();
var currentPoll = new AtomicInteger(0);
var waitThirdPollLatch = new CountDownLatch(4);

AbstractPollingMessageSource<Object, Message> source = new AbstractPollingMessageSource<>() {
@Override
protected CompletableFuture<Collection<Message>> doPollForMessages(int messagesToRequest) {
waitThirdPollLatch.countDown();
if (currentPoll.compareAndSet(0, 1)) {
logger.debug("First poll - returning empty list");
return CompletableFuture.completedFuture(List.of());
}
else if (currentPoll.compareAndSet(1, 2)) {
logger.debug("Second poll - returning error");
return CompletableFuture.failedFuture(new RuntimeException("Expected exception on second poll"));
}
else if (currentPoll.compareAndSet(2, 3)) {
logger.debug("Third poll - returning error");
return CompletableFuture.failedFuture(new RuntimeException("Expected exception on third poll"));
}
else {
logger.debug("Fourth poll - returning empty list");
return CompletableFuture.completedFuture(List.of());
}
}
};

var policy = mock(BackOffPolicy.class);
var backOffContext = mock(BackOffContext.class);
given(policy.start(null)).willReturn(backOffContext);

source.setBackPressureHandler(backPressureHandler);
source.setMessageSink((msgs, context) -> CompletableFuture.completedFuture(null));
source.setId(testName + " source");
source.configure(SqsContainerOptions.builder().pollBackOffPolicy(policy).build());

source.setTaskExecutor(createTaskExecutor(testName));
source.setAcknowledgementProcessor(getAcknowledgementProcessor());
source.start();

doAwait(waitThirdPollLatch);

then(policy).should().start(null);
then(policy).should(times(2)).backOff(backOffContext);

}

private static boolean doAwait(CountDownLatch processingLatch) {
try {
return processingLatch.await(4, TimeUnit.SECONDS);
Expand Down

0 comments on commit 49d7ef4

Please sign in to comment.