Skip to content

Commit

Permalink
Merge pull request #2421 from cescoffier/refactor-rabbitmq-cont
Browse files Browse the repository at this point in the history
Refactor the RabbitMQ connector (step 2)
  • Loading branch information
ozangunalp authored Dec 19, 2023
2 parents 77e8d1f + f48ecd2 commit a303391
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ public class RabbitMQConnector implements InboundConnector, OutboundConnector, H
@Override
public Flow.Publisher<? extends Message<?>> getPublisher(final Config config) {
final RabbitMQConnectorIncomingConfiguration ic = new RabbitMQConnectorIncomingConfiguration(config);
IncomingRabbitMQChannel incoming = new IncomingRabbitMQChannel(
this, ic, failureHandlerFactories, clientOptions, credentialsProviders, configMaps);
IncomingRabbitMQChannel incoming = new IncomingRabbitMQChannel(this, ic);
this.incomings.add(incoming);
return incoming.getStream();
}
Expand All @@ -203,8 +202,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(final Config config) {
@Override
public Flow.Subscriber<? extends Message<?>> getSubscriber(final Config config) {
final RabbitMQConnectorOutgoingConfiguration oc = new RabbitMQConnectorOutgoingConfiguration(config);
OutgoingRabbitMQChannel outgoing = new OutgoingRabbitMQChannel(this, oc, clientOptions, credentialsProviders,
configMaps);
OutgoingRabbitMQChannel outgoing = new OutgoingRabbitMQChannel(this, oc);
outgoings.add(outgoing);
return outgoing.getSubscriber();
}
Expand Down Expand Up @@ -258,8 +256,11 @@ public Vertx vertx() {
return executionHolder.vertx();
}

public void addClient(String channel, RabbitMQClient client) {
clients.put(channel, client);
public void registerClient(String channel, RabbitMQClient client) {
RabbitMQClient old = clients.put(channel, client);
if (old != null) {
old.stopAndForget();
}
}

public void reportIncomingFailure(String channel, Throwable reason) {
Expand All @@ -271,4 +272,19 @@ public void reportIncomingFailure(String channel, Throwable reason) {
}
}

public Instance<RabbitMQFailureHandler.Factory> failureHandlerFactories() {
return failureHandlerFactories;
}

public Instance<RabbitMQOptions> clientOptions() {
return clientOptions;
}

public Instance<CredentialsProvider> credentialsProviders() {
return credentialsProviders;
}

public Instance<Map<String, ?>> configMaps() {
return configMaps;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,8 @@
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex;
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.declareExchangeIfNeeded;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.getExchangeName;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.parseArguments;
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.serverQueueName;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -21,7 +17,6 @@
import org.eclipse.microprofile.reactive.messaging.Message;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.impl.CredentialsProvider;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand All @@ -46,43 +41,34 @@
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
import io.vertx.mutiny.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.rabbitmq.RabbitMQOptions;

public class IncomingRabbitMQChannel {

private final Instance<Map<String, ?>> configMaps;
private volatile RabbitMQOpenTelemetryInstrumenter instrumenter;

private final RabbitMQOpenTelemetryInstrumenter instrumenter;
private final AtomicReference<Flow.Subscription> subscription = new AtomicReference<>();
private final List<RabbitMQClient> clients = new CopyOnWriteArrayList<>();
private final RabbitMQConnectorIncomingConfiguration config;
private final Multi<? extends Message<?>> stream;
private final RabbitMQConnector connector;

private final List<RabbitMQConsumer> consumers = new CopyOnWriteArrayList<>();

public IncomingRabbitMQChannel(RabbitMQConnector connector,
RabbitMQConnectorIncomingConfiguration ic,
Instance<RabbitMQFailureHandler.Factory> failureHandlerFactories,
Instance<RabbitMQOptions> clientOptions,
Instance<CredentialsProvider> credentialsProviders,
Instance<Map<String, ?>> configMaps) {
if (ic.getTracingEnabled() && instrumenter == null) {
RabbitMQConnectorIncomingConfiguration ic) {
if (ic.getTracingEnabled()) {
instrumenter = RabbitMQOpenTelemetryInstrumenter.createForConnector();
} else {
instrumenter = null;
}
this.config = ic;
this.connector = connector;
this.configMaps = configMaps;

final RabbitMQFailureHandler onNack = createFailureHandler(failureHandlerFactories, ic);
final RabbitMQFailureHandler onNack = createFailureHandler(connector.failureHandlerFactories(), ic);
final RabbitMQAckHandler onAck = createAckHandler(ic);

final Integer connectionCount = ic.getConnectionCount();

Multi<? extends Message<?>> multi = Multi.createFrom().range(0, connectionCount)
.onItem()
.transformToUniAndMerge(
connectionIdx -> createConsumer(connector, ic, clientOptions, credentialsProviders, connectionIdx))
connectionIdx -> createConsumer(connector, ic, connectionIdx))
.collect().asList()
.onItem()
.invoke(list -> clients.addAll(list.stream().map(t -> t.getItem1().client()).collect(Collectors.toList())))
Expand Down Expand Up @@ -139,11 +125,9 @@ public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder
}

private Uni<Tuple2<ClientHolder, RabbitMQConsumer>> createConsumer(RabbitMQConnector connector,
RabbitMQConnectorIncomingConfiguration ic, Instance<RabbitMQOptions> clientOptions,
Instance<CredentialsProvider> credentialsProviders, Integer connectionIdx) {
RabbitMQConnectorIncomingConfiguration ic, Integer connectionIdx) {
// Create a client
final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, ic, clientOptions,
credentialsProviders);
final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, ic);
client.getDelegate().addConnectionEstablishedCallback(promise -> {

Uni<Void> uni;
Expand All @@ -153,10 +137,11 @@ private Uni<Tuple2<ClientHolder, RabbitMQConsumer>> createConsumer(RabbitMQConne
uni = Uni.createFrom().nullItem();
}

Instance<Map<String, ?>> maps = connector.configMaps();
// Ensure we create the queues (and exchanges) from which messages will be read
uni
.call(() -> declareQueue(client, ic, configMaps))
.call(() -> configureDLQorDLX(client, ic, configMaps))
.call(() -> declareQueue(client, ic, maps))
.call(() -> RabbitMQClientHelper.configureDLQorDLX(client, ic, maps))
.subscribe().with(ignored -> promise.complete(), promise::fail);
});

Expand All @@ -167,10 +152,7 @@ private Uni<Tuple2<ClientHolder, RabbitMQConsumer>> createConsumer(RabbitMQConne
final ClientHolder holder = new ClientHolder(client, ic, connector.vertx(), root);
return holder.getOrEstablishConnection()
.invoke(() -> log.connectionEstablished(connectionIdx, ic.getChannel()))
.flatMap(connection -> createConsumer(ic, connection).map(consumer -> {
consumers.add(consumer);
return Tuple2.of(holder, consumer);
}));
.flatMap(connection -> createConsumer(ic, connection).map(consumer -> Tuple2.of(holder, consumer)));
}

private RabbitMQFailureHandler createFailureHandler(Instance<RabbitMQFailureHandler.Factory> failureHandlerFactories,
Expand Down Expand Up @@ -201,7 +183,7 @@ private Uni<String> declareQueue(
final RabbitMQClient client,
final RabbitMQConnectorIncomingConfiguration ic,
final Instance<Map<String, ?>> configMaps) {
final String queueName = getQueueName(ic);
final String queueName = RabbitMQClientHelper.getQueueName(ic);

// Declare the queue (and its binding(s) to the exchange, and TTL) if we have been asked to do so
final JsonObject queueArgs = new JsonObject();
Expand Down Expand Up @@ -229,7 +211,7 @@ private Uni<String> declareQueue(
//x-delivery-limit
ic.getQueueXDeliveryLimit().ifPresent(deliveryLimit -> queueArgs.put("x-delivery-limit", deliveryLimit));

return declareExchangeIfNeeded(client, ic, configMaps)
return RabbitMQClientHelper.declareExchangeIfNeeded(client, ic, configMaps)
.flatMap(v -> {
if (ic.getQueueDeclare()) {
// Declare the queue.
Expand All @@ -246,7 +228,7 @@ private Uni<String> declareQueue(
return declare
.invoke(() -> log.queueEstablished(queueName))
.onFailure().invoke(ex -> log.unableToEstablishQueue(queueName, ex))
.flatMap(x -> establishBindings(client, ic))
.flatMap(x -> RabbitMQClientHelper.establishBindings(client, ic))
.replaceWith(queueName);
} else {
// Not declaring the queue, so validate its existence...
Expand All @@ -259,7 +241,7 @@ private Uni<String> declareQueue(
}

private Uni<RabbitMQConsumer> createConsumer(RabbitMQConnectorIncomingConfiguration ic, RabbitMQClient client) {
return client.basicConsumer(serverQueueName(getQueueName(ic)), new QueueOptions()
return client.basicConsumer(serverQueueName(RabbitMQClientHelper.getQueueName(ic)), new QueueOptions()
.setAutoAck(ic.getAutoAcknowledgement())
.setMaxInternalQueueSize(ic.getMaxIncomingInternalQueueSize())
.setKeepMostRecent(ic.getKeepMostRecent()));
Expand All @@ -272,7 +254,7 @@ private Multi<? extends Message<?>> getStreamOfMessages(
RabbitMQFailureHandler onNack,
RabbitMQAckHandler onAck) {

final String queueName = getQueueName(ic);
final String queueName = RabbitMQClientHelper.getQueueName(ic);
final boolean isTracingEnabled = ic.getTracingEnabled();
final String contentTypeOverride = ic.getContentTypeOverride().orElse(null);
log.receiverListeningAddress(queueName);
Expand All @@ -298,109 +280,4 @@ public void terminate() {
}
}

/**
* Establish a DLQ, possibly establishing a DLX too
*
* @param client the {@link RabbitMQClient}
* @param ic the {@link RabbitMQConnectorIncomingConfiguration}
* @return a {@link Uni<String>} containing the DLQ name
*/
static Uni<?> configureDLQorDLX(final RabbitMQClient client, final RabbitMQConnectorIncomingConfiguration ic,
final Instance<Map<String, ?>> configMaps) {
final String deadLetterQueueName = ic.getDeadLetterQueueName().orElse(String.format("%s.dlq", getQueueName(ic)));
final String deadLetterExchangeName = ic.getDeadLetterExchange();
final String deadLetterRoutingKey = ic.getDeadLetterRoutingKey().orElse(getQueueName(ic));

final JsonObject exchangeArgs = new JsonObject();
ic.getDeadLetterExchangeArguments().ifPresent(argsId -> {
Instance<Map<String, ?>> exchangeArguments = CDIUtils.getInstanceById(configMaps, argsId);
if (exchangeArguments.isResolvable()) {
Map<String, ?> argsMap = exchangeArguments.get();
argsMap.forEach(exchangeArgs::put);
}
});
// Declare the exchange if we have been asked to do so
final Uni<String> dlxFlow = Uni.createFrom()
.item(() -> ic.getAutoBindDlq() && ic.getDlxDeclare() ? null : deadLetterExchangeName)
.onItem().ifNull().switchTo(() -> client.exchangeDeclare(deadLetterExchangeName, ic.getDeadLetterExchangeType(),
true, false, exchangeArgs)
.onItem().invoke(() -> log.dlxEstablished(deadLetterExchangeName))
.onFailure().invoke(ex -> log.unableToEstablishDlx(deadLetterExchangeName, ex))
.onItem().transform(v -> deadLetterExchangeName));

// Declare the queue (and its binding to the exchange or DLQ type/mode) if we have been asked to do so
final JsonObject queueArgs = new JsonObject();
ic.getDeadLetterQueueArguments().ifPresent(argsId -> {
Instance<Map<String, ?>> queueArguments = CDIUtils.getInstanceById(configMaps, argsId);
if (queueArguments.isResolvable()) {
Map<String, ?> argsMap = queueArguments.get();
argsMap.forEach(queueArgs::put);
}
});
// x-dead-letter-exchange
ic.getDeadLetterDlx().ifPresent(deadLetterDlx -> queueArgs.put("x-dead-letter-exchange", deadLetterDlx));
// x-dead-letter-routing-key
ic.getDeadLetterDlxRoutingKey().ifPresent(deadLetterDlx -> queueArgs.put("x-dead-letter-routing-key", deadLetterDlx));
// x-queue-type
ic.getDeadLetterQueueType().ifPresent(queueType -> queueArgs.put("x-queue-type", queueType));
// x-queue-mode
ic.getDeadLetterQueueMode().ifPresent(queueMode -> queueArgs.put("x-queue-mode", queueMode));
// x-message-ttl
ic.getDeadLetterTtl().ifPresent(queueTtl -> {
if (queueTtl >= 0) {
queueArgs.put("x-message-ttl", queueTtl);
} else {
throw ex.illegalArgumentInvalidQueueTtl();
}
});
return dlxFlow
.onItem().transform(v -> Boolean.TRUE.equals(ic.getAutoBindDlq()) ? null : deadLetterQueueName)
.onItem().ifNull().switchTo(
() -> client
.queueDeclare(deadLetterQueueName, true, false, false, queueArgs)
.onItem().invoke(() -> log.queueEstablished(deadLetterQueueName))
.onFailure().invoke(ex -> log.unableToEstablishQueue(deadLetterQueueName, ex))
.onItem()
.call(v -> client.queueBind(deadLetterQueueName, deadLetterExchangeName, deadLetterRoutingKey))
.onItem()
.invoke(() -> log.deadLetterBindingEstablished(deadLetterQueueName, deadLetterExchangeName,
deadLetterRoutingKey))
.onFailure()
.invoke(ex -> log.unableToEstablishBinding(deadLetterQueueName, deadLetterExchangeName, ex))
.onItem().transform(v -> deadLetterQueueName));
}

/**
* Returns a stream that will create bindings from the queue to the exchange with each of the
* supplied routing keys.
*
* @param client the {@link RabbitMQClient} to use
* @param ic the incoming channel configuration
* @return a Uni with the list of routing keys
*/
static Uni<List<String>> establishBindings(
final RabbitMQClient client,
final RabbitMQConnectorIncomingConfiguration ic) {
final String exchangeName = getExchangeName(ic);
final String queueName = getQueueName(ic);
final List<String> routingKeys = Arrays.stream(ic.getRoutingKeys().split(","))
.map(String::trim).collect(Collectors.toList());
final Map<String, Object> arguments = parseArguments(ic.getArguments());

// Skip queue bindings if exchange name is default ("")
if (exchangeName.isEmpty()) {
return Uni.createFrom().item(Collections.emptyList());
}

return Multi.createFrom().iterable(routingKeys)
.call(routingKey -> client.queueBind(serverQueueName(queueName), exchangeName, routingKey, arguments))
.invoke(routingKey -> log.bindingEstablished(queueName, exchangeName, routingKey, arguments.toString()))
.onFailure().invoke(ex -> log.unableToEstablishBinding(queueName, exchangeName, ex))
.collect().asList();
}

static String getQueueName(final RabbitMQConnectorIncomingConfiguration config) {
return config.getQueueName().orElse(config.getChannel());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,10 @@
import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.declareExchangeIfNeeded;
import static java.time.Duration.ofSeconds;

import java.util.Map;
import java.util.concurrent.Flow;

import jakarta.enterprise.inject.Instance;

import org.eclipse.microprofile.reactive.messaging.Message;

import com.rabbitmq.client.impl.CredentialsProvider;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
Expand All @@ -21,7 +16,6 @@
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorOutgoingConfiguration;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
import io.vertx.mutiny.rabbitmq.RabbitMQPublisher;
import io.vertx.rabbitmq.RabbitMQOptions;
import io.vertx.rabbitmq.RabbitMQPublisherOptions;

public class OutgoingRabbitMQChannel {
Expand All @@ -31,16 +25,14 @@ public class OutgoingRabbitMQChannel {
private final ClientHolder holder;
private volatile RabbitMQPublisher publisher;

public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOutgoingConfiguration oc,
Instance<RabbitMQOptions> clientOptions, Instance<CredentialsProvider> credentialsProviders,
Instance<Map<String, ?>> configMaps) {
public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOutgoingConfiguration oc) {

this.config = oc;
// Create a client
final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, oc, clientOptions, credentialsProviders);
final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, oc);
client.getDelegate().addConnectionEstablishedCallback(promise -> {
// Ensure we create the exchange to which messages are to be sent
declareExchangeIfNeeded(client, oc, configMaps)
RabbitMQClientHelper.declareExchangeIfNeeded(client, oc, connector.configMaps())
.subscribe().with((ignored) -> promise.complete(), promise::fail);
});

Expand Down
Loading

0 comments on commit a303391

Please sign in to comment.