From 8581ffa46b07cd66e4bc0c3679b48b3b75555f9c Mon Sep 17 00:00:00 2001 From: Tomaz Fernandes <76525045+tomazfernandes@users.noreply.github.com> Date: Mon, 19 Aug 2024 22:45:51 -0300 Subject: [PATCH] Add MessageConverter to AutoConfiguration (#1194) * Add MessageConverter AutoConfiguration * Polishing Fixes #1145 --------- Co-authored-by: dongha kim --- .../sqs/SqsAutoConfiguration.java | 38 +++++++++++------- .../sqs/SqsAutoConfigurationTest.java | 40 +++++++++++++++++++ 2 files changed, 63 insertions(+), 15 deletions(-) diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java index a8c25161e..8cefc01a8 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2022 the original author or authors. + * Copyright 2013-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,6 @@ import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration; import io.awspring.cloud.sqs.config.SqsListenerConfigurer; import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; -import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder; import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; @@ -32,6 +31,7 @@ import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder; import io.awspring.cloud.sqs.operations.SqsTemplate; import io.awspring.cloud.sqs.operations.SqsTemplateBuilder; +import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter; import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; @@ -46,6 +46,7 @@ import org.springframework.context.annotation.Import; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder; +import software.amazon.awssdk.services.sqs.model.Message; /** * {@link EnableAutoConfiguration Auto-configuration} for SQS integration. @@ -53,6 +54,7 @@ * @author Tomaz Fernandes * @author Maciej Walkowiak * @author Wei Jiang + * @author Dongha Kim * @since 3.0 */ @AutoConfiguration @@ -80,10 +82,9 @@ public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilder @ConditionalOnMissingBean @Bean - public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider objectMapperProvider) { - SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient); - objectMapperProvider - .ifAvailable(om -> builder.configureDefaultConverter(converter -> converter.setObjectMapper(om))); + public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider objectMapperProvider, MessagingMessageConverter messageConverter) { + SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient).messageConverter(messageConverter); + objectMapperProvider.ifAvailable(om -> setMapperToConverter(messageConverter, om)); if (sqsProperties.getQueueNotFoundStrategy() != null) { builder.configure((options) -> options.queueNotFoundStrategy(sqsProperties.getQueueNotFoundStrategy())); } @@ -97,27 +98,34 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac ObjectProvider> errorHandler, ObjectProvider> asyncInterceptors, ObjectProvider> interceptors, - ObjectProvider objectMapperProvider) { + ObjectProvider objectMapperProvider, + MessagingMessageConverter messagingMessageConverter) { SqsMessageListenerContainerFactory factory = new SqsMessageListenerContainerFactory<>(); - factory.configure(this::configureContainerOptions); + factory.configure(this::configureProperties); sqsAsyncClient.ifAvailable(factory::setSqsAsyncClient); asyncErrorHandler.ifAvailable(factory::setErrorHandler); errorHandler.ifAvailable(factory::setErrorHandler); interceptors.forEach(factory::addMessageInterceptor); asyncInterceptors.forEach(factory::addMessageInterceptor); - objectMapperProvider.ifAvailable(objectMapper -> setObjectMapper(factory, objectMapper)); + objectMapperProvider.ifAvailable(om -> setMapperToConverter(messagingMessageConverter, om)); + factory.configure(options -> options.messageConverter(messagingMessageConverter)); return factory; } - private void setObjectMapper(SqsMessageListenerContainerFactory factory, ObjectMapper objectMapper) { - // Object Mapper for early deserialization in MessageSource - var messageConverter = new SqsMessagingMessageConverter(); - messageConverter.setObjectMapper(objectMapper); - factory.configure(options -> options.messageConverter(messageConverter)); + private void setMapperToConverter(MessagingMessageConverter messagingMessageConverter, ObjectMapper om) { + if (messagingMessageConverter instanceof SqsMessagingMessageConverter sqsConverter) { + sqsConverter.setObjectMapper(om); + } + } + + @ConditionalOnMissingBean + @Bean + public MessagingMessageConverter messageConverter() { + return new SqsMessagingMessageConverter(); } - private void configureContainerOptions(SqsContainerOptionsBuilder options) { + private void configureProperties(SqsContainerOptionsBuilder options) { PropertyMapper mapper = PropertyMapper.get().alwaysApplyingWhenNonNull(); mapper.from(this.sqsProperties.getQueueNotFoundStrategy()).to(options::queueNotFoundStrategy); mapper.from(this.sqsProperties.getListener().getMaxConcurrentMessages()).to(options::maxConcurrentMessages); diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java index edb40fcef..41ea6dc98 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java @@ -30,12 +30,16 @@ import io.awspring.cloud.sqs.config.EndpointRegistrar; import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration; import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; +import io.awspring.cloud.sqs.listener.AbstractContainerOptions; import io.awspring.cloud.sqs.listener.ContainerOptions; import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder; import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy; +import io.awspring.cloud.sqs.listener.MessageListenerContainerRegistry; +import io.awspring.cloud.sqs.listener.SqsContainerOptions; import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.operations.SqsTemplate; +import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter; import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter; import java.net.URI; import java.time.Duration; @@ -56,6 +60,7 @@ import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder; +import software.amazon.awssdk.services.sqs.model.Message; /** * Tests for {@link SqsAutoConfiguration}. @@ -66,6 +71,7 @@ class SqsAutoConfigurationTest { private static final String CUSTOM_OBJECT_MAPPER_BEAN_NAME = "customObjectMapper"; + private static final String CUSTOM_MESSAGE_CONVERTER_BEAN_NAME = "customMessageConverter"; private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() .withPropertyValues("spring.cloud.aws.region.static:eu-west-1") @@ -219,6 +225,30 @@ void configuresObjectMapper() { }); } + @Test + void configuresMessageConverter() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true") + .withUserConfiguration(ObjectMapperConfiguration.class, MessageConverterConfiguration.class) + .run(context -> { + SqsTemplate sqsTemplate = context.getBean("sqsTemplate", SqsTemplate.class); + SqsMessageListenerContainerFactory factory = context.getBean("defaultSqsListenerContainerFactory", SqsMessageListenerContainerFactory.class); + ObjectMapper objectMapper = context.getBean(CUSTOM_OBJECT_MAPPER_BEAN_NAME, ObjectMapper.class); + SqsMessagingMessageConverter converter = context.getBean(CUSTOM_MESSAGE_CONVERTER_BEAN_NAME, SqsMessagingMessageConverter.class); + assertThat(converter.getPayloadMessageConverter()) + .extracting("converters") + .asList() + .filteredOn(conv -> conv instanceof MappingJackson2MessageConverter) + .first() + .extracting("objectMapper") + .isEqualTo(objectMapper); + assertThat(sqsTemplate).extracting("messageConverter").isEqualTo(converter); + assertThat(factory) + .extracting("containerOptionsBuilder") + .extracting("messageConverter") + .isEqualTo(converter); + }); + } + @Configuration(proxyBeanMethods = false) static class CustomComponentsConfiguration { @@ -246,6 +276,16 @@ ObjectMapper objectMapper() { } + @Configuration(proxyBeanMethods = false) + static class MessageConverterConfiguration { + + @Bean(name = CUSTOM_MESSAGE_CONVERTER_BEAN_NAME) + MessagingMessageConverter messageConverter() { + return new SqsMessagingMessageConverter(); + } + + } + @Configuration(proxyBeanMethods = false) static class CustomAwsAsyncClientConfig {