diff --git a/README.md b/README.md
index 2cd4450..a5453aa 100644
--- a/README.md
+++ b/README.md
@@ -17,6 +17,9 @@ mvn clean package
You need to create a unique test topic per application instance with enough partitions to span all brokers
+- topic partition count increases are automatically handled
+- It may take up to 60000 ms to reflect the new topic partition count
+
### Self-Managed Kafka
The configuration `test.properties` in https://github.com/dhoard/k-synthetic-test/blob/main/configuration/test.properties should be self-explanatory
@@ -81,7 +84,7 @@ Copy `configuration/test.properties` and edit to match your environment
Run
```shell
-java -jar target/k-synthetic-test-0.0.5.jar configuration/test.properties
+java -jar target/k-synthetic-test-0.0.6.jar configuration/test.properties
```
**NOTES**
diff --git a/configuration/README.md b/configuration/README.md
index 1e0ffc3..a64459b 100644
--- a/configuration/README.md
+++ b/configuration/README.md
@@ -16,7 +16,7 @@ Configuration scenario
Usage
```shell
-java -jar target/k-synthetic-test-0.0.5.jar configuration/test.properties
+java -jar target/k-synthetic-test-0.0.6.jar configuration/test.properties
```
---
@@ -36,7 +36,7 @@ Configuration scenario
java \
-Djavax.net.ssl.keyStore=configuration/keystore.pkcs12 \
-Djavax.net.ssl.keyStorePassword=changeit \
- -jar target/k-synthetic-test-0.0.5.jar configuration/test.ssl.properties
+ -jar target/k-synthetic-test-0.0.6.jar configuration/test.ssl.properties
```
---
@@ -56,5 +56,5 @@ Configuration scenario
java \
-Djavax.net.ssl.keyStore=configuration/keystore.pkcs12 \
-Djavax.net.ssl.keyStorePassword=changeit \
- -jar target/k-synthetic-test-0.0.5.jar configuration/test.confluent-cloud.properties
+ -jar target/k-synthetic-test-0.0.6.jar configuration/test.confluent-cloud.properties
```
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 19ef59d..b7d395d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.github.dhoard
k-synthetic-test
- 0.0.5
+ 0.0.6
UTF-8
@@ -78,17 +78,17 @@
org.slf4j
slf4j-api
- 1.7.30
+ 2.0.6
ch.qos.logback
logback-classic
- 1.2.3
+ 1.4.5
org.apache.kafka
kafka-clients
- 3.3.2
+ 3.4.0
io.prometheus
diff --git a/src/main/java/com/github/dhoard/k/synthetic/test/KSyntheticTest.java b/src/main/java/com/github/dhoard/k/synthetic/test/KSyntheticTest.java
index 47386cf..93164b9 100644
--- a/src/main/java/com/github/dhoard/k/synthetic/test/KSyntheticTest.java
+++ b/src/main/java/com/github/dhoard/k/synthetic/test/KSyntheticTest.java
@@ -58,9 +58,7 @@ public KSyntheticTest() {
countDownLatch = new CountDownLatch(1);
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- countDownLatch.countDown();
- }));
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> countDownLatch.countDown()));
}
/**
@@ -190,6 +188,7 @@ public boolean checkCredentials(String username, String password) {
// to prevent "These configurations X were supplied but are not used yet" warnings
Configuration consumerConfiguration = configuration.copy();
+ consumerConfiguration.put("metadata.max.age.ms", "60000");
consumerConfiguration.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerConfiguration.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerConfiguration.remove("acks");
@@ -201,6 +200,7 @@ public boolean checkCredentials(String username, String password) {
messageConsumer.start();
Configuration producerConfiguration = configuration.copy();
+ producerConfiguration.put("metadata.max.age.ms", "60000");
producerConfiguration.remove("key.deserializer");
producerConfiguration.remove("value.deserializer");
producerConfiguration.remove("session.timeout.ms");
diff --git a/src/main/java/com/github/dhoard/k/synthetic/test/RecordConsumer.java b/src/main/java/com/github/dhoard/k/synthetic/test/RecordConsumer.java
index 7ed45af..2dbb3a4 100644
--- a/src/main/java/com/github/dhoard/k/synthetic/test/RecordConsumer.java
+++ b/src/main/java/com/github/dhoard/k/synthetic/test/RecordConsumer.java
@@ -24,9 +24,14 @@
import org.slf4j.LoggerFactory;
import java.time.Duration;
-import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
+import java.util.Objects;
import java.util.Properties;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
@@ -46,6 +51,8 @@ public class RecordConsumer {
private CountDownLatch countDownLatch;
private KafkaConsumer kafkaConsumer;
+ private Timer assignPartitionsTimer;
+
/**
* Constructor
*
@@ -68,23 +75,21 @@ public void start() throws InterruptedException, ExecutionException {
kafkaConsumer = new KafkaConsumer<>(properties);
- // Manual partition assignment
-
- List topicPartitionList = new ArrayList<>();
-
- List partitionInfoList = kafkaConsumer.partitionsFor(topic);
- for (PartitionInfo partitionInfo : partitionInfoList) {
- topicPartitionList.add(new TopicPartition(topic, partitionInfo.partition()));
- }
-
- kafkaConsumer.assign(topicPartitionList);
- kafkaConsumer.seekToEnd(kafkaConsumer.assignment());
+ assignPartitions();
countDownLatch = new CountDownLatch(2);
thread = new Thread(this::poll);
thread.start();
+ assignPartitionsTimer = new Timer("assignment", true);
+ assignPartitionsTimer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ assignPartitions();
+ }
+ }, 0, 10000);
+
LOGGER.info("consumer started");
}
}
@@ -104,6 +109,9 @@ public void close() {
// DO NOTHING
}
+ assignPartitionsTimer.cancel();
+ assignPartitionsTimer = null;
+
kafkaConsumer.close();
kafkaConsumer = null;
@@ -112,6 +120,26 @@ public void close() {
}
}
}
+ private void assignPartitions() {
+ LOGGER.debug("assignPartitions()");
+
+ synchronized (kafkaConsumer) {
+ Set topicPartitionSet = new TreeSet<>(Comparator.comparingInt(TopicPartition::partition));
+
+ List partitionInfoList = kafkaConsumer.partitionsFor(topic);
+ for (PartitionInfo partitionInfo : partitionInfoList) {
+ topicPartitionSet.add(new TopicPartition(topic, partitionInfo.partition()));
+ }
+
+ Set existingTopicPartitionSet = kafkaConsumer.assignment();
+
+ if (!Objects.equals(topicPartitionSet, existingTopicPartitionSet)) {
+ LOGGER.debug("reassigning consumer partitions");
+ kafkaConsumer.assign(topicPartitionSet);
+ kafkaConsumer.seekToEnd(kafkaConsumer.assignment());
+ }
+ }
+ }
/**
* Method to poll for records
diff --git a/src/main/java/com/github/dhoard/k/synthetic/test/RecordProducer.java b/src/main/java/com/github/dhoard/k/synthetic/test/RecordProducer.java
index f927940..25f7a54 100644
--- a/src/main/java/com/github/dhoard/k/synthetic/test/RecordProducer.java
+++ b/src/main/java/com/github/dhoard/k/synthetic/test/RecordProducer.java
@@ -19,13 +19,18 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Comparator;
import java.util.List;
+import java.util.Objects;
import java.util.Properties;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.TreeSet;
/**
* Class to produce records
@@ -39,8 +44,10 @@ public class RecordProducer {
private final long periodMs;
private final String topic;
private KafkaProducer kafkaProducer;
- private List partitionInfoList;
+ private Set topicPartitionSet;
private Timer produceTimer;
+ private Timer assignPartitionsTimer;
+
/**
* Constructor
@@ -64,8 +71,9 @@ public void start() {
if (produceTimer == null) {
LOGGER.info("starting producer");
+ topicPartitionSet = new TreeSet<>(Comparator.comparingInt(TopicPartition::partition));
+
kafkaProducer = new KafkaProducer<>(properties);
- partitionInfoList = kafkaProducer.partitionsFor(topic);
produceTimer = new Timer("producer", true);
produceTimer.scheduleAtFixedRate(new TimerTask() {
@@ -75,6 +83,14 @@ public void run() {
}
}, delayMs, periodMs);
+ assignPartitionsTimer = new Timer("assignment", true);
+ assignPartitionsTimer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ assignPartitions();
+ }
+ }, 0, 10000);
+
LOGGER.info("producer started");
}
}
@@ -96,6 +112,24 @@ public void close() {
}
}
+ private void assignPartitions() {
+ LOGGER.debug("assignPartitions()");
+
+ synchronized (kafkaProducer) {
+ Set newTopicPartitionSet = new TreeSet<>(Comparator.comparingInt(TopicPartition::partition));
+
+ List partitionInfoList = kafkaProducer.partitionsFor(topic);
+ for (PartitionInfo partitionInfo : partitionInfoList) {
+ newTopicPartitionSet.add(new TopicPartition(topic, partitionInfo.partition()));
+ }
+
+ if (!Objects.equals(topicPartitionSet, newTopicPartitionSet)) {
+ LOGGER.debug("reassigning producer partitions");
+ topicPartitionSet = newTopicPartitionSet;
+ }
+ }
+ }
+
/**
* Method to produce records
*/
@@ -103,17 +137,17 @@ private void produce() {
LOGGER.debug("produce()");
try {
- for (PartitionInfo partitionInfo : partitionInfoList) {
- long nowMs = System.currentTimeMillis();
+ synchronized (this) {
+ for (TopicPartition topicPartition : topicPartitionSet) {
+ long nowMs = System.currentTimeMillis();
- ProducerRecord producerRecord =
- new ProducerRecord<>(
- topic,
- partitionInfo.partition(),
- null,
- String.valueOf(nowMs));
+ ProducerRecord producerRecord =
+ new ProducerRecord<>(
+ topic,
+ topicPartition.partition(),
+ null,
+ String.valueOf(nowMs));
- synchronized (this) {
kafkaProducer.send(producerRecord, (recordMetadata, e) -> {
if (e != null) {
LOGGER.error("Exception producing record", e);