diff --git a/pom.xml b/pom.xml
index 28df4ad93b..6a39cc3310 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@
2.1.1
1.7.36
+ 2.0.9
5.9
0.105.0
@@ -310,6 +311,11 @@
slf4j-log4j12
${slf4j-log4j12.version}
+
+ org.slf4j
+ slf4j-reload4j
+ ${slf4j-reload4j.version}
+
org.slf4j
slf4j-simple
diff --git a/smallrye-reactive-messaging-kafka-test-companion/pom.xml b/smallrye-reactive-messaging-kafka-test-companion/pom.xml
index 5593a33614..a614daf795 100644
--- a/smallrye-reactive-messaging-kafka-test-companion/pom.xml
+++ b/smallrye-reactive-messaging-kafka-test-companion/pom.xml
@@ -67,10 +67,10 @@
true
-
+
org.slf4j
- slf4j-log4j12
+ slf4j-reload4j
test
diff --git a/smallrye-reactive-messaging-kafka/pom.xml b/smallrye-reactive-messaging-kafka/pom.xml
index 3e810053b4..1c084dae65 100644
--- a/smallrye-reactive-messaging-kafka/pom.xml
+++ b/smallrye-reactive-messaging-kafka/pom.xml
@@ -130,10 +130,10 @@
test
-
+
org.slf4j
- slf4j-log4j12
+ slf4j-reload4j
test
diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java
index b4c3802a2e..56f00332bc 100644
--- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java
+++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java
@@ -8,6 +8,7 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.INCOMING_PREFIX;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
@@ -15,10 +16,14 @@
import java.util.function.BiConsumer;
import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Any;
+import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
+import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import io.smallrye.common.annotation.Identifier;
@@ -26,11 +31,14 @@
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
+import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
+import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
-import io.smallrye.reactive.messaging.kafka.impl.ConfigurationCleaner;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer;
+import io.smallrye.reactive.messaging.providers.impl.ConnectorConfig;
+import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig;
import io.vertx.mutiny.core.Vertx;
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -65,6 +73,17 @@ public static class Factory implements KafkaFailureHandler.Factory {
@Inject
KafkaCDIEvents kafkaCDIEvents;
+ @Inject
+ @Any
+ Instance> serializationFailureHandlers;
+
+ @Inject
+ @Any
+ Instance> producerInterceptors;
+
+ @Inject
+ Instance rootConfig;
+
@Override
public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
Vertx vertx,
@@ -74,29 +93,27 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
String keyDeserializer = (String) deadQueueProducerConfig.remove(KEY_DESERIALIZER_CLASS_CONFIG);
String valueDeserializer = (String) deadQueueProducerConfig.remove(VALUE_DESERIALIZER_CLASS_CONFIG);
- // We need to remove consumer interceptor
- deadQueueProducerConfig.remove(INTERCEPTOR_CLASSES_CONFIG);
-
- deadQueueProducerConfig.put(KEY_SERIALIZER_CLASS_CONFIG,
- config.getDeadLetterQueueKeySerializer().orElse(getMirrorSerializer(keyDeserializer)));
- deadQueueProducerConfig.put(VALUE_SERIALIZER_CLASS_CONFIG,
- config.getDeadLetterQueueValueSerializer().orElse(getMirrorSerializer(valueDeserializer)));
- deadQueueProducerConfig.put(CLIENT_ID_CONFIG,
- config.getDeadLetterQueueProducerClientId()
- .orElse("kafka-dead-letter-topic-producer-" + deadQueueProducerConfig.get(CLIENT_ID_CONFIG)));
+ String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG);
+ ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
+ config.getChannel(), "dead-letter-queue", Map.of(
+ KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
+ VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer),
+ CLIENT_ID_CONFIG, c -> config.getDeadLetterQueueProducerClientId()
+ .orElse("kafka-dead-letter-topic-producer-" + consumerClientId),
+ "topic", c -> "dead-letter-topic-" + config.getChannel(),
+ "key-serialization-failure-handler", c -> "dlq-serialization",
+ "value-serialization-failure-handler", c -> "dlq-serialization",
+ INTERCEPTOR_CLASSES_CONFIG, c -> ""));
+ KafkaConnectorOutgoingConfiguration producerConfig = new KafkaConnectorOutgoingConfiguration(connectorConfig);
- ConfigurationCleaner.cleanupProducerConfiguration(deadQueueProducerConfig);
String deadQueueTopic = config.getDeadLetterQueueTopic().orElse("dead-letter-topic-" + config.getChannel());
- log.deadLetterConfig(deadQueueTopic,
- (String) deadQueueProducerConfig.get(KEY_SERIALIZER_CLASS_CONFIG),
- (String) deadQueueProducerConfig.get(VALUE_SERIALIZER_CLASS_CONFIG));
+ log.deadLetterConfig(producerConfig.getTopic().orElse(null), producerConfig.getKeySerializer(),
+ producerConfig.getValueSerializer());
- var dlqSerializationHandler = new KafkaDeadLetterSerializationHandler<>();
// fire producer event (e.g. bind metrics)
- ReactiveKafkaProducer