Skip to content

Commit

Permalink
v0.0.6 (#5)
Browse files Browse the repository at this point in the history
Added code to automatically handle topic partition count increases without requiring a restart
  • Loading branch information
dhoard authored Feb 16, 2023
1 parent d41a184 commit b6b941f
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 34 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ mvn clean package

You need to create a unique test topic per application instance with enough partitions to span all brokers

- topic partition count increases are automatically handled
- It may take up to 60000 ms to reflect the new topic partition count

### Self-Managed Kafka

The configuration `test.properties` in https://github.com/dhoard/k-synthetic-test/blob/main/configuration/test.properties should be self-explanatory
Expand Down Expand Up @@ -81,7 +84,7 @@ Copy `configuration/test.properties` and edit to match your environment
Run

```shell
java -jar target/k-synthetic-test-0.0.5.jar configuration/test.properties
java -jar target/k-synthetic-test-0.0.6.jar configuration/test.properties
```

**NOTES**
Expand Down
6 changes: 3 additions & 3 deletions configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Configuration scenario
Usage

```shell
java -jar target/k-synthetic-test-0.0.5.jar configuration/test.properties
java -jar target/k-synthetic-test-0.0.6.jar configuration/test.properties
```

---
Expand All @@ -36,7 +36,7 @@ Configuration scenario
java \
-Djavax.net.ssl.keyStore=configuration/keystore.pkcs12 \
-Djavax.net.ssl.keyStorePassword=changeit \
-jar target/k-synthetic-test-0.0.5.jar configuration/test.ssl.properties
-jar target/k-synthetic-test-0.0.6.jar configuration/test.ssl.properties
```

---
Expand All @@ -56,5 +56,5 @@ Configuration scenario
java \
-Djavax.net.ssl.keyStore=configuration/keystore.pkcs12 \
-Djavax.net.ssl.keyStorePassword=changeit \
-jar target/k-synthetic-test-0.0.5.jar configuration/test.confluent-cloud.properties
-jar target/k-synthetic-test-0.0.6.jar configuration/test.confluent-cloud.properties
```
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.github.dhoard</groupId>
<artifactId>k-synthetic-test</artifactId>
<version>0.0.5</version>
<version>0.0.6</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -78,17 +78,17 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<version>1.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.2</version>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ public KSyntheticTest() {

countDownLatch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
countDownLatch.countDown();
}));
Runtime.getRuntime().addShutdownHook(new Thread(() -> countDownLatch.countDown()));
}

/**
Expand Down Expand Up @@ -190,6 +188,7 @@ public boolean checkCredentials(String username, String password) {
// to prevent "These configurations X were supplied but are not used yet" warnings

Configuration consumerConfiguration = configuration.copy();
consumerConfiguration.put("metadata.max.age.ms", "60000");
consumerConfiguration.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerConfiguration.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerConfiguration.remove("acks");
Expand All @@ -201,6 +200,7 @@ public boolean checkCredentials(String username, String password) {
messageConsumer.start();

Configuration producerConfiguration = configuration.copy();
producerConfiguration.put("metadata.max.age.ms", "60000");
producerConfiguration.remove("key.deserializer");
producerConfiguration.remove("value.deserializer");
producerConfiguration.remove("session.timeout.ms");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
Expand All @@ -46,6 +51,8 @@ public class RecordConsumer {
private CountDownLatch countDownLatch;
private KafkaConsumer<String, String> kafkaConsumer;

private Timer assignPartitionsTimer;

/**
* Constructor
*
Expand All @@ -68,23 +75,21 @@ public void start() throws InterruptedException, ExecutionException {

kafkaConsumer = new KafkaConsumer<>(properties);

// Manual partition assignment

List<TopicPartition> topicPartitionList = new ArrayList<>();

List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfoList) {
topicPartitionList.add(new TopicPartition(topic, partitionInfo.partition()));
}

kafkaConsumer.assign(topicPartitionList);
kafkaConsumer.seekToEnd(kafkaConsumer.assignment());
assignPartitions();

countDownLatch = new CountDownLatch(2);

thread = new Thread(this::poll);
thread.start();

assignPartitionsTimer = new Timer("assignment", true);
assignPartitionsTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
assignPartitions();
}
}, 0, 10000);

LOGGER.info("consumer started");
}
}
Expand All @@ -104,6 +109,9 @@ public void close() {
// DO NOTHING
}

assignPartitionsTimer.cancel();
assignPartitionsTimer = null;

kafkaConsumer.close();
kafkaConsumer = null;

Expand All @@ -112,6 +120,26 @@ public void close() {
}
}
}
private void assignPartitions() {
LOGGER.debug("assignPartitions()");

synchronized (kafkaConsumer) {
Set<TopicPartition> topicPartitionSet = new TreeSet<>(Comparator.comparingInt(TopicPartition::partition));

List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfoList) {
topicPartitionSet.add(new TopicPartition(topic, partitionInfo.partition()));
}

Set<TopicPartition> existingTopicPartitionSet = kafkaConsumer.assignment();

if (!Objects.equals(topicPartitionSet, existingTopicPartitionSet)) {
LOGGER.debug("reassigning consumer partitions");
kafkaConsumer.assign(topicPartitionSet);
kafkaConsumer.seekToEnd(kafkaConsumer.assignment());
}
}
}

/**
* Method to poll for records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeSet;

/**
* Class to produce records
Expand All @@ -39,8 +44,10 @@ public class RecordProducer {
private final long periodMs;
private final String topic;
private KafkaProducer<String, String> kafkaProducer;
private List<PartitionInfo> partitionInfoList;
private Set<TopicPartition> topicPartitionSet;
private Timer produceTimer;
private Timer assignPartitionsTimer;


/**
* Constructor
Expand All @@ -64,8 +71,9 @@ public void start() {
if (produceTimer == null) {
LOGGER.info("starting producer");

topicPartitionSet = new TreeSet<>(Comparator.comparingInt(TopicPartition::partition));

kafkaProducer = new KafkaProducer<>(properties);
partitionInfoList = kafkaProducer.partitionsFor(topic);

produceTimer = new Timer("producer", true);
produceTimer.scheduleAtFixedRate(new TimerTask() {
Expand All @@ -75,6 +83,14 @@ public void run() {
}
}, delayMs, periodMs);

assignPartitionsTimer = new Timer("assignment", true);
assignPartitionsTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
assignPartitions();
}
}, 0, 10000);

LOGGER.info("producer started");
}
}
Expand All @@ -96,24 +112,42 @@ public void close() {
}
}

private void assignPartitions() {
LOGGER.debug("assignPartitions()");

synchronized (kafkaProducer) {
Set<TopicPartition> newTopicPartitionSet = new TreeSet<>(Comparator.comparingInt(TopicPartition::partition));

List<PartitionInfo> partitionInfoList = kafkaProducer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfoList) {
newTopicPartitionSet.add(new TopicPartition(topic, partitionInfo.partition()));
}

if (!Objects.equals(topicPartitionSet, newTopicPartitionSet)) {
LOGGER.debug("reassigning producer partitions");
topicPartitionSet = newTopicPartitionSet;
}
}
}

/**
* Method to produce records
*/
private void produce() {
LOGGER.debug("produce()");

try {
for (PartitionInfo partitionInfo : partitionInfoList) {
long nowMs = System.currentTimeMillis();
synchronized (this) {
for (TopicPartition topicPartition : topicPartitionSet) {
long nowMs = System.currentTimeMillis();

ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(
topic,
partitionInfo.partition(),
null,
String.valueOf(nowMs));
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(
topic,
topicPartition.partition(),
null,
String.valueOf(nowMs));

synchronized (this) {
kafkaProducer.send(producerRecord, (recordMetadata, e) -> {
if (e != null) {
LOGGER.error("Exception producing record", e);
Expand Down

0 comments on commit b6b941f

Please sign in to comment.