From 05b1032b1cfd7c6c90370f75c9f6927ed1e9836c Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Wed, 20 Dec 2023 09:04:09 +0100 Subject: [PATCH] Add a test verifying the acknowledgement --- .../messaging/rabbitmq/AckChainTest.java | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/AckChainTest.java diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/AckChainTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/AckChainTest.java new file mode 100644 index 0000000000..2bc6307ac5 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/AckChainTest.java @@ -0,0 +1,73 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.*; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +/** + * Reproduce #1966 + */ +public class AckChainTest extends WeldTestBase { + + @Test + void test() { + addBeans(MyApp.class); + runApplication(new MapBasedConfig() + .with("mp.messaging.outgoing.outgoing-no-ack.connector", "smallrye-rabbitmq") + .with("mp.messaging.outgoing.outgoing-no-ack.exchange.name", "DemoNoAck") + .with("mp.messaging.outgoing.outgoing-no-ack.type", "topic") + .with("mp.messaging.outgoing.outgoing-no-ack.declare", "true") + .with("mp.messaging.incoming.incoming-no-ack.connector", "smallrye-rabbitmq") + .with("mp.messaging.incoming.incoming-no-ack.exchange.name", "DemoNoAck") + .with("mp.messaging.incoming.incoming-no-ack.queue.name", "queue.no.ack") + .with("mp.messaging.incoming.incoming-no-ack.queue.declare", "true") + .with("mp.messaging.incoming.incoming-no-ack.routing.keys", "no.ack")); + + usage.produce("DemoNoAck", "queue.no.ack", "no.ack", 1, () -> "payload"); + MyApp app = container.select(MyApp.class).get(); + + Awaitility.await().until(() -> app.acked()); + } + + @ApplicationScoped + public static class MyApp { + @Inject + @Channel("outgoing-no-ack") + Emitter emitter; + + AtomicBoolean acked = new AtomicBoolean(false); + + @Incoming("incoming-no-ack") + CompletableFuture consume(String msg) { + CompletableFuture future = new CompletableFuture<>(); + Metadata metadata = Metadata.of(OutgoingRabbitMQMetadata.builder() + .withRoutingKey("other.queue") + .withContentType("text/plain") + .build()); + Message output = Message.of("payload").withMetadata(metadata) + .withAck(() -> { + future.complete(null); + acked.set(true); + return CompletableFuture.completedFuture(null); + }) + .withNack(t -> { + future.completeExceptionally(t); + return CompletableFuture.completedFuture(null); + }); + emitter.send(output); + return future; + } + + public boolean acked() { + return acked.get(); + } + } +}