Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMQ retry connection tests #2419

Merged
merged 5 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion smallrye-reactive-messaging-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<artifactId>slf4j-reload4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -77,6 +77,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>toxiproxy</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,28 @@ public ClientHolder(RabbitMQClient client,
Context root) {
this.client = client;
this.vertx = vertx;
this.connection = Uni.createFrom().voidItem()
.onItem().transformToUni(unused -> {
log.establishingConnection(configuration.getChannel());
return client.start()
.onSubscription().invoke(() -> {
connected.set(true);
log.connectionEstablished(configuration.getChannel());
})
.onItem().transform(ignored -> {
connectionHolder
.set(new CurrentConnection(client, root == null ? Vertx.currentContext() : root));

// handle the case we are already disconnected.
if (!client.isConnected() || connectionHolder.get() == null) {
// Throwing the exception would trigger a retry.
connectionHolder.set(null);
throw ex.illegalStateConnectionDisconnected();
}
return client;
})
.onFailure().invoke(log::unableToConnectToBroker)
.onFailure().invoke(t -> {
connectionHolder.set(null);
log.unableToRecoverFromConnectionDisruption(t);
});
this.connection = Uni.createFrom().deferred(() -> client.start()
.onSubscription().invoke(() -> {
connected.set(true);
log.connectionEstablished(configuration.getChannel());
})
.onItem().transform(ignored -> {
connectionHolder
.set(new CurrentConnection(client, root == null ? Vertx.currentContext() : root));

// handle the case we are already disconnected.
if (!client.isConnected() || connectionHolder.get() == null) {
// Throwing the exception would trigger a retry.
connectionHolder.set(null);
throw ex.illegalStateConnectionDisconnected();
}
return client;
})
.onFailure().invoke(log::unableToConnectToBroker)
.onFailure().invoke(t -> {
connectionHolder.set(null);
log.unableToRecoverFromConnectionDisruption(t);
}))
.memoize().until(() -> {
CurrentConnection connection = connectionHolder.get();
if (connection == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex;
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.declareExchangeIfNeeded;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.getExchangeName;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.parseArguments;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.serverQueueName;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQConsumerHelper.*;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -294,4 +298,109 @@ public void terminate() {
}
}

/**
* Establish a DLQ, possibly establishing a DLX too
*
* @param client the {@link RabbitMQClient}
* @param ic the {@link RabbitMQConnectorIncomingConfiguration}
* @return a {@link Uni<String>} containing the DLQ name
*/
static Uni<?> configureDLQorDLX(final RabbitMQClient client, final RabbitMQConnectorIncomingConfiguration ic,
final Instance<Map<String, ?>> configMaps) {
final String deadLetterQueueName = ic.getDeadLetterQueueName().orElse(String.format("%s.dlq", getQueueName(ic)));
final String deadLetterExchangeName = ic.getDeadLetterExchange();
final String deadLetterRoutingKey = ic.getDeadLetterRoutingKey().orElse(getQueueName(ic));

final JsonObject exchangeArgs = new JsonObject();
ic.getDeadLetterExchangeArguments().ifPresent(argsId -> {
Instance<Map<String, ?>> exchangeArguments = CDIUtils.getInstanceById(configMaps, argsId);
if (exchangeArguments.isResolvable()) {
Map<String, ?> argsMap = exchangeArguments.get();
argsMap.forEach(exchangeArgs::put);
}
});
// Declare the exchange if we have been asked to do so
final Uni<String> dlxFlow = Uni.createFrom()
.item(() -> ic.getAutoBindDlq() && ic.getDlxDeclare() ? null : deadLetterExchangeName)
.onItem().ifNull().switchTo(() -> client.exchangeDeclare(deadLetterExchangeName, ic.getDeadLetterExchangeType(),
true, false, exchangeArgs)
.onItem().invoke(() -> log.dlxEstablished(deadLetterExchangeName))
.onFailure().invoke(ex -> log.unableToEstablishDlx(deadLetterExchangeName, ex))
.onItem().transform(v -> deadLetterExchangeName));

// Declare the queue (and its binding to the exchange or DLQ type/mode) if we have been asked to do so
final JsonObject queueArgs = new JsonObject();
ic.getDeadLetterQueueArguments().ifPresent(argsId -> {
Instance<Map<String, ?>> queueArguments = CDIUtils.getInstanceById(configMaps, argsId);
if (queueArguments.isResolvable()) {
Map<String, ?> argsMap = queueArguments.get();
argsMap.forEach(queueArgs::put);
}
});
// x-dead-letter-exchange
ic.getDeadLetterDlx().ifPresent(deadLetterDlx -> queueArgs.put("x-dead-letter-exchange", deadLetterDlx));
// x-dead-letter-routing-key
ic.getDeadLetterDlxRoutingKey().ifPresent(deadLetterDlx -> queueArgs.put("x-dead-letter-routing-key", deadLetterDlx));
// x-queue-type
ic.getDeadLetterQueueType().ifPresent(queueType -> queueArgs.put("x-queue-type", queueType));
// x-queue-mode
ic.getDeadLetterQueueMode().ifPresent(queueMode -> queueArgs.put("x-queue-mode", queueMode));
// x-message-ttl
ic.getDeadLetterTtl().ifPresent(queueTtl -> {
if (queueTtl >= 0) {
queueArgs.put("x-message-ttl", queueTtl);
} else {
throw ex.illegalArgumentInvalidQueueTtl();
}
});
return dlxFlow
.onItem().transform(v -> Boolean.TRUE.equals(ic.getAutoBindDlq()) ? null : deadLetterQueueName)
.onItem().ifNull().switchTo(
() -> client
.queueDeclare(deadLetterQueueName, true, false, false, queueArgs)
.onItem().invoke(() -> log.queueEstablished(deadLetterQueueName))
.onFailure().invoke(ex -> log.unableToEstablishQueue(deadLetterQueueName, ex))
.onItem()
.call(v -> client.queueBind(deadLetterQueueName, deadLetterExchangeName, deadLetterRoutingKey))
.onItem()
.invoke(() -> log.deadLetterBindingEstablished(deadLetterQueueName, deadLetterExchangeName,
deadLetterRoutingKey))
.onFailure()
.invoke(ex -> log.unableToEstablishBinding(deadLetterQueueName, deadLetterExchangeName, ex))
.onItem().transform(v -> deadLetterQueueName));
}

/**
* Returns a stream that will create bindings from the queue to the exchange with each of the
* supplied routing keys.
*
* @param client the {@link RabbitMQClient} to use
* @param ic the incoming channel configuration
* @return a Uni with the list of routing keys
*/
static Uni<List<String>> establishBindings(
final RabbitMQClient client,
final RabbitMQConnectorIncomingConfiguration ic) {
final String exchangeName = getExchangeName(ic);
final String queueName = getQueueName(ic);
final List<String> routingKeys = Arrays.stream(ic.getRoutingKeys().split(","))
.map(String::trim).collect(Collectors.toList());
final Map<String, Object> arguments = parseArguments(ic.getArguments());

// Skip queue bindings if exchange name is default ("")
if (exchangeName.isEmpty()) {
return Uni.createFrom().item(Collections.emptyList());
}

return Multi.createFrom().iterable(routingKeys)
.call(routingKey -> client.queueBind(serverQueueName(queueName), exchangeName, routingKey, arguments))
.invoke(routingKey -> log.bindingEstablished(queueName, exchangeName, routingKey, arguments.toString()))
.onFailure().invoke(ex -> log.unableToEstablishBinding(queueName, exchangeName, ex))
.collect().asList();
}

static String getQueueName(final RabbitMQConnectorIncomingConfiguration config) {
return config.getQueueName().orElse(config.getChannel());
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.smallrye.reactive.messaging.rabbitmq.internals;

import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.declareExchangeIfNeeded;
import static java.time.Duration.ofSeconds;

import java.util.Map;
Expand Down Expand Up @@ -39,7 +40,7 @@ public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOut
final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, oc, clientOptions, credentialsProviders);
client.getDelegate().addConnectionEstablishedCallback(promise -> {
// Ensure we create the exchange to which messages are to be sent
RabbitMQConsumerHelper.declareExchangeIfNeeded(client, oc, configMaps)
declareExchangeIfNeeded(client, oc, configMaps)
.subscribe().with((ignored) -> promise.complete(), promise::fail);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
import com.rabbitmq.client.impl.DefaultCredentialsRefreshService;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorCommonConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JksOptions;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
Expand Down Expand Up @@ -148,6 +151,10 @@ static RabbitMQOptions getClientOptions(Vertx vertx, RabbitMQConnectorCommonConf
return options;
}

public static String getExchangeName(final RabbitMQConnectorCommonConfiguration config) {
return config.getExchangeName().map(s -> "\"\"".equals(s) ? "" : s).orElse(config.getChannel());
}

public static String serverQueueName(String name) {
if (name.equals("(server.auto)")) {
return "";
Expand All @@ -170,4 +177,39 @@ public static Map<String, Object> parseArguments(
}
return argumentsBinding;
}

/**
* Uses a {@link RabbitMQClient} to ensure the required exchange is created.
*
* @param client the RabbitMQ client
* @param config the channel configuration
* @return a {@link Uni <String>} which yields the exchange name
*/
public static Uni<String> declareExchangeIfNeeded(
final RabbitMQClient client,
final RabbitMQConnectorCommonConfiguration config,
final Instance<Map<String, ?>> configMaps) {
final String exchangeName = getExchangeName(config);

JsonObject queueArgs = new JsonObject();
Instance<Map<String, ?>> queueArguments = CDIUtils.getInstanceById(configMaps, config.getExchangeArguments());
if (queueArguments.isResolvable()) {
Map<String, ?> argsMap = queueArguments.get();
argsMap.forEach(queueArgs::put);
}

// Declare the exchange if we have been asked to do so and only when exchange name is not default ("")
boolean declareExchange = config.getExchangeDeclare() && !exchangeName.isEmpty();
if (declareExchange) {
return client
.exchangeDeclare(exchangeName, config.getExchangeType(),
config.getExchangeDurable(), config.getExchangeAutoDelete(), queueArgs)
.replaceWith(exchangeName)
.invoke(() -> log.exchangeEstablished(exchangeName))
.onFailure().invoke(ex -> log.unableToEstablishExchange(exchangeName, ex));
} else {
return Uni.createFrom().item(exchangeName);
}
}

}
Loading
Loading