From 6496484cefa1409aca45afbba4b0f421c7f8fa52 Mon Sep 17 00:00:00 2001 From: Matthias Maeller Date: Mon, 10 Jul 2023 09:52:12 +0200 Subject: [PATCH] Fixes gh-697 Enhance SqsAutoConfiguration to use an available ObjectMapper for SqsContainerOptions. --- .../sqs/SqsAutoConfiguration.java | 25 ++-- .../sqs/SqsAutoConfigurationTest.java | 115 +++++++++++++----- 2 files changed, 103 insertions(+), 37 deletions(-) diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java index 1f6496fb9..48024e8c1 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java @@ -30,6 +30,7 @@ import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor; import io.awspring.cloud.sqs.operations.SqsTemplate; import io.awspring.cloud.sqs.operations.SqsTemplateBuilder; +import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfigureAfter; @@ -52,10 +53,10 @@ * @since 3.0 */ @AutoConfiguration -@ConditionalOnClass({ SqsAsyncClient.class, SqsBootstrapConfiguration.class }) +@ConditionalOnClass({SqsAsyncClient.class, SqsBootstrapConfiguration.class}) @EnableConfigurationProperties(SqsProperties.class) @Import(SqsBootstrapConfiguration.class) -@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class }) +@AutoConfigureAfter({CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class}) @ConditionalOnProperty(name = "spring.cloud.aws.sqs.enabled", havingValue = "true", matchIfMissing = true) public class SqsAutoConfiguration { @@ -68,9 +69,9 @@ public SqsAutoConfiguration(SqsProperties sqsProperties) { @ConditionalOnMissingBean @Bean public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilderConfigurer, - ObjectProvider> configurer) { + ObjectProvider> configurer) { return awsClientBuilderConfigurer - .configure(SqsAsyncClient.builder(), this.sqsProperties, configurer.getIfAvailable()).build(); + .configure(SqsAsyncClient.builder(), this.sqsProperties, configurer.getIfAvailable()).build(); } @ConditionalOnMissingBean @@ -78,17 +79,18 @@ public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilder public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider objectMapperProvider) { SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient); objectMapperProvider - .ifAvailable(om -> builder.configureDefaultConverter(converter -> converter.setObjectMapper(om))); + .ifAvailable(om -> builder.configureDefaultConverter(converter -> converter.setObjectMapper(om))); return builder.build(); } @ConditionalOnMissingBean @Bean public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory( - ObjectProvider sqsAsyncClient, ObjectProvider> asyncErrorHandler, - ObjectProvider> errorHandler, - ObjectProvider> asyncInterceptors, - ObjectProvider> interceptors) { + ObjectProvider sqsAsyncClient, ObjectProvider> asyncErrorHandler, + ObjectProvider> errorHandler, + ObjectProvider> asyncInterceptors, + ObjectProvider> interceptors, + ObjectProvider objectMapperProvider) { SqsMessageListenerContainerFactory factory = new SqsMessageListenerContainerFactory<>(); factory.configure(this::configureContainerOptions); @@ -97,6 +99,11 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac errorHandler.ifAvailable(factory::setErrorHandler); interceptors.forEach(factory::addMessageInterceptor); asyncInterceptors.forEach(factory::addMessageInterceptor); + objectMapperProvider.ifAvailable(objectMapper -> { + var messageConverter = new SqsMessagingMessageConverter(); + messageConverter.setObjectMapper(objectMapper); + factory.configure(options -> options.messageConverter(messageConverter)); + }); return factory; } diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java index 2fc84e180..677aaee52 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.InstanceOfAssertFactories.type; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import io.awspring.cloud.autoconfigure.ConfiguredAwsClient; import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration; import io.awspring.cloud.autoconfigure.core.AwsClientCustomizer; @@ -34,6 +35,7 @@ import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.operations.SqsTemplate; +import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter; import java.net.URI; import java.time.Duration; import java.util.Map; @@ -44,6 +46,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.lang.Nullable; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.MappingJackson2MessageConverter; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkClientOption; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; @@ -61,21 +65,21 @@ class SqsAutoConfigurationTest { private static final String CUSTOM_OBJECT_MAPPER_BEAN_NAME = "customObjectMapper"; private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() - .withPropertyValues("spring.cloud.aws.region.static:eu-west-1") - .withConfiguration(AutoConfigurations.of(RegionProviderAutoConfiguration.class, - CredentialsProviderAutoConfiguration.class, SqsAutoConfiguration.class, - AwsAutoConfiguration.class)); + .withPropertyValues("spring.cloud.aws.region.static:eu-west-1") + .withConfiguration(AutoConfigurations.of(RegionProviderAutoConfiguration.class, + CredentialsProviderAutoConfiguration.class, SqsAutoConfiguration.class, + AwsAutoConfiguration.class)); @Test void sqsAutoConfigurationIsDisabled() { this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:false") - .run(context -> assertThat(context).doesNotHaveBean(SqsAsyncClient.class)); + .run(context -> assertThat(context).doesNotHaveBean(SqsAsyncClient.class)); } @Test void sqsAutoConfigurationIsDisabledWhenSqsModuleIsNotInClassPath() { this.contextRunner.withClassLoader(new FilteredClassLoader(SqsBootstrapConfiguration.class)) - .run(context -> assertThat(context).doesNotHaveBean(SqsAsyncClient.class)); + .run(context -> assertThat(context).doesNotHaveBean(SqsAsyncClient.class)); } @Test @@ -123,44 +127,99 @@ void customSqsClientConfigurer() { }); }); } + // @formatter:on @Test void configuresFactoryComponentsAndOptions() { this.contextRunner - .withPropertyValues("spring.cloud.aws.sqs.enabled:true", - "spring.cloud.aws.sqs.listener.max-concurrent-messages:19", - "spring.cloud.aws.sqs.listener.max-messages-per-poll:8", - "spring.cloud.aws.sqs.listener.poll-timeout:6s") - .withUserConfiguration(CustomComponentsConfiguration.class).run(context -> { - assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class); - SqsMessageListenerContainerFactory factory = context - .getBean(SqsMessageListenerContainerFactory.class); - assertThat(factory) - .hasFieldOrProperty("errorHandler") - .extracting("asyncMessageInterceptors").asList().isNotEmpty(); - assertThat(factory) - .extracting("containerOptionsBuilder") + .withPropertyValues("spring.cloud.aws.sqs.enabled:true", + "spring.cloud.aws.sqs.listener.max-concurrent-messages:19", + "spring.cloud.aws.sqs.listener.max-messages-per-poll:8", + "spring.cloud.aws.sqs.listener.poll-timeout:6s") + .withUserConfiguration(CustomComponentsConfiguration.class, ObjectMapperConfiguration.class) + .run(context -> { + assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class); + var factory = context.getBean(SqsMessageListenerContainerFactory.class); + assertThat(factory).hasFieldOrProperty("errorHandler").extracting("asyncMessageInterceptors") + .asList().isNotEmpty(); + assertThat(factory).extracting("containerOptionsBuilder") .asInstanceOf(type(ContainerOptionsBuilder.class)) .extracting(ContainerOptionsBuilder::build) .isInstanceOfSatisfying(ContainerOptions.class, options -> { assertThat(options.getMaxConcurrentMessages()).isEqualTo(19); assertThat(options.getMaxMessagesPerPoll()).isEqualTo(8); assertThat(options.getPollTimeout()).isEqualTo(Duration.ofSeconds(6)); + assertThat(options.getMessageConverter()).isInstanceOfSatisfying( + SqsMessagingMessageConverter.class, + messageConverter -> assertThat(messageConverter.getPayloadMessageConverter()) + .isInstanceOfSatisfying(CompositeMessageConverter.class, + compositeMessageConverter -> assertThat( + compositeMessageConverter.getConverters()).element(1) + .isInstanceOfSatisfying( + MappingJackson2MessageConverter.class, + jackson2MessageConverter -> assertThat( + jackson2MessageConverter + .getObjectMapper()) + .isInstanceOfSatisfying( + ObjectMapper.class, + objectMapper -> { + assertThat( + objectMapper) + .isEqualTo( + context.getBean( + ObjectMapper.class)); + assertThat( + objectMapper + .getRegisteredModuleIds()) + .contains( + "jackson-datatype-jsr310"); + })))); }); + }); + } + + @Test + void configuresFactoryComponentsAndOptionsWithDefaults() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true").run(context -> { + assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class); + var factory = context.getBean(SqsMessageListenerContainerFactory.class); + assertThat(factory).hasFieldOrProperty("errorHandler").extracting("asyncMessageInterceptors").asList() + .isEmpty(); + assertThat(factory).extracting("containerOptionsBuilder").asInstanceOf(type(ContainerOptionsBuilder.class)) + .extracting(ContainerOptionsBuilder::build) + .isInstanceOfSatisfying(ContainerOptions.class, options -> { + assertThat(options.getMaxConcurrentMessages()).isEqualTo(10); + assertThat(options.getMaxMessagesPerPoll()).isEqualTo(10); + assertThat(options.getPollTimeout()).isEqualTo(Duration.ofSeconds(10)); + assertThat(options.getMessageConverter()).isInstanceOfSatisfying( + SqsMessagingMessageConverter.class, + messageConverter -> assertThat(messageConverter.getPayloadMessageConverter()) + .isInstanceOfSatisfying(CompositeMessageConverter.class, + compositeMessageConverter -> assertThat(compositeMessageConverter + .getConverters()).element(1).isInstanceOfSatisfying( + MappingJackson2MessageConverter.class, + jackson2MessageConverter -> assertThat( + jackson2MessageConverter.getObjectMapper()) + .isInstanceOfSatisfying( + ObjectMapper.class, + objectMapper -> assertThat( + objectMapper + .getRegisteredModuleIds()) + .isEmpty())))); }); + }); } - // @formatter:on @Test void configuresObjectMapper() { this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true") - .withUserConfiguration(ObjectMapperConfiguration.class).run(context -> { - SqsListenerAnnotationBeanPostProcessor bpp = context - .getBean(SqsListenerAnnotationBeanPostProcessor.class); - ObjectMapper objectMapper = context.getBean(CUSTOM_OBJECT_MAPPER_BEAN_NAME, ObjectMapper.class); - assertThat(bpp).extracting("endpointRegistrar").asInstanceOf(type(EndpointRegistrar.class)) - .extracting(EndpointRegistrar::getObjectMapper).isEqualTo(objectMapper); - }); + .withUserConfiguration(ObjectMapperConfiguration.class).run(context -> { + SqsListenerAnnotationBeanPostProcessor bpp = context + .getBean(SqsListenerAnnotationBeanPostProcessor.class); + ObjectMapper objectMapper = context.getBean(CUSTOM_OBJECT_MAPPER_BEAN_NAME, ObjectMapper.class); + assertThat(bpp).extracting("endpointRegistrar").asInstanceOf(type(EndpointRegistrar.class)) + .extracting(EndpointRegistrar::getObjectMapper).isEqualTo(objectMapper); + }); } @Configuration(proxyBeanMethods = false) @@ -185,7 +244,7 @@ static class ObjectMapperConfiguration { @Bean(name = CUSTOM_OBJECT_MAPPER_BEAN_NAME) ObjectMapper objectMapper() { - return new ObjectMapper(); + return new ObjectMapper().registerModule(new JavaTimeModule()); } }