diff --git a/CHANGELOG.md b/CHANGELOG.md index 43f1af6..883a53a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.31.0] - 2024-09-09 +### Added +- Added a postprocessor to the `TkmsKafkaProducerProvider` to allow features like tracing attached to the Kafka Producer. + ## [0.30.1] - 2024-08-08 ### Changed - MeterFilter's applied by the library are no longer explicitly applied and are instead diff --git a/gradle.properties b/gradle.properties index fe6a7f0..229d380 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.30.1 +version=0.31.0 diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java index 380197c..0621942 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java @@ -16,7 +16,6 @@ import com.transferwise.kafka.tkms.api.ITkmsMessageInterceptors; import com.transferwise.kafka.tkms.api.TkmsShardPartition; import com.transferwise.kafka.tkms.config.ITkmsDaoProvider; -import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerPostProcessor; import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase; import com.transferwise.kafka.tkms.config.TkmsProperties; @@ -43,7 +42,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.mutable.MutableObject; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -81,8 +79,6 @@ public class TkmsStorageToKafkaProxy implements GracefulShutdownStrategy, ITkmsS @Autowired private ITkmsMessageInterceptors messageIntereceptors; @Autowired - private ITkmsKafkaProducerPostProcessor tkmsKafkaProducerPostProcessor; - @Autowired private SharedReentrantLockBuilderFactory lockBuilderFactory; @Autowired private ITkmsInterrupterService tkmsInterrupterService; diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java index 3402902..dbddf13 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java @@ -9,12 +9,6 @@ public interface ITkmsKafkaProducerProvider { Producer getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition); - default void addPostProcessor(ITkmsKafkaProducerPostProcessor postProcessor) { - } - - default void removePostProcessors() { - } - void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); void closeKafkaProducerForTopicValidation(TkmsShardPartition tkmsShardPartition); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java index 193e4a2..1137878 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.util.Assert; @Slf4j public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, GracefulShutdownStrategy { @@ -43,18 +42,9 @@ public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, Gr private Map, ProducerEntry> producers = new ConcurrentHashMap<>(); + @Autowired private List postProcessors = new ArrayList<>(); - @Override - public void addPostProcessor(ITkmsKafkaProducerPostProcessor postProcessor) { - Assert.notNull(postProcessor, "'postProcessor' cannot be null"); - this.postProcessors.add(postProcessor); - } - - @Override - public void removePostProcessors() { - this.postProcessors.clear(); - } @Override public Producer getKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) { diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java index 756c7b2..d1fa8d5 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java @@ -10,35 +10,28 @@ import com.transferwise.kafka.tkms.api.TkmsMessage; import com.transferwise.kafka.tkms.test.BaseIntTest; import com.transferwise.kafka.tkms.test.ITkmsSentMessagesCollector; +import com.transferwise.kafka.tkms.test.TestMessagesInterceptor; import com.transferwise.kafka.tkms.test.TestProperties; import java.nio.charset.StandardCharsets; import java.util.stream.StreamSupport; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; import org.springframework.beans.factory.annotation.Autowired; -@TestInstance(TestInstance.Lifecycle.PER_CLASS) class MessagePostProcessingTest extends BaseIntTest { + @Autowired + private TestMessagesInterceptor testMessagesInterceptor; @Autowired private TransactionalKafkaMessageSender transactionalKafkaMessageSender; - @Autowired private TestProperties testProperties; - @Autowired private ITransactionsHelper transactionsHelper; - @BeforeEach - void setupTest() { - tkmsSentMessagesCollector.clear(); - } - @AfterEach void cleanupTest() { - tkmsSentMessagesCollector.clear(); + testMessagesInterceptor.setBeforeSendingToKafkaFunction(null); } @Test diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java index 6288500..b101a5b 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java @@ -1,7 +1,6 @@ package com.transferwise.kafka.tkms.test; import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerPostProcessor; -import com.transferwise.kafka.tkms.config.TkmsKafkaProducerProvider; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -11,20 +10,15 @@ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component -public class TestKafkaProducerPostProcessor implements ITkmsKafkaProducerPostProcessor, InitializingBean { +public class TestKafkaProducerPostProcessor implements ITkmsKafkaProducerPostProcessor { public static final byte[] TEST_MESSAGE = "Testing ProducerPostProcessing".getBytes(StandardCharsets.UTF_8); private ProxyInvocationHandler handler; - @Autowired - TkmsKafkaProducerProvider tkmsKafkaProducerProvider; - @SuppressWarnings("unchecked") @Override public Producer apply(Producer producer) { @@ -36,11 +30,6 @@ public Producer apply(Producer producer) { handler); } - @Override - public void afterPropertiesSet() throws Exception { - tkmsKafkaProducerProvider.addPostProcessor(this); - } - private static class ProxyInvocationHandler implements InvocationHandler { private final Producer producer;