From 2acc9e867c4a2ae4973513c79f9748927f7fdf6c Mon Sep 17 00:00:00 2001 From: John Kelly Date: Thu, 5 Oct 2023 11:18:40 -0500 Subject: [PATCH] Fixes gh-697 Enhance SqsAutoConfiguration to use an available ObjectMapper for SqsContainerOptions. --- .../sqs/SqsAutoConfiguration.java | 9 +++- .../sqs/SqsAutoConfigurationTest.java | 48 +++++++++++++++++-- 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java index 1f6496fb9..bf66b2a76 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java @@ -30,6 +30,7 @@ import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor; import io.awspring.cloud.sqs.operations.SqsTemplate; import io.awspring.cloud.sqs.operations.SqsTemplateBuilder; +import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfigureAfter; @@ -88,7 +89,8 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac ObjectProvider sqsAsyncClient, ObjectProvider> asyncErrorHandler, ObjectProvider> errorHandler, ObjectProvider> asyncInterceptors, - ObjectProvider> interceptors) { + ObjectProvider> interceptors, + ObjectProvider objectMapperProvider) { SqsMessageListenerContainerFactory factory = new SqsMessageListenerContainerFactory<>(); factory.configure(this::configureContainerOptions); @@ -97,6 +99,11 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac errorHandler.ifAvailable(factory::setErrorHandler); interceptors.forEach(factory::addMessageInterceptor); asyncInterceptors.forEach(factory::addMessageInterceptor); + objectMapperProvider.ifAvailable(objectMapper -> { + var messageConverter = new SqsMessagingMessageConverter(); + messageConverter.setObjectMapper(objectMapper); + factory.configure(options -> options.messageConverter(messageConverter)); + }); return factory; } diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java index 2fc84e180..1de7ec63b 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.InstanceOfAssertFactories.type; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import io.awspring.cloud.autoconfigure.ConfiguredAwsClient; import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration; import io.awspring.cloud.autoconfigure.core.AwsClientCustomizer; @@ -34,8 +35,10 @@ import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.operations.SqsTemplate; +import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter; import java.net.URI; import java.time.Duration; +import java.util.List; import java.util.Map; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; @@ -44,6 +47,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.lang.Nullable; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.MappingJackson2MessageConverter; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkClientOption; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; @@ -131,7 +136,7 @@ void configuresFactoryComponentsAndOptions() { "spring.cloud.aws.sqs.listener.max-concurrent-messages:19", "spring.cloud.aws.sqs.listener.max-messages-per-poll:8", "spring.cloud.aws.sqs.listener.poll-timeout:6s") - .withUserConfiguration(CustomComponentsConfiguration.class).run(context -> { + .withUserConfiguration(CustomComponentsConfiguration.class, ObjectMapperConfiguration.class).run(context -> { assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class); SqsMessageListenerContainerFactory factory = context .getBean(SqsMessageListenerContainerFactory.class); @@ -146,9 +151,46 @@ void configuresFactoryComponentsAndOptions() { assertThat(options.getMaxConcurrentMessages()).isEqualTo(19); assertThat(options.getMaxMessagesPerPoll()).isEqualTo(8); assertThat(options.getPollTimeout()).isEqualTo(Duration.ofSeconds(6)); - }); + }) + .extracting("messageConverter") + .asInstanceOf(type(SqsMessagingMessageConverter.class)) + .extracting("payloadMessageConverter") + .asInstanceOf(type(CompositeMessageConverter.class)) + .extracting(CompositeMessageConverter::getConverters) + .isInstanceOfSatisfying(List.class, converters -> + assertThat(converters.get(1)).isInstanceOfSatisfying( + MappingJackson2MessageConverter.class, + jackson2MessageConverter -> + assertThat(jackson2MessageConverter.getObjectMapper().getRegisteredModuleIds()).contains("jackson-datatype-jsr310"))); }); } + + @Test + void configuresFactoryComponentsAndOptionsWithDefaults() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true").run(context -> { + assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class); + var factory = context.getBean(SqsMessageListenerContainerFactory.class); + assertThat(factory).hasFieldOrProperty("errorHandler").extracting("asyncMessageInterceptors").asList() + .isEmpty(); + assertThat(factory).extracting("containerOptionsBuilder").asInstanceOf(type(ContainerOptionsBuilder.class)) + .extracting(ContainerOptionsBuilder::build) + .isInstanceOfSatisfying(ContainerOptions.class, options -> { + assertThat(options.getMaxConcurrentMessages()).isEqualTo(10); + assertThat(options.getMaxMessagesPerPoll()).isEqualTo(10); + assertThat(options.getPollTimeout()).isEqualTo(Duration.ofSeconds(10)); + }) + .extracting("messageConverter") + .asInstanceOf(type(SqsMessagingMessageConverter.class)) + .extracting("payloadMessageConverter") + .asInstanceOf(type(CompositeMessageConverter.class)) + .extracting(CompositeMessageConverter::getConverters) + .isInstanceOfSatisfying(List.class, converters -> + assertThat(converters.get(1)).isInstanceOfSatisfying( + MappingJackson2MessageConverter.class, + jackson2MessageConverter -> + assertThat(jackson2MessageConverter.getObjectMapper().getRegisteredModuleIds()).isEmpty())); + }); + } // @formatter:on @Test @@ -185,7 +227,7 @@ static class ObjectMapperConfiguration { @Bean(name = CUSTOM_OBJECT_MAPPER_BEAN_NAME) ObjectMapper objectMapper() { - return new ObjectMapper(); + return new ObjectMapper().registerModule(new JavaTimeModule()); } }