diff --git a/smallrye-reactive-messaging-rabbitmq/pom.xml b/smallrye-reactive-messaging-rabbitmq/pom.xml index 776bb9db98..6c767b005c 100644 --- a/smallrye-reactive-messaging-rabbitmq/pom.xml +++ b/smallrye-reactive-messaging-rabbitmq/pom.xml @@ -63,7 +63,7 @@ org.slf4j - slf4j-simple + slf4j-reload4j test @@ -77,6 +77,11 @@ ${project.version} test + + org.testcontainers + toxiproxy + test + diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java index 020a334754..5c9ef286a2 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java @@ -32,32 +32,28 @@ public ClientHolder(RabbitMQClient client, Context root) { this.client = client; this.vertx = vertx; - this.connection = Uni.createFrom().voidItem() - .onItem().transformToUni(unused -> { - log.establishingConnection(configuration.getChannel()); - return client.start() - .onSubscription().invoke(() -> { - connected.set(true); - log.connectionEstablished(configuration.getChannel()); - }) - .onItem().transform(ignored -> { - connectionHolder - .set(new CurrentConnection(client, root == null ? Vertx.currentContext() : root)); - - // handle the case we are already disconnected. - if (!client.isConnected() || connectionHolder.get() == null) { - // Throwing the exception would trigger a retry. - connectionHolder.set(null); - throw ex.illegalStateConnectionDisconnected(); - } - return client; - }) - .onFailure().invoke(log::unableToConnectToBroker) - .onFailure().invoke(t -> { - connectionHolder.set(null); - log.unableToRecoverFromConnectionDisruption(t); - }); + this.connection = Uni.createFrom().deferred(() -> client.start() + .onSubscription().invoke(() -> { + connected.set(true); + log.connectionEstablished(configuration.getChannel()); }) + .onItem().transform(ignored -> { + connectionHolder + .set(new CurrentConnection(client, root == null ? Vertx.currentContext() : root)); + + // handle the case we are already disconnected. + if (!client.isConnected() || connectionHolder.get() == null) { + // Throwing the exception would trigger a retry. + connectionHolder.set(null); + throw ex.illegalStateConnectionDisconnected(); + } + return client; + }) + .onFailure().invoke(log::unableToConnectToBroker) + .onFailure().invoke(t -> { + connectionHolder.set(null); + log.unableToRecoverFromConnectionDisruption(t); + })) .memoize().until(() -> { CurrentConnection connection = connectionHolder.get(); if (connection == null) { diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java index 64ef9390a4..c33539fca6 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java @@ -2,9 +2,13 @@ import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex; import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log; +import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.declareExchangeIfNeeded; +import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.getExchangeName; +import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.parseArguments; import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.serverQueueName; -import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQConsumerHelper.*; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -294,4 +298,109 @@ public void terminate() { } } + /** + * Establish a DLQ, possibly establishing a DLX too + * + * @param client the {@link RabbitMQClient} + * @param ic the {@link RabbitMQConnectorIncomingConfiguration} + * @return a {@link Uni} containing the DLQ name + */ + static Uni configureDLQorDLX(final RabbitMQClient client, final RabbitMQConnectorIncomingConfiguration ic, + final Instance> configMaps) { + final String deadLetterQueueName = ic.getDeadLetterQueueName().orElse(String.format("%s.dlq", getQueueName(ic))); + final String deadLetterExchangeName = ic.getDeadLetterExchange(); + final String deadLetterRoutingKey = ic.getDeadLetterRoutingKey().orElse(getQueueName(ic)); + + final JsonObject exchangeArgs = new JsonObject(); + ic.getDeadLetterExchangeArguments().ifPresent(argsId -> { + Instance> exchangeArguments = CDIUtils.getInstanceById(configMaps, argsId); + if (exchangeArguments.isResolvable()) { + Map argsMap = exchangeArguments.get(); + argsMap.forEach(exchangeArgs::put); + } + }); + // Declare the exchange if we have been asked to do so + final Uni dlxFlow = Uni.createFrom() + .item(() -> ic.getAutoBindDlq() && ic.getDlxDeclare() ? null : deadLetterExchangeName) + .onItem().ifNull().switchTo(() -> client.exchangeDeclare(deadLetterExchangeName, ic.getDeadLetterExchangeType(), + true, false, exchangeArgs) + .onItem().invoke(() -> log.dlxEstablished(deadLetterExchangeName)) + .onFailure().invoke(ex -> log.unableToEstablishDlx(deadLetterExchangeName, ex)) + .onItem().transform(v -> deadLetterExchangeName)); + + // Declare the queue (and its binding to the exchange or DLQ type/mode) if we have been asked to do so + final JsonObject queueArgs = new JsonObject(); + ic.getDeadLetterQueueArguments().ifPresent(argsId -> { + Instance> queueArguments = CDIUtils.getInstanceById(configMaps, argsId); + if (queueArguments.isResolvable()) { + Map argsMap = queueArguments.get(); + argsMap.forEach(queueArgs::put); + } + }); + // x-dead-letter-exchange + ic.getDeadLetterDlx().ifPresent(deadLetterDlx -> queueArgs.put("x-dead-letter-exchange", deadLetterDlx)); + // x-dead-letter-routing-key + ic.getDeadLetterDlxRoutingKey().ifPresent(deadLetterDlx -> queueArgs.put("x-dead-letter-routing-key", deadLetterDlx)); + // x-queue-type + ic.getDeadLetterQueueType().ifPresent(queueType -> queueArgs.put("x-queue-type", queueType)); + // x-queue-mode + ic.getDeadLetterQueueMode().ifPresent(queueMode -> queueArgs.put("x-queue-mode", queueMode)); + // x-message-ttl + ic.getDeadLetterTtl().ifPresent(queueTtl -> { + if (queueTtl >= 0) { + queueArgs.put("x-message-ttl", queueTtl); + } else { + throw ex.illegalArgumentInvalidQueueTtl(); + } + }); + return dlxFlow + .onItem().transform(v -> Boolean.TRUE.equals(ic.getAutoBindDlq()) ? null : deadLetterQueueName) + .onItem().ifNull().switchTo( + () -> client + .queueDeclare(deadLetterQueueName, true, false, false, queueArgs) + .onItem().invoke(() -> log.queueEstablished(deadLetterQueueName)) + .onFailure().invoke(ex -> log.unableToEstablishQueue(deadLetterQueueName, ex)) + .onItem() + .call(v -> client.queueBind(deadLetterQueueName, deadLetterExchangeName, deadLetterRoutingKey)) + .onItem() + .invoke(() -> log.deadLetterBindingEstablished(deadLetterQueueName, deadLetterExchangeName, + deadLetterRoutingKey)) + .onFailure() + .invoke(ex -> log.unableToEstablishBinding(deadLetterQueueName, deadLetterExchangeName, ex)) + .onItem().transform(v -> deadLetterQueueName)); + } + + /** + * Returns a stream that will create bindings from the queue to the exchange with each of the + * supplied routing keys. + * + * @param client the {@link RabbitMQClient} to use + * @param ic the incoming channel configuration + * @return a Uni with the list of routing keys + */ + static Uni> establishBindings( + final RabbitMQClient client, + final RabbitMQConnectorIncomingConfiguration ic) { + final String exchangeName = getExchangeName(ic); + final String queueName = getQueueName(ic); + final List routingKeys = Arrays.stream(ic.getRoutingKeys().split(",")) + .map(String::trim).collect(Collectors.toList()); + final Map arguments = parseArguments(ic.getArguments()); + + // Skip queue bindings if exchange name is default ("") + if (exchangeName.isEmpty()) { + return Uni.createFrom().item(Collections.emptyList()); + } + + return Multi.createFrom().iterable(routingKeys) + .call(routingKey -> client.queueBind(serverQueueName(queueName), exchangeName, routingKey, arguments)) + .invoke(routingKey -> log.bindingEstablished(queueName, exchangeName, routingKey, arguments.toString())) + .onFailure().invoke(ex -> log.unableToEstablishBinding(queueName, exchangeName, ex)) + .collect().asList(); + } + + static String getQueueName(final RabbitMQConnectorIncomingConfiguration config) { + return config.getQueueName().orElse(config.getChannel()); + } + } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java index a1fdb4b68c..cc724b7440 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java @@ -1,6 +1,7 @@ package io.smallrye.reactive.messaging.rabbitmq.internals; import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log; +import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.declareExchangeIfNeeded; import static java.time.Duration.ofSeconds; import java.util.Map; @@ -39,7 +40,7 @@ public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOut final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, oc, clientOptions, credentialsProviders); client.getDelegate().addConnectionEstablishedCallback(promise -> { // Ensure we create the exchange to which messages are to be sent - RabbitMQConsumerHelper.declareExchangeIfNeeded(client, oc, configMaps) + declareExchangeIfNeeded(client, oc, configMaps) .subscribe().with((ignored) -> promise.complete(), promise::fail); }); diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java index d3f18775ba..da7021e6a3 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java @@ -18,10 +18,13 @@ import com.rabbitmq.client.impl.DefaultCredentialsRefreshService; import io.smallrye.common.annotation.Identifier; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.providers.helpers.CDIUtils; import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging; import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector; import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorCommonConfiguration; import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration; +import io.vertx.core.json.JsonObject; import io.vertx.core.net.JksOptions; import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.rabbitmq.RabbitMQClient; @@ -148,6 +151,10 @@ static RabbitMQOptions getClientOptions(Vertx vertx, RabbitMQConnectorCommonConf return options; } + public static String getExchangeName(final RabbitMQConnectorCommonConfiguration config) { + return config.getExchangeName().map(s -> "\"\"".equals(s) ? "" : s).orElse(config.getChannel()); + } + public static String serverQueueName(String name) { if (name.equals("(server.auto)")) { return ""; @@ -170,4 +177,39 @@ public static Map parseArguments( } return argumentsBinding; } + + /** + * Uses a {@link RabbitMQClient} to ensure the required exchange is created. + * + * @param client the RabbitMQ client + * @param config the channel configuration + * @return a {@link Uni } which yields the exchange name + */ + public static Uni declareExchangeIfNeeded( + final RabbitMQClient client, + final RabbitMQConnectorCommonConfiguration config, + final Instance> configMaps) { + final String exchangeName = getExchangeName(config); + + JsonObject queueArgs = new JsonObject(); + Instance> queueArguments = CDIUtils.getInstanceById(configMaps, config.getExchangeArguments()); + if (queueArguments.isResolvable()) { + Map argsMap = queueArguments.get(); + argsMap.forEach(queueArgs::put); + } + + // Declare the exchange if we have been asked to do so and only when exchange name is not default ("") + boolean declareExchange = config.getExchangeDeclare() && !exchangeName.isEmpty(); + if (declareExchange) { + return client + .exchangeDeclare(exchangeName, config.getExchangeType(), + config.getExchangeDurable(), config.getExchangeAutoDelete(), queueArgs) + .replaceWith(exchangeName) + .invoke(() -> log.exchangeEstablished(exchangeName)) + .onFailure().invoke(ex -> log.unableToEstablishExchange(exchangeName, ex)); + } else { + return Uni.createFrom().item(exchangeName); + } + } + } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQConsumerHelper.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQConsumerHelper.java deleted file mode 100644 index 1a7b55ca66..0000000000 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQConsumerHelper.java +++ /dev/null @@ -1,168 +0,0 @@ -package io.smallrye.reactive.messaging.rabbitmq.internals; - -import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex; -import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log; -import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.parseArguments; -import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.serverQueueName; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import jakarta.enterprise.inject.Instance; - -import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.Uni; -import io.smallrye.reactive.messaging.providers.helpers.CDIUtils; -import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorCommonConfiguration; -import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration; -import io.vertx.core.json.JsonObject; -import io.vertx.mutiny.rabbitmq.RabbitMQClient; - -public class RabbitMQConsumerHelper { - - /** - * Uses a {@link RabbitMQClient} to ensure the required exchange is created. - * - * @param client the RabbitMQ client - * @param config the channel configuration - * @return a {@link Uni } which yields the exchange name - */ - static Uni declareExchangeIfNeeded( - final RabbitMQClient client, - final RabbitMQConnectorCommonConfiguration config, - final Instance> configMaps) { - final String exchangeName = getExchangeName(config); - - JsonObject queueArgs = new JsonObject(); - Instance> queueArguments = CDIUtils.getInstanceById(configMaps, config.getExchangeArguments()); - if (queueArguments.isResolvable()) { - Map argsMap = queueArguments.get(); - argsMap.forEach(queueArgs::put); - } - - // Declare the exchange if we have been asked to do so and only when exchange name is not default ("") - boolean declareExchange = config.getExchangeDeclare() && !exchangeName.isEmpty(); - if (declareExchange) { - return client - .exchangeDeclare(exchangeName, config.getExchangeType(), - config.getExchangeDurable(), config.getExchangeAutoDelete(), queueArgs) - .replaceWith(exchangeName) - .invoke(() -> log.exchangeEstablished(exchangeName)) - .onFailure().invoke(ex -> log.unableToEstablishExchange(exchangeName, ex)); - } else { - return Uni.createFrom().item(exchangeName); - } - } - - public static String getExchangeName(final RabbitMQConnectorCommonConfiguration config) { - return config.getExchangeName().map(s -> "\"\"".equals(s) ? "" : s).orElse(config.getChannel()); - } - - /** - * Establish a DLQ, possibly establishing a DLX too - * - * @param client the {@link RabbitMQClient} - * @param ic the {@link RabbitMQConnectorIncomingConfiguration} - * @return a {@link Uni} containing the DLQ name - */ - static Uni configureDLQorDLX(final RabbitMQClient client, final RabbitMQConnectorIncomingConfiguration ic, - final Instance> configMaps) { - final String deadLetterQueueName = ic.getDeadLetterQueueName().orElse(String.format("%s.dlq", getQueueName(ic))); - final String deadLetterExchangeName = ic.getDeadLetterExchange(); - final String deadLetterRoutingKey = ic.getDeadLetterRoutingKey().orElse(getQueueName(ic)); - - final JsonObject exchangeArgs = new JsonObject(); - ic.getDeadLetterExchangeArguments().ifPresent(argsId -> { - Instance> exchangeArguments = CDIUtils.getInstanceById(configMaps, argsId); - if (exchangeArguments.isResolvable()) { - Map argsMap = exchangeArguments.get(); - argsMap.forEach(exchangeArgs::put); - } - }); - // Declare the exchange if we have been asked to do so - final Uni dlxFlow = Uni.createFrom() - .item(() -> ic.getAutoBindDlq() && ic.getDlxDeclare() ? null : deadLetterExchangeName) - .onItem().ifNull().switchTo(() -> client.exchangeDeclare(deadLetterExchangeName, ic.getDeadLetterExchangeType(), - true, false, exchangeArgs) - .onItem().invoke(() -> log.dlxEstablished(deadLetterExchangeName)) - .onFailure().invoke(ex -> log.unableToEstablishDlx(deadLetterExchangeName, ex)) - .onItem().transform(v -> deadLetterExchangeName)); - - // Declare the queue (and its binding to the exchange or DLQ type/mode) if we have been asked to do so - final JsonObject queueArgs = new JsonObject(); - ic.getDeadLetterQueueArguments().ifPresent(argsId -> { - Instance> queueArguments = CDIUtils.getInstanceById(configMaps, argsId); - if (queueArguments.isResolvable()) { - Map argsMap = queueArguments.get(); - argsMap.forEach(queueArgs::put); - } - }); - // x-dead-letter-exchange - ic.getDeadLetterDlx().ifPresent(deadLetterDlx -> queueArgs.put("x-dead-letter-exchange", deadLetterDlx)); - // x-dead-letter-routing-key - ic.getDeadLetterDlxRoutingKey().ifPresent(deadLetterDlx -> queueArgs.put("x-dead-letter-routing-key", deadLetterDlx)); - // x-queue-type - ic.getDeadLetterQueueType().ifPresent(queueType -> queueArgs.put("x-queue-type", queueType)); - // x-queue-mode - ic.getDeadLetterQueueMode().ifPresent(queueMode -> queueArgs.put("x-queue-mode", queueMode)); - // x-message-ttl - ic.getDeadLetterTtl().ifPresent(queueTtl -> { - if (queueTtl >= 0) { - queueArgs.put("x-message-ttl", queueTtl); - } else { - throw ex.illegalArgumentInvalidQueueTtl(); - } - }); - return dlxFlow - .onItem().transform(v -> Boolean.TRUE.equals(ic.getAutoBindDlq()) ? null : deadLetterQueueName) - .onItem().ifNull().switchTo( - () -> client - .queueDeclare(deadLetterQueueName, true, false, false, queueArgs) - .onItem().invoke(() -> log.queueEstablished(deadLetterQueueName)) - .onFailure().invoke(ex -> log.unableToEstablishQueue(deadLetterQueueName, ex)) - .onItem() - .call(v -> client.queueBind(deadLetterQueueName, deadLetterExchangeName, deadLetterRoutingKey)) - .onItem() - .invoke(() -> log.deadLetterBindingEstablished(deadLetterQueueName, deadLetterExchangeName, - deadLetterRoutingKey)) - .onFailure() - .invoke(ex -> log.unableToEstablishBinding(deadLetterQueueName, deadLetterExchangeName, ex)) - .onItem().transform(v -> deadLetterQueueName)); - } - - /** - * Returns a stream that will create bindings from the queue to the exchange with each of the - * supplied routing keys. - * - * @param client the {@link RabbitMQClient} to use - * @param ic the incoming channel configuration - * @return a Uni with the list of routing keys - */ - static Uni> establishBindings( - final RabbitMQClient client, - final RabbitMQConnectorIncomingConfiguration ic) { - final String exchangeName = getExchangeName(ic); - final String queueName = getQueueName(ic); - final List routingKeys = Arrays.stream(ic.getRoutingKeys().split(",")) - .map(String::trim).collect(Collectors.toList()); - final Map arguments = parseArguments(ic.getArguments()); - - // Skip queue bindings if exchange name is default ("") - if (exchangeName.isEmpty()) { - return Uni.createFrom().item(Collections.emptyList()); - } - - return Multi.createFrom().iterable(routingKeys) - .call(routingKey -> client.queueBind(serverQueueName(queueName), exchangeName, routingKey, arguments)) - .invoke(routingKey -> log.bindingEstablished(queueName, exchangeName, routingKey, arguments.toString())) - .onFailure().invoke(ex -> log.unableToEstablishBinding(queueName, exchangeName, ex)) - .collect().asList(); - } - - public static String getQueueName(final RabbitMQConnectorIncomingConfiguration config) { - return config.getQueueName().orElse(config.getChannel()); - } -} diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSender.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSender.java index 817c6967f0..f5bec95dce 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSender.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSender.java @@ -1,7 +1,7 @@ package io.smallrye.reactive.messaging.rabbitmq.internals; import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex; -import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQConsumerHelper.getExchangeName; +import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.getExchangeName; import static java.time.Duration.ofSeconds; import java.util.Optional; @@ -155,9 +155,8 @@ public void onNext(Message message) { .onItem().transform(m -> Tuple2.of(sender, m)); } catch (Exception e) { // Message can't be sent - nacking and skipping. - message.nack(e); RabbitMQLogging.log.serializationFailure(configuration.getChannel(), e); - return Uni.createFrom().nullItem(); + return Uni.createFrom().completionStage(message.nack(e)).map(unused -> null); } }) .subscribe().with( diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQBrokerTestBase.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQBrokerTestBase.java index a967cfef57..498a630dd8 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQBrokerTestBase.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQBrokerTestBase.java @@ -13,6 +13,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.MountableFile; @@ -33,6 +34,8 @@ public class RabbitMQBrokerTestBase { private static final GenericContainer RABBIT = new GenericContainer<>( DockerImageName.parse("rabbitmq:3-management")) .withExposedPorts(5672, 15672) + .withNetworkAliases("rabbitmq") + .withNetwork(Network.SHARED) .withLogConsumer(of -> LOGGER.info(of.getUtf8String())) .waitingFor(Wait.forLogMessage(".*Server startup complete.*\\n", 1)) .withCopyFileToContainer(MountableFile.forClasspathResource("rabbitmq/enabled_plugins"), diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQReconnectionTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQReconnectionTest.java new file mode 100644 index 0000000000..f1655e3d66 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQReconnectionTest.java @@ -0,0 +1,211 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral; +import org.jboss.weld.environment.se.Weld; +import org.jboss.weld.environment.se.WeldContainer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.ToxiproxyContainer; +import org.testcontainers.utility.DockerImageName; + +import eu.rekawek.toxiproxy.Proxy; +import eu.rekawek.toxiproxy.ToxiproxyClient; +import io.smallrye.config.SmallRyeConfigProviderResolver; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class RabbitMQReconnectionTest extends RabbitMQBrokerTestBase { + + private WeldContainer container; + + Weld weld = new Weld(); + + @AfterEach + public void cleanup() { + if (container != null) { + container.select(RabbitMQConnector.class, ConnectorLiteral.of(RabbitMQConnector.CONNECTOR_NAME)).get() + .terminate(null); + container.shutdown(); + } + + MapBasedConfig.cleanup(); + SmallRyeConfigProviderResolver.instance().releaseConfig(ConfigProvider.getConfig()); + } + + private Proxy createContainerProxy(ToxiproxyContainer toxiproxy, int toxiPort) { + try { + // Create toxiproxy client + ToxiproxyClient client = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort()); + // Create toxiproxy + String upstream = "rabbitmq:5672"; + return client.createProxy(upstream, "0.0.0.0:" + toxiPort, upstream); + } catch (IOException e) { + throw new RuntimeException("Proxy could not be created", e); + } + } + + @Test + void testSendingMessagesToRabbitMQ_connection_fails() { + final String exchangeName = "exchg1"; + final String routingKey = "normal"; + + List received = new CopyOnWriteArrayList<>(); + usage.consumeIntegers(exchangeName, routingKey, received::add); + try (ToxiproxyContainer toxiproxy = new ToxiproxyContainer(DockerImageName.parse("ghcr.io/shopify/toxiproxy:latest") + .asCompatibleSubstituteFor("shopify/toxiproxy")) + .withNetworkAliases("toxiproxy")) { + toxiproxy.withNetwork(Network.SHARED); + toxiproxy.start(); + await().until(toxiproxy::isRunning); + + List exposedPorts = toxiproxy.getExposedPorts(); + int toxiPort = exposedPorts.get(exposedPorts.size() - 1); + Proxy proxy = createContainerProxy(toxiproxy, toxiPort); + int exposedPort = toxiproxy.getMappedPort(toxiPort); + proxy.disable(); + + weld.addBeanClass(ProducingBean.class); + + new MapBasedConfig() + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", false) + .put("mp.messaging.outgoing.sink.default-routing-key", routingKey) + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", toxiproxy.getHost()) + .put("mp.messaging.outgoing.sink.port", exposedPort) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-interval", 1) + .write(); + + container = weld.initialize(); + + await().pollDelay(3, SECONDS).until(() -> !isRabbitMQConnectorAlive(container)); + proxy.enable(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + await().untilAsserted(() -> assertThat(received).hasSize(10)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + void testSendingMessagesToRabbitMQ_connection_fails_after_connection() { + final String exchangeName = "exchg1"; + final String routingKey = "normal"; + + List received = new CopyOnWriteArrayList<>(); + usage.consumeIntegers(exchangeName, routingKey, received::add); + try (ToxiproxyContainer toxiproxy = new ToxiproxyContainer(DockerImageName.parse("ghcr.io/shopify/toxiproxy:latest") + .asCompatibleSubstituteFor("shopify/toxiproxy")) + .withNetworkAliases("toxiproxy")) { + toxiproxy.withNetwork(Network.SHARED); + toxiproxy.start(); + await().until(toxiproxy::isRunning); + + List exposedPorts = toxiproxy.getExposedPorts(); + int toxiPort = exposedPorts.get(exposedPorts.size() - 1); + Proxy proxy = createContainerProxy(toxiproxy, toxiPort); + int exposedPort = toxiproxy.getMappedPort(toxiPort); + + weld.addBeanClass(ProducingBean.class); + + new MapBasedConfig() + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", false) + .put("mp.messaging.outgoing.sink.default-routing-key", routingKey) + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", toxiproxy.getHost()) + .put("mp.messaging.outgoing.sink.port", exposedPort) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-interval", 1) + .write(); + + container = weld.initialize(); + + await().pollDelay(3, SECONDS).until(() -> isRabbitMQConnectorAvailable(container)); + proxy.disable(); + await().pollDelay(3, SECONDS).until(() -> !isRabbitMQConnectorAvailable(container)); + proxy.enable(); + + await().untilAsserted(() -> assertThat(received).hasSize(10)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Verifies that messages can be received from RabbitMQ. + */ + @Test + void testReceivingMessagesFromRabbitMQ_connection_fails() { + final String exchangeName = "exchg2"; + final String queueName = "q2"; + final String routingKey = "xyzzy"; + try (ToxiproxyContainer toxiproxy = new ToxiproxyContainer(DockerImageName.parse("ghcr.io/shopify/toxiproxy:latest") + .asCompatibleSubstituteFor("shopify/toxiproxy")) + .withNetworkAliases("toxiproxy")) { + toxiproxy.withNetwork(Network.SHARED); + toxiproxy.start(); + await().until(toxiproxy::isRunning); + + List exposedPorts = toxiproxy.getExposedPorts(); + int toxiPort = exposedPorts.get(exposedPorts.size() - 1); + Proxy proxy = createContainerProxy(toxiproxy, toxiPort); + int exposedPort = toxiproxy.getMappedPort(toxiPort); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.durable", false) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.durable", true) + .put("mp.messaging.incoming.data.queue.routing-keys", routingKey) + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", toxiproxy.getHost()) + .put("mp.messaging.incoming.data.port", exposedPort) + .put("mp.messaging.incoming.data.tracing-enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-interval", 1) + .write(); + + weld.addBeanClass(ConsumptionBean.class); + + container = weld.initialize(); + ConsumptionBean bean = container.getBeanManager().createInstance().select(ConsumptionBean.class).get(); + + await().until(() -> isRabbitMQConnectorAvailable(container)); + + List list = bean.getResults(); + assertThat(list).isEmpty(); + + AtomicInteger counter = new AtomicInteger(); + usage.produceTenIntegers(exchangeName, queueName, routingKey, counter::getAndIncrement); + + proxy.disable(); + await().pollDelay(3, SECONDS).until(() -> !isRabbitMQConnectorAvailable(container)); + proxy.enable(); + + await().atMost(1, TimeUnit.MINUTES).until(() -> list.size() >= 10); + assertThat(list).contains(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/resources/log4j.properties b/smallrye-reactive-messaging-rabbitmq/src/test/resources/log4j.properties new file mode 100644 index 0000000000..48d3a6d2d4 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/resources/log4j.properties @@ -0,0 +1,7 @@ +log4j.rootLogger=INFO, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p [%c] - %m%n + +log4j.logger.io.vertx.rabbitmq=INFO