Skip to content

Commit 01f0a3c

Browse files
authored
Add SQSListener argument resolver to extract the subject of an SNS me… (#1318)
Introduce @SnsNotificationSubject in SQS
1 parent 3c26212 commit 01f0a3c

File tree

9 files changed

+280
-33
lines changed

9 files changed

+280
-33
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,16 @@ public void listen(@SnsNotificationMessage List<Pojo> pojos) {
639639
}
640640
----
641641

642+
Since 3.3.1 you can also retrieve the subject of the SNS message through the `@SnsNotificationSubject` annotation.
643+
644+
[source, java]
645+
----
646+
@SqsListener("my-queue")
647+
public void listen(@SnsNotificationSubject String subject, @SnsNotificationMessage Pojo pojo) {
648+
System.out.println("received message for subject %s: %s".formatted(subject, pojo));
649+
}
650+
----
651+
642652
===== Specifying a MessageListenerContainerFactory
643653
A `MessageListenerContainerFactory` can be specified through the `factory` property.
644654
Such factory will then be used to create the container for the annotated method.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2013-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.annotation;
17+
18+
import java.lang.annotation.ElementType;
19+
import java.lang.annotation.Retention;
20+
import java.lang.annotation.RetentionPolicy;
21+
import java.lang.annotation.Target;
22+
23+
/**
24+
* Annotation that is used to map SNS notification subject on an SQS Queue to a variable that is annotated. Used in
25+
* Controllers method for handling/receiving SQS notifications.
26+
*
27+
* @author Alexander Nebel
28+
* @since 3.3.1
29+
*/
30+
@Retention(RetentionPolicy.RUNTIME)
31+
@Target(ElementType.PARAMETER)
32+
public @interface SnsNotificationSubject {
33+
34+
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.awspring.cloud.sqs.listener.SqsHeaders;
2323
import io.awspring.cloud.sqs.support.resolver.BatchVisibilityHandlerMethodArgumentResolver;
2424
import io.awspring.cloud.sqs.support.resolver.NotificationMessageArgumentResolver;
25+
import io.awspring.cloud.sqs.support.resolver.NotificationSubjectArgumentResolver;
2526
import io.awspring.cloud.sqs.support.resolver.QueueAttributesMethodArgumentResolver;
2627
import io.awspring.cloud.sqs.support.resolver.SqsMessageMethodArgumentResolver;
2728
import io.awspring.cloud.sqs.support.resolver.VisibilityHandlerMethodArgumentResolver;
@@ -84,6 +85,7 @@ protected Collection<HandlerMethodArgumentResolver> createAdditionalArgumentReso
8485
List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>(createAdditionalArgumentResolvers());
8586
if (objectMapper != null) {
8687
argumentResolvers.add(new NotificationMessageArgumentResolver(messageConverter, objectMapper));
88+
argumentResolvers.add(new NotificationSubjectArgumentResolver(objectMapper));
8789
}
8890
return argumentResolvers;
8991
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.support.converter;
17+
18+
import com.fasterxml.jackson.databind.JsonNode;
19+
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import org.springframework.messaging.converter.MessageConversionException;
21+
22+
/**
23+
* @author Michael Sosa
24+
* @author Alexander Nebel
25+
* @since 3.3.1
26+
*/
27+
public class SnsJsonNode {
28+
private final String jsonString;
29+
private final JsonNode jsonNode;
30+
31+
public SnsJsonNode(ObjectMapper jsonMapper, String jsonString) {
32+
try {
33+
this.jsonString = jsonString;
34+
jsonNode = jsonMapper.readTree(jsonString);
35+
}
36+
catch (Exception e) {
37+
throw new MessageConversionException("Could not read JSON", e);
38+
}
39+
validate();
40+
}
41+
42+
void validate() throws MessageConversionException {
43+
if (!jsonNode.has("Type")) {
44+
throw new MessageConversionException("Payload: '" + jsonString + "' does not contain a Type attribute",
45+
null);
46+
}
47+
48+
if (!"Notification".equals(jsonNode.get("Type").asText())) {
49+
throw new MessageConversionException("Payload: '" + jsonString + "' is not a valid notification", null);
50+
}
51+
52+
if (!jsonNode.has("Message")) {
53+
throw new MessageConversionException("Payload: '" + jsonString + "' does not contain a message", null);
54+
}
55+
}
56+
57+
public String getMessageAsString() {
58+
return jsonNode.get("Message").asText();
59+
}
60+
61+
public String getSubjectAsString() {
62+
if (!jsonNode.has("Subject")) {
63+
throw new MessageConversionException("Payload: '" + jsonString + "' does not contain a subject", null);
64+
}
65+
return jsonNode.get("Subject").asText();
66+
}
67+
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SnsMessageConverter.java

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package io.awspring.cloud.sqs.support.converter;
1717

18-
import com.fasterxml.jackson.databind.JsonNode;
1918
import com.fasterxml.jackson.databind.ObjectMapper;
2019
import java.lang.reflect.ParameterizedType;
2120
import java.lang.reflect.Type;
@@ -26,7 +25,6 @@
2625
import org.springframework.lang.Nullable;
2726
import org.springframework.messaging.Message;
2827
import org.springframework.messaging.MessageHeaders;
29-
import org.springframework.messaging.converter.MessageConversionException;
3028
import org.springframework.messaging.converter.MessageConverter;
3129
import org.springframework.messaging.converter.SmartMessageConverter;
3230
import org.springframework.messaging.support.GenericMessage;
@@ -81,35 +79,13 @@ private Object fromGenericMessages(List<GenericMessage<?>> messages, Class<?> ta
8179

8280
private Object fromGenericMessage(GenericMessage<?> message, Class<?> targetClass,
8381
@Nullable Object conversionHint) {
84-
JsonNode jsonNode;
85-
try {
86-
jsonNode = this.jsonMapper.readTree(message.getPayload().toString());
87-
}
88-
catch (Exception e) {
89-
throw new MessageConversionException("Could not read JSON", e);
90-
}
91-
if (!jsonNode.has("Type")) {
92-
throw new MessageConversionException(
93-
"Payload: '" + message.getPayload() + "' does not contain a Type attribute", null);
94-
}
95-
96-
if (!"Notification".equals(jsonNode.get("Type").asText())) {
97-
throw new MessageConversionException("Payload: '" + message.getPayload() + "' is not a valid notification",
98-
null);
99-
}
100-
101-
if (!jsonNode.has("Message")) {
102-
throw new MessageConversionException("Payload: '" + message.getPayload() + "' does not contain a message",
103-
null);
104-
}
82+
var snsJsonNode = new SnsJsonNode(jsonMapper, message.getPayload().toString());
10583

106-
String messagePayload = jsonNode.get("Message").asText();
84+
String messagePayload = snsJsonNode.getMessageAsString();
10785
GenericMessage<String> genericMessage = new GenericMessage<>(messagePayload);
108-
Object convertedMessage = (payloadConverter instanceof SmartMessageConverter)
109-
? ((SmartMessageConverter) this.payloadConverter).fromMessage(genericMessage, targetClass,
110-
conversionHint)
111-
: this.payloadConverter.fromMessage(genericMessage, targetClass);
112-
return convertedMessage;
86+
return payloadConverter instanceof SmartMessageConverter smartMessageConverter
87+
? smartMessageConverter.fromMessage(genericMessage, targetClass, conversionHint)
88+
: payloadConverter.fromMessage(genericMessage, targetClass);
11389
}
11490

11591
@Override
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2013-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.support.converter;
17+
18+
import com.fasterxml.jackson.databind.ObjectMapper;
19+
import java.util.List;
20+
import org.springframework.messaging.Message;
21+
import org.springframework.messaging.MessageHeaders;
22+
import org.springframework.messaging.converter.MessageConversionException;
23+
import org.springframework.messaging.converter.MessageConverter;
24+
import org.springframework.util.Assert;
25+
import org.springframework.util.ClassUtils;
26+
27+
/**
28+
* @author Alexander Nebel
29+
* @since 3.3.1
30+
*/
31+
public class SnsSubjectConverter implements MessageConverter {
32+
33+
private final ObjectMapper objectMapper;
34+
35+
public SnsSubjectConverter(ObjectMapper objectMapper) {
36+
Assert.notNull(objectMapper, "jsonMapper must not be null");
37+
this.objectMapper = objectMapper;
38+
}
39+
40+
@Override
41+
public Object fromMessage(Message<?> message, Class<?> targetClass) {
42+
Assert.notNull(message, "message must not be null");
43+
Assert.notNull(targetClass, "target class must not be null");
44+
45+
Object payload = message.getPayload();
46+
47+
if (!ClassUtils.isAssignable(targetClass, String.class)) {
48+
throw new MessageConversionException("Subject can only be injected into String assignable Types", null);
49+
}
50+
if (payload instanceof List) {
51+
throw new MessageConversionException("Conversion of List is not supported", null);
52+
}
53+
54+
var snsJsonNode = new SnsJsonNode(objectMapper, message.getPayload().toString());
55+
return snsJsonNode.getSubjectAsString();
56+
}
57+
58+
@Override
59+
public Message<?> toMessage(Object payload, MessageHeaders headers) {
60+
throw new UnsupportedOperationException(
61+
"This converter only supports reading a SNS notification and not writing them");
62+
}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2013-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.support.resolver;
17+
18+
import com.fasterxml.jackson.databind.ObjectMapper;
19+
import io.awspring.cloud.sqs.annotation.SnsNotificationSubject;
20+
import io.awspring.cloud.sqs.support.converter.SnsSubjectConverter;
21+
import java.lang.reflect.Executable;
22+
import java.util.Optional;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
import org.springframework.core.MethodParameter;
26+
import org.springframework.messaging.Message;
27+
import org.springframework.messaging.converter.MessageConverter;
28+
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
29+
import org.springframework.util.ClassUtils;
30+
31+
/**
32+
* @author Alexander Nebel
33+
* @since 3.3.1
34+
*/
35+
public class NotificationSubjectArgumentResolver implements HandlerMethodArgumentResolver {
36+
37+
private static final Logger logger = LoggerFactory.getLogger(NotificationSubjectArgumentResolver.class);
38+
39+
private final MessageConverter converter;
40+
41+
public NotificationSubjectArgumentResolver(ObjectMapper jsonMapper) {
42+
this.converter = new SnsSubjectConverter(jsonMapper);
43+
}
44+
45+
@Override
46+
public boolean supportsParameter(MethodParameter parameter) {
47+
if (parameter.hasParameterAnnotation(SnsNotificationSubject.class)) {
48+
if (ClassUtils.isAssignable(parameter.getParameterType(), String.class)) {
49+
return true;
50+
}
51+
if (logger.isWarnEnabled()) {
52+
logger.warn(
53+
"Notification subject can only be injected into String assignable Types - No injection happening for {}#{}",
54+
parameter.getDeclaringClass().getName(), getMethodName(parameter));
55+
}
56+
}
57+
return false;
58+
}
59+
60+
@Override
61+
public Object resolveArgument(MethodParameter par, Message<?> msg) {
62+
return converter.fromMessage(msg, par.getParameterType());
63+
}
64+
65+
private String getMethodName(MethodParameter parameter) {
66+
var method = parameter.getMethod();
67+
var constructor = parameter.getConstructor();
68+
return Optional.ofNullable(method != null ? method : constructor).map(Executable::getName)
69+
.orElse("<Method name not resolvable>");
70+
}
71+
}

0 commit comments

Comments
 (0)