From 42895cd57a13c6c73d6109d384b1b990629bd817 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Wed, 20 Dec 2023 08:19:21 +0100 Subject: [PATCH] Add a test verifying back pressure for RabbitMQ consumption --- .../rabbitmq/i18n/RabbitMQLogging.java | 4 +- .../rabbitmq/ConsumerBackPressure.java | 88 +++++++++++++++++++ 2 files changed, 90 insertions(+), 2 deletions(-) create mode 100644 smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ConsumerBackPressure.java diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQLogging.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQLogging.java index b93dadc7a3..e909b800b2 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQLogging.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/i18n/RabbitMQLogging.java @@ -102,11 +102,11 @@ public interface RabbitMQLogging extends BasicLogger { @Message(id = 17030, value = "Unable to establish dlx `%s`") void unableToEstablishDlx(String deadLetterExchangeName, @Cause Throwable ex); - @LogMessage(level = Logger.Level.INFO) + @LogMessage(level = Level.DEBUG) @Message(id = 17033, value = "A message sent to channel `%s` has been ack'd") void ackMessage(String channel); - @LogMessage(level = Logger.Level.INFO) + @LogMessage(level = Level.DEBUG) @Message(id = 17034, value = "A message sent to channel `%s` has not been explicitly ack'd as auto-ack is enabled") void ackAutoMessage(String channel); diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ConsumerBackPressure.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ConsumerBackPressure.java new file mode 100644 index 0000000000..d650979eb4 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/ConsumerBackPressure.java @@ -0,0 +1,88 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.test.AssertSubscriber; +import io.smallrye.reactive.messaging.MutinyEmitter; +import io.smallrye.reactive.messaging.providers.extension.HealthCenter; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public class ConsumerBackPressure extends WeldTestBase { + + private MapBasedConfig getBaseConfig() { + return new MapBasedConfig() + .with("mp.messaging.outgoing.to-rabbitmq.connector", RabbitMQConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.to-rabbitmq.exchange.name", exchange) + + .with("mp.messaging.incoming.from-rabbitmq.connector", RabbitMQConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.from-rabbitmq.queue.name", queue) + .with("mp.messaging.incoming.from-rabbitmq.queue.durable", true) + + .with("mp.messaging.incoming.from-rabbitmq.exchange.name", exchange); + } + + @Test + void testConsumerBackpressure() { + addBeans(Publisher.class, Subscriber.class); + runApplication(getBaseConfig()); + + Subscriber subscriber = container.getBeanManager().createInstance().select(Subscriber.class).get(); + Publisher publisher = container.getBeanManager().createInstance().select(Publisher.class).get(); + + AssertSubscriber consumer = AssertSubscriber.create(0); + subscriber.getMulti().subscribe().withSubscriber(consumer); + + HealthCenter health = container.getBeanManager().createInstance().select(HealthCenter.class).get(); + assertThat(health.getLiveness().isOk()).isTrue(); + + publisher.generate(); + + AtomicInteger i = new AtomicInteger(1); + while(consumer.getItems().size() < 10000) { + consumer.request(100); + await().until(() -> consumer.getItems().size() == 100 * i.get()); + i.getAndIncrement(); + } + await().until(() -> consumer.getItems().size() == 10000); + + } + + + @ApplicationScoped + public static class Publisher { + + @Inject @Channel("to-rabbitmq") + MutinyEmitter emitter; + + public void generate() { + for (int i = 0; i < 10000; i++) { + emitter.sendAndAwait(Integer.toString(i)); + } + } + + } + + @ApplicationScoped + public static class Subscriber { + + @Inject @Channel("from-rabbitmq") Multi multi; + + public Multi getMulti() { + return multi; + } + } + +}