Skip to content

Commit

Permalink
Merge pull request #2406 from ozangunalp/kafka_config_override
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier authored Dec 12, 2023
2 parents 0418394 + 3dfeebf commit f277c46
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 67 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@

<nats-embedded.version>2.1.1</nats-embedded.version>
<slf4j-log4j12.version>1.7.36</slf4j-log4j12.version>
<slf4j-reload4j.version>2.0.9</slf4j-reload4j.version>
<opencsv.version>5.9</opencsv.version>
<strimzi-test-container.version>0.105.0</strimzi-test-container.version>

Expand Down Expand Up @@ -310,6 +311,11 @@
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j-log4j12.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<version>${slf4j-reload4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions smallrye-reactive-messaging-kafka-test-companion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@
<optional>true</optional>
</dependency>

<!-- Ensure log4j1 log backend with slf4j -->
<!-- Ensure log4j log backend with slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<artifactId>slf4j-reload4j</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
4 changes: 2 additions & 2 deletions smallrye-reactive-messaging-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@
<scope>test</scope>
</dependency>

<!-- Ensure log4j1 log backend with slf4j -->
<!-- Ensure log4j log backend with slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<artifactId>slf4j-reload4j</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,37 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.INCOMING_PREFIX;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.impl.ConfigurationCleaner;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer;
import io.smallrye.reactive.messaging.providers.impl.ConnectorConfig;
import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig;
import io.vertx.mutiny.core.Vertx;

@SuppressWarnings({ "rawtypes", "unchecked" })
Expand Down Expand Up @@ -65,6 +73,17 @@ public static class Factory implements KafkaFailureHandler.Factory {
@Inject
KafkaCDIEvents kafkaCDIEvents;

@Inject
@Any
Instance<SerializationFailureHandler<?>> serializationFailureHandlers;

@Inject
@Any
Instance<ProducerInterceptor<?, ?>> producerInterceptors;

@Inject
Instance<Config> rootConfig;

@Override
public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
Vertx vertx,
Expand All @@ -74,29 +93,27 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
String keyDeserializer = (String) deadQueueProducerConfig.remove(KEY_DESERIALIZER_CLASS_CONFIG);
String valueDeserializer = (String) deadQueueProducerConfig.remove(VALUE_DESERIALIZER_CLASS_CONFIG);

// We need to remove consumer interceptor
deadQueueProducerConfig.remove(INTERCEPTOR_CLASSES_CONFIG);

deadQueueProducerConfig.put(KEY_SERIALIZER_CLASS_CONFIG,
config.getDeadLetterQueueKeySerializer().orElse(getMirrorSerializer(keyDeserializer)));
deadQueueProducerConfig.put(VALUE_SERIALIZER_CLASS_CONFIG,
config.getDeadLetterQueueValueSerializer().orElse(getMirrorSerializer(valueDeserializer)));
deadQueueProducerConfig.put(CLIENT_ID_CONFIG,
config.getDeadLetterQueueProducerClientId()
.orElse("kafka-dead-letter-topic-producer-" + deadQueueProducerConfig.get(CLIENT_ID_CONFIG)));
String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG);
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
config.getChannel(), "dead-letter-queue", Map.of(
KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer),
CLIENT_ID_CONFIG, c -> config.getDeadLetterQueueProducerClientId()
.orElse("kafka-dead-letter-topic-producer-" + consumerClientId),
"topic", c -> "dead-letter-topic-" + config.getChannel(),
"key-serialization-failure-handler", c -> "dlq-serialization",
"value-serialization-failure-handler", c -> "dlq-serialization",
INTERCEPTOR_CLASSES_CONFIG, c -> ""));
KafkaConnectorOutgoingConfiguration producerConfig = new KafkaConnectorOutgoingConfiguration(connectorConfig);

ConfigurationCleaner.cleanupProducerConfiguration(deadQueueProducerConfig);
String deadQueueTopic = config.getDeadLetterQueueTopic().orElse("dead-letter-topic-" + config.getChannel());

log.deadLetterConfig(deadQueueTopic,
(String) deadQueueProducerConfig.get(KEY_SERIALIZER_CLASS_CONFIG),
(String) deadQueueProducerConfig.get(VALUE_SERIALIZER_CLASS_CONFIG));
log.deadLetterConfig(producerConfig.getTopic().orElse(null), producerConfig.getKeySerializer(),
producerConfig.getValueSerializer());

var dlqSerializationHandler = new KafkaDeadLetterSerializationHandler<>();
// fire producer event (e.g. bind metrics)
ReactiveKafkaProducer<Object, Object> producer = new ReactiveKafkaProducer<>(deadQueueProducerConfig,
deadQueueTopic, 10000, false, null, dlqSerializationHandler, dlqSerializationHandler,
(p, c) -> kafkaCDIEvents.producer().fire(p));
ReactiveKafkaProducer<Object, Object> producer = new ReactiveKafkaProducer<>(producerConfig,
serializationFailureHandlers, producerInterceptors, null, (p, c) -> kafkaCDIEvents.producer().fire(p));

return new KafkaDeadLetterQueue(config.getChannel(), deadQueueTopic, producer, reportFailure);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,21 @@
import java.util.Arrays;
import java.util.Objects;

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler;

public class KafkaDeadLetterSerializationHandler<T> implements SerializationFailureHandler<T> {
@ApplicationScoped
@Identifier("dlq-serialization")
public class KafkaDeadLetterSerializationHandler implements SerializationFailureHandler<Object> {

@Override
public byte[] decorateSerialization(Uni<byte[]> serialization, String topic, boolean isKey, String serializer, T data,
public byte[] decorateSerialization(Uni<byte[]> serialization, String topic, boolean isKey, String serializer, Object data,
Headers headers) {
// deserializer failure
if (headers.lastHeader(DESERIALIZATION_FAILURE_DESERIALIZER) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ;
import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log;
import static io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.createDeserializationFailureHandler;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.INCOMING_PREFIX;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
Expand All @@ -26,15 +26,18 @@
import java.util.stream.Stream;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
Expand All @@ -44,15 +47,17 @@
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
import io.smallrye.reactive.messaging.kafka.commit.KafkaLatestCommit;
import io.smallrye.reactive.messaging.kafka.impl.ConfigurationCleaner;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer;
import io.smallrye.reactive.messaging.kafka.impl.RuntimeKafkaSourceConfiguration;
import io.smallrye.reactive.messaging.providers.impl.ConnectorConfig;
import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Vertx;

Expand Down Expand Up @@ -82,7 +87,19 @@ public static class Factory implements KafkaFailureHandler.Factory {
KafkaCDIEvents kafkaCDIEvents;

@Inject
Instance<DeserializationFailureHandler<?>> failureHandlers;
@Any
Instance<DeserializationFailureHandler<?>> deserializationFailureHandlers;

@Inject
@Any
Instance<SerializationFailureHandler<?>> serializationFailureHandlers;

@Inject
@Any
Instance<ProducerInterceptor<?, ?>> producerInterceptors;

@Inject
Instance<Config> rootConfig;

@Override
public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
Expand All @@ -93,20 +110,6 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
String keyDeserializer = (String) delayedRetryTopicProducerConfig.remove(KEY_DESERIALIZER_CLASS_CONFIG);
String valueDeserializer = (String) delayedRetryTopicProducerConfig.remove(VALUE_DESERIALIZER_CLASS_CONFIG);

// We need to remove consumer interceptor
delayedRetryTopicProducerConfig.remove(INTERCEPTOR_CLASSES_CONFIG);

delayedRetryTopicProducerConfig.put(KEY_SERIALIZER_CLASS_CONFIG,
config.getDeadLetterQueueKeySerializer().orElse(getMirrorSerializer(keyDeserializer)));
delayedRetryTopicProducerConfig.put(VALUE_SERIALIZER_CLASS_CONFIG,
config.getDeadLetterQueueValueSerializer().orElse(getMirrorSerializer(valueDeserializer)));
delayedRetryTopicProducerConfig.put(CLIENT_ID_CONFIG,
config.getDeadLetterQueueProducerClientId()
.orElse("kafka-delayed-retry-topic-producer-"
+ delayedRetryTopicProducerConfig.get(CLIENT_ID_CONFIG)));

ConfigurationCleaner.cleanupProducerConfiguration(delayedRetryTopicProducerConfig);

List<String> retryTopics = config.getDelayedRetryTopicTopics()
.map(topics -> Arrays.stream(topics.split(",")).collect(Collectors.toList()))
.orElseGet(() -> Stream.of(
Expand All @@ -117,30 +120,37 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
long retryTimeout = config.getDelayedRetryTopicTimeout();
String deadQueueTopic = config.getDeadLetterQueueTopic().orElse(null);

String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG);
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
config.getChannel(), "dead-letter-queue", Map.of(
KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer),
CLIENT_ID_CONFIG, c -> config.getDeadLetterQueueProducerClientId()
.orElse("kafka-delayed-retry-topic-producer-" + consumerClientId),
"topic", c -> retryTopics.get(0),
"key-serialization-failure-handler", c -> "dlq-serialization",
"value-serialization-failure-handler", c -> "dlq-serialization",
INTERCEPTOR_CLASSES_CONFIG, c -> ""));
KafkaConnectorOutgoingConfiguration producerConfig = new KafkaConnectorOutgoingConfiguration(connectorConfig);

log.delayedRetryTopic(config.getChannel(), retryTopics, maxRetries, retryTimeout, deadQueueTopic);

var dlqSerializationHandler = new KafkaDeadLetterSerializationHandler<>();
// fire producer event (e.g. bind metrics)
ReactiveKafkaProducer<Object, Object> producer = new ReactiveKafkaProducer<>(delayedRetryTopicProducerConfig,
retryTopics.get(0), 10000, false, null, dlqSerializationHandler, dlqSerializationHandler,
(p, c) -> kafkaCDIEvents.producer().fire(p));

Map<String, Object> retryConsumerConfig = new HashMap<>(consumer.configuration());
retryConsumerConfig.put(CLIENT_ID_CONFIG,
"kafka-delayed-retry-topic-" + retryConsumerConfig.get(CLIENT_ID_CONFIG));
retryConsumerConfig.put(GROUP_ID_CONFIG,
"kafka-delayed-retry-topic-" + retryConsumerConfig.get(GROUP_ID_CONFIG));

ReactiveKafkaConsumer<Object, Object> retryConsumer = new ReactiveKafkaConsumer<>(retryConsumerConfig,
createDeserializationFailureHandler(true, failureHandlers, config),
createDeserializationFailureHandler(false, failureHandlers, config),
RuntimeKafkaSourceConfiguration.buildFromConfiguration(config),
true,
config.getPollTimeout(),
config.getFailOnDeserializationFailure(),
c -> kafkaCDIEvents.consumer().fire(c),
ReactiveKafkaProducer<Object, Object> producer = new ReactiveKafkaProducer<>(producerConfig,
serializationFailureHandlers, producerInterceptors, null, (p, c) -> kafkaCDIEvents.producer().fire(p));

ConnectorConfig retryConsumerConfig = new OverrideConnectorConfig(INCOMING_PREFIX,
rootConfig.get(), config.getChannel(), "delayed-retry-topic.consumer", Map.of(
"lazy-client", c -> true,
CLIENT_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId,
GROUP_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId));
KafkaConnectorIncomingConfiguration retryConfig = new KafkaConnectorIncomingConfiguration(retryConsumerConfig);
ReactiveKafkaConsumer<Object, Object> retryConsumer = new ReactiveKafkaConsumer<>(retryConfig,
deserializationFailureHandlers,
retryConsumerConfig.getValue(GROUP_ID_CONFIG, String.class), -1,
reportFailure,
((VertxInternal) vertx.getDelegate()).createEventLoopContext());
((VertxInternal) vertx.getDelegate()).createEventLoopContext(),
c -> kafkaCDIEvents.consumer().fire(c));

return new KafkaDelayedRetryTopic(config.getChannel(), vertx, config, retryTopics, maxRetries, retryTimeout,
deadQueueTopic, producer, retryConsumer, reportFailure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.smallrye.reactive.messaging.kafka.commit.KafkaLatestCommit;
import io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterSerializationHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailStop;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaIgnoreFailure;
Expand Down Expand Up @@ -98,6 +99,7 @@ public void initWeld() {
weld.addBeanClass(KafkaFailStop.Factory.class);
weld.addBeanClass(KafkaIgnoreFailure.Factory.class);
weld.addBeanClass(KafkaDeadLetterQueue.Factory.class);
weld.addBeanClass(KafkaDeadLetterSerializationHandler.class);
weld.addBeanClass(KafkaCDIEvents.class);
weld.addBeanClass(KafkaConnector.class);
weld.addBeanClass(KafkaClientServiceImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ public KafkaMessageObservation(String channel, Message<?> message) {
if (metadata.isPresent()) {
Instant inst = metadata.get().getTimestamp();
recordTs = inst.toEpochMilli();
System.out.println("record " + recordTs);
} else {
recordTs = 0L;
}
Expand All @@ -242,7 +241,6 @@ public KafkaMessageObservation(String channel, Message<?> message) {
public void onMessageAck(Message<?> message) {
super.onMessageAck(message);
completedMs = System.currentTimeMillis();
System.out.println("completed in " + completedMs);
done = true;
}
}
Expand Down
Loading

0 comments on commit f277c46

Please sign in to comment.