Skip to content

Commit

Permalink
Add a way to configure custom ChannelInterceptor for SNS integration (
Browse files Browse the repository at this point in the history
#1105)

Fixes #565
  • Loading branch information
sondemar authored Mar 29, 2024
1 parent daae273 commit e26752a
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.web.method.support.HandlerMethodArgumentResolver;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import software.amazon.awssdk.services.sns.SnsClient;
Expand All @@ -55,6 +56,7 @@
* @author Maciej Walkowiak
* @author Manuel Wessner
* @author Matej Nedic
* @author Mariusz Sondecki
*/
@AutoConfiguration
@ConditionalOnClass({ SnsClient.class, SnsTemplate.class })
Expand All @@ -75,12 +77,15 @@ public SnsClient snsClient(SnsProperties properties, AwsClientBuilderConfigurer
@ConditionalOnMissingBean(SnsOperations.class)
@Bean
public SnsTemplate snsTemplate(SnsClient snsClient, Optional<ObjectMapper> objectMapper,
Optional<TopicArnResolver> topicArnResolver) {
Optional<TopicArnResolver> topicArnResolver, ObjectProvider<ChannelInterceptor> interceptors) {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setSerializedPayloadClass(String.class);
objectMapper.ifPresent(converter::setObjectMapper);
return topicArnResolver.map(it -> new SnsTemplate(snsClient, it, converter))
SnsTemplate snsTemplate = topicArnResolver.map(it -> new SnsTemplate(snsClient, it, converter))
.orElseGet(() -> new SnsTemplate(snsClient, converter));
interceptors.forEach(snsTemplate::addChannelInterceptor);

return snsTemplate;
}

@ConditionalOnMissingBean(SnsSmsOperations.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import software.amazon.awssdk.arns.Arn;
Expand All @@ -53,6 +54,7 @@
* Tests for class {@link io.awspring.cloud.autoconfigure.sns.SnsAutoConfiguration}.
*
* @author Matej Nedic
* @author Mariusz Sondecki
*/
class SnsAutoConfigurationTest {

Expand Down Expand Up @@ -137,6 +139,12 @@ void bothTemplatesAndOperationsAreInjectable() {
});
}

@Test
void customChannelInterceptorCanBeConfigured() {
this.contextRunner.withUserConfiguration(CustomChannelInterceptorConfiguration.class)
.run(context -> assertThat(context).hasSingleBean(CustomChannelInterceptor.class));
}

@Configuration(proxyBeanMethods = false)
static class CustomTopicArnResolverConfiguration {

Expand Down Expand Up @@ -215,4 +223,15 @@ ApplicationRunner runner4(SnsSmsOperations snsSmsOperations) {
}
}

@Configuration(proxyBeanMethods = false)
static class CustomChannelInterceptorConfiguration {

@Bean
ChannelInterceptor customChannelInterceptor() {
return new CustomChannelInterceptor();
}
}

static class CustomChannelInterceptor implements ChannelInterceptor {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.messaging.core.AbstractMessageSendingTemplate;
import org.springframework.messaging.core.DestinationResolvingMessageSendingOperations;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.util.Assert;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.services.sns.SnsClient;
Expand All @@ -40,13 +41,15 @@
*
* @author Alain Sahli
* @author Matej Nedic
* @author Mariusz Sondecki
* @since 1.0
*/
public class SnsTemplate extends AbstractMessageSendingTemplate<TopicMessageChannel>
implements DestinationResolvingMessageSendingOperations<TopicMessageChannel>, SnsOperations {

private final SnsClient snsClient;
private final TopicArnResolver topicArnResolver;
private final List<ChannelInterceptor> channelInterceptors = new ArrayList<>();

public SnsTemplate(SnsClient snsClient) {
this(snsClient, null);
Expand Down Expand Up @@ -110,6 +113,7 @@ public <T> void convertAndSend(String destination, T payload, @Nullable Map<Stri
* Convenience method that sends a notification with the given {@literal message} and {@literal subject} to the
* {@literal destination}. The {@literal subject} is sent as header as defined in the
* <a href="https://docs.aws.amazon.com/sns/latest/dg/json-formats.html">SNS message JSON formats</a>.
*
* @param destinationName The logical name of the destination
* @param message The message to send
* @param subject The subject to send
Expand All @@ -123,6 +127,7 @@ public void sendNotification(String destinationName, Object message, @Nullable S
* {@literal destination}. The {@literal subject} is sent as header as defined in the
* <a href="https://docs.aws.amazon.com/sns/latest/dg/json-formats.html">SNS message JSON formats</a>. The
* configured default destination will be used.
*
* @param message The message to send
* @param subject The subject to send
*/
Expand All @@ -131,14 +136,27 @@ public void sendNotification(Object message, @Nullable String subject) {
Collections.singletonMap(NOTIFICATION_SUBJECT_HEADER, subject));
}

/**
* Add a {@link ChannelInterceptor} to be used by {@link TopicMessageChannel} created with this template.
* Interceptors will be applied just after TopicMessageChannel creation.
*
* @param channelInterceptor the message interceptor instance.
*/
public void addChannelInterceptor(ChannelInterceptor channelInterceptor) {
Assert.notNull(channelInterceptor, "channelInterceptor cannot be null");
this.channelInterceptors.add(channelInterceptor);
}

@Override
public void sendNotification(String topic, SnsNotification<?> notification) {
this.convertAndSend(topic, notification.getPayload(), notification.getHeaders());
}

private TopicMessageChannel resolveMessageChannelByTopicName(String topicName) {
Arn topicArn = this.topicArnResolver.resolveTopicArn(topicName);
return new TopicMessageChannel(this.snsClient, topicArn);
TopicMessageChannel topicMessageChannel = new TopicMessageChannel(this.snsClient, topicArn);
channelInterceptors.forEach(topicMessageChannel::addInterceptor);
return topicMessageChannel;
}

private static CompositeMessageConverter initMessageConverter(@Nullable MessageConverter messageConverter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@

import static io.awspring.cloud.sns.Matchers.requestMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.awspring.cloud.sns.Person;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.GenericMessage;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.CreateTopicRequest;
import software.amazon.awssdk.services.sns.model.CreateTopicResponse;
Expand All @@ -35,6 +41,7 @@
* Tests for {@link SnsTemplate}.
*
* @author Alain Sahli
* @author Mariusz Sondecki
*/
class SnsTemplateTest {
private static final String TOPIC_ARN = "arn:aws:sns:eu-west:123456789012:test";
Expand Down Expand Up @@ -137,4 +144,25 @@ void sendsSimpleSnsNotification() {
}));
}

@Test
void sendsMessageProcessedByInterceptor() {
// given
ChannelInterceptor interceptor = mock(ChannelInterceptor.class);
String originalMessage = "message content";
String processedMessage = originalMessage + " modified by interceptor";
snsTemplate.addChannelInterceptor(interceptor);
when(interceptor.preSend(any(Message.class), any(MessageChannel.class))).thenAnswer(invocation -> {
Object[] args = invocation.getArguments();
Message message = (Message) args[0];
return new GenericMessage<>(processedMessage, message.getHeaders());
});

// when
snsTemplate.sendNotification("topic name", originalMessage, "subject");

// then
verify(snsClient).publish(requestMatches(r -> assertThat(r.message()).isEqualTo(processedMessage)));
verify(interceptor).preSend(any(), any());
verify(interceptor).postSend(any(), any(), anyBoolean());
}
}

0 comments on commit e26752a

Please sign in to comment.