Aggregator processor enables an application to aggregates incoming messages into groups and release them into an output destination.
java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc
Change kafka to rabbit if you want to run it against RabbitMQ.
If an input payload is a byte[]
and content-type header is a JSON, then JsonBytesToMap
function tries to deserialize this payload to a Map
for better data representation on the output of the aggregator function.
Also, such a Map
data representation makes it easy to access to the payload content from SpEL expressions mentioned below.
Otherwise(including a deserialization error), the input payload is left as is - and it is the target application configuration to convert it into a desired form.