diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java index ca52a6f15e..45d4b29afa 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java @@ -180,8 +180,7 @@ public class RabbitMQConnector implements InboundConnector, OutboundConnector, H @Override public Flow.Publisher> getPublisher(final Config config) { final RabbitMQConnectorIncomingConfiguration ic = new RabbitMQConnectorIncomingConfiguration(config); - IncomingRabbitMQChannel incoming = new IncomingRabbitMQChannel( - this, ic, failureHandlerFactories, clientOptions, credentialsProviders, configMaps); + IncomingRabbitMQChannel incoming = new IncomingRabbitMQChannel(this, ic); this.incomings.add(incoming); return incoming.getStream(); } @@ -203,8 +202,7 @@ public Flow.Publisher> getPublisher(final Config config) { @Override public Flow.Subscriber> getSubscriber(final Config config) { final RabbitMQConnectorOutgoingConfiguration oc = new RabbitMQConnectorOutgoingConfiguration(config); - OutgoingRabbitMQChannel outgoing = new OutgoingRabbitMQChannel(this, oc, clientOptions, credentialsProviders, - configMaps); + OutgoingRabbitMQChannel outgoing = new OutgoingRabbitMQChannel(this, oc); outgoings.add(outgoing); return outgoing.getSubscriber(); } @@ -258,8 +256,11 @@ public Vertx vertx() { return executionHolder.vertx(); } - public void addClient(String channel, RabbitMQClient client) { - clients.put(channel, client); + public void registerClient(String channel, RabbitMQClient client) { + RabbitMQClient old = clients.put(channel, client); + if (old != null) { + old.stopAndForget(); + } } public void reportIncomingFailure(String channel, Throwable reason) { @@ -271,4 +272,19 @@ public void reportIncomingFailure(String channel, Throwable reason) { } } + public Instance failureHandlerFactories() { + return failureHandlerFactories; + } + + public Instance clientOptions() { + return clientOptions; + } + + public Instance credentialsProviders() { + return credentialsProviders; + } + + public Instance> configMaps() { + return configMaps; + } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/ChannelStatus.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/ChannelStatus.java deleted file mode 100644 index df8ad2395f..0000000000 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/ChannelStatus.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.smallrye.reactive.messaging.rabbitmq.internals; - -public enum ChannelStatus { - CONNECTED, - NOT_CONNECTED, - INITIALISING; -} diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java index c33539fca6..3e0b0a4532 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java @@ -3,12 +3,8 @@ import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex; import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log; import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.declareExchangeIfNeeded; -import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.getExchangeName; -import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.parseArguments; import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.serverQueueName; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -21,7 +17,6 @@ import org.eclipse.microprofile.reactive.messaging.Message; import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.impl.CredentialsProvider; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -46,43 +41,34 @@ import io.vertx.mutiny.rabbitmq.RabbitMQClient; import io.vertx.mutiny.rabbitmq.RabbitMQConsumer; import io.vertx.rabbitmq.QueueOptions; -import io.vertx.rabbitmq.RabbitMQOptions; public class IncomingRabbitMQChannel { - private final Instance> configMaps; - private volatile RabbitMQOpenTelemetryInstrumenter instrumenter; - + private final RabbitMQOpenTelemetryInstrumenter instrumenter; private final AtomicReference subscription = new AtomicReference<>(); private final List clients = new CopyOnWriteArrayList<>(); private final RabbitMQConnectorIncomingConfiguration config; private final Multi> stream; private final RabbitMQConnector connector; - private final List consumers = new CopyOnWriteArrayList<>(); - public IncomingRabbitMQChannel(RabbitMQConnector connector, - RabbitMQConnectorIncomingConfiguration ic, - Instance failureHandlerFactories, - Instance clientOptions, - Instance credentialsProviders, - Instance> configMaps) { - if (ic.getTracingEnabled() && instrumenter == null) { + RabbitMQConnectorIncomingConfiguration ic) { + if (ic.getTracingEnabled()) { instrumenter = RabbitMQOpenTelemetryInstrumenter.createForConnector(); + } else { + instrumenter = null; } this.config = ic; this.connector = connector; - this.configMaps = configMaps; - final RabbitMQFailureHandler onNack = createFailureHandler(failureHandlerFactories, ic); + final RabbitMQFailureHandler onNack = createFailureHandler(connector.failureHandlerFactories(), ic); final RabbitMQAckHandler onAck = createAckHandler(ic); final Integer connectionCount = ic.getConnectionCount(); - Multi> multi = Multi.createFrom().range(0, connectionCount) .onItem() .transformToUniAndMerge( - connectionIdx -> createConsumer(connector, ic, clientOptions, credentialsProviders, connectionIdx)) + connectionIdx -> createConsumer(connector, ic, connectionIdx)) .collect().asList() .onItem() .invoke(list -> clients.addAll(list.stream().map(t -> t.getItem1().client()).collect(Collectors.toList()))) @@ -139,11 +125,9 @@ public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder } private Uni> createConsumer(RabbitMQConnector connector, - RabbitMQConnectorIncomingConfiguration ic, Instance clientOptions, - Instance credentialsProviders, Integer connectionIdx) { + RabbitMQConnectorIncomingConfiguration ic, Integer connectionIdx) { // Create a client - final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, ic, clientOptions, - credentialsProviders); + final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, ic); client.getDelegate().addConnectionEstablishedCallback(promise -> { Uni uni; @@ -153,10 +137,11 @@ private Uni> createConsumer(RabbitMQConne uni = Uni.createFrom().nullItem(); } + Instance> maps = connector.configMaps(); // Ensure we create the queues (and exchanges) from which messages will be read uni - .call(() -> declareQueue(client, ic, configMaps)) - .call(() -> configureDLQorDLX(client, ic, configMaps)) + .call(() -> declareQueue(client, ic, maps)) + .call(() -> RabbitMQClientHelper.configureDLQorDLX(client, ic, maps)) .subscribe().with(ignored -> promise.complete(), promise::fail); }); @@ -167,10 +152,7 @@ private Uni> createConsumer(RabbitMQConne final ClientHolder holder = new ClientHolder(client, ic, connector.vertx(), root); return holder.getOrEstablishConnection() .invoke(() -> log.connectionEstablished(connectionIdx, ic.getChannel())) - .flatMap(connection -> createConsumer(ic, connection).map(consumer -> { - consumers.add(consumer); - return Tuple2.of(holder, consumer); - })); + .flatMap(connection -> createConsumer(ic, connection).map(consumer -> Tuple2.of(holder, consumer))); } private RabbitMQFailureHandler createFailureHandler(Instance failureHandlerFactories, @@ -201,7 +183,7 @@ private Uni declareQueue( final RabbitMQClient client, final RabbitMQConnectorIncomingConfiguration ic, final Instance> configMaps) { - final String queueName = getQueueName(ic); + final String queueName = RabbitMQClientHelper.getQueueName(ic); // Declare the queue (and its binding(s) to the exchange, and TTL) if we have been asked to do so final JsonObject queueArgs = new JsonObject(); @@ -229,7 +211,7 @@ private Uni declareQueue( //x-delivery-limit ic.getQueueXDeliveryLimit().ifPresent(deliveryLimit -> queueArgs.put("x-delivery-limit", deliveryLimit)); - return declareExchangeIfNeeded(client, ic, configMaps) + return RabbitMQClientHelper.declareExchangeIfNeeded(client, ic, configMaps) .flatMap(v -> { if (ic.getQueueDeclare()) { // Declare the queue. @@ -246,7 +228,7 @@ private Uni declareQueue( return declare .invoke(() -> log.queueEstablished(queueName)) .onFailure().invoke(ex -> log.unableToEstablishQueue(queueName, ex)) - .flatMap(x -> establishBindings(client, ic)) + .flatMap(x -> RabbitMQClientHelper.establishBindings(client, ic)) .replaceWith(queueName); } else { // Not declaring the queue, so validate its existence... @@ -259,7 +241,7 @@ private Uni declareQueue( } private Uni createConsumer(RabbitMQConnectorIncomingConfiguration ic, RabbitMQClient client) { - return client.basicConsumer(serverQueueName(getQueueName(ic)), new QueueOptions() + return client.basicConsumer(serverQueueName(RabbitMQClientHelper.getQueueName(ic)), new QueueOptions() .setAutoAck(ic.getAutoAcknowledgement()) .setMaxInternalQueueSize(ic.getMaxIncomingInternalQueueSize()) .setKeepMostRecent(ic.getKeepMostRecent())); @@ -272,7 +254,7 @@ private Multi> getStreamOfMessages( RabbitMQFailureHandler onNack, RabbitMQAckHandler onAck) { - final String queueName = getQueueName(ic); + final String queueName = RabbitMQClientHelper.getQueueName(ic); final boolean isTracingEnabled = ic.getTracingEnabled(); final String contentTypeOverride = ic.getContentTypeOverride().orElse(null); log.receiverListeningAddress(queueName); @@ -298,109 +280,4 @@ public void terminate() { } } - /** - * Establish a DLQ, possibly establishing a DLX too - * - * @param client the {@link RabbitMQClient} - * @param ic the {@link RabbitMQConnectorIncomingConfiguration} - * @return a {@link Uni} containing the DLQ name - */ - static Uni configureDLQorDLX(final RabbitMQClient client, final RabbitMQConnectorIncomingConfiguration ic, - final Instance> configMaps) { - final String deadLetterQueueName = ic.getDeadLetterQueueName().orElse(String.format("%s.dlq", getQueueName(ic))); - final String deadLetterExchangeName = ic.getDeadLetterExchange(); - final String deadLetterRoutingKey = ic.getDeadLetterRoutingKey().orElse(getQueueName(ic)); - - final JsonObject exchangeArgs = new JsonObject(); - ic.getDeadLetterExchangeArguments().ifPresent(argsId -> { - Instance> exchangeArguments = CDIUtils.getInstanceById(configMaps, argsId); - if (exchangeArguments.isResolvable()) { - Map argsMap = exchangeArguments.get(); - argsMap.forEach(exchangeArgs::put); - } - }); - // Declare the exchange if we have been asked to do so - final Uni dlxFlow = Uni.createFrom() - .item(() -> ic.getAutoBindDlq() && ic.getDlxDeclare() ? null : deadLetterExchangeName) - .onItem().ifNull().switchTo(() -> client.exchangeDeclare(deadLetterExchangeName, ic.getDeadLetterExchangeType(), - true, false, exchangeArgs) - .onItem().invoke(() -> log.dlxEstablished(deadLetterExchangeName)) - .onFailure().invoke(ex -> log.unableToEstablishDlx(deadLetterExchangeName, ex)) - .onItem().transform(v -> deadLetterExchangeName)); - - // Declare the queue (and its binding to the exchange or DLQ type/mode) if we have been asked to do so - final JsonObject queueArgs = new JsonObject(); - ic.getDeadLetterQueueArguments().ifPresent(argsId -> { - Instance> queueArguments = CDIUtils.getInstanceById(configMaps, argsId); - if (queueArguments.isResolvable()) { - Map argsMap = queueArguments.get(); - argsMap.forEach(queueArgs::put); - } - }); - // x-dead-letter-exchange - ic.getDeadLetterDlx().ifPresent(deadLetterDlx -> queueArgs.put("x-dead-letter-exchange", deadLetterDlx)); - // x-dead-letter-routing-key - ic.getDeadLetterDlxRoutingKey().ifPresent(deadLetterDlx -> queueArgs.put("x-dead-letter-routing-key", deadLetterDlx)); - // x-queue-type - ic.getDeadLetterQueueType().ifPresent(queueType -> queueArgs.put("x-queue-type", queueType)); - // x-queue-mode - ic.getDeadLetterQueueMode().ifPresent(queueMode -> queueArgs.put("x-queue-mode", queueMode)); - // x-message-ttl - ic.getDeadLetterTtl().ifPresent(queueTtl -> { - if (queueTtl >= 0) { - queueArgs.put("x-message-ttl", queueTtl); - } else { - throw ex.illegalArgumentInvalidQueueTtl(); - } - }); - return dlxFlow - .onItem().transform(v -> Boolean.TRUE.equals(ic.getAutoBindDlq()) ? null : deadLetterQueueName) - .onItem().ifNull().switchTo( - () -> client - .queueDeclare(deadLetterQueueName, true, false, false, queueArgs) - .onItem().invoke(() -> log.queueEstablished(deadLetterQueueName)) - .onFailure().invoke(ex -> log.unableToEstablishQueue(deadLetterQueueName, ex)) - .onItem() - .call(v -> client.queueBind(deadLetterQueueName, deadLetterExchangeName, deadLetterRoutingKey)) - .onItem() - .invoke(() -> log.deadLetterBindingEstablished(deadLetterQueueName, deadLetterExchangeName, - deadLetterRoutingKey)) - .onFailure() - .invoke(ex -> log.unableToEstablishBinding(deadLetterQueueName, deadLetterExchangeName, ex)) - .onItem().transform(v -> deadLetterQueueName)); - } - - /** - * Returns a stream that will create bindings from the queue to the exchange with each of the - * supplied routing keys. - * - * @param client the {@link RabbitMQClient} to use - * @param ic the incoming channel configuration - * @return a Uni with the list of routing keys - */ - static Uni> establishBindings( - final RabbitMQClient client, - final RabbitMQConnectorIncomingConfiguration ic) { - final String exchangeName = getExchangeName(ic); - final String queueName = getQueueName(ic); - final List routingKeys = Arrays.stream(ic.getRoutingKeys().split(",")) - .map(String::trim).collect(Collectors.toList()); - final Map arguments = parseArguments(ic.getArguments()); - - // Skip queue bindings if exchange name is default ("") - if (exchangeName.isEmpty()) { - return Uni.createFrom().item(Collections.emptyList()); - } - - return Multi.createFrom().iterable(routingKeys) - .call(routingKey -> client.queueBind(serverQueueName(queueName), exchangeName, routingKey, arguments)) - .invoke(routingKey -> log.bindingEstablished(queueName, exchangeName, routingKey, arguments.toString())) - .onFailure().invoke(ex -> log.unableToEstablishBinding(queueName, exchangeName, ex)) - .collect().asList(); - } - - static String getQueueName(final RabbitMQConnectorIncomingConfiguration config) { - return config.getQueueName().orElse(config.getChannel()); - } - } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java index cc724b7440..50fdadbcba 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java @@ -4,15 +4,10 @@ import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.declareExchangeIfNeeded; import static java.time.Duration.ofSeconds; -import java.util.Map; import java.util.concurrent.Flow; -import jakarta.enterprise.inject.Instance; - import org.eclipse.microprofile.reactive.messaging.Message; -import com.rabbitmq.client.impl.CredentialsProvider; - import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; @@ -21,7 +16,6 @@ import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorOutgoingConfiguration; import io.vertx.mutiny.rabbitmq.RabbitMQClient; import io.vertx.mutiny.rabbitmq.RabbitMQPublisher; -import io.vertx.rabbitmq.RabbitMQOptions; import io.vertx.rabbitmq.RabbitMQPublisherOptions; public class OutgoingRabbitMQChannel { @@ -31,16 +25,14 @@ public class OutgoingRabbitMQChannel { private final ClientHolder holder; private volatile RabbitMQPublisher publisher; - public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOutgoingConfiguration oc, - Instance clientOptions, Instance credentialsProviders, - Instance> configMaps) { + public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOutgoingConfiguration oc) { this.config = oc; // Create a client - final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, oc, clientOptions, credentialsProviders); + final RabbitMQClient client = RabbitMQClientHelper.createClient(connector, oc); client.getDelegate().addConnectionEstablishedCallback(promise -> { // Ensure we create the exchange to which messages are to be sent - declareExchangeIfNeeded(client, oc, configMaps) + RabbitMQClientHelper.declareExchangeIfNeeded(client, oc, connector.configMaps()) .subscribe().with((ignored) -> promise.complete(), promise::fail); }); diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java index da7021e6a3..a92ea206a0 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQClientHelper.java @@ -9,6 +9,7 @@ import java.time.Duration; import java.util.*; +import java.util.stream.Collectors; import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.literal.NamedLiteral; @@ -18,6 +19,7 @@ import com.rabbitmq.client.impl.DefaultCredentialsRefreshService; import io.smallrye.common.annotation.Identifier; +import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.providers.helpers.CDIUtils; import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging; @@ -39,22 +41,21 @@ private RabbitMQClientHelper() { // avoid direct instantiation. } - static RabbitMQClient createClient(RabbitMQConnector connector, RabbitMQConnectorCommonConfiguration config, - Instance optionsInstances, Instance credentialsProviderInstances) { + static RabbitMQClient createClient(RabbitMQConnector connector, RabbitMQConnectorCommonConfiguration config) { Optional clientOptionsName = config.getClientOptionsName(); Vertx vertx = connector.vertx(); RabbitMQOptions options; try { if (clientOptionsName.isPresent()) { - options = getClientOptionsFromBean(optionsInstances, clientOptionsName.get()); + options = getClientOptionsFromBean(connector.clientOptions(), clientOptionsName.get()); } else { - options = getClientOptions(vertx, config, credentialsProviderInstances); + options = getClientOptions(vertx, config, connector.credentialsProviders()); } if (DEFAULT_METRICS_NAME.equals(options.getMetricsName())) { options.setMetricsName("rabbitmq|" + config.getChannel()); } RabbitMQClient client = RabbitMQClient.create(vertx, options); - connector.addClient(config.getChannel(), client); + connector.registerClient(config.getChannel(), client); return client; } catch (Exception e) { log.unableToCreateClient(e); @@ -151,10 +152,6 @@ static RabbitMQOptions getClientOptions(Vertx vertx, RabbitMQConnectorCommonConf return options; } - public static String getExchangeName(final RabbitMQConnectorCommonConfiguration config) { - return config.getExchangeName().map(s -> "\"\"".equals(s) ? "" : s).orElse(config.getChannel()); - } - public static String serverQueueName(String name) { if (name.equals("(server.auto)")) { return ""; @@ -212,4 +209,112 @@ public static Uni declareExchangeIfNeeded( } } + public static String getExchangeName(final RabbitMQConnectorCommonConfiguration config) { + return config.getExchangeName().map(s -> "\"\"".equals(s) ? "" : s).orElse(config.getChannel()); + } + + /** + * Establish a DLQ, possibly establishing a DLX too + * + * @param client the {@link RabbitMQClient} + * @param ic the {@link RabbitMQConnectorIncomingConfiguration} + * @return a {@link Uni} containing the DLQ name + */ + static Uni configureDLQorDLX(final RabbitMQClient client, final RabbitMQConnectorIncomingConfiguration ic, + final Instance> configMaps) { + final String deadLetterQueueName = ic.getDeadLetterQueueName().orElse(String.format("%s.dlq", getQueueName(ic))); + final String deadLetterExchangeName = ic.getDeadLetterExchange(); + final String deadLetterRoutingKey = ic.getDeadLetterRoutingKey().orElse(getQueueName(ic)); + + final JsonObject exchangeArgs = new JsonObject(); + ic.getDeadLetterExchangeArguments().ifPresent(argsId -> { + Instance> exchangeArguments = CDIUtils.getInstanceById(configMaps, argsId); + if (exchangeArguments.isResolvable()) { + Map argsMap = exchangeArguments.get(); + argsMap.forEach(exchangeArgs::put); + } + }); + // Declare the exchange if we have been asked to do so + final Uni dlxFlow = Uni.createFrom() + .item(() -> ic.getAutoBindDlq() && ic.getDlxDeclare() ? null : deadLetterExchangeName) + .onItem().ifNull().switchTo(() -> client.exchangeDeclare(deadLetterExchangeName, ic.getDeadLetterExchangeType(), + true, false, exchangeArgs) + .onItem().invoke(() -> log.dlxEstablished(deadLetterExchangeName)) + .onFailure().invoke(ex -> log.unableToEstablishDlx(deadLetterExchangeName, ex)) + .onItem().transform(v -> deadLetterExchangeName)); + + // Declare the queue (and its binding to the exchange or DLQ type/mode) if we have been asked to do so + final JsonObject queueArgs = new JsonObject(); + ic.getDeadLetterQueueArguments().ifPresent(argsId -> { + Instance> queueArguments = CDIUtils.getInstanceById(configMaps, argsId); + if (queueArguments.isResolvable()) { + Map argsMap = queueArguments.get(); + argsMap.forEach(queueArgs::put); + } + }); + // x-dead-letter-exchange + ic.getDeadLetterDlx().ifPresent(deadLetterDlx -> queueArgs.put("x-dead-letter-exchange", deadLetterDlx)); + // x-dead-letter-routing-key + ic.getDeadLetterDlxRoutingKey().ifPresent(deadLetterDlx -> queueArgs.put("x-dead-letter-routing-key", deadLetterDlx)); + // x-queue-type + ic.getDeadLetterQueueType().ifPresent(queueType -> queueArgs.put("x-queue-type", queueType)); + // x-queue-mode + ic.getDeadLetterQueueMode().ifPresent(queueMode -> queueArgs.put("x-queue-mode", queueMode)); + // x-message-ttl + ic.getDeadLetterTtl().ifPresent(queueTtl -> { + if (queueTtl >= 0) { + queueArgs.put("x-message-ttl", queueTtl); + } else { + throw ex.illegalArgumentInvalidQueueTtl(); + } + }); + return dlxFlow + .onItem().transform(v -> Boolean.TRUE.equals(ic.getAutoBindDlq()) ? null : deadLetterQueueName) + .onItem().ifNull().switchTo( + () -> client + .queueDeclare(deadLetterQueueName, true, false, false, queueArgs) + .onItem().invoke(() -> log.queueEstablished(deadLetterQueueName)) + .onFailure().invoke(ex -> log.unableToEstablishQueue(deadLetterQueueName, ex)) + .onItem() + .call(v -> client.queueBind(deadLetterQueueName, deadLetterExchangeName, deadLetterRoutingKey)) + .onItem() + .invoke(() -> log.deadLetterBindingEstablished(deadLetterQueueName, deadLetterExchangeName, + deadLetterRoutingKey)) + .onFailure() + .invoke(ex -> log.unableToEstablishBinding(deadLetterQueueName, deadLetterExchangeName, ex)) + .onItem().transform(v -> deadLetterQueueName)); + } + + /** + * Returns a stream that will create bindings from the queue to the exchange with each of the + * supplied routing keys. + * + * @param client the {@link RabbitMQClient} to use + * @param ic the incoming channel configuration + * @return a Uni with the list of routing keys + */ + static Uni> establishBindings( + final RabbitMQClient client, + final RabbitMQConnectorIncomingConfiguration ic) { + final String exchangeName = getExchangeName(ic); + final String queueName = getQueueName(ic); + final List routingKeys = Arrays.stream(ic.getRoutingKeys().split(",")) + .map(String::trim).collect(Collectors.toList()); + final Map arguments = parseArguments(ic.getArguments()); + + // Skip queue bindings if exchange name is default ("") + if (exchangeName.isEmpty()) { + return Uni.createFrom().item(Collections.emptyList()); + } + + return Multi.createFrom().iterable(routingKeys) + .call(routingKey -> client.queueBind(serverQueueName(queueName), exchangeName, routingKey, arguments)) + .invoke(routingKey -> log.bindingEstablished(queueName, exchangeName, routingKey, arguments.toString())) + .onFailure().invoke(ex -> log.unableToEstablishBinding(queueName, exchangeName, ex)) + .collect().asList(); + } + + public static String getQueueName(final RabbitMQConnectorIncomingConfiguration config) { + return config.getQueueName().orElse(config.getChannel()); + } }