Skip to content

Commit

Permalink
Merge pull request #2419 from ozangunalp/rabbitmq_reconnect_tests
Browse files Browse the repository at this point in the history
RabbitMQ retry connection tests
  • Loading branch information
ozangunalp authored Dec 19, 2023
2 parents f9234b7 + 46042f0 commit 77e8d1f
Show file tree
Hide file tree
Showing 10 changed files with 404 additions and 199 deletions.
7 changes: 6 additions & 1 deletion smallrye-reactive-messaging-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<artifactId>slf4j-reload4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -77,6 +77,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>toxiproxy</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,28 @@ public ClientHolder(RabbitMQClient client,
Context root) {
this.client = client;
this.vertx = vertx;
this.connection = Uni.createFrom().voidItem()
.onItem().transformToUni(unused -> {
log.establishingConnection(configuration.getChannel());
return client.start()
.onSubscription().invoke(() -> {
connected.set(true);
log.connectionEstablished(configuration.getChannel());
})
.onItem().transform(ignored -> {
connectionHolder
.set(new CurrentConnection(client, root == null ? Vertx.currentContext() : root));

// handle the case we are already disconnected.
if (!client.isConnected() || connectionHolder.get() == null) {
// Throwing the exception would trigger a retry.
connectionHolder.set(null);
throw ex.illegalStateConnectionDisconnected();
}
return client;
})
.onFailure().invoke(log::unableToConnectToBroker)
.onFailure().invoke(t -> {
connectionHolder.set(null);
log.unableToRecoverFromConnectionDisruption(t);
});
this.connection = Uni.createFrom().deferred(() -> client.start()
.onSubscription().invoke(() -> {
connected.set(true);
log.connectionEstablished(configuration.getChannel());
})
.onItem().transform(ignored -> {
connectionHolder
.set(new CurrentConnection(client, root == null ? Vertx.currentContext() : root));

// handle the case we are already disconnected.
if (!client.isConnected() || connectionHolder.get() == null) {
// Throwing the exception would trigger a retry.
connectionHolder.set(null);
throw ex.illegalStateConnectionDisconnected();
}
return client;
})
.onFailure().invoke(log::unableToConnectToBroker)
.onFailure().invoke(t -> {
connectionHolder.set(null);
log.unableToRecoverFromConnectionDisruption(t);
}))
.memoize().until(() -> {
CurrentConnection connection = connectionHolder.get();
if (connection == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex;
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.declareExchangeIfNeeded;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.getExchangeName;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.parseArguments;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.serverQueueName;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQConsumerHelper.*;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -294,4 +298,109 @@ public void terminate() {
}
}

/**
* Establish a DLQ, possibly establishing a DLX too
*
* @param client the {@link RabbitMQClient}
* @param ic the {@link RabbitMQConnectorIncomingConfiguration}
* @return a {@link Uni<String>} containing the DLQ name
*/
static Uni<?> configureDLQorDLX(final RabbitMQClient client, final RabbitMQConnectorIncomingConfiguration ic,
final Instance<Map<String, ?>> configMaps) {
final String deadLetterQueueName = ic.getDeadLetterQueueName().orElse(String.format("%s.dlq", getQueueName(ic)));
final String deadLetterExchangeName = ic.getDeadLetterExchange();
final String deadLetterRoutingKey = ic.getDeadLetterRoutingKey().orElse(getQueueName(ic));

final JsonObject exchangeArgs = new JsonObject();
ic.getDeadLetterExchangeArguments().ifPresent(argsId -> {
Instance<Map<String, ?>> exchangeArguments = CDIUtils.getInstanceById(configMaps, argsId);
if (exchangeArguments.isResolvable()) {
Map<String, ?> argsMap = exchangeArguments.get();
argsMap.forEach(exchangeArgs::put);
}
});
// Declare the exchange if we have been asked to do so
final Uni<String> dlxFlow = Uni.createFrom()
.item(() -> ic.getAutoBindDlq() && ic.getDlxDeclare() ? null : deadLetterExchangeName)
.onItem().ifNull().switchTo(() -> client.exchangeDeclare(deadLetterExchangeName, ic.getDeadLetterExchangeType(),
true, false, exchangeArgs)
.onItem().invoke(() -> log.dlxEstablished(deadLetterExchangeName))
.onFailure().invoke(ex -> log.unableToEstablishDlx(deadLetterExchangeName, ex))
.onItem().transform(v -> deadLetterExchangeName));

// Declare the queue (and its binding to the exchange or DLQ type/mode) if we have been asked to do so
final JsonObject queueArgs = new JsonObject();
ic.getDeadLetterQueueArguments().ifPresent(argsId -> {
Instance<Map<String, ?>> queueArguments = CDIUtils.getInstanceById(configMaps, argsId);
if (queueArguments.isResolvable()) {
Map<String, ?> argsMap = queueArguments.get();
argsMap.forEach(queueArgs::put);
}
});
// x-dead-letter-exchange
ic.getDeadLetterDlx().ifPresent(deadLetterDlx -> queueArgs.put("x-dead-letter-exchange", deadLetterDlx));
// x-dead-letter-routing-key
ic.getDeadLetterDlxRoutingKey().ifPresent(deadLetterDlx -> queueArgs.put("x-dead-letter-routing-key", deadLetterDlx));
// x-queue-type
ic.getDeadLetterQueueType().ifPresent(queueType -> queueArgs.put("x-queue-type", queueType));
// x-queue-mode
ic.getDeadLetterQueueMode().ifPresent(queueMode -> queueArgs.put("x-queue-mode", queueMode));
// x-message-ttl
ic.getDeadLetterTtl().ifPresent(queueTtl -> {
if (queueTtl >= 0) {
queueArgs.put("x-message-ttl", queueTtl);
} else {
throw ex.illegalArgumentInvalidQueueTtl();
}
});
return dlxFlow
.onItem().transform(v -> Boolean.TRUE.equals(ic.getAutoBindDlq()) ? null : deadLetterQueueName)
.onItem().ifNull().switchTo(
() -> client
.queueDeclare(deadLetterQueueName, true, false, false, queueArgs)
.onItem().invoke(() -> log.queueEstablished(deadLetterQueueName))
.onFailure().invoke(ex -> log.unableToEstablishQueue(deadLetterQueueName, ex))
.onItem()
.call(v -> client.queueBind(deadLetterQueueName, deadLetterExchangeName, deadLetterRoutingKey))
.onItem()
.invoke(() -> log.deadLetterBindingEstablished(deadLetterQueueName, deadLetterExchangeName,
deadLetterRoutingKey))
.onFailure()
.invoke(ex -> log.unableToEstablishBinding(deadLetterQueueName, deadLetterExchangeName, ex))
.onItem().transform(v -> deadLetterQueueName));
}

/**
* Returns a stream that will create bindings from the queue to the exchange with each of the
* supplied routing keys.
*
* @param client the {@link RabbitMQClient} to use
* @param ic the incoming channel configuration
* @return a Uni with the list of routing keys
*/
static Uni<List<String>> establishBindings(
final RabbitMQClient client,
final RabbitMQConnectorIncomingConfiguration ic) {
final String exchangeName = getExchangeName(ic);
final String queueName = getQueueName(ic);
final List<String> routingKeys = Arrays.stream(ic.getRoutingKeys().split(","))
.map(String::trim).collect(Collectors.toList());
final Map<String, Object> arguments = parseArguments(ic.getArguments());

// Skip queue bindings if exchange name is default ("")
if (exchangeName.isEmpty()) {
return Uni.createFrom().item(Collections.emptyList());
}

return Multi.createFrom().iterable(routingKeys)
.call(routingKey -> client.queueBind(serverQueueName(queueName), exchangeName, routingKey, arguments))
.invoke(routingKey -> log.bindingEstablished(queueName, exchangeName, routingKey, arguments.toString()))
.onFailure().invoke(ex -> log.unableToEstablishBinding(queueName, exchangeName, ex))
.collect().asList();
}

static String getQueueName(final RabbitMQConnectorIncomingConfiguration config) {
return config.getQueueName().orElse(config.getChannel());
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.smallrye.reactive.messaging.rabbitmq.internals;

import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.declareExchangeIfNeeded;
import static java.time.Duration.ofSeconds;

import java.util.Map;
Expand Down Expand Up @@ -39,7 +40,7 @@ public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOut
final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, oc, clientOptions, credentialsProviders);
client.getDelegate().addConnectionEstablishedCallback(promise -> {
// Ensure we create the exchange to which messages are to be sent
RabbitMQConsumerHelper.declareExchangeIfNeeded(client, oc, configMaps)
declareExchangeIfNeeded(client, oc, configMaps)
.subscribe().with((ignored) -> promise.complete(), promise::fail);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
import com.rabbitmq.client.impl.DefaultCredentialsRefreshService;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorCommonConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JksOptions;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
Expand Down Expand Up @@ -148,6 +151,10 @@ static RabbitMQOptions getClientOptions(Vertx vertx, RabbitMQConnectorCommonConf
return options;
}

public static String getExchangeName(final RabbitMQConnectorCommonConfiguration config) {
return config.getExchangeName().map(s -> "\"\"".equals(s) ? "" : s).orElse(config.getChannel());
}

public static String serverQueueName(String name) {
if (name.equals("(server.auto)")) {
return "";
Expand All @@ -170,4 +177,39 @@ public static Map<String, Object> parseArguments(
}
return argumentsBinding;
}

/**
* Uses a {@link RabbitMQClient} to ensure the required exchange is created.
*
* @param client the RabbitMQ client
* @param config the channel configuration
* @return a {@link Uni <String>} which yields the exchange name
*/
public static Uni<String> declareExchangeIfNeeded(
final RabbitMQClient client,
final RabbitMQConnectorCommonConfiguration config,
final Instance<Map<String, ?>> configMaps) {
final String exchangeName = getExchangeName(config);

JsonObject queueArgs = new JsonObject();
Instance<Map<String, ?>> queueArguments = CDIUtils.getInstanceById(configMaps, config.getExchangeArguments());
if (queueArguments.isResolvable()) {
Map<String, ?> argsMap = queueArguments.get();
argsMap.forEach(queueArgs::put);
}

// Declare the exchange if we have been asked to do so and only when exchange name is not default ("")
boolean declareExchange = config.getExchangeDeclare() && !exchangeName.isEmpty();
if (declareExchange) {
return client
.exchangeDeclare(exchangeName, config.getExchangeType(),
config.getExchangeDurable(), config.getExchangeAutoDelete(), queueArgs)
.replaceWith(exchangeName)
.invoke(() -> log.exchangeEstablished(exchangeName))
.onFailure().invoke(ex -> log.unableToEstablishExchange(exchangeName, ex));
} else {
return Uni.createFrom().item(exchangeName);
}
}

}
Loading

0 comments on commit 77e8d1f

Please sign in to comment.