Description
Type: Bug
Component:
SQSDescribe the bug
When we have configured payloadTypeMapper and two SqsListners,
We send message with appropriate header to be able to consume in specific listener according to our type.
But InvocableHandlerMethod tried to adopt apply message to all listeners and fails to parse it, on those that can't.
As it applies to everyone, but it looks very strange, beacuse with have defined setPayloadTypeMapper.
SqsMessageDto1 to SqsMessageDto2 to different listner.
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'SqsMessageDto1': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (String)"SqsMessageDto1(sqsMessageDto1=.......line: 1, column: 25]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2961)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:2002)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:802)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4761)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4667)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629)
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:232)
... 27 common frames omitted
Sample
`@Configuration
public class SqsConfig {
@Bean
public SqsMessagingMessageConverter messageConverter(ObjectMapper objectMapper) {
SqsMessagingMessageConverter messageConverter = new SqsMessagingMessageConverter();
messageConverter.setPayloadTypeHeader("someCustomHeader");
messageConverter.setObjectMapper(objectMapper);
messageConverter.setPayloadTypeMapper(message -> {
String type = message.getHeaders().get("someCustomHeader", String.class);
return SomeCustomHeaderEnum.byType(type).getEventClass(); // it will return SqsMessageDto1 or SqsMessageDto2
});
return messageConverter;
}
@Bean
public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, SqsMessagingMessageConverter messageConverter) {
return SqsTemplate.builder()
.sqsAsyncClient(sqsAsyncClient)
.messageConverter(messageConverter)
.configure(options -> options.defaultQueue("queueName")
.queueNotFoundStrategy(QueueNotFoundStrategy.FAIL)
).build();
}
@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient,
SqsMessagingMessageConverter messageConverter) {
return SqsMessageListenerContainerFactory.builder()
.configure(options -> {
options.messageConverter(messageConverter)
.queueNotFoundStrategy(QueueNotFoundStrategy.FAIL);
}).sqsAsyncClient(sqsAsyncClient)
.build();
}
}`
`
@service
public class SqsListener1 {
@SqsListener("${sqs.queue}")
public void process(@Payload SqsMessageDto1 message) {
/// logic
}
@Service
public class SqsListener2 {
@SqsListener("${sqs.queue}")
public void process(@Payload SqsMessageDto2 message) {
/// logic
}
`
Activity
tomazfernandes commentedon Jan 27, 2025
Hey @gorbachevsasha, if I understand correctly you want to be able to have messages from the same queue be received in different methods based on the payload type, which in turn is decided by a message header.
This integration does not support that at the moment, but I've opened an enhancement request for it: #1331
Please let me know if you'd like to submit a PR!