Skip to content

Commit

Permalink
Merge pull request #2244 from ozangunalp/message_composability
Browse files Browse the repository at this point in the history
Message metadata propagation with ack and nack
  • Loading branch information
ozangunalp authored Dec 4, 2023
2 parents 1518570 + 86664d4 commit 94a78a5
Show file tree
Hide file tree
Showing 76 changed files with 3,594 additions and 335 deletions.
25 changes: 25 additions & 0 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,31 @@
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>process-test-resources</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.eclipse.microprofile.reactive.messaging</groupId>
<artifactId>microprofile-reactive-messaging-api</artifactId>
</artifactItem>
</artifactItems>
<stripVersion>true</stripVersion>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>coverage</id>
Expand Down
116 changes: 114 additions & 2 deletions api/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,119 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"code": "java.annotation.removed",
"old": "method java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>> org.eclipse.microprofile.reactive.messaging.Message<T>::getNack()",
"new": "method java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>> org.eclipse.microprofile.reactive.messaging.Message<T>::getNack()",
"annotation": "@io.smallrye.common.annotation.Experimental(\"nack support is a SmallRye-only feature\")",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"

},
{
"code": "java.annotation.removed",
"old": "method java.util.concurrent.CompletionStage<java.lang.Void> org.eclipse.microprofile.reactive.messaging.Message<T>::nack(java.lang.Throwable)",
"new": "method java.util.concurrent.CompletionStage<java.lang.Void> org.eclipse.microprofile.reactive.messaging.Message<T>::nack(java.lang.Throwable)",
"annotation": "@io.smallrye.common.annotation.Experimental(\"nack support is a SmallRye-only feature\")",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"code": "java.annotation.attributeValueChanged",
"old": "method <T> org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::of(T, org.eclipse.microprofile.reactive.messaging.Metadata, java.util.function.Supplier<java.util.concurrent.CompletionStage<java.lang.Void>>, java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>)",
"new": "method <T> org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::of(T, org.eclipse.microprofile.reactive.messaging.Metadata, java.util.function.Supplier<java.util.concurrent.CompletionStage<java.lang.Void>>, java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>)",
"annotationType": "io.smallrye.common.annotation.Experimental",
"attribute": "value",
"oldValue": "\"nack support is a SmallRye-only feature\"",
"newValue": "\"metadata propagation is a SmallRye-specific feature\"",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"code": "java.annotation.attributeValueChanged",
"old": "method org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::withNack(java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>)",
"new": "method org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::withNack(java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>)",
"annotationType": "io.smallrye.common.annotation.Experimental",
"attribute": "value",
"oldValue": "\"nack support is a SmallRye-only feature\"",
"newValue": "\"metadata propagation is a SmallRye-specific feature\"",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.removed",
"old": "method java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>> org.eclipse.microprofile.reactive.messaging.Message<T>::getNack() @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"new": "method java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>> org.eclipse.microprofile.reactive.messaging.Message<T>::getNack() @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"annotation": "@io.smallrye.common.annotation.Experimental(\"nack support is a SmallRye-only feature\")",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.removed",
"old": "method java.util.concurrent.CompletionStage<java.lang.Void> org.eclipse.microprofile.reactive.messaging.Message<T>::nack(java.lang.Throwable) @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"new": "method java.util.concurrent.CompletionStage<java.lang.Void> org.eclipse.microprofile.reactive.messaging.Message<T>::nack(java.lang.Throwable) @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"annotation": "@io.smallrye.common.annotation.Experimental(\"nack support is a SmallRye-only feature\")",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.attributeValueChanged",
"old": "method <T> org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::of(T, org.eclipse.microprofile.reactive.messaging.Metadata, java.util.function.Supplier<java.util.concurrent.CompletionStage<java.lang.Void>>, java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"new": "method <T> org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::of(T, org.eclipse.microprofile.reactive.messaging.Metadata, java.util.function.Supplier<java.util.concurrent.CompletionStage<java.lang.Void>>, java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"annotationType": "io.smallrye.common.annotation.Experimental",
"attribute": "value",
"oldValue": "\"nack support is a SmallRye-only feature\"",
"newValue": "\"metadata propagation is a SmallRye-specific feature\"",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.attributeValueChanged",
"old": "method org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::withNack(java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"new": "method org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::withNack(java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"annotationType": "io.smallrye.common.annotation.Experimental",
"attribute": "value",
"oldValue": "\"nack support is a SmallRye-only feature\"",
"newValue": "\"metadata propagation is a SmallRye-specific feature\"",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.removed",
"old": "method java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>> org.eclipse.microprofile.reactive.messaging.Message<T>::getNack() @ io.smallrye.reactive.messaging.TargetedMessages",
"new": "method java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>> org.eclipse.microprofile.reactive.messaging.Message<T>::getNack() @ io.smallrye.reactive.messaging.TargetedMessages",
"annotation": "@io.smallrye.common.annotation.Experimental(\"nack support is a SmallRye-only feature\")",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.removed",
"old": "method java.util.concurrent.CompletionStage<java.lang.Void> org.eclipse.microprofile.reactive.messaging.Message<T>::nack(java.lang.Throwable) @ io.smallrye.reactive.messaging.TargetedMessages",
"new": "method java.util.concurrent.CompletionStage<java.lang.Void> org.eclipse.microprofile.reactive.messaging.Message<T>::nack(java.lang.Throwable) @ io.smallrye.reactive.messaging.TargetedMessages",
"annotation": "@io.smallrye.common.annotation.Experimental(\"nack support is a SmallRye-only feature\")",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.attributeValueChanged",
"old": "method <T> org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::of(T, org.eclipse.microprofile.reactive.messaging.Metadata, java.util.function.Supplier<java.util.concurrent.CompletionStage<java.lang.Void>>, java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages",
"new": "method <T> org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::of(T, org.eclipse.microprofile.reactive.messaging.Metadata, java.util.function.Supplier<java.util.concurrent.CompletionStage<java.lang.Void>>, java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages",
"annotationType": "io.smallrye.common.annotation.Experimental",
"attribute": "value",
"oldValue": "\"nack support is a SmallRye-only feature\"",
"newValue": "\"metadata propagation is a SmallRye-specific feature\"",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.attributeValueChanged",
"old": "method org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::withNack(java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages",
"new": "method org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::withNack(java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages",
"annotationType": "io.smallrye.common.annotation.Experimental",
"attribute": "value",
"oldValue": "\"nack support is a SmallRye-only feature\"",
"newValue": "\"metadata propagation is a SmallRye-specific feature\"",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand All @@ -46,4 +158,4 @@
"minCriticality" : "documented",
"output" : "out"
}
} ]
} ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.smallrye.reactive.messaging;

import jakarta.enterprise.inject.spi.Prioritized;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.common.annotation.Experimental;

/**
* Interceptor for incoming messages on connector channels.
* <p>
* To register an outgoing interceptor, expose a managed bean, implementing this interface,
* and qualified with {@code @Identifier} with the targeted channel name.
* <p>
* Only one interceptor is allowed to be bound for interception per incoming channel.
* When multiple interceptors are available, implementation should override the {@link #getPriority()} method.
*/
@Experimental("Smallrye-only feature")
public interface IncomingInterceptor extends Prioritized {

@Override
default int getPriority() {
return -1;
}

/**
* Called after message received
*
* @param message received message
* @return the message to dispatch for consumer methods, possibly mutated
*/
default Message<?> afterMessageReceive(Message<?> message) {
return message;
}

/**
* Called after message acknowledgment
*
* @param message acknowledged message
*/
void onMessageAck(Message<?> message);

/**
* Called after message negative-acknowledgement
*
* @param message message to negative-acknowledge
* @param failure failure
*/
void onMessageNack(Message<?> message, Throwable failure);
}
Loading

0 comments on commit 94a78a5

Please sign in to comment.