Skip to content

Commit

Permalink
Add MessageConverter to AutoConfiguration (#1194)
Browse files Browse the repository at this point in the history
* Add MessageConverter AutoConfiguration

* Polishing

Fixes #1145

---------

Co-authored-by: dongha kim <kimdongha15@naver.com>
  • Loading branch information
tomazfernandes and imsosleepy committed Aug 20, 2024
1 parent 79ef4e3 commit 8581ffa
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2022 the original author or authors.
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,14 +24,14 @@
import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration;
import io.awspring.cloud.sqs.config.SqsListenerConfigurer;
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor;
import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import io.awspring.cloud.sqs.operations.SqsTemplateBuilder;
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
Expand All @@ -46,13 +46,15 @@
import org.springframework.context.annotation.Import;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder;
import software.amazon.awssdk.services.sqs.model.Message;

/**
* {@link EnableAutoConfiguration Auto-configuration} for SQS integration.
*
* @author Tomaz Fernandes
* @author Maciej Walkowiak
* @author Wei Jiang
* @author Dongha Kim
* @since 3.0
*/
@AutoConfiguration
Expand Down Expand Up @@ -80,10 +82,9 @@ public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilder

@ConditionalOnMissingBean
@Bean
public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<ObjectMapper> objectMapperProvider) {
SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient);
objectMapperProvider
.ifAvailable(om -> builder.configureDefaultConverter(converter -> converter.setObjectMapper(om)));
public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<ObjectMapper> objectMapperProvider, MessagingMessageConverter<Message> messageConverter) {
SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient).messageConverter(messageConverter);
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messageConverter, om));
if (sqsProperties.getQueueNotFoundStrategy() != null) {
builder.configure((options) -> options.queueNotFoundStrategy(sqsProperties.getQueueNotFoundStrategy()));
}
Expand All @@ -97,27 +98,34 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
ObjectProvider<ErrorHandler<Object>> errorHandler,
ObjectProvider<AsyncMessageInterceptor<Object>> asyncInterceptors,
ObjectProvider<MessageInterceptor<Object>> interceptors,
ObjectProvider<ObjectMapper> objectMapperProvider) {
ObjectProvider<ObjectMapper> objectMapperProvider,
MessagingMessageConverter<?> messagingMessageConverter) {

SqsMessageListenerContainerFactory<Object> factory = new SqsMessageListenerContainerFactory<>();
factory.configure(this::configureContainerOptions);
factory.configure(this::configureProperties);
sqsAsyncClient.ifAvailable(factory::setSqsAsyncClient);
asyncErrorHandler.ifAvailable(factory::setErrorHandler);
errorHandler.ifAvailable(factory::setErrorHandler);
interceptors.forEach(factory::addMessageInterceptor);
asyncInterceptors.forEach(factory::addMessageInterceptor);
objectMapperProvider.ifAvailable(objectMapper -> setObjectMapper(factory, objectMapper));
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messagingMessageConverter, om));
factory.configure(options -> options.messageConverter(messagingMessageConverter));
return factory;
}

private void setObjectMapper(SqsMessageListenerContainerFactory<Object> factory, ObjectMapper objectMapper) {
// Object Mapper for early deserialization in MessageSource
var messageConverter = new SqsMessagingMessageConverter();
messageConverter.setObjectMapper(objectMapper);
factory.configure(options -> options.messageConverter(messageConverter));
private void setMapperToConverter(MessagingMessageConverter<?> messagingMessageConverter, ObjectMapper om) {
if (messagingMessageConverter instanceof SqsMessagingMessageConverter sqsConverter) {
sqsConverter.setObjectMapper(om);
}
}

@ConditionalOnMissingBean
@Bean
public MessagingMessageConverter<Message> messageConverter() {
return new SqsMessagingMessageConverter();
}

private void configureContainerOptions(SqsContainerOptionsBuilder options) {
private void configureProperties(SqsContainerOptionsBuilder options) {
PropertyMapper mapper = PropertyMapper.get().alwaysApplyingWhenNonNull();
mapper.from(this.sqsProperties.getQueueNotFoundStrategy()).to(options::queueNotFoundStrategy);
mapper.from(this.sqsProperties.getListener().getMaxConcurrentMessages()).to(options::maxConcurrentMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@
import io.awspring.cloud.sqs.config.EndpointRegistrar;
import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration;
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import io.awspring.cloud.sqs.listener.AbstractContainerOptions;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;
import io.awspring.cloud.sqs.listener.MessageListenerContainerRegistry;
import io.awspring.cloud.sqs.listener.SqsContainerOptions;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
import java.net.URI;
import java.time.Duration;
Expand All @@ -56,6 +60,7 @@
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder;
import software.amazon.awssdk.services.sqs.model.Message;

/**
* Tests for {@link SqsAutoConfiguration}.
Expand All @@ -66,6 +71,7 @@
class SqsAutoConfigurationTest {

private static final String CUSTOM_OBJECT_MAPPER_BEAN_NAME = "customObjectMapper";
private static final String CUSTOM_MESSAGE_CONVERTER_BEAN_NAME = "customMessageConverter";

private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withPropertyValues("spring.cloud.aws.region.static:eu-west-1")
Expand Down Expand Up @@ -219,6 +225,30 @@ void configuresObjectMapper() {
});
}

@Test
void configuresMessageConverter() {
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true")
.withUserConfiguration(ObjectMapperConfiguration.class, MessageConverterConfiguration.class)
.run(context -> {
SqsTemplate sqsTemplate = context.getBean("sqsTemplate", SqsTemplate.class);
SqsMessageListenerContainerFactory<?> factory = context.getBean("defaultSqsListenerContainerFactory", SqsMessageListenerContainerFactory.class);
ObjectMapper objectMapper = context.getBean(CUSTOM_OBJECT_MAPPER_BEAN_NAME, ObjectMapper.class);
SqsMessagingMessageConverter converter = context.getBean(CUSTOM_MESSAGE_CONVERTER_BEAN_NAME, SqsMessagingMessageConverter.class);
assertThat(converter.getPayloadMessageConverter())
.extracting("converters")
.asList()
.filteredOn(conv -> conv instanceof MappingJackson2MessageConverter)
.first()
.extracting("objectMapper")
.isEqualTo(objectMapper);
assertThat(sqsTemplate).extracting("messageConverter").isEqualTo(converter);
assertThat(factory)
.extracting("containerOptionsBuilder")
.extracting("messageConverter")
.isEqualTo(converter);
});
}

@Configuration(proxyBeanMethods = false)
static class CustomComponentsConfiguration {

Expand Down Expand Up @@ -246,6 +276,16 @@ ObjectMapper objectMapper() {

}

@Configuration(proxyBeanMethods = false)
static class MessageConverterConfiguration {

@Bean(name = CUSTOM_MESSAGE_CONVERTER_BEAN_NAME)
MessagingMessageConverter<Message> messageConverter() {
return new SqsMessagingMessageConverter();
}

}

@Configuration(proxyBeanMethods = false)
static class CustomAwsAsyncClientConfig {

Expand Down

0 comments on commit 8581ffa

Please sign in to comment.