From a8e458643ae61de1b21333efd7ce6a2437a2d96b Mon Sep 17 00:00:00 2001 From: Doug Hoard Date: Thu, 23 Feb 2023 08:30:56 -0500 Subject: [PATCH] v0.0.7 (#6) Added the ability to use a single test topic --- README.md | 37 +++-- configuration/README.md | 6 +- configuration/test.confluent-cloud.properties | 23 +-- configuration/test.properties | 23 +-- configuration/test.ssl.properties | 23 +-- pom.xml | 3 +- .../k/synthetic/test/ExpiringGauge.java | 6 +- .../k/synthetic/test/KSyntheticTest.java | 140 ++++++++++-------- .../k/synthetic/test/RecordConsumer.java | 24 +-- .../k/synthetic/test/RecordProducer.java | 23 ++- .../dhoard/k/synthetic/test/StringHeader.java | 53 +++++++ 11 files changed, 188 insertions(+), 173 deletions(-) create mode 100644 src/main/java/com/github/dhoard/k/synthetic/test/StringHeader.java diff --git a/README.md b/README.md index 308b18c..11c355f 100644 --- a/README.md +++ b/README.md @@ -15,15 +15,22 @@ mvn clean package ## Kafka Topic Configuration -You need to create a unique test topic per application instance with enough partitions to span all brokers +Create a topic for the application to use with enough partitions to span all brokers + +- a topic can be shared with multiple application instances + - topic partition count increases are automatically handled -- It may take up to 60000 ms to reflect the new topic partition count + + +- It may take up to 60000 ms to reflect the topic partition count changes ### Self-Managed Kafka The configuration `test.properties` in https://github.com/dhoard/k-synthetic-test/blob/main/configuration/test.properties should be self-explanatory +- the example `test.properties` documents configuration values + ### Confluent Cloud **Step 1** @@ -61,16 +68,8 @@ kcat -b ${CCLOUD_BROKERS} -L \ **Notes** -- This application uses manual partition assignment - - dynamic Kafka partition increases are currently not handle - - -- Example topic name is `k-synthetic-test-` - - where `` matches the `id` in your test properties - - -- Example retention time is `300,000` ms (5 minutes) - - old messages provide no value, so are skipped +- Suggested retention time is `300,000` ms (5 minutes) + - old records provide no value, so are skipped ## Run @@ -79,12 +78,14 @@ kcat -b ${CCLOUD_BROKERS} -L \ Copy `configuration/test.properties` and edit to match your environment +- Configuration value `id` should be unique per application instance + **Step 2** Run ```shell -java -jar target/k-synthetic-test-0.0.6.jar configuration/test.properties +java -jar target/k-synthetic-test-0.0.7.jar configuration/test.properties ``` **NOTES** @@ -98,7 +99,12 @@ java -jar target/k-synthetic-test-0.0.6.jar configuration/test.properties ## Metrics -Access Prometheus metrics using `http://:` +Access Prometheus metrics using `http://:` + +- Configuration value `http.server.address` detemines the IP address to service Prometheus metrics requests + +- To bind to all IP addresses use `0.0.0.0` + Example URL (based on `test.properties`: @@ -118,7 +124,8 @@ k_synthetic_test_round_trip_time{id="source-10.0.0.1",bootstrap_servers="cp-1:90 **Notes** -- A test message is sent to every partition based on the configured `period.ms` value + +- A record is sent to every partition based on the configured `period.ms` value - A negative value indicates that a metric hasn't been updated within the configured `metric.expiration.period.ms` value diff --git a/configuration/README.md b/configuration/README.md index a64459b..2133630 100644 --- a/configuration/README.md +++ b/configuration/README.md @@ -16,7 +16,7 @@ Configuration scenario Usage ```shell -java -jar target/k-synthetic-test-0.0.6.jar configuration/test.properties +java -jar target/k-synthetic-test-0.0.7.jar configuration/test.properties ``` --- @@ -36,7 +36,7 @@ Configuration scenario java \ -Djavax.net.ssl.keyStore=configuration/keystore.pkcs12 \ -Djavax.net.ssl.keyStorePassword=changeit \ - -jar target/k-synthetic-test-0.0.6.jar configuration/test.ssl.properties + -jar target/k-synthetic-test-0.0.7.jar configuration/test.ssl.properties ``` --- @@ -56,5 +56,5 @@ Configuration scenario java \ -Djavax.net.ssl.keyStore=configuration/keystore.pkcs12 \ -Djavax.net.ssl.keyStorePassword=changeit \ - -jar target/k-synthetic-test-0.0.6.jar configuration/test.confluent-cloud.properties + -jar target/k-synthetic-test-0.0.7.jar configuration/test.confluent-cloud.properties ``` \ No newline at end of file diff --git a/configuration/test.confluent-cloud.properties b/configuration/test.confluent-cloud.properties index 2f74f59..e22f494 100644 --- a/configuration/test.confluent-cloud.properties +++ b/configuration/test.confluent-cloud.properties @@ -1,4 +1,4 @@ -# Test instance id +# Application instance id id=source-10.0.0.1 # Delay period before starting production of test messages @@ -7,7 +7,6 @@ delay.ms=0 # Producer period for test messages period.ms=1000 -# # Metric expiration TTL (expire data "value = -1.0" if not update within TTL period) # # This should be a multiple of "period.ms" @@ -43,28 +42,14 @@ http.server.basic.authentication.password=secret # HTTPS support (optional) # -# If enabled, Java keystore / truststore system properties +# If enabled, Java keystore system properties # must be defined along with the certificate alias # http.server.ssl.enabled=true http.server.ssl.certificate.alias=localhost -# -# Topic for test messages -# -# The most common naming pattern would be "k-synthetic-test-" -# -# Example: -# -# id=source-10.0.0.1 -# topic=k-synthetic-test-source-10.0.0.1 -# -# Example 2: -# -# id=us-east1.project-1.network-1.ip-1.2.3.4 -# topic=k-synthetic-test-us-east1.project-1.network-1.ip-1.2.3.4 -# -topic=k-synthetic-test-source-10.0.0.1 +# Kafka topic for test messages +topic=k-synthetic-test # Confluent Cloud properties bootstrap.servers= diff --git a/configuration/test.properties b/configuration/test.properties index ab6ea0e..58b319c 100644 --- a/configuration/test.properties +++ b/configuration/test.properties @@ -5,7 +5,7 @@ # HTTP authentication disabled # HTTPS disabled -# Test instance id +# Application instance id id=source-10.0.0.1 # Delay period before starting production of test messages @@ -14,7 +14,6 @@ delay.ms=0 # Producer period for test messages period.ms=1000 -# # Metric expiration TTL (expire data "value = -1.0" if not update within TTL period) # # This should be a multiple of "period.ms" @@ -50,28 +49,14 @@ http.server.basic.authentication.password=secret # HTTPS support (optional) # -# If enabled, Java keystore / truststore system properties +# If enabled, Java keystore system properties # must be defined along with the certificate alias # http.server.ssl.enabled=false http.server.ssl.certificate.alias=localhost -# -# Topic for test messages -# -# The most common naming pattern would be "k-synthetic-test-" -# -# Example: -# -# id=source-10.0.0.1 -# topic=k-synthetic-test-source-10.0.0.1 -# -# Example 2: -# -# id=us-east1.project-1.network-1.ip-1.2.3.4 -# topic=k-synthetic-test-us-east1.project-1.network-1.ip-1.2.3.4 -# -topic=k-synthetic-test-source-10.0.0.1 +# Kafka topic for test messages +topic=k-synthetic-test # Kafka properties bootstrap.servers=cp-1:9092,cp-2:9092,cp-3:9092 \ No newline at end of file diff --git a/configuration/test.ssl.properties b/configuration/test.ssl.properties index 344f103..9605907 100644 --- a/configuration/test.ssl.properties +++ b/configuration/test.ssl.properties @@ -5,7 +5,7 @@ # HTTP authentication enabled # HTTPS enabled -# Test instance id +# Application instance id id=source-10.0.0.1 # Delay period before starting production of test messages @@ -14,7 +14,6 @@ delay.ms=0 # Producer period for test messages period.ms=1000 -# # Metric expiration TTL (expire data "value = -1.0" if not update within TTL period) # # This should be a multiple of "period.ms" @@ -50,28 +49,14 @@ http.server.basic.authentication.password=secret # HTTPS support (optional) # -# If enabled, Java keystore / truststore system properties +# If enabled, Java keystore system properties # must be defined along with the certificate alias # http.server.ssl.enabled=true http.server.ssl.certificate.alias=localhost -# -# Topic for test messages -# -# The most common naming pattern would be "k-synthetic-test-" -# -# Example: -# -# id=source-10.0.0.1 -# topic=k-synthetic-test-source-10.0.0.1 -# -# Example 2: -# -# id=us-east1.project-1.network-1.ip-1.2.3.4 -# topic=k-synthetic-test-us-east1.project-1.network-1.ip-1.2.3.4 -# -topic=k-synthetic-test-source-10.0.0.1 +# Kafka topic for test messages +topic=k-synthetic-test # Kafka properties bootstrap.servers=cp-1:9092,cp-2:9092,cp-3:9092 \ No newline at end of file diff --git a/pom.xml b/pom.xml index b7d395d..9ed3922 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.dhoard k-synthetic-test - 0.0.6 + 0.0.7 UTF-8 @@ -55,6 +55,7 @@ *:* + module-info.class META-INF/MANIFEST.MF diff --git a/src/main/java/com/github/dhoard/k/synthetic/test/ExpiringGauge.java b/src/main/java/com/github/dhoard/k/synthetic/test/ExpiringGauge.java index 84ccc44..920cef6 100644 --- a/src/main/java/com/github/dhoard/k/synthetic/test/ExpiringGauge.java +++ b/src/main/java/com/github/dhoard/k/synthetic/test/ExpiringGauge.java @@ -16,11 +16,7 @@ package com.github.dhoard.k.synthetic.test; -import io.prometheus.client.Collector; -import io.prometheus.client.DoubleAdder; -import io.prometheus.client.GaugeMetricFamily; -import io.prometheus.client.SimpleCollector; -import io.prometheus.client.Summary; +import io.prometheus.client.*; import java.io.Closeable; import java.util.ArrayList; diff --git a/src/main/java/com/github/dhoard/k/synthetic/test/KSyntheticTest.java b/src/main/java/com/github/dhoard/k/synthetic/test/KSyntheticTest.java index 93164b9..63ff51f 100644 --- a/src/main/java/com/github/dhoard/k/synthetic/test/KSyntheticTest.java +++ b/src/main/java/com/github/dhoard/k/synthetic/test/KSyntheticTest.java @@ -23,18 +23,19 @@ import io.prometheus.client.exporter.HTTPServer; import nl.altindag.ssl.SSLFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.header.Header; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; +import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.function.Consumer; /** * Class to implement a synthetic Kafka performance test */ -public class KSyntheticTest implements Consumer> { +public class KSyntheticTest implements Consumer> { private static final Logger LOGGER = LoggerFactory.getLogger(KSyntheticTest.class); @@ -42,13 +43,8 @@ public class KSyntheticTest implements Consumer> private String id; private String topic; private String bootstrapServers; - private String httpServerAddress; - private int httpServerPort; private boolean logResponses; - private RecordProducer messageProducer; - private RecordConsumer messageConsumer; private ExpiringGauge roundTripTimeExpiringGauge; - private HTTPServer httpServer; /** * Constructor @@ -58,7 +54,7 @@ public KSyntheticTest() { countDownLatch = new CountDownLatch(1); - Runtime.getRuntime().addShutdownHook(new Thread(() -> countDownLatch.countDown())); + Runtime.getRuntime().addShutdownHook(new Thread(countDownLatch::countDown)); } /** @@ -99,13 +95,13 @@ private void run(String filename) throws Exception { logResponses = configuration.asBoolean("log.responses", false); LOGGER.info(String.format("log.responses [%b]", logResponses)); - httpServerAddress = configuration.asString("http.server.address"); + String httpServerAddress = configuration.asString("http.server.address"); if (!InetAddresses.isUriInetAddress(httpServerAddress) && !InternetDomainName.isValid(httpServerAddress)) { throw new ConfigurationException("property \"http.server.address\" doesn't appear to be an IP address or host name"); } LOGGER.info(String.format("http.server.address [%s]", httpServerAddress)); - httpServerPort = configuration.asInt("http.server.port"); + int httpServerPort = configuration.asInt("http.server.port"); if (httpServerPort < 1 || httpServerPort > 65535) { throw new ConfigurationException("property \"http.server.port\" must be >= 1 and <= 65535"); } @@ -167,7 +163,7 @@ public boolean checkCredentials(String username, String password) { .ttl(metricExpirationPeriodMs) .register(); - httpServer = httpServerBuilder.build(); + HTTPServer httpServer = httpServerBuilder.build(); // Remove general test properties @@ -187,76 +183,93 @@ public boolean checkCredentials(String username, String password) { // Create specific producer and consumer configuration with a subset of properties // to prevent "These configurations X were supplied but are not used yet" warnings - Configuration consumerConfiguration = configuration.copy(); - consumerConfiguration.put("metadata.max.age.ms", "60000"); - consumerConfiguration.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - consumerConfiguration.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - consumerConfiguration.remove("acks"); - consumerConfiguration.remove("linger.ms"); - consumerConfiguration.remove("key.serializer"); - consumerConfiguration.remove("value.serializer"); - - messageConsumer = new RecordConsumer(consumerConfiguration.toProperties(), this); - messageConsumer.start(); - - Configuration producerConfiguration = configuration.copy(); - producerConfiguration.put("metadata.max.age.ms", "60000"); - producerConfiguration.remove("key.deserializer"); - producerConfiguration.remove("value.deserializer"); - producerConfiguration.remove("session.timeout.ms"); - producerConfiguration.put("batch.size", "0"); - producerConfiguration.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - producerConfiguration.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - if (!producerConfiguration.containsKey("acks")) { - producerConfiguration.put("acks", "all"); + Configuration recordConsumerConfiguration = configuration.copy(); + recordConsumerConfiguration.put("metadata.max.age.ms", "60000"); + recordConsumerConfiguration.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + recordConsumerConfiguration.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + recordConsumerConfiguration.remove("acks"); + recordConsumerConfiguration.remove("linger.ms"); + recordConsumerConfiguration.remove("key.serializer"); + recordConsumerConfiguration.remove("value.serializer"); + + RecordConsumer recordConsumer = new RecordConsumer(recordConsumerConfiguration, this); + recordConsumer.start(); + + Configuration recordProducerConfiguration = configuration.copy(); + recordProducerConfiguration.put("metadata.max.age.ms", "60000"); + recordProducerConfiguration.remove("key.deserializer"); + recordProducerConfiguration.remove("value.deserializer"); + recordProducerConfiguration.remove("session.timeout.ms"); + recordProducerConfiguration.put("batch.size", "0"); + recordProducerConfiguration.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + recordProducerConfiguration.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + if (!recordProducerConfiguration.containsKey("acks")) { + recordProducerConfiguration.put("acks", "all"); } - if (!producerConfiguration.containsKey("linger.ms")) { - producerConfiguration.put("linger.ms", "0"); + if (!recordProducerConfiguration.containsKey("linger.ms")) { + recordProducerConfiguration.put("linger.ms", "0"); } - messageProducer = new RecordProducer(producerConfiguration.toProperties(), delayMs, periodMs); - messageProducer.start(); + RecordProducer recordProducer = new RecordProducer(id, delayMs, periodMs, recordProducerConfiguration); + recordProducer.start(); LOGGER.info("running"); countDownLatch.await(); httpServer.close(); - messageProducer.close(); - messageConsumer.close(); + recordProducer.close(); + recordConsumer.close(); } /** - * Method to process messages + * Method to accept a ConsumerRecord * - * @param consumerRecords + * @param consumerRecord */ - public void accept(ConsumerRecords consumerRecords) { - for (ConsumerRecord consumerRecord : consumerRecords) { - long messageTimestampMs = Long.parseLong(consumerRecord.value()); - long nowMs = System.currentTimeMillis(); - long elapsedTimeMs = nowMs - messageTimestampMs; - String partition = String.valueOf(consumerRecord.partition()); - - roundTripTimeExpiringGauge - .labels( + public void accept(ConsumerRecord consumerRecord) { + Header[] headers = consumerRecord.headers().toArray(); + for (Header header : headers) { + if ("id".equals(header.key())) { + String value = new String(header.value(), StandardCharsets.UTF_8); + if (id.equals(value)) { + process(consumerRecord); + } + + break; + } + } + } + + /** + * Method to process a ConsumerRecord + * + * @param consumerRecord + */ + private void process(ConsumerRecord consumerRecord) { + long recordValueTimestampMs = Long.parseLong(consumerRecord.value()); + long nowMs = System.currentTimeMillis(); + long elapsedTimeMs = nowMs - recordValueTimestampMs; + String partition = String.valueOf(consumerRecord.partition()); + + roundTripTimeExpiringGauge + .labels( + id, + bootstrapServers, + topic, + partition) + .set(elapsedTimeMs); + + if (logResponses) { + LOGGER.info( + String.format( + "id [%s] bootstrap.servers [%s] topic [%s] partition [%d] round trip time [%d] ms", id, bootstrapServers, topic, - partition) - .set(elapsedTimeMs); - - if (logResponses) { - LOGGER.info( - String.format( - "id [%s] bootstrap.servers [%s] topic [%s] partition [%d] round trip time [%d] ms", - id, - bootstrapServers, - topic, - consumerRecord.partition(), elapsedTimeMs)); - } + consumerRecord.partition(), elapsedTimeMs)); } } @@ -287,7 +300,6 @@ public static void main(String[] args) throws Exception { */ private void banner(String string) { String line = String.format("%0" + string.length() + "d", 0).replace('0', '-'); - LOGGER.info(line); LOGGER.info(string); LOGGER.info(line); diff --git a/src/main/java/com/github/dhoard/k/synthetic/test/RecordConsumer.java b/src/main/java/com/github/dhoard/k/synthetic/test/RecordConsumer.java index 2dbb3a4..4d4b5d3 100644 --- a/src/main/java/com/github/dhoard/k/synthetic/test/RecordConsumer.java +++ b/src/main/java/com/github/dhoard/k/synthetic/test/RecordConsumer.java @@ -16,7 +16,7 @@ package com.github.dhoard.k.synthetic.test; -import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -24,16 +24,8 @@ import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.Comparator; -import java.util.List; -import java.util.Objects; -import java.util.Properties; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.TreeSet; +import java.util.*; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.function.Consumer; /** @@ -45,7 +37,7 @@ public class RecordConsumer { private final Properties properties; private final String topic; - private final Consumer> consumer; + private final Consumer> consumer; private Thread thread; private CountDownLatch countDownLatch; @@ -56,11 +48,11 @@ public class RecordConsumer { /** * Constructor * - * @param properties + * @param configuration * @param consumer */ - public RecordConsumer(Properties properties, Consumer> consumer) { - this.properties = properties; + public RecordConsumer(Configuration configuration, Consumer> consumer) { + this.properties = configuration.toProperties(); this.consumer = consumer; this.topic = (String) properties.remove("topic"); } @@ -68,7 +60,7 @@ public RecordConsumer(Properties properties, Consumer { if (e != null) { LOGGER.error("Exception producing record", e); @@ -159,4 +157,5 @@ private void produce() { LOGGER.error("Exception producing record", t); } } + } diff --git a/src/main/java/com/github/dhoard/k/synthetic/test/StringHeader.java b/src/main/java/com/github/dhoard/k/synthetic/test/StringHeader.java new file mode 100644 index 0000000..90d18bb --- /dev/null +++ b/src/main/java/com/github/dhoard/k/synthetic/test/StringHeader.java @@ -0,0 +1,53 @@ +/* + * Copyright 2023 Douglas Hoard + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.dhoard.k.synthetic.test; + +import org.apache.kafka.common.header.Header; + +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +/** + * Class to implement a String based Kafka Header + */ +public final class StringHeader implements Header { + + private final String key; + private final byte[] value; + + private StringHeader(String key, String value) { + this.key = key; + this.value = value.getBytes(StandardCharsets.UTF_8); + } + + @Override + public String key() { + return key; + } + + @Override + public byte[] value() { + return value; + } + + public static StringHeader of(String key, String value) { + Objects.requireNonNull(key); + Objects.requireNonNull(value); + + return new StringHeader(key, value); + } +}