diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index bc3f00fa7..caad9d46c 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -812,6 +812,11 @@ See AWS documentation for more information. After that period, the framework will try to perform a partial acquire with the available permits, resulting in a poll for less than `maxMessagesPerPoll` messages, unless otherwise configured. See <>. +|<> +|Any valid `BackOffPolicy` implementation +|`ExponentialBackOffPolicy` +|The back off policy to be applied when a polling thread throws an error. The default is an exponential policy with a delay of `1s`, a multiplier of `2.0`, and a maximum of `10s`. + |`autoStartup` |true, false |true @@ -1569,6 +1574,11 @@ Represents the maximum amount of time the container will wait for `maxMessagesPe This wait is applied per queue and one queue has no interference in another in this regard. Defaults to 10 seconds. +===== pollBackOffPolicy +Since 3.2 it's possible to specify a `BackOffPolicy` which will be applied when a polling thread throws an exception. +The default policy is an exponential back off with a delay of 1000ms, a 2.0 multiplier, and a 10000ms maximum delay. +Note that in highly concurrent environments with many polling threads it may happen that a successful poll cancels the next scheduled backoff before it happens, and as such no back offs need to be executed. + ==== Default Polling Behavior By default, the framework starts all queues in `low throughput mode`, where it will perform one poll for messages at a time. When a poll returns at least one message, the queue enters a `high throughput mode` where it will try to fulfill `maxConcurrentMessages` messages by making (maxConcurrentMessages / maxMessagesPerPoll) parallel polls to the queue. diff --git a/spring-cloud-aws-sqs/pom.xml b/spring-cloud-aws-sqs/pom.xml index a8e522394..21ff3d809 100644 --- a/spring-cloud-aws-sqs/pom.xml +++ b/spring-cloud-aws-sqs/pom.xml @@ -30,6 +30,10 @@ org.springframework spring-context + + org.springframework.retry + spring-retry + com.fasterxml.jackson.core jackson-databind diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java index a51007026..81f4eb3f2 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java @@ -22,6 +22,8 @@ import java.time.Duration; import org.springframework.core.task.TaskExecutor; import org.springframework.lang.Nullable; +import org.springframework.retry.backoff.BackOffPolicy; +import org.springframework.retry.backoff.BackOffPolicyBuilder; import org.springframework.util.Assert; /** @@ -41,6 +43,8 @@ public abstract class AbstractContainerOptions, private final Duration pollTimeout; + private final BackOffPolicy pollBackOffPolicy; + private final Duration maxDelayBetweenPolls; private final Duration listenerShutdownTimeout; @@ -75,6 +79,7 @@ protected AbstractContainerOptions(Builder builder) { this.maxMessagesPerPoll = builder.maxMessagesPerPoll; this.autoStartup = builder.autoStartup; this.pollTimeout = builder.pollTimeout; + this.pollBackOffPolicy = builder.pollBackOffPolicy; this.maxDelayBetweenPolls = builder.maxDelayBetweenPolls; this.listenerShutdownTimeout = builder.listenerShutdownTimeout; this.acknowledgementShutdownTimeout = builder.acknowledgementShutdownTimeout; @@ -112,6 +117,11 @@ public Duration getPollTimeout() { return this.pollTimeout; } + @Override + public BackOffPolicy getPollBackOffPolicy() { + return this.pollBackOffPolicy; + } + @Override public Duration getMaxDelayBetweenPolls() { return this.maxDelayBetweenPolls; @@ -188,6 +198,14 @@ protected abstract static class Builder, private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofSeconds(10); + private static final double DEFAULT_BACK_OFF_MULTIPLIER = 2.0; + + private static final int DEFAULT_BACK_OFF_DELAY = 1000; + + private static final int DEFAULT_BACK_OFF_MAX_DELAY = 10000; + + private static final BackOffPolicy DEFAULT_POLL_BACK_OFF_POLICY = buildDefaultBackOffPolicy(); + private static final Duration DEFAULT_SEMAPHORE_TIMEOUT = Duration.ofSeconds(10); private static final Duration DEFAULT_LISTENER_SHUTDOWN_TIMEOUT = Duration.ofSeconds(20); @@ -210,6 +228,8 @@ protected abstract static class Builder, private Duration pollTimeout = DEFAULT_POLL_TIMEOUT; + private BackOffPolicy pollBackOffPolicy = DEFAULT_POLL_BACK_OFF_POLICY; + private Duration maxDelayBetweenPolls = DEFAULT_SEMAPHORE_TIMEOUT; private BackPressureMode backPressureMode = DEFAULT_THROUGHPUT_CONFIGURATION; @@ -247,6 +267,7 @@ protected Builder(AbstractContainerOptions options) { this.maxMessagesPerPoll = options.maxMessagesPerPoll; this.autoStartup = options.autoStartup; this.pollTimeout = options.pollTimeout; + this.pollBackOffPolicy = options.pollBackOffPolicy; this.maxDelayBetweenPolls = options.maxDelayBetweenPolls; this.listenerShutdownTimeout = options.listenerShutdownTimeout; this.acknowledgementShutdownTimeout = options.acknowledgementShutdownTimeout; @@ -287,6 +308,13 @@ public B pollTimeout(Duration pollTimeout) { return self(); } + @Override + public B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) { + Assert.notNull(pollBackOffPolicy, "pollBackOffPolicy cannot be null"); + this.pollBackOffPolicy = pollBackOffPolicy; + return self(); + } + @Override public B maxDelayBetweenPolls(Duration maxDelayBetweenPolls) { Assert.notNull(maxDelayBetweenPolls, "semaphoreAcquireTimeout cannot be null"); @@ -377,6 +405,10 @@ private B self() { return (B) this; } + private static BackOffPolicy buildDefaultBackOffPolicy() { + return BackOffPolicyBuilder.newBuilder().multiplier(DEFAULT_BACK_OFF_MULTIPLIER) + .delay(DEFAULT_BACK_OFF_DELAY).maxDelay(DEFAULT_BACK_OFF_MAX_DELAY).build(); + } } } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java index a85509eaf..ad7313cf6 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java @@ -22,6 +22,7 @@ import java.util.Collection; import org.springframework.core.task.TaskExecutor; import org.springframework.lang.Nullable; +import org.springframework.retry.backoff.BackOffPolicy; /** * Contains the options to be used by the {@link MessageListenerContainer} at runtime. Note that after the object has @@ -73,6 +74,15 @@ public interface ContainerOptions, B extends Co */ Duration getPollTimeout(); + /** + * Return the {@link BackOffPolicy} to be applied when polling throws an exception. + * @return the timeout duration. + * @since 3.2 + */ + default BackOffPolicy getPollBackOffPolicy() { + throw new UnsupportedOperationException("Poll Back Off not supported by this ContainerOptions"); + } + /** * Return the {@link TaskExecutor} to be used by this container's components. It's shared by the * {@link io.awspring.cloud.sqs.listener.sink.MessageSink} and any blocking components the container might have. Due diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java index cd9cc832b..9d03b7964 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java @@ -20,6 +20,7 @@ import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter; import java.time.Duration; import org.springframework.core.task.TaskExecutor; +import org.springframework.retry.backoff.BackOffPolicy; /** * A builder for creating a {@link ContainerOptions} instance. @@ -74,6 +75,16 @@ public interface ContainerOptionsBuilder */ B pollTimeout(Duration pollTimeout); + /** + * Set the {@link BackOffPolicy} to use when polling throws an exception. + * @param pollBackOffPolicy the back off policy. + * @return this instance. + * @since 3.2 + */ + default B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) { + throw new UnsupportedOperationException("Poll back off not supported by this container options builder"); + } + /** * Set the {@link ListenerMode} mode for this container. Default is {@link ListenerMode#SINGLE_MESSAGE} * diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java index abb5b9bad..34a4a3d12 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java @@ -33,10 +33,13 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.task.TaskExecutor; import org.springframework.messaging.Message; +import org.springframework.retry.backoff.BackOffContext; +import org.springframework.retry.backoff.BackOffPolicy; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -65,6 +68,10 @@ public abstract class AbstractPollingMessageSource extends AbstractMessage private Duration shutdownTimeout; + private BackOffPolicy pollBackOffPolicy; + + private AtomicReference pollBackOffContext = new AtomicReference<>(); + private TaskExecutor taskExecutor; private BatchAwareBackPressureHandler backPressureHandler; @@ -87,6 +94,7 @@ public abstract class AbstractPollingMessageSource extends AbstractMessage @Override protected void configureMessageSource(ContainerOptions containerOptions) { this.shutdownTimeout = containerOptions.getListenerShutdownTimeout(); + this.pollBackOffPolicy = containerOptions.getPollBackOffPolicy(); doConfigure(containerOptions); } @@ -163,6 +171,7 @@ public void start() { Assert.notNull(this.messageSink, "messageSink not set"); Assert.notNull(this.backPressureHandler, "backPressureHandler not set"); Assert.notNull(this.acknowledgmentProcessor, "acknowledgmentProcessor not set"); + Assert.notNull(this.pollBackOffPolicy, "pollBackOffPolicy not set"); logger.debug("Starting {} for queue {}", getClass().getSimpleName(), this.pollingEndpointName); this.running = true; ConfigUtils.INSTANCE @@ -194,6 +203,7 @@ private void pollAndEmitMessages() { if (!isRunning()) { continue; } + handlePollBackOff(); logger.trace("Requesting permits for queue {}", this.pollingEndpointName); final int acquiredPermits = this.backPressureHandler.requestBatch(); if (acquiredPermits == 0) { @@ -209,6 +219,7 @@ private void pollAndEmitMessages() { } // @formatter:off managePollingFuture(doPollForMessages(acquiredPermits)) + .thenApply(this::resetBackOffContext) .exceptionally(this::handlePollingException) .thenApply(msgs -> releaseUnusedPermits(acquiredPermits, msgs)) .thenApply(this::convertMessages) @@ -228,6 +239,16 @@ private void pollAndEmitMessages() { logger.debug("Execution thread stopped for queue {}", this.pollingEndpointName); } + private void handlePollBackOff() { + BackOffContext backOffContext = this.pollBackOffContext.get(); + if (backOffContext == null) { + return; + } + logger.trace("Back off context found, backing off"); + this.pollBackOffPolicy.backOff(backOffContext); + logger.trace("Resuming from back off"); + } + protected abstract CompletableFuture> doPollForMessages(int messagesToRequest); public Collection releaseUnusedPermits(int permits, Collection msgs) { @@ -273,10 +294,30 @@ private Void handleSinkException(Throwable t) { } private Collection handlePollingException(Throwable t) { - logger.error("Error polling for messages in queue {}", this.pollingEndpointName, t); + logger.error("Error polling for messages in queue {}.", this.pollingEndpointName, t); + if (this.pollBackOffContext.get() == null) { + logger.trace("Setting back off policy in queue {}", this.pollingEndpointName); + this.pollBackOffContext.set(createBackOffContext()); + } return Collections.emptyList(); } + private BackOffContext createBackOffContext() { + BackOffContext context = this.pollBackOffPolicy.start(null); + return context != null ? context : new NoOpsBackOffContext(); + } + + private static class NoOpsBackOffContext implements BackOffContext { + } + + private Collection resetBackOffContext(Collection messages) { + if (this.pollBackOffContext.get() != null) { + logger.trace("Polling successful, resetting back off context."); + this.pollBackOffContext.set(null); + } + return messages; + } + private CompletableFuture managePollingFuture(CompletableFuture pollingFuture) { this.pollingFutures.add(pollingFuture); pollingFuture.thenRun(() -> this.pollingFutures.remove(pollingFuture)); diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/ContainerOptionsTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/ContainerOptionsTests.java index 9c2546610..b3985b01f 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/ContainerOptionsTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/ContainerOptionsTests.java @@ -28,6 +28,9 @@ import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.springframework.core.task.TaskExecutor; +import org.springframework.retry.backoff.BackOffPolicy; +import org.springframework.retry.backoff.BackOffPolicyBuilder; + import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; @@ -96,6 +99,13 @@ void shouldSetTaskExecutor() { assertThat(options.getComponentsTaskExecutor()).isEqualTo(executor); } + @Test + void shouldSetPollBackOffPolicyExecutor() { + BackOffPolicy pollBackOffPolicy = BackOffPolicyBuilder.newDefaultPolicy(); + SqsContainerOptions options = SqsContainerOptions.builder().pollBackOffPolicy(pollBackOffPolicy).build(); + assertThat(options.getPollBackOffPolicy()).isEqualTo(pollBackOffPolicy); + } + @Test void shouldSetQueueNotFoundStrategy() { SqsContainerOptions options = SqsContainerOptions.builder().queueNotFoundStrategy(QueueNotFoundStrategy.FAIL) @@ -118,6 +128,7 @@ private SqsContainerOptions createConfiguredOptions() { private SqsContainerOptionsBuilder createConfiguredBuilder() { return SqsContainerOptions.builder().acknowledgementShutdownTimeout(Duration.ofSeconds(7)) .messageVisibility(Duration.ofSeconds(11)) + .pollBackOffPolicy(BackOffPolicyBuilder.newBuilder().delay(1000).build()) .queueAttributeNames(Collections.singletonList(QueueAttributeName.QUEUE_ARN)) .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.MESSAGE_GROUP_ID)) .messageAttributeNames(Collections.singletonList("my-attribute")) diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java index 40cb111c2..e22f31660 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java @@ -17,6 +17,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.InstanceOfAssertFactories.type; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import io.awspring.cloud.sqs.MessageExecutionThreadFactory; import io.awspring.cloud.sqs.listener.BackPressureMode; @@ -43,6 +47,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.task.TaskExecutor; +import org.springframework.retry.backoff.BackOffContext; +import org.springframework.retry.backoff.BackOffPolicy; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.test.util.ReflectionTestUtils; import software.amazon.awssdk.services.sqs.model.Message; @@ -227,6 +233,60 @@ else if (hasAcquired9.compareAndSet(false, true)) { assertThat(hasThrownError.get()).isFalse(); } + @Test + void shouldBackOffIfPollingThrowsAnError() { + + var testName = "shouldBackOffIfPollingThrowsAnError"; + + var backPressureHandler = SemaphoreBackPressureHandler.builder().acquireTimeout(Duration.ofMillis(200)) + .batchSize(10).totalPermits(40).throughputConfiguration(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES) + .build(); + var currentPoll = new AtomicInteger(0); + var waitThirdPollLatch = new CountDownLatch(4); + + AbstractPollingMessageSource source = new AbstractPollingMessageSource<>() { + @Override + protected CompletableFuture> doPollForMessages(int messagesToRequest) { + waitThirdPollLatch.countDown(); + if (currentPoll.compareAndSet(0, 1)) { + logger.debug("First poll - returning empty list"); + return CompletableFuture.completedFuture(List.of()); + } + else if (currentPoll.compareAndSet(1, 2)) { + logger.debug("Second poll - returning error"); + return CompletableFuture.failedFuture(new RuntimeException("Expected exception on second poll")); + } + else if (currentPoll.compareAndSet(2, 3)) { + logger.debug("Third poll - returning error"); + return CompletableFuture.failedFuture(new RuntimeException("Expected exception on third poll")); + } + else { + logger.debug("Fourth poll - returning empty list"); + return CompletableFuture.completedFuture(List.of()); + } + } + }; + + var policy = mock(BackOffPolicy.class); + var backOffContext = mock(BackOffContext.class); + given(policy.start(null)).willReturn(backOffContext); + + source.setBackPressureHandler(backPressureHandler); + source.setMessageSink((msgs, context) -> CompletableFuture.completedFuture(null)); + source.setId(testName + " source"); + source.configure(SqsContainerOptions.builder().pollBackOffPolicy(policy).build()); + + source.setTaskExecutor(createTaskExecutor(testName)); + source.setAcknowledgementProcessor(getAcknowledgementProcessor()); + source.start(); + + doAwait(waitThirdPollLatch); + + then(policy).should().start(null); + then(policy).should(times(2)).backOff(backOffContext); + + } + private static boolean doAwait(CountDownLatch processingLatch) { try { return processingLatch.await(4, TimeUnit.SECONDS);