From 3a34d48bd147e145db18855cd89bef6c9c361848 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 7 Dec 2023 23:01:43 +0100 Subject: [PATCH] Define OverrideConnectorConfig to use with nested channels with property overrides. --- .../kafka/fault/KafkaDeadLetterQueue.java | 56 +++-- .../KafkaDeadLetterSerializationHandler.java | 9 +- .../kafka/fault/KafkaDelayedRetryTopic.java | 87 ++++---- .../messaging/kafka/base/WeldTestBase.java | 2 + .../kafka/metrics/ObservationTest.java | 2 - .../impl/OverrideConnectorConfig.java | 192 ++++++++++++++++++ .../impl/OverrideConnectorConfigTest.java | 189 +++++++++++++++++ 7 files changed, 474 insertions(+), 63 deletions(-) create mode 100644 smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfig.java create mode 100644 smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfigTest.java diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java index b4c3802a2e..95d7631a7d 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java @@ -3,11 +3,11 @@ import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ; import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log; import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.INCOMING_PREFIX; import java.nio.charset.StandardCharsets; import java.util.HashMap; @@ -15,10 +15,14 @@ import java.util.function.BiConsumer; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Any; +import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; +import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; +import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.reactive.messaging.Metadata; import io.smallrye.common.annotation.Identifier; @@ -26,11 +30,14 @@ import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord; import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents; import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration; +import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration; import io.smallrye.reactive.messaging.kafka.KafkaConsumer; import io.smallrye.reactive.messaging.kafka.KafkaProducer; +import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler; import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; -import io.smallrye.reactive.messaging.kafka.impl.ConfigurationCleaner; import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer; +import io.smallrye.reactive.messaging.providers.impl.ConnectorConfig; +import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig; import io.vertx.mutiny.core.Vertx; @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -65,6 +72,17 @@ public static class Factory implements KafkaFailureHandler.Factory { @Inject KafkaCDIEvents kafkaCDIEvents; + @Inject + @Any + Instance> serializationFailureHandlers; + + @Inject + @Any + Instance> producerInterceptors; + + @Inject + Instance rootConfig; + @Override public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, Vertx vertx, @@ -74,29 +92,27 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, String keyDeserializer = (String) deadQueueProducerConfig.remove(KEY_DESERIALIZER_CLASS_CONFIG); String valueDeserializer = (String) deadQueueProducerConfig.remove(VALUE_DESERIALIZER_CLASS_CONFIG); - // We need to remove consumer interceptor - deadQueueProducerConfig.remove(INTERCEPTOR_CLASSES_CONFIG); - - deadQueueProducerConfig.put(KEY_SERIALIZER_CLASS_CONFIG, - config.getDeadLetterQueueKeySerializer().orElse(getMirrorSerializer(keyDeserializer))); - deadQueueProducerConfig.put(VALUE_SERIALIZER_CLASS_CONFIG, - config.getDeadLetterQueueValueSerializer().orElse(getMirrorSerializer(valueDeserializer))); - deadQueueProducerConfig.put(CLIENT_ID_CONFIG, - config.getDeadLetterQueueProducerClientId() - .orElse("kafka-dead-letter-topic-producer-" + deadQueueProducerConfig.get(CLIENT_ID_CONFIG))); + String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG); + ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(), + config.getChannel(), "dead-letter-queue", Map.of( + KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer), + VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer), + CLIENT_ID_CONFIG, c -> config.getDeadLetterQueueProducerClientId() + .orElse("kafka-dead-letter-topic-producer-" + consumerClientId), + "topic", c -> "dead-letter-topic-" + config.getChannel(), + "key-serialization-failure-handler", c -> "dlq-serialization", + "value-serialization-failure-handler", c -> "dlq-serialization", + "interceptor.classes", c -> "")); + KafkaConnectorOutgoingConfiguration producerConfig = new KafkaConnectorOutgoingConfiguration(connectorConfig); - ConfigurationCleaner.cleanupProducerConfiguration(deadQueueProducerConfig); String deadQueueTopic = config.getDeadLetterQueueTopic().orElse("dead-letter-topic-" + config.getChannel()); - log.deadLetterConfig(deadQueueTopic, - (String) deadQueueProducerConfig.get(KEY_SERIALIZER_CLASS_CONFIG), - (String) deadQueueProducerConfig.get(VALUE_SERIALIZER_CLASS_CONFIG)); + log.deadLetterConfig(producerConfig.getTopic().orElse(null), producerConfig.getKeySerializer(), + producerConfig.getValueSerializer()); - var dlqSerializationHandler = new KafkaDeadLetterSerializationHandler<>(); // fire producer event (e.g. bind metrics) - ReactiveKafkaProducer producer = new ReactiveKafkaProducer<>(deadQueueProducerConfig, - deadQueueTopic, 10000, false, null, dlqSerializationHandler, dlqSerializationHandler, - (p, c) -> kafkaCDIEvents.producer().fire(p)); + ReactiveKafkaProducer producer = new ReactiveKafkaProducer<>(producerConfig, + serializationFailureHandlers, producerInterceptors, null, (p, c) -> kafkaCDIEvents.producer().fire(p)); return new KafkaDeadLetterQueue(config.getChannel(), deadQueueTopic, producer, reportFailure); } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterSerializationHandler.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterSerializationHandler.java index 27fbf2b799..381cb54670 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterSerializationHandler.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterSerializationHandler.java @@ -10,16 +10,21 @@ import java.util.Arrays; import java.util.Objects; +import jakarta.enterprise.context.ApplicationScoped; + import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; +import io.smallrye.common.annotation.Identifier; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler; -public class KafkaDeadLetterSerializationHandler implements SerializationFailureHandler { +@ApplicationScoped +@Identifier("dlq-serialization") +public class KafkaDeadLetterSerializationHandler implements SerializationFailureHandler { @Override - public byte[] decorateSerialization(Uni serialization, String topic, boolean isKey, String serializer, T data, + public byte[] decorateSerialization(Uni serialization, String topic, boolean isKey, String serializer, Object data, Headers headers) { // deserializer failure if (headers.lastHeader(DESERIALIZATION_FAILURE_DESERIALIZER) != null) { diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java index 4873b27a3c..5418a4456b 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java @@ -2,14 +2,13 @@ import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ; import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log; -import static io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.createDeserializationFailureHandler; import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; import static org.apache.kafka.clients.CommonClientConfigs.GROUP_ID_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.INCOMING_PREFIX; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -26,15 +25,18 @@ import java.util.stream.Stream; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Any; import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.StringSerializer; +import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.reactive.messaging.Metadata; import io.smallrye.common.annotation.Identifier; @@ -44,15 +46,17 @@ import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord; import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents; import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration; +import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration; import io.smallrye.reactive.messaging.kafka.KafkaConsumer; import io.smallrye.reactive.messaging.kafka.KafkaProducer; +import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler; import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; import io.smallrye.reactive.messaging.kafka.commit.ContextHolder; import io.smallrye.reactive.messaging.kafka.commit.KafkaLatestCommit; -import io.smallrye.reactive.messaging.kafka.impl.ConfigurationCleaner; import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer; import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer; -import io.smallrye.reactive.messaging.kafka.impl.RuntimeKafkaSourceConfiguration; +import io.smallrye.reactive.messaging.providers.impl.ConnectorConfig; +import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig; import io.vertx.core.impl.VertxInternal; import io.vertx.mutiny.core.Vertx; @@ -82,7 +86,19 @@ public static class Factory implements KafkaFailureHandler.Factory { KafkaCDIEvents kafkaCDIEvents; @Inject - Instance> failureHandlers; + @Any + Instance> deserializationFailureHandlers; + + @Inject + @Any + Instance> serializationFailureHandlers; + + @Inject + @Any + Instance> producerInterceptors; + + @Inject + Instance rootConfig; @Override public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, @@ -93,20 +109,6 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, String keyDeserializer = (String) delayedRetryTopicProducerConfig.remove(KEY_DESERIALIZER_CLASS_CONFIG); String valueDeserializer = (String) delayedRetryTopicProducerConfig.remove(VALUE_DESERIALIZER_CLASS_CONFIG); - // We need to remove consumer interceptor - delayedRetryTopicProducerConfig.remove(INTERCEPTOR_CLASSES_CONFIG); - - delayedRetryTopicProducerConfig.put(KEY_SERIALIZER_CLASS_CONFIG, - config.getDeadLetterQueueKeySerializer().orElse(getMirrorSerializer(keyDeserializer))); - delayedRetryTopicProducerConfig.put(VALUE_SERIALIZER_CLASS_CONFIG, - config.getDeadLetterQueueValueSerializer().orElse(getMirrorSerializer(valueDeserializer))); - delayedRetryTopicProducerConfig.put(CLIENT_ID_CONFIG, - config.getDeadLetterQueueProducerClientId() - .orElse("kafka-delayed-retry-topic-producer-" - + delayedRetryTopicProducerConfig.get(CLIENT_ID_CONFIG))); - - ConfigurationCleaner.cleanupProducerConfiguration(delayedRetryTopicProducerConfig); - List retryTopics = config.getDelayedRetryTopicTopics() .map(topics -> Arrays.stream(topics.split(",")).collect(Collectors.toList())) .orElseGet(() -> Stream.of( @@ -117,30 +119,37 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, long retryTimeout = config.getDelayedRetryTopicTimeout(); String deadQueueTopic = config.getDeadLetterQueueTopic().orElse(null); + String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG); + ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(), + config.getChannel(), "dead-letter-queue", Map.of( + KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer), + VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer), + CLIENT_ID_CONFIG, c -> config.getDeadLetterQueueProducerClientId() + .orElse("kafka-delayed-retry-topic-producer-" + consumerClientId), + "topic", c -> retryTopics.get(0), + "key-serialization-failure-handler", c -> "dlq-serialization", + "value-serialization-failure-handler", c -> "dlq-serialization", + "interceptor.classes", c -> "")); + KafkaConnectorOutgoingConfiguration producerConfig = new KafkaConnectorOutgoingConfiguration(connectorConfig); + log.delayedRetryTopic(config.getChannel(), retryTopics, maxRetries, retryTimeout, deadQueueTopic); - var dlqSerializationHandler = new KafkaDeadLetterSerializationHandler<>(); // fire producer event (e.g. bind metrics) - ReactiveKafkaProducer producer = new ReactiveKafkaProducer<>(delayedRetryTopicProducerConfig, - retryTopics.get(0), 10000, false, null, dlqSerializationHandler, dlqSerializationHandler, - (p, c) -> kafkaCDIEvents.producer().fire(p)); - - Map retryConsumerConfig = new HashMap<>(consumer.configuration()); - retryConsumerConfig.put(CLIENT_ID_CONFIG, - "kafka-delayed-retry-topic-" + retryConsumerConfig.get(CLIENT_ID_CONFIG)); - retryConsumerConfig.put(GROUP_ID_CONFIG, - "kafka-delayed-retry-topic-" + retryConsumerConfig.get(GROUP_ID_CONFIG)); - - ReactiveKafkaConsumer retryConsumer = new ReactiveKafkaConsumer<>(retryConsumerConfig, - createDeserializationFailureHandler(true, failureHandlers, config), - createDeserializationFailureHandler(false, failureHandlers, config), - RuntimeKafkaSourceConfiguration.buildFromConfiguration(config), - true, - config.getPollTimeout(), - config.getFailOnDeserializationFailure(), - c -> kafkaCDIEvents.consumer().fire(c), + ReactiveKafkaProducer producer = new ReactiveKafkaProducer<>(producerConfig, + serializationFailureHandlers, producerInterceptors, null, (p, c) -> kafkaCDIEvents.producer().fire(p)); + + ConnectorConfig retryConsumerConfig = new OverrideConnectorConfig(INCOMING_PREFIX, + rootConfig.get(), config.getChannel(), "delayed-retry-topic.consumer", Map.of( + "lazy-client", c -> true, + CLIENT_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId, + GROUP_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId)); + KafkaConnectorIncomingConfiguration retryConfig = new KafkaConnectorIncomingConfiguration(retryConsumerConfig); + ReactiveKafkaConsumer retryConsumer = new ReactiveKafkaConsumer<>(retryConfig, + deserializationFailureHandlers, + retryConsumerConfig.getValue(GROUP_ID_CONFIG, String.class), -1, reportFailure, - ((VertxInternal) vertx.getDelegate()).createEventLoopContext()); + ((VertxInternal) vertx.getDelegate()).createEventLoopContext(), + c -> kafkaCDIEvents.consumer().fire(c)); return new KafkaDelayedRetryTopic(config.getChannel(), vertx, config, retryTopics, maxRetries, retryTimeout, deadQueueTopic, producer, retryConsumer, reportFailure); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java index a8d6ce83a5..23df09f64d 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java @@ -21,6 +21,7 @@ import io.smallrye.reactive.messaging.kafka.commit.KafkaLatestCommit; import io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit; import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue; +import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterSerializationHandler; import io.smallrye.reactive.messaging.kafka.fault.KafkaFailStop; import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler; import io.smallrye.reactive.messaging.kafka.fault.KafkaIgnoreFailure; @@ -98,6 +99,7 @@ public void initWeld() { weld.addBeanClass(KafkaFailStop.Factory.class); weld.addBeanClass(KafkaIgnoreFailure.Factory.class); weld.addBeanClass(KafkaDeadLetterQueue.Factory.class); + weld.addBeanClass(KafkaDeadLetterSerializationHandler.class); weld.addBeanClass(KafkaCDIEvents.class); weld.addBeanClass(KafkaConnector.class); weld.addBeanClass(KafkaClientServiceImpl.class); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/metrics/ObservationTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/metrics/ObservationTest.java index 636dc00b69..db50083106 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/metrics/ObservationTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/metrics/ObservationTest.java @@ -232,7 +232,6 @@ public KafkaMessageObservation(String channel, Message message) { if (metadata.isPresent()) { Instant inst = metadata.get().getTimestamp(); recordTs = inst.toEpochMilli(); - System.out.println("record " + recordTs); } else { recordTs = 0L; } @@ -242,7 +241,6 @@ public KafkaMessageObservation(String channel, Message message) { public void onMessageAck(Message message) { super.onMessageAck(message); completedMs = System.currentTimeMillis(); - System.out.println("completed in " + completedMs); done = true; } } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfig.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfig.java new file mode 100644 index 0000000000..38b396c48f --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfig.java @@ -0,0 +1,192 @@ +package io.smallrye.reactive.messaging.providers.impl; + +import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.CHANNEL_NAME_ATTRIBUTE; +import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.CONNECTOR_PREFIX; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +import org.eclipse.microprofile.config.Config; + +/** + * Represents a configuration class that allows overriding specific properties for a connector. + * If a nested-channel is defined, property lookups will be in the following order + *
    + *
  1. If nested-channel is given try [prefix].[channel].[nested-channel].[property-key]
  2. + *
  3. If an override function is given for a property-key, calls it converts the return value
  4. + *
  5. Calls the {@link ConnectorConfig} base function
  6. + *
+ * The function {@link #getOriginalValue(String, Class)} allows accessing the connector config without override. + * + * @see ConnectorConfig + */ +public class OverrideConnectorConfig extends ConnectorConfig { + + private final String nestedChannel; + private final Map> overrides; + + public OverrideConnectorConfig(String prefix, Config overall, String channel, + Map> overrides) { + this(prefix, overall, channel, null, overrides); + } + + public OverrideConnectorConfig(String prefix, Config overall, String channel, String nestedChannel) { + this(prefix, overall, channel, nestedChannel, new HashMap<>()); + } + + public OverrideConnectorConfig(String prefix, Config overall, String channel, String nestedChannel, + Map> overrides) { + super(prefix, overall, channel); + this.nestedChannel = nestedChannel; + this.overrides = overrides; + } + + protected String nestedChannelKey(String keyName) { + if (nestedChannel == null) { + return keyName; + } + return nestedChannel + "." + keyName; + } + + public Optional getOriginalValue(String propertyName, Class propertyType) { + return super.getOptionalValue(propertyName, propertyType); + } + + @Override + public T getValue(String propertyName, Class propertyType) { + if (nestedChannel != null) { + // First check if the nestedChannel channel configuration contains the desired attribute. + Optional maybeResult = super.getOptionalValue(nestedChannelKey(propertyName), propertyType); + if (maybeResult.isPresent()) { + return maybeResult.get(); + } + } + Function function = overrides.get(propertyName); + if (function != null) { + Object o = function.apply(this); + if (o != null) { + if (propertyType.isInstance(o)) { + return (T) o; + } + if (o instanceof String) { + return convert(((String) o), propertyType); + } + } + } + return super.getValue(propertyName, propertyType); + } + + @Override + public Optional getOptionalValue(String propertyName, Class propertyType) { + if (nestedChannel != null) { + // First check if the nestedChannel channel configuration contains the desired attribute. + Optional maybe = super.getOptionalValue(nestedChannelKey(propertyName), propertyType); + if (maybe.isPresent()) { + return maybe; + } + } + Function function = overrides.get(propertyName); + if (function != null) { + Object o = function.apply(this); + if (o != null) { + if (propertyType.isInstance(o)) { + return Optional.of((T) o); + } + if (o instanceof String) { + return convertOptional((String) o, propertyType); + } + return Optional.empty(); + } + } + return super.getOptionalValue(propertyName, propertyType); + } + + /** + * Gets the lists of config keys for the given connector. + * Note that the list contains property names from the config and env variables. + * It includes keys from the connector config and channel config. + * + * @return the list of keys + */ + @Override + public Iterable getPropertyNames() { + String prefix = channelPrefix; + String nestedPrefix = channelPrefix + nestedChannel + "."; + String prefixAlpha = toAlpha(prefix); + String nestedPrefixAlpha = toAlpha(nestedPrefix); + String prefixAlphaUpper = prefixAlpha.toUpperCase(); + String nestedPrefixAlphaUpper = nestedPrefixAlpha.toUpperCase(); + String connectorPrefix = CONNECTOR_PREFIX + connector + "."; + String connectorNestedPrefix = connectorPrefix + nestedChannel + "."; + String connectorPrefixAlpha = toAlpha(connectorPrefix); + String connectorNestedPrefixAlpha = toAlpha(connectorNestedPrefix); + String connectorPrefixAlphaUpper = connectorPrefixAlpha.toUpperCase(); + String connectorNestedPrefixAlphaUpper = connectorNestedPrefixAlpha.toUpperCase(); + + Set names = new HashSet<>(); + for (String name : overall.getPropertyNames()) { + if (name.startsWith(connectorNestedPrefix)) { + String computed = name.substring(connectorNestedPrefix.length()); + names.add(computed); + } else if (name.startsWith(connectorPrefix)) { + String computed = name.substring(connectorPrefix.length()); + names.add(computed); + } else if (name.startsWith(connectorNestedPrefixAlpha)) { + String computed = name.substring(connectorNestedPrefixAlpha.length()); + if (nameExists(connectorNestedPrefix + computed)) { + names.add(computed); + } + } else if (name.startsWith(connectorPrefixAlpha)) { + String computed = name.substring(connectorPrefixAlpha.length()); + if (nameExists(connectorPrefix + computed)) { + names.add(computed); + } + } else if (name.startsWith(connectorNestedPrefixAlphaUpper)) { + String computed = name.substring(connectorNestedPrefixAlphaUpper.length()); + if (nameExists(connectorNestedPrefix + computed)) { + names.add(computed); + } + } else if (name.startsWith(connectorPrefixAlphaUpper)) { + String computed = name.substring(connectorPrefixAlphaUpper.length()); + if (nameExists(connectorPrefix + computed)) { + names.add(computed); + } + } else if (name.startsWith(prefix)) { + String computed = name.substring(prefix.length()); + names.add(computed); + } else if (name.startsWith(nestedPrefix)) { + String computed = name.substring(nestedPrefix.length()); + names.add(computed); + } else if (name.startsWith(prefixAlpha)) { + String computed = name.substring(prefixAlpha.length()); + if (nameExists(prefix + computed)) { + names.add(computed); + } + } else if (name.startsWith(nestedPrefixAlpha)) { + String computed = name.substring(nestedPrefixAlpha.length()); + if (nameExists(nestedPrefix + computed)) { + names.add(computed); + } + } else if (name.startsWith(prefixAlphaUpper)) { + String computed = name.substring(prefixAlphaUpper.length()); + if (nameExists(prefix + computed)) { + names.add(computed); + } + } else if (name.startsWith(nestedPrefixAlphaUpper)) { + String computed = name.substring(nestedPrefixAlphaUpper.length()); + if (nameExists(nestedPrefix + computed)) { + names.add(computed); + } + } + } + + names.add(CHANNEL_NAME_ATTRIBUTE); + names.addAll(overrides.keySet()); + return names; + } + +} diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfigTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfigTest.java new file mode 100644 index 0000000000..a16f3e44a0 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfigTest.java @@ -0,0 +1,189 @@ +package io.smallrye.reactive.messaging.providers.impl; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.eclipse.microprofile.config.ConfigValue; +import org.eclipse.microprofile.config.spi.ConfigSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.config.SmallRyeConfig; +import io.smallrye.config.SmallRyeConfigBuilder; +import io.smallrye.reactive.messaging.test.common.config.SetEnvironmentVariable; + +@SetEnvironmentVariable(key = "MP_MESSAGING_INCOMING_FOO_ATTR", value = "new-value") +@SetEnvironmentVariable(key = "MP_MESSAGING_INCOMING_FOO_AT_TR", value = "another-value") +@SetEnvironmentVariable(key = "MP_MESSAGING_CONNECTOR_SOME_CONNECTOR_SOME_KEY", value = "another-value-from-connector") +@SetEnvironmentVariable(key = "MP_MESSAGING_CONNECTOR_SOME_CONNECTOR_SOME_OTHER_KEY", value = "another-value-other") +@SetEnvironmentVariable(key = "MP_MESSAGING_CONNECTOR_SOME_CONNECTOR_ATTR1", value = "should not be used") +@SetEnvironmentVariable(key = "MP_MESSAGING_CONNECTOR_SOME_CONNECTOR_ATTR3", value = "used") +@SetEnvironmentVariable(key = "mp_messaging_connector_some_connector_attr4", value = "used") +@SetEnvironmentVariable(key = "mp_messaging_connector_SOME_CONNECTOR_mixedcase", value = "used") +class OverrideConnectorConfigTest { + + private SmallRyeConfig overallConfig; + private OverrideConnectorConfig config; + private OverrideConnectorConfig config2; + + @BeforeEach + public void createTestConfig() { + Map cfg = new HashMap<>(); + cfg.put("mp.messaging.incoming.bar.connector", "some-connector"); + cfg.put("mp.messaging.incoming.bar.test", "test"); + cfg.put("mp.messaging.incoming.foo.connector", "some-connector"); + cfg.put("mp.messaging.incoming.foo.attr1", "value"); + cfg.put("mp.messaging.incoming.foo.attr2", "23"); + cfg.put("mp.messaging.incoming.foo.attr.2", "test"); + cfg.put("mp.messaging.incoming.foo.at-tr", "test"); + cfg.put("mp.messaging.incoming.foo.bar.qux", "value"); + cfg.put("mp.messaging.connector.some-connector.key", "value"); + cfg.put("mp.messaging.connector.some-connector.some-key", "should not be used"); + cfg.put("mp.messaging.connector.some-connector.bar.other-key", "another-value"); + cfg.put("MP_MESSAGING_CONNECTOR_SOME_CONNECTOR_CAPSKEY", "should not be used"); + + SmallRyeConfigBuilder builder = new SmallRyeConfigBuilder(); + builder.addDefaultSources(); + builder.withSources(new ConfigSource() { + @Override + public Map getProperties() { + return cfg; + } + + @Override + public Set getPropertyNames() { + return cfg.keySet(); + } + + @Override + public int getOrdinal() { + return ConfigSource.DEFAULT_ORDINAL; + } + + @Override + public String getValue(String s) { + return cfg.get(s); + } + + @Override + public String getName() { + return "test"; + } + }); + overallConfig = builder.build(); + config = new OverrideConnectorConfig("mp.messaging.incoming.", overallConfig, "foo", "bar"); + config2 = new OverrideConnectorConfig("mp.messaging.incoming.", overallConfig, "foo", "bar", + Map.of("attr1", c -> "some-other-value", + "attr2", c -> c.getOriginalValue("attr2", Integer.class).map(i -> i + 10) + .orElse(10))); + } + + @Test + public void testGetConfigValue() { + ConfigValue attr1 = config.getConfigValue("attr1"); + assertThat(attr1.getName()).isEqualTo("mp.messaging.incoming.foo.attr1"); + assertThat(attr1.getValue()).isEqualTo("value"); + assertThat(attr1.getRawValue()).isEqualTo("value"); + assertThat(attr1.getSourceName()).isEqualTo("test"); + assertThat(attr1.getSourceOrdinal()).isEqualTo(ConfigSource.DEFAULT_ORDINAL); + + ConfigValue attr3 = config.getConfigValue("attr3"); + assertThat(attr3.getName()).isEqualTo("mp.messaging.connector.some-connector.attr3"); // The value looked up in the overall config + assertThat(attr3.getValue()).isEqualTo("used"); + assertThat(attr3.getRawValue()).isEqualTo("used"); + assertThat(attr3.getSourceOrdinal()).isEqualTo(300); // Env config source default ordinal + + ConfigValue channelName = config.getConfigValue("channel-name"); + assertThat(channelName.getName()).isEqualTo("channel-name"); + assertThat(channelName.getValue()).isEqualTo("foo"); + assertThat(channelName.getRawValue()).isEqualTo("foo"); + assertThat(channelName.getSourceName()).isEqualTo("ConnectorConfig internal"); + assertThat(channelName.getSourceOrdinal()).isEqualTo(0); + } + + @SetEnvironmentVariable(key = "MP_MESSAGING_INCOMING_FOO_ATTR", value = "new-value") + @SetEnvironmentVariable(key = "MP_MESSAGING_INCOMING_FOO_AT_TR", value = "another-value") + @SetEnvironmentVariable(key = "MP_MESSAGING_INCOMING_FOO_BAR_KEY", value = "some-other-value") + @SetEnvironmentVariable(key = "MP_MESSAGING_CONNECTOR_SOME_CONNECTOR_SOME_KEY", value = "another-value-from-connector") + @SetEnvironmentVariable(key = "MP_MESSAGING_CONNECTOR_SOME_CONNECTOR_SOME_OTHER_KEY", value = "another-value-other") + @SetEnvironmentVariable(key = "MP_MESSAGING_CONNECTOR_SOME_CONNECTOR_ATTR1", value = "should not be used") + @SetEnvironmentVariable(key = "MP_MESSAGING_CONNECTOR_SOME_CONNECTOR_ATTR3", value = "used") + @SetEnvironmentVariable(key = "mp_messaging_connector_some_connector_attr4", value = "used") + @SetEnvironmentVariable(key = "mp_messaging_connector_SOME_CONNECTOR_mixedcase", value = "used") + @Test + public void testPropertyNames() { + // Base config behaviour: + // Even though looking up both properties would return the value of the environment variable, + // both are included in the set returned from getPropertyNames. + assertThat(overallConfig.getPropertyNames()) + .contains("mp.messaging.incoming.foo.at-tr", + "MP_MESSAGING_INCOMING_FOO_AT_TR"); + + Iterable names = config.getPropertyNames(); + assertThat(names) + .containsExactlyInAnyOrder("connector", "ATTR1", "attr1", "attr2", "attr.2", "ATTR", "attr", "AT_TR", + "at-tr", "bar.key", "BAR_KEY", "bar.qux", "other-key", "key", + "SOME_KEY", "some-key", "SOME_OTHER_KEY", "ATTR3", "attr4", "channel-name"); + + assertThat(config.getOptionalValue("connector", String.class)).hasValue("some-connector"); + assertThat(config.getOptionalValue("attr1", String.class)).hasValue("value"); + assertThat(config.getOptionalValue("attr2", Integer.class)).hasValue(23); + assertThat(config.getOptionalValue("attr.2", String.class)).hasValue("test"); + assertThat(config.getOptionalValue("at-tr", String.class)).hasValue("another-value"); + assertThat(config.getOptionalValue("AT_TR", String.class)).hasValue("another-value"); + assertThat(config.getOptionalValue("some-key", String.class)).hasValue("another-value-from-connector"); + assertThat(config.getOptionalValue("SOME_KEY", String.class)).hasValue("another-value-from-connector"); + assertThat(config.getOptionalValue("key", String.class)).hasValue("some-other-value"); + assertThat(config.getOptionalValue("attr3", String.class)).hasValue("used"); + assertThat(config.getOptionalValue("ATTR3", String.class)).hasValue("used"); + assertThat(config.getOptionalValue("attr4", String.class)).hasValue("used"); + assertThat(config.getOptionalValue("attr1", String.class)).hasValue("value"); + assertThat(config.getOptionalValue("qux", String.class)).hasValue("value"); + assertThat(config.getOptionalValue("bar.qux", String.class)).hasValue("value"); + assertThat(config.getOptionalValue("bar.other-key", String.class)).hasValue("another-value"); + } + + @SetEnvironmentVariable(key = "MP_MESSAGING_INCOMING_FOO_ATTR", value = "new-value") + @SetEnvironmentVariable(key = "MP_MESSAGING_INCOMING_FOO_AT_TR", value = "another-value") + @SetEnvironmentVariable(key = "MP_MESSAGING_INCOMING_FOO_BAR_KEY", value = "some-other-value") + @SetEnvironmentVariable(key = "MP_MESSAGING_CONNECTOR_SOME_CONNECTOR_SOME_KEY", value = "another-value-from-connector") + @SetEnvironmentVariable(key = "MP_MESSAGING_CONNECTOR_SOME_CONNECTOR_SOME_OTHER_KEY", value = "another-value-other") + @SetEnvironmentVariable(key = "MP_MESSAGING_CONNECTOR_SOME_CONNECTOR_ATTR1", value = "should not be used") + @SetEnvironmentVariable(key = "MP_MESSAGING_CONNECTOR_SOME_CONNECTOR_ATTR3", value = "used") + @SetEnvironmentVariable(key = "mp_messaging_connector_some_connector_attr4", value = "used") + @SetEnvironmentVariable(key = "mp_messaging_connector_SOME_CONNECTOR_mixedcase", value = "used") + @Test + public void testPropertyNamesOverriden() { + // Base config behaviour: + // Even though looking up both properties would return the value of the environment variable, + // both are included in the set returned from getPropertyNames. + assertThat(overallConfig.getPropertyNames()) + .contains("mp.messaging.incoming.foo.at-tr", + "MP_MESSAGING_INCOMING_FOO_AT_TR"); + + Iterable names = config2.getPropertyNames(); + assertThat(names) + .containsExactlyInAnyOrder("connector", "ATTR1", "attr1", "attr2", "attr.2", "ATTR", "attr", "AT_TR", + "at-tr", "bar.key", "BAR_KEY", "bar.qux", "other-key", "key", + "SOME_KEY", "some-key", "SOME_OTHER_KEY", "ATTR3", "attr4", "channel-name"); + + assertThat(config2.getOptionalValue("connector", String.class)).hasValue("some-connector"); + assertThat(config2.getOptionalValue("attr1", String.class)).hasValue("some-other-value"); + assertThat(config2.getOptionalValue("attr2", Integer.class)).hasValue(33); + assertThat(config2.getOptionalValue("attr.2", String.class)).hasValue("test"); + assertThat(config2.getOptionalValue("at-tr", String.class)).hasValue("another-value"); + assertThat(config2.getOptionalValue("AT_TR", String.class)).hasValue("another-value"); + assertThat(config2.getOptionalValue("some-key", String.class)).hasValue("another-value-from-connector"); + assertThat(config2.getOptionalValue("SOME_KEY", String.class)).hasValue("another-value-from-connector"); + assertThat(config2.getOptionalValue("key", String.class)).hasValue("some-other-value"); + assertThat(config2.getOptionalValue("attr3", String.class)).hasValue("used"); + assertThat(config2.getOptionalValue("ATTR3", String.class)).hasValue("used"); + assertThat(config2.getOptionalValue("attr4", String.class)).hasValue("used"); + assertThat(config2.getOptionalValue("qux", String.class)).hasValue("value"); + assertThat(config2.getOptionalValue("bar.qux", String.class)).hasValue("value"); + assertThat(config2.getOptionalValue("bar.other-key", String.class)).hasValue("another-value"); + } +}