diff --git a/CHANGELOG.md b/CHANGELOG.md index 5457634..d168217 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). ## 2.2.0 (4/24/2018) -- [Issue-5](https://github.com/salesforce/kafka-junit/issues/5) Updated to support Kafka versions 1.0.x and 1.1.x. Thanks [kasuri](https://github.com/kasuri)! +- [Issue-5](https://github.com/salesforce/kafka-junit/issues/5) Updated to support Kafka versions 1.0.x and 1.1.x. Thanks [kasuri](https://github.com/kasuri)! +- [Issue-4](https://github.com/salesforce/kafka-junit/issues/4) Fix server configuration to allow for transactional producers & consumers. ### Breaking Change This library now requires you to provide which version of Kafka you want to use. diff --git a/kafka-junit-core/pom.xml b/kafka-junit-core/pom.xml index 5d8e3e7..3a0979d 100644 --- a/kafka-junit-core/pom.xml +++ b/kafka-junit-core/pom.xml @@ -11,4 +11,16 @@ kafka-junit-core 2.2.0 + + + + + + + org.junit.jupiter + junit-jupiter-api + 5.1.1 + test + + \ No newline at end of file diff --git a/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java index baa556a..2c7dec8 100644 --- a/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java +++ b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java @@ -43,6 +43,7 @@ import java.io.File; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -161,6 +162,8 @@ public void start() throws Exception { kafkaProperties.setProperty("offsets.topic.replication.factor", "1"); kafkaProperties.setProperty("offset.storage.replication.factor", "1"); kafkaProperties.setProperty("transaction.state.log.replication.factor", "1"); + kafkaProperties.setProperty("transaction.state.log.min.isr", "1"); + kafkaProperties.setProperty("transaction.state.log.num.partitions", "4"); kafkaProperties.setProperty("config.storage.replication.factor", "1"); kafkaProperties.setProperty("status.storage.replication.factor", "1"); kafkaProperties.setProperty("default.replication.factor", "1"); @@ -228,15 +231,39 @@ public KafkaProducer getKafkaProducer( final Class> keySerializer, final Class> valueSerializer) { + return getKafkaProducer(keySerializer, valueSerializer, new Properties()); + } + + /** + * Creates a kafka producer that is connected to our test server. + * @param Type of message key + * @param Type of message value + * @param keySerializer Class of serializer to be used for keys. + * @param valueSerializer Class of serializer to be used for values. + * @param config Additional producer configuration options to be set. + * @return KafkaProducer configured to produce into Test server. + */ + public KafkaProducer getKafkaProducer( + final Class> keySerializer, + final Class> valueSerializer, + final Properties config) { + // Build config - final Map kafkaProducerConfig = Maps.newHashMap(); + final Map kafkaProducerConfig = new HashMap<>(); kafkaProducerConfig.put("bootstrap.servers", getKafkaConnectString()); - kafkaProducerConfig.put("key.serializer", keySerializer); - kafkaProducerConfig.put("value.serializer", valueSerializer); kafkaProducerConfig.put("max.in.flight.requests.per.connection", 1); kafkaProducerConfig.put("retries", 5); kafkaProducerConfig.put("client.id", getClass().getSimpleName() + " Producer"); kafkaProducerConfig.put("batch.size", 0); + kafkaProducerConfig.put("key.serializer", keySerializer); + kafkaProducerConfig.put("value.serializer", valueSerializer); + + // Override config + if (config != null) { + for (Map.Entry entry: config.entrySet()) { + kafkaProducerConfig.put(entry.getKey().toString(), entry.getValue()); + } + } // Create and return Producer. return new KafkaProducer<>(kafkaProducerConfig); @@ -253,6 +280,22 @@ public KafkaProducer getKafkaProducer( public KafkaConsumer getKafkaConsumer( final Class> keyDeserializer, final Class> valueDeserializer) { + return getKafkaConsumer(keyDeserializer, valueDeserializer, new Properties()); + } + + /** + * Return Kafka Consumer configured to consume from internal Kafka Server. + * @param Type of message key + * @param Type of message value + * @param keyDeserializer Class of deserializer to be used for keys. + * @param valueDeserializer Class of deserializer to be used for values. + * @param config Additional consumer configuration options to be set. + * @return KafkaProducer configured to produce into Test server. + */ + public KafkaConsumer getKafkaConsumer( + final Class> keyDeserializer, + final Class> valueDeserializer, + final Properties config) { // Build config Map kafkaConsumerConfig = buildDefaultClientConfig(); @@ -260,6 +303,13 @@ public KafkaConsumer getKafkaConsumer( kafkaConsumerConfig.put("value.deserializer", valueDeserializer); kafkaConsumerConfig.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor"); + // Override config + if (config != null) { + for (Map.Entry entry: config.entrySet()) { + kafkaConsumerConfig.put(entry.getKey().toString(), entry.getValue()); + } + } + // Create and return Consumer. return new KafkaConsumer<>(kafkaConsumerConfig); } diff --git a/kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java b/kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java new file mode 100644 index 0000000..c344d77 --- /dev/null +++ b/kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java @@ -0,0 +1,103 @@ +/** + * Copyright (c) 2017-2018, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the + * following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this list of conditions and the following + * disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the distribution. + * + * * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.kafka.test; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Properties; + +/** + * Validation tests against KafkaTestServer class. + */ +public class KafkaTestServerTest { + /** + * Integration test validates that we can use transactional consumers and producers against the Test kafka instance. + */ + @Test + void testExactlyOnceTransaction() throws Exception { + // Define topic to test with. + final String theTopic = "transactional-topic" + System.currentTimeMillis(); + + // Create our test server instance. + try (final KafkaTestServer kafkaTestServer = new KafkaTestServer()) { + // Start it and create our topic. + kafkaTestServer.start(); + kafkaTestServer.createTopic(theTopic, 1); + + // Define override properties. + Properties config = new Properties(); + config.put("group.id", "test-consumer-group"); + config.put("enable.auto.commit", "false"); + config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + config.put("auto.offset.reset", "earliest"); + + try (final KafkaConsumer consumer + = kafkaTestServer.getKafkaConsumer(StringDeserializer.class, StringDeserializer.class, config)) { + consumer.subscribe(Collections.singletonList(theTopic)); + + // Setup the producer + config = new Properties(); + config.put("transactional.id", "MyRandomString" + System.currentTimeMillis()); + + try (final KafkaProducer producer + = kafkaTestServer.getKafkaProducer(StringSerializer.class, StringSerializer.class, config)) { + // Init transaction and begin + producer.initTransactions(); + producer.beginTransaction(); + + // Define our test message and key + final String theKey = "Here is the Key"; + final String theMsg = "Here is the message"; + final ProducerRecord r = new ProducerRecord<>(theTopic, theKey, theMsg); + + // Send and commit the record. + producer.send(r); + producer.commitTransaction(); + + // Use consumer to read the message + final ConsumerRecords records = consumer.poll(5000); + Assertions.assertFalse(records.isEmpty(), "Should not be empty!"); + Assertions.assertEquals(1, records.count(), "Should have a single record"); + for (final ConsumerRecord record : records) { + Assertions.assertEquals(theKey, record.key(), "Keys should match"); + Assertions.assertEquals(theMsg, record.value(), "Values should match"); + consumer.commitSync(); + } + } + } + } + } +} \ No newline at end of file