Skip to content

Commit

Permalink
[ISSUE-4] (#10)
Browse files Browse the repository at this point in the history
* Bungled release 2.1, lets go to 2.2

* Update readme with release instructions

* Update readme with release instructions

* Initial checkin to resolve issue4

* Use JUnit5

* Code cleanup, fix style violations

* update changelog

* [ISSUE-4] add missing javadoc params
  • Loading branch information
Crim authored Apr 24, 2018
1 parent 30536ad commit 7c944bf
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 4 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 2.2.0 (4/24/2018)
- [Issue-5](https://github.com/salesforce/kafka-junit/issues/5) Updated to support Kafka versions 1.0.x and 1.1.x. Thanks [kasuri](https://github.com/kasuri)!
- [Issue-5](https://github.com/salesforce/kafka-junit/issues/5) Updated to support Kafka versions 1.0.x and 1.1.x. Thanks [kasuri](https://github.com/kasuri)!
- [Issue-4](https://github.com/salesforce/kafka-junit/issues/4) Fix server configuration to allow for transactional producers & consumers.

### Breaking Change
This library now requires you to provide which version of Kafka you want to use.
Expand Down
12 changes: 12 additions & 0 deletions kafka-junit-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,16 @@

<artifactId>kafka-junit-core</artifactId>
<version>2.2.0</version>

<!-- Module Dependencies -->
<dependencies>

<!-- Use JUnit5 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.1.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -161,6 +162,8 @@ public void start() throws Exception {
kafkaProperties.setProperty("offsets.topic.replication.factor", "1");
kafkaProperties.setProperty("offset.storage.replication.factor", "1");
kafkaProperties.setProperty("transaction.state.log.replication.factor", "1");
kafkaProperties.setProperty("transaction.state.log.min.isr", "1");
kafkaProperties.setProperty("transaction.state.log.num.partitions", "4");
kafkaProperties.setProperty("config.storage.replication.factor", "1");
kafkaProperties.setProperty("status.storage.replication.factor", "1");
kafkaProperties.setProperty("default.replication.factor", "1");
Expand Down Expand Up @@ -228,15 +231,39 @@ public <K, V> KafkaProducer<K, V> getKafkaProducer(
final Class<? extends Serializer<K>> keySerializer,
final Class<? extends Serializer<V>> valueSerializer) {

return getKafkaProducer(keySerializer, valueSerializer, new Properties());
}

/**
* Creates a kafka producer that is connected to our test server.
* @param <K> Type of message key
* @param <V> Type of message value
* @param keySerializer Class of serializer to be used for keys.
* @param valueSerializer Class of serializer to be used for values.
* @param config Additional producer configuration options to be set.
* @return KafkaProducer configured to produce into Test server.
*/
public <K, V> KafkaProducer<K, V> getKafkaProducer(
final Class<? extends Serializer<K>> keySerializer,
final Class<? extends Serializer<V>> valueSerializer,
final Properties config) {

// Build config
final Map<String, Object> kafkaProducerConfig = Maps.newHashMap();
final Map<String, Object> kafkaProducerConfig = new HashMap<>();
kafkaProducerConfig.put("bootstrap.servers", getKafkaConnectString());
kafkaProducerConfig.put("key.serializer", keySerializer);
kafkaProducerConfig.put("value.serializer", valueSerializer);
kafkaProducerConfig.put("max.in.flight.requests.per.connection", 1);
kafkaProducerConfig.put("retries", 5);
kafkaProducerConfig.put("client.id", getClass().getSimpleName() + " Producer");
kafkaProducerConfig.put("batch.size", 0);
kafkaProducerConfig.put("key.serializer", keySerializer);
kafkaProducerConfig.put("value.serializer", valueSerializer);

// Override config
if (config != null) {
for (Map.Entry<Object, Object> entry: config.entrySet()) {
kafkaProducerConfig.put(entry.getKey().toString(), entry.getValue());
}
}

// Create and return Producer.
return new KafkaProducer<>(kafkaProducerConfig);
Expand All @@ -253,13 +280,36 @@ public <K, V> KafkaProducer<K, V> getKafkaProducer(
public <K, V> KafkaConsumer<K, V> getKafkaConsumer(
final Class<? extends Deserializer<K>> keyDeserializer,
final Class<? extends Deserializer<V>> valueDeserializer) {
return getKafkaConsumer(keyDeserializer, valueDeserializer, new Properties());
}

/**
* Return Kafka Consumer configured to consume from internal Kafka Server.
* @param <K> Type of message key
* @param <V> Type of message value
* @param keyDeserializer Class of deserializer to be used for keys.
* @param valueDeserializer Class of deserializer to be used for values.
* @param config Additional consumer configuration options to be set.
* @return KafkaProducer configured to produce into Test server.
*/
public <K, V> KafkaConsumer<K, V> getKafkaConsumer(
final Class<? extends Deserializer<K>> keyDeserializer,
final Class<? extends Deserializer<V>> valueDeserializer,
final Properties config) {

// Build config
Map<String, Object> kafkaConsumerConfig = buildDefaultClientConfig();
kafkaConsumerConfig.put("key.deserializer", keyDeserializer);
kafkaConsumerConfig.put("value.deserializer", valueDeserializer);
kafkaConsumerConfig.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");

// Override config
if (config != null) {
for (Map.Entry<Object, Object> entry: config.entrySet()) {
kafkaConsumerConfig.put(entry.getKey().toString(), entry.getValue());
}
}

// Create and return Consumer.
return new KafkaConsumer<>(kafkaConsumerConfig);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Copyright (c) 2017-2018, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
* following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this list of conditions and the following
* disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
*
* * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.kafka.test;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.Properties;

/**
* Validation tests against KafkaTestServer class.
*/
public class KafkaTestServerTest {
/**
* Integration test validates that we can use transactional consumers and producers against the Test kafka instance.
*/
@Test
void testExactlyOnceTransaction() throws Exception {
// Define topic to test with.
final String theTopic = "transactional-topic" + System.currentTimeMillis();

// Create our test server instance.
try (final KafkaTestServer kafkaTestServer = new KafkaTestServer()) {
// Start it and create our topic.
kafkaTestServer.start();
kafkaTestServer.createTopic(theTopic, 1);

// Define override properties.
Properties config = new Properties();
config.put("group.id", "test-consumer-group");
config.put("enable.auto.commit", "false");
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
config.put("auto.offset.reset", "earliest");

try (final KafkaConsumer<String, String> consumer
= kafkaTestServer.getKafkaConsumer(StringDeserializer.class, StringDeserializer.class, config)) {
consumer.subscribe(Collections.singletonList(theTopic));

// Setup the producer
config = new Properties();
config.put("transactional.id", "MyRandomString" + System.currentTimeMillis());

try (final KafkaProducer<String, String> producer
= kafkaTestServer.getKafkaProducer(StringSerializer.class, StringSerializer.class, config)) {
// Init transaction and begin
producer.initTransactions();
producer.beginTransaction();

// Define our test message and key
final String theKey = "Here is the Key";
final String theMsg = "Here is the message";
final ProducerRecord<String, String> r = new ProducerRecord<>(theTopic, theKey, theMsg);

// Send and commit the record.
producer.send(r);
producer.commitTransaction();

// Use consumer to read the message
final ConsumerRecords<String, String> records = consumer.poll(5000);
Assertions.assertFalse(records.isEmpty(), "Should not be empty!");
Assertions.assertEquals(1, records.count(), "Should have a single record");
for (final ConsumerRecord<String, String> record : records) {
Assertions.assertEquals(theKey, record.key(), "Keys should match");
Assertions.assertEquals(theMsg, record.value(), "Values should match");
consumer.commitSync();
}
}
}
}
}
}

0 comments on commit 7c944bf

Please sign in to comment.