Skip to content

Commit

Permalink
Add a test verifying back pressure for RabbitMQ consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier committed Dec 20, 2023
1 parent a303391 commit 4891f83
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ public interface RabbitMQLogging extends BasicLogger {
@Message(id = 17030, value = "Unable to establish dlx `%s`")
void unableToEstablishDlx(String deadLetterExchangeName, @Cause Throwable ex);

@LogMessage(level = Logger.Level.INFO)
@LogMessage(level = Level.DEBUG)
@Message(id = 17033, value = "A message sent to channel `%s` has been ack'd")
void ackMessage(String channel);

@LogMessage(level = Logger.Level.INFO)
@LogMessage(level = Level.DEBUG)
@Message(id = 17034, value = "A message sent to channel `%s` has not been explicitly ack'd as auto-ack is enabled")
void ackAutoMessage(String channel);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package io.smallrye.reactive.messaging.rabbitmq;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.providers.extension.HealthCenter;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

public class ConsumerBackPressure extends WeldTestBase {

private MapBasedConfig getBaseConfig() {
return new MapBasedConfig()
.with("mp.messaging.outgoing.to-rabbitmq.connector", RabbitMQConnector.CONNECTOR_NAME)
.with("mp.messaging.outgoing.to-rabbitmq.exchange.name", exchange)

.with("mp.messaging.incoming.from-rabbitmq.connector", RabbitMQConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.from-rabbitmq.queue.name", queue)
.with("mp.messaging.incoming.from-rabbitmq.queue.durable", true)

.with("mp.messaging.incoming.from-rabbitmq.exchange.name", exchange);
}

@Test
void testConsumerBackpressure() {
addBeans(Publisher.class, Subscriber.class);
runApplication(getBaseConfig());

Subscriber subscriber = container.getBeanManager().createInstance().select(Subscriber.class).get();
Publisher publisher = container.getBeanManager().createInstance().select(Publisher.class).get();

AssertSubscriber<Object> consumer = AssertSubscriber.create(0);
subscriber.getMulti().subscribe().withSubscriber(consumer);

HealthCenter health = container.getBeanManager().createInstance().select(HealthCenter.class).get();
assertThat(health.getLiveness().isOk()).isTrue();

publisher.generate();

AtomicInteger i = new AtomicInteger(1);
while(consumer.getItems().size() < 10000) {
consumer.request(100);
await().until(() -> consumer.getItems().size() == 100 * i.get());
i.getAndIncrement();
}
await().until(() -> consumer.getItems().size() == 10000);

}


@ApplicationScoped
public static class Publisher {

@Inject @Channel("to-rabbitmq")
MutinyEmitter<String> emitter;

public void generate() {
for (int i = 0; i < 10000; i++) {
emitter.sendAndAwait(Integer.toString(i));
}
}

}

@ApplicationScoped
public static class Subscriber {

@Inject @Channel("from-rabbitmq") Multi<String> multi;

public Multi<String> getMulti() {
return multi;
}
}

}

0 comments on commit 4891f83

Please sign in to comment.