diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f24ea4..feefefc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,11 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). -## 1.0.0 (11/09/2017) +## 2.0.0 (4/10/2018) +- Created new modules to support both JUnit4 and JUnit 5. + +## 1.0.0 (09/11/2017) - Initial release! - Based off of Kafka Server and Kafka-Clients version 0.11.0.1 +- Built for JUnit 4.x diff --git a/LICENSE.txt b/LICENSE.txt index 3be6b95..ab2ca1e 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1,4 +1,4 @@ -Copyright (c) 2017, Salesforce.com, Inc. +Copyright (c) 2017-2018, Salesforce.com, Inc. All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the diff --git a/README.md b/README.md index 84d8bf1..91c121d 100644 --- a/README.md +++ b/README.md @@ -5,91 +5,13 @@ This library wraps Kafka Test Server and allows you to easily create and run tests against a "real" kafka server running within your tests, no more needing to stand up an external kafka cluster! -## Usage & Examples - -Include this in your project with scope test. - -``` - - com.salesforce.kafka.test - kafka-junit - 1.0.0 - test - -``` - -### KafkaTestServer - -A great example of how to use this can be found within our tests! Check out [KafkaTestServerTest.java](src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java) - -Add the following to your JUnit test file and it will handle automatically starting and stopping the embedded Kafka -instance for you. - -```java - /** - * We have a single embedded kafka server that gets started when this test class is initialized. - * - * It's automatically started before any methods are run via the @ClassRule annotation. - * It's automatically stopped after all of the tests are completed via the @ClassRule annotation. - */ - @ClassRule - public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource(); -``` - -SharedKafkaTestResource has two accessors that you can make use of in your tests to interact with the service. - -```java - /** - * @return Shared Kafka Test server instance. - */ - public KafkaTestServer getKafkaTestServer(); - - /** - * @return Instance of KafkaTestUtils configured and ready to go. - */ - public KafkaTestUtils getKafkaTestUtils(); -``` - -### KafkaTestUtils - -Often times you'll end up rebuilding the same patterns around producing and consuming data from this internal -kafka server. We've tried to collect some of these within [KafkaTestUtils](src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java)! - -For usage and examples, check out it's test at [KafkaTestUtilsTest](src/test/java/com/salesforce/kafka/test/KafkaTestUtilsTest.java). - -### Zookeeper Test Server - -**Note** Since Kafka depends on Zookeeper, you get this for *free* if you use the SharedKafkaTestResource, you do not, and should not, use - both of these together within the same Test class. - -If you need to run tests against an **only** embedded Zookeeper server and not all of Kafka, we have you covered as well. Add the following - to your JUnit test file -and it will handle automatically start and stopping the embedded Zookeeper instance for you. - -```java - /** - * We have a single embedded zookeeper server that gets started when this test class is initialized. - * - * It's automatically started before any methods are run via the @ClassRule annotation. - * It's automatically stopped after all of the tests are completed via the @ClassRule annotation. - */ - @ClassRule - public static final SharedZookeeperTestResource sharedZookeeperTestResource = new SharedZookeeperTestResource(); -``` - -SharedZookeeperTestResource has the following accessors that you can make use of in your tests to interact with the Zookeeper instance. - -```java - /** - * @return Shared Zookeeper test server instance. - */ - public TestingServer getZookeeperTestServer(); - - /** - * @return Connection string to connect to the Zookeeper instance. - */ - public String getZookeeperConnectString(); -``` +## Using Kafka-JUnit with JUnit 4. + +Please review [Kafka-JUnit4 Readme](kafka-junit4/) for instructions. + +## Using Kafka-JUnit with JUnit 5. + +Please review [Kafka-JUnit5 Readme](kafka-junit5/) for instructions. ## Changelog diff --git a/kafka-junit-core/pom.xml b/kafka-junit-core/pom.xml new file mode 100644 index 0000000..13b400e --- /dev/null +++ b/kafka-junit-core/pom.xml @@ -0,0 +1,16 @@ + + + + kafka-junit + com.salesforce.kafka.test + 2.0.0 + + 4.0.0 + + kafka-junit-core + 1.0.0 + + + \ No newline at end of file diff --git a/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java similarity index 99% rename from src/main/java/com/salesforce/kafka/test/KafkaTestServer.java rename to kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java index 60a9271..6c87080 100644 --- a/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java +++ b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2017, Salesforce.com, Inc. + * Copyright (c) 2017-2018, Salesforce.com, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the diff --git a/src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java similarity index 99% rename from src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java rename to kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java index 42e6429..ea5dab0 100644 --- a/src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java +++ b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2017, Salesforce.com, Inc. + * Copyright (c) 2017-2018, Salesforce.com, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the diff --git a/src/main/java/com/salesforce/kafka/test/ProducedKafkaRecord.java b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/ProducedKafkaRecord.java similarity index 98% rename from src/main/java/com/salesforce/kafka/test/ProducedKafkaRecord.java rename to kafka-junit-core/src/main/java/com/salesforce/kafka/test/ProducedKafkaRecord.java index 1fd9e87..93a9922 100644 --- a/src/main/java/com/salesforce/kafka/test/ProducedKafkaRecord.java +++ b/kafka-junit-core/src/main/java/com/salesforce/kafka/test/ProducedKafkaRecord.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2017, Salesforce.com, Inc. + * Copyright (c) 2017-2018, Salesforce.com, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the diff --git a/kafka-junit4/README.md b/kafka-junit4/README.md new file mode 100644 index 0000000..5146864 --- /dev/null +++ b/kafka-junit4/README.md @@ -0,0 +1,96 @@ +# Kafka-JUnit4 + +This library wraps Kafka Test Server and allows you to easily create and run tests against +a "real" kafka server running within your tests, no more needing to stand up an external kafka cluster! + +Kafka-JUnit4 is built on-top of **JUnit 4** as a SharedResource using the **@ClassRule** annotation. + +For usage with JUnit5 or more project information please review top level [README](../README.md). + +## Using Kafka-JUnit with JUnit 4. + +### Usage & Examples + +Include this in your project with scope test. + +``` + + com.salesforce.kafka.test + kafka-junit4 + 1.0.0 + test + +``` + +#### KafkaTestServer + +A great example of how to use this can be found within our tests! Check out [KafkaTestServerTest.java](src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java) + +Add the following to your JUnit test file and it will handle automatically starting and stopping the embedded Kafka +instance for you. + +```java + /** + * We have a single embedded kafka server that gets started when this test class is initialized. + * + * It's automatically started before any methods are run via the @ClassRule annotation. + * It's automatically stopped after all of the tests are completed via the @ClassRule annotation. + */ + @ClassRule + public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource(); +``` + +SharedKafkaTestResource has two accessors that you can make use of in your tests to interact with the service. + +```java + /** + * @return Shared Kafka Test server instance. + */ + public KafkaTestServer getKafkaTestServer(); + + /** + * @return Instance of KafkaTestUtils configured and ready to go. + */ + public KafkaTestUtils getKafkaTestUtils(); +``` + +#### KafkaTestUtils + +Often times you'll end up rebuilding the same patterns around producing and consuming data from this internal +kafka server. We've tried to collect some of these within [KafkaTestUtils](src/main/java/com/salesforce/kafka/test/KafkaTestUtils.java)! + +For usage and examples, check out it's test at [KafkaTestUtilsTest](src/test/java/com/salesforce/kafka/test/KafkaTestUtilsTest.java). + +#### Zookeeper Test Server + +**Note** Since Kafka depends on Zookeeper, you get this for *free* if you use the SharedKafkaTestResource, you do not, and should not, use + both of these together within the same Test class. + +If you need to run tests against an **only** embedded Zookeeper server and not all of Kafka, we have you covered as well. Add the following + to your JUnit test file +and it will handle automatically start and stopping the embedded Zookeeper instance for you. + +```java + /** + * We have a single embedded zookeeper server that gets started when this test class is initialized. + * + * It's automatically started before any methods are run via the @ClassRule annotation. + * It's automatically stopped after all of the tests are completed via the @ClassRule annotation. + */ + @ClassRule + public static final SharedZookeeperTestResource sharedZookeeperTestResource = new SharedZookeeperTestResource(); +``` + +SharedZookeeperTestResource has the following accessors that you can make use of in your tests to interact with the Zookeeper instance. + +```java + /** + * @return Shared Zookeeper test server instance. + */ + public TestingServer getZookeeperTestServer(); + + /** + * @return Connection string to connect to the Zookeeper instance. + */ + public String getZookeeperConnectString(); +``` \ No newline at end of file diff --git a/kafka-junit4/pom.xml b/kafka-junit4/pom.xml new file mode 100644 index 0000000..4c79669 --- /dev/null +++ b/kafka-junit4/pom.xml @@ -0,0 +1,115 @@ + + + + + + kafka-junit + com.salesforce.kafka.test + 2.0.0 + + 4.0.0 + + kafka-junit4 + 1.0.0 + + + + UTF-8 + + + 4.12 + + + + + + com.salesforce.kafka.test + kafka-junit-core + 1.0.0 + + + + + junit + junit + ${junit.version} + + + + + src/main/java + src/test/java + + + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + + + + + + make-assembly + package + + single + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.19.1 + + + org.apache.maven.surefire + surefire-junit47 + 2.19 + + + + -Xmx512M + ${skipTests} + plain + + + + + \ No newline at end of file diff --git a/src/main/java/com/salesforce/kafka/test/junit/SharedKafkaTestResource.java b/kafka-junit4/src/main/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResource.java similarity index 98% rename from src/main/java/com/salesforce/kafka/test/junit/SharedKafkaTestResource.java rename to kafka-junit4/src/main/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResource.java index 7ba10be..5c3dfa0 100644 --- a/src/main/java/com/salesforce/kafka/test/junit/SharedKafkaTestResource.java +++ b/kafka-junit4/src/main/java/com/salesforce/kafka/test/junit4/SharedKafkaTestResource.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2017, Salesforce.com, Inc. + * Copyright (c) 2017-2018, Salesforce.com, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the @@ -23,10 +23,10 @@ * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.salesforce.kafka.test.junit; +package com.salesforce.kafka.test.junit4; -import com.salesforce.kafka.test.KafkaTestUtils; import com.salesforce.kafka.test.KafkaTestServer; +import com.salesforce.kafka.test.KafkaTestUtils; import org.apache.curator.test.TestingServer; import org.junit.rules.ExternalResource; import org.slf4j.Logger; diff --git a/src/main/java/com/salesforce/kafka/test/junit/SharedZookeeperTestResource.java b/kafka-junit4/src/main/java/com/salesforce/kafka/test/junit4/SharedZookeeperTestResource.java similarity index 97% rename from src/main/java/com/salesforce/kafka/test/junit/SharedZookeeperTestResource.java rename to kafka-junit4/src/main/java/com/salesforce/kafka/test/junit4/SharedZookeeperTestResource.java index 2555a34..e6a1548 100644 --- a/src/main/java/com/salesforce/kafka/test/junit/SharedZookeeperTestResource.java +++ b/kafka-junit4/src/main/java/com/salesforce/kafka/test/junit4/SharedZookeeperTestResource.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2017, Salesforce.com, Inc. + * Copyright (c) 2017-2018, Salesforce.com, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the @@ -23,7 +23,7 @@ * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.salesforce.kafka.test.junit; +package com.salesforce.kafka.test.junit4; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingServer; @@ -98,4 +98,4 @@ public TestingServer getZookeeperTestServer() { public String getZookeeperConnectString() { return zookeeperTestServer.getConnectString(); } -} +} \ No newline at end of file diff --git a/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/KafkaTestServerTest.java similarity index 97% rename from src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java rename to kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/KafkaTestServerTest.java index 876a1a9..817b535 100644 --- a/src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java +++ b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/KafkaTestServerTest.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2017, Salesforce.com, Inc. + * Copyright (c) 2017-2018, Salesforce.com, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the @@ -23,10 +23,10 @@ * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.salesforce.kafka.test; +package com.salesforce.kafka.test.junit4; import com.google.common.collect.Lists; -import com.salesforce.kafka.test.junit.SharedKafkaTestResource; +import com.salesforce.kafka.test.KafkaTestServer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; diff --git a/src/test/java/com/salesforce/kafka/test/KafkaTestUtilsTest.java b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/KafkaTestUtilsTest.java similarity index 96% rename from src/test/java/com/salesforce/kafka/test/KafkaTestUtilsTest.java rename to kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/KafkaTestUtilsTest.java index d522c56..b5954be 100644 --- a/src/test/java/com/salesforce/kafka/test/KafkaTestUtilsTest.java +++ b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/KafkaTestUtilsTest.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2017, Salesforce.com, Inc. + * Copyright (c) 2017-2018, Salesforce.com, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the @@ -23,10 +23,12 @@ * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.salesforce.kafka.test; +package com.salesforce.kafka.test.junit4; import com.google.common.base.Charsets; -import com.salesforce.kafka.test.junit.SharedKafkaTestResource; +import com.salesforce.kafka.test.KafkaTestServer; +import com.salesforce.kafka.test.KafkaTestUtils; +import com.salesforce.kafka.test.ProducedKafkaRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Before; import org.junit.ClassRule; diff --git a/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/ZookeeperTestServerTest.java b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/ZookeeperTestServerTest.java new file mode 100644 index 0000000..017da1f --- /dev/null +++ b/kafka-junit4/src/test/java/com/salesforce/kafka/test/junit4/ZookeeperTestServerTest.java @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2017-2018, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the + * following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this list of conditions and the following + * disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the distribution. + * + * * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.kafka.test.junit4; + +import org.apache.curator.test.TestingServer; +import org.junit.ClassRule; +import org.junit.Test; + +import static junit.framework.TestCase.assertNotNull; +import static junit.framework.TestCase.assertTrue; + +/** + * Test of Zookeeper Test instance. + * + * This also serves as an example of how to use this library! + */ +public class ZookeeperTestServerTest { + /** + * We have a single embedded kafka server that gets started when this test class is initialized. + * + * It's automatically started before any methods are run via the @ClassRule annotation. + * It's automatically stopped after all of the tests are completed via the @ClassRule annotation. + */ + @ClassRule + public static final SharedZookeeperTestResource sharedZookeeperTestResource = new SharedZookeeperTestResource(); + + /** + * Validates that we receive a sane looking ZK connection string. + */ + @Test + public void testGetZookeeperConnectString() { + final String actualConnectStr = sharedZookeeperTestResource.getZookeeperConnectString(); + + // Validate + assertNotNull("Should have non-null connect string", actualConnectStr); + assertTrue("Should start with 127.0.0.1", actualConnectStr.startsWith("127.0.0.1:")); + } + + /** + * Validates that we receive a sane looking ZK connection string. + */ + @Test + public void testZookeeperServer() { + final TestingServer zkTestServer = sharedZookeeperTestResource.getZookeeperTestServer(); + + // Validate + assertNotNull("Should have non-null instance", zkTestServer); + } +} diff --git a/kafka-junit5/README.md b/kafka-junit5/README.md new file mode 100644 index 0000000..b80c53f --- /dev/null +++ b/kafka-junit5/README.md @@ -0,0 +1,115 @@ +# Kafka-JUnit5 + +This library wraps Kafka Test Server and allows you to easily create and run tests against +a "real" kafka server running within your tests, no more needing to stand up an external kafka cluster! + +Kafka-JUnit5 is built on-top of **JUnit 5** as an Extension using the **@ExtendWith** annotation. + +For usage with JUnit4 or more project information please review top level [README](../README.md). + +## Using Kafka-JUnit with JUnit 5. + +### Usage & Examples + +Include this in your project's POM with scope test. + +``` + + com.salesforce.kafka.test + kafka-junit5 + 1.0.0 + test + +``` + +#### KafkaTestServer + +A great example of how to use this can be found within our tests! Check out [KafkaTestServerTest.java](src/test/java/com/salesforce/kafka/test/KafkaTestServerTest.java) + +Annotate your JUnit test class with `@ExtendWith(KafkaResourceExtension.class)` and add the appropriate constructor. The JUnit5 extension will handle automatically starting and stopping the embedded Kafka +instance for you. + +```java +@ExtendWith(KafkaResourceExtension.class) +public class MyTestClass { + /** + * We have a single embedded kafka server that gets started when this test class is initialized. + * + * It's automatically started before any methods are run via the @ExtendWith annotation. + * It's automatically stopped after all of the tests are completed via the @ExtendWith annotation. + * This instance is passed to this class's constructor via the @ExtendWith annotation. + */ + private final SharedKafkaTestResource sharedKafkaTestResource; + + /** + * Constructor where KafkaResourceExtension provides the sharedKafkaTestResource object. + * @param sharedKafkaTestResource Provided by KafkaResourceExtension. + */ + public MyTestClass(final SharedKafkaTestResource sharedKafkaTestResource) { + this.sharedKafkaTestResource = sharedKafkaTestResource; + } +``` + +[SharedKafkaTestResource](kafka-junit5/src/main/java/test/junit/SharedKafkaTestResource.java) instance has two accessors that you can make use of in your tests to interact with the service. + +```java + /** + * @return Shared Kafka Test server instance. + */ + public KafkaTestServer getKafkaTestServer(); + + /** + * @return Instance of KafkaTestUtils configured and ready to go. + */ + public KafkaTestUtils getKafkaTestUtils(); +``` + +#### KafkaTestUtils + +Often times you'll end up rebuilding the same patterns around producing and consuming data from this internal +kafka server. We've tried to collect some of these within [KafkaTestUtils](kafka-junit5/src/main/java/test/KafkaTestUtils.java)! + +For usage and examples, check out it's test at [KafkaTestUtilsTest](src/test/java/com/salesforce/kafka/test/KafkaTestUtilsTest.java). + +#### Zookeeper Test Server + +**Note** Since Kafka depends on Zookeeper, you get this for *free* if you use the [KafkaResourceExtension](kafka-junit5/src/main/java/test/junit/KafkaResourceExtension.java), you do not, and should not, use +both of these together within the same Test class. + +If you need to run tests against an **only** embedded Zookeeper server and not all of Kafka, we have you covered as well. Add the following annotation to your JUnit test class + and it will handle automatically start and stopping the embedded Zookeeper instance for you. + +```java +@ExtendWith(ZookeeperResourceExtension.class) +public class MyTestClass { + /** + * We have a single embedded zookeeper server that gets started when this test class is initialized. + * + * It's automatically started before any methods are run via the @ExtendWith annotation. + * It's automatically stopped after all of the tests are completed via the @ExtendWith annotation. + * This instance is passed to this class's constructor via the @ExtendWith annotation. + */ + private final SharedZookeeperTestResource sharedZookeeperTestResource; + + /** + * Constructor where KafkaResourceExtension provides the sharedKafkaTestResource object. + * @param sharedZookeeperTestResource Provided by ZookeeperResourceExtension. + */ + public MyTestClass(final SharedZookeeperTestResource sharedZookeeperTestResource) { + this.sharedZookeeperTestResource = sharedZookeeperTestResource; + } +``` + +[SharedZookeeperTestResource](kafka-junit5/src/main/java/test/junit/SharedZookeeperTestResource.java) has the following accessors that you can make use of in your tests to interact with the Zookeeper instance. + +```java + /** + * @return Shared Zookeeper test server instance. + */ + public TestingServer getZookeeperTestServer(); + + /** + * @return Connection string to connect to the Zookeeper instance. + */ + public String getZookeeperConnectString(); +``` \ No newline at end of file diff --git a/kafka-junit5/pom.xml b/kafka-junit5/pom.xml new file mode 100644 index 0000000..fc69629 --- /dev/null +++ b/kafka-junit5/pom.xml @@ -0,0 +1,119 @@ + + + + + kafka-junit + com.salesforce.kafka.test + 2.0.0 + + 4.0.0 + + kafka-junit5 + 1.0.0 + + + + UTF-8 + + + 5.1.1 + + + + + + com.salesforce.kafka.test + kafka-junit-core + 1.0.0 + + + + + org.junit.jupiter + junit-jupiter-engine + ${junit.version} + + + + + src/main/java + src/test/java + + + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + + + + + + make-assembly + package + + single + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.19.1 + + + org.junit.platform + junit-platform-surefire-provider + 1.1.1 + + + org.junit.jupiter + junit-jupiter-engine + ${junit.version} + + + + -Xmx512M + ${skipTests} + plain + + + + + \ No newline at end of file diff --git a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/KafkaResourceExtension.java b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/KafkaResourceExtension.java new file mode 100644 index 0000000..08d5ebd --- /dev/null +++ b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/KafkaResourceExtension.java @@ -0,0 +1,116 @@ +/** + * Copyright (c) 2017-2018, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the + * following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this list of conditions and the following + * disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the distribution. + * + * * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.kafka.test.junit5; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JUnit 5 extension to provide an internal test kafka server to be shared across test cases within the same test class. + * + * Annotate your test class with: + * @ExtendWith(KafkaResourceExtension.class) + * + * Add a constructor parameter to your test class: + * + * public YourTestClass(SharedKafkaTestResource sharedKafkaTestResource) { + * // Save reference to test resource. + * this.sharedKafkaTestResource = sharedKafkaTestResource; + * } + * + * Within your test case methods: + * this.sharedKafkaTestResource.getKafkaTestServer()... + */ +public class KafkaResourceExtension implements BeforeAllCallback, AfterAllCallback, ParameterResolver { + private static final Logger logger = LoggerFactory.getLogger(KafkaResourceExtension.class); + + /** + * Shared Kafka Test Server Resource. + */ + private SharedKafkaTestResource kafkaTestResource = new SharedKafkaTestResource(); + + /** + * Here we stand up an internal test kafka and zookeeper service. + * Once for all tests that use this shared resource. + */ + @Override + public void beforeAll(ExtensionContext context) throws Exception { + logger.info("Starting kafka test server"); + + // Start kafka test server + kafkaTestResource + .getKafkaTestServer() + .start(); + } + + /** + * Here we shut down the internal test kafka and zookeeper services. + */ + @Override + public void afterAll(ExtensionContext context) throws Exception { + logger.info("Shutting down kafka test server"); + + // Close out kafka test server if needed + try { + kafkaTestResource + .getKafkaTestServer() + .shutdown(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + kafkaTestResource = null; + } + + @Override + public boolean supportsParameter( + final ParameterContext parameterContext, + final ExtensionContext extensionContext) throws ParameterResolutionException { + return parameterContext + .getParameter() + .getType() + .equals(SharedKafkaTestResource.class); + } + + @Override + public Object resolveParameter( + final ParameterContext parameterContext, + final ExtensionContext extensionContext) throws ParameterResolutionException { + final Class parameterType = parameterContext + .getParameter() + .getType(); + + if (parameterType.equals(SharedKafkaTestResource.class)) { + return kafkaTestResource; + } + return null; + } +} diff --git a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResource.java b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResource.java new file mode 100644 index 0000000..10e42da --- /dev/null +++ b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedKafkaTestResource.java @@ -0,0 +1,83 @@ +/** + * Copyright (c) 2017-2018, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the + * following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this list of conditions and the following + * disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the distribution. + * + * * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.kafka.test.junit5; + +import com.salesforce.kafka.test.KafkaTestServer; +import com.salesforce.kafka.test.KafkaTestUtils; +import org.apache.curator.test.TestingServer; + +/** + * Shared Kafka Test Resource instance. Contains references to internal Kafka and Zookeeper server instances. + */ +public class SharedKafkaTestResource { + /** + * Our internal Kafka Test Server instance. + */ + private final KafkaTestServer kafkaTestServer = new KafkaTestServer(); + + /** + * Cached instance of KafkaTestUtils. + */ + private KafkaTestUtils kafkaTestUtils = null; + + /** + * @return Shared Kafka Test server instance. + */ + public KafkaTestServer getKafkaTestServer() { + return kafkaTestServer; + } + + /** + * @return Instance of KafkaTestUtils configured and ready to go. + */ + public KafkaTestUtils getKafkaTestUtils() { + if (kafkaTestUtils == null) { + kafkaTestUtils = new KafkaTestUtils(getKafkaTestServer()); + } + return kafkaTestUtils; + } + + /** + * @return Shared Zookeeper test server instance. + */ + public TestingServer getZookeeperTestServer() { + return getKafkaTestServer().getZookeeperServer(); + } + + /** + * @return Connection string to connect to the Zookeeper instance. + */ + public String getZookeeperConnectString() { + return getZookeeperTestServer().getConnectString(); + } + + /** + * @return The proper connect string to use for Kafka. + */ + public String getKafkaConnectString() { + return getKafkaTestServer().getKafkaConnectString(); + } +} diff --git a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResource.java b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResource.java new file mode 100644 index 0000000..7469197 --- /dev/null +++ b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/SharedZookeeperTestResource.java @@ -0,0 +1,62 @@ +/** + * Copyright (c) 2017-2018, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the + * following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this list of conditions and the following + * disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the distribution. + * + * * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.kafka.test.junit5; + +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingServer; + +/** + * Shared Zookeeper Test Resource instance. Contains references to internal Zookeeper server instances. + */ +public class SharedZookeeperTestResource { + /** + * Our internal Zookeeper test server instance. + */ + private TestingServer zookeeperTestServer = null; + + /** + * @return Shared Zookeeper test server instance. + */ + public TestingServer getZookeeperTestServer() { + if (zookeeperTestServer == null) { + // Setup zookeeper test server + final InstanceSpec zkInstanceSpec = new InstanceSpec(null, -1, -1, -1, true, -1, -1, 1000); + try { + zookeeperTestServer = new TestingServer(zkInstanceSpec, true); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + return zookeeperTestServer; + } + + /** + * @return Connection string to connect to the Zookeeper instance. + */ + public String getZookeeperConnectString() { + return getZookeeperTestServer().getConnectString(); + } +} diff --git a/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/ZookeeperResourceExtension.java b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/ZookeeperResourceExtension.java new file mode 100644 index 0000000..0b18697 --- /dev/null +++ b/kafka-junit5/src/main/java/com/salesforce/kafka/test/junit5/ZookeeperResourceExtension.java @@ -0,0 +1,124 @@ +/** + * Copyright (c) 2017-2018, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the + * following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this list of conditions and the following + * disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the distribution. + * + * * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.kafka.test.junit5; + +import org.apache.curator.test.TestingServer; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * JUnit 5 extension to provide an internal test zookeeper server to be shared across test cases within the same test class. + * + * Annotate your test class with: + * @ExtendWith(ZookeeperResourceExtension.class) + * + * Add a constructor parameter to your test class: + * + * public YourTestClass(SharedZookeeperTestResource sharedZookeeperTestResource) { + * // Save reference to test resource. + * this.sharedZookeeperTestResource = sharedZookeeperTestResource; + * } + * + * Within your test case methods: + * this.sharedZookeeperTestResource.getZookeeperTestServer()... + * this.sharedZookeeperTestResource.getZookeeperConnectString()... + */ +public class ZookeeperResourceExtension implements BeforeAllCallback, AfterAllCallback, ParameterResolver { + private static final Logger logger = LoggerFactory.getLogger(ZookeeperResourceExtension.class); + + private SharedZookeeperTestResource zookeeperTestResource = null; + + /** + * Here we shut down the internal test zookeeper service. + */ + @Override + public void afterAll(ExtensionContext context) throws Exception { + logger.info("Shutting down zookeeper test server"); + + // If we don't have an instance + if (zookeeperTestResource == null) { + // Nothing to close. + return; + } + + try { + final TestingServer testingServer = zookeeperTestResource.getZookeeperTestServer(); + testingServer.stop(); + testingServer.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // null out reference + zookeeperTestResource = null; + } + + /** + * Here we stand up an internal test zookeeper service. + * Once for all tests that use this shared resource. + */ + @Override + public void beforeAll(ExtensionContext context) throws Exception { + logger.info("Starting Zookeeper test server"); + if (zookeeperTestResource != null) { + throw new IllegalStateException("Unknown State! Zookeeper test server already exists!"); + } + // Setup zookeeper test server + zookeeperTestResource = new SharedZookeeperTestResource(); + } + + @Override + public boolean supportsParameter( + final ParameterContext parameterContext, + final ExtensionContext extensionContext) throws ParameterResolutionException { + return parameterContext + .getParameter() + .getType() + .equals(SharedZookeeperTestResource.class); + } + + @Override + public Object resolveParameter( + final ParameterContext parameterContext, + final ExtensionContext extensionContext) throws ParameterResolutionException { + final Class parameterType = parameterContext + .getParameter() + .getType(); + + if (parameterType.equals(SharedZookeeperTestResource.class)) { + return zookeeperTestResource; + } + return null; + } +} diff --git a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestServerTest.java b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestServerTest.java new file mode 100644 index 0000000..45cbc67 --- /dev/null +++ b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestServerTest.java @@ -0,0 +1,174 @@ +/** + * Copyright (c) 2017-2018, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the + * following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this list of conditions and the following + * disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the distribution. + * + * * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.kafka.test.junit5; + +import com.google.common.collect.Lists; +import com.salesforce.kafka.test.KafkaTestServer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Clock; +import java.util.List; +import java.util.concurrent.Future; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test of KafkaTestServer. + * + * This also serves as an example of how to use this library! + */ +@ExtendWith(KafkaResourceExtension.class) +public class KafkaTestServerTest { + private static final Logger logger = LoggerFactory.getLogger(KafkaTestServerTest.class); + + /** + * We have a single embedded kafka server that gets started when this test class is initialized. + * + * It's automatically started before any methods are run via the @ExtendWith annotation. + * It's automatically stopped after all of the tests are completed via the @ExtendWith annotation. + * This instance is passed to this class's constructor via the @ExtendWith annotation. + */ + private final SharedKafkaTestResource sharedKafkaTestResource; + + /** + * Constructor where KafkaResourceExtension provides the sharedKafkaTestResource object. + * @param sharedKafkaTestResource Provided by KafkaResourceExtension. + */ + public KafkaTestServerTest(final SharedKafkaTestResource sharedKafkaTestResource) { + this.sharedKafkaTestResource = sharedKafkaTestResource; + } + + /** + * Before every test, we generate a random topic name and create it within the embedded kafka server. + * Each test can then be segmented run any tests against its own topic. + */ + private String topicName; + + /** + * This happens once before every test method. + * Create a new empty namespace with randomly generated name. + */ + @BeforeEach + void beforeTest() { + // Generate topic name + topicName = getClass().getSimpleName() + Clock.systemUTC().millis(); + + // Create topic with a single partition, + // NOTE: This will create partition id 0, because partitions are indexed at 0 :) + getKafkaTestServer().createTopic(topicName, 1); + } + + /** + * Test that KafkaServer works as expected! + * + * This also serves as a decent example of how to use the producer and consumer. + */ + @Test + void testProducerAndConsumer() throws Exception { + final int partitionId = 0; + + // Define our message + final String expectedKey = "my-key"; + final String expectedValue = "my test message"; + + // Define the record we want to produce + ProducerRecord producerRecord = new ProducerRecord<>(topicName, partitionId, expectedKey, expectedValue); + + // Create a new producer + KafkaProducer producer = getKafkaTestServer().getKafkaProducer(StringSerializer.class, StringSerializer.class); + + // Produce it & wait for it to complete. + Future future = producer.send(producerRecord); + producer.flush(); + while (!future.isDone()) { + Thread.sleep(500L); + } + logger.info("Produce completed"); + + // Close producer! + producer.close(); + + KafkaConsumer kafkaConsumer = + getKafkaTestServer().getKafkaConsumer(StringDeserializer.class, StringDeserializer.class); + + final List topicPartitionList = Lists.newArrayList(); + for (final PartitionInfo partitionInfo: kafkaConsumer.partitionsFor(topicName)) { + topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } + kafkaConsumer.assign(topicPartitionList); + kafkaConsumer.seekToBeginning(topicPartitionList); + + // Pull records from kafka, keep polling until we get nothing back + ConsumerRecords records; + do { + records = kafkaConsumer.poll(2000L); + logger.info("Found {} records in kafka", records.count()); + for (ConsumerRecord record: records) { + // Validate + assertEquals(expectedKey, record.key(), "Key matches expected"); + assertEquals(expectedValue, record.value(), "value matches expected"); + } + } + while (!records.isEmpty()); + + // close consumer + kafkaConsumer.close(); + } + + /** + * Test if we create a topic more than once, no errors occur. + */ + @Test + void testCreatingTopicMultipleTimes() { + final String myTopic = "myTopic"; + for (int creationCounter = 0; creationCounter < 5; creationCounter++) { + getKafkaTestServer().createTopic(myTopic); + } + assertTrue(true, "Made it here!"); + } + + /** + * Simple accessor. + */ + private KafkaTestServer getKafkaTestServer() { + return sharedKafkaTestResource.getKafkaTestServer(); + } +} \ No newline at end of file diff --git a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestUtilsTest.java b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestUtilsTest.java new file mode 100644 index 0000000..4da7225 --- /dev/null +++ b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/KafkaTestUtilsTest.java @@ -0,0 +1,148 @@ +/** + * Copyright (c) 2017-2018, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the + * following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this list of conditions and the following + * disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the distribution. + * + * * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.kafka.test.junit5; + +import com.google.common.base.Charsets; +import com.salesforce.kafka.test.KafkaTestServer; +import com.salesforce.kafka.test.KafkaTestUtils; +import com.salesforce.kafka.test.ProducedKafkaRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Clock; +import java.util.Iterator; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Serves both as a test for the Utilities, but also as a good example of how to use them. + */ +@ExtendWith(KafkaResourceExtension.class) +public class KafkaTestUtilsTest { + private static final Logger logger = LoggerFactory.getLogger(KafkaTestUtilsTest.class); + + /** + * We have a single embedded kafka server that gets started when this test class is initialized. + * + * It's automatically started before any methods are run via the @ClassRule annotation. + * It's automatically stopped after all of the tests are completed via the @ClassRule annotation. + */ + private final SharedKafkaTestResource sharedKafkaTestResource; + + /** + * Before every test, we generate a random topic name and create it within the embedded kafka server. + * Each test can then be segmented run any tests against its own topic. + */ + private String topicName; + + /** + * Constructor where KafkaResourceExtension provides the sharedKafkaTestResource object. + * @param sharedKafkaTestResource Provided by KafkaResourceExtension. + */ + public KafkaTestUtilsTest(SharedKafkaTestResource sharedKafkaTestResource) { + this.sharedKafkaTestResource = sharedKafkaTestResource; + } + + /** + * This happens once before every test method. + * Create a new empty namespace with randomly generated name. + */ + @BeforeEach + void beforeTest() { + // Generate topic name + topicName = getClass().getSimpleName() + Clock.systemUTC().millis(); + + // Create topic with 3 partitions, + // NOTE: This will create partition ids 0 thru 2, because partitions are indexed at 0 :) + getKafkaTestServer().createTopic(topicName, 3); + } + + /** + * Simple smoke test and example of how to use this Utility class. + */ + @Test + void testProducerAndConsumerUtils() { + final int numberOfRecords = 10; + final int partitionId = 2; + + // Create our utility class + final KafkaTestUtils kafkaTestUtils = new KafkaTestUtils(getKafkaTestServer()); + + // Produce some random records + final List> producedRecordsList = + kafkaTestUtils.produceRecords(numberOfRecords, topicName, partitionId); + + // You can get details about what get produced + for (ProducedKafkaRecord producedKafkaRecord: producedRecordsList) { + final String key = new String(producedKafkaRecord.getKey(), Charsets.UTF_8); + final String value = new String(producedKafkaRecord.getValue(), Charsets.UTF_8); + final String topic = producedKafkaRecord.getTopic(); + final int partition = producedKafkaRecord.getPartition(); + final long offset = producedKafkaRecord.getOffset(); + + // for debugging + logger.info("Produced into topic:{} partition:{} offset:{} key:{} value:{}", topic, partition, offset, key, value); + } + + // Now we'll try to consume these records. + final List> consumerRecords = kafkaTestUtils.consumeAllRecordsFromTopic(topicName); + + // Validate + assertEquals(numberOfRecords, consumerRecords.size(), "Should have 10 records"); + + final Iterator> consumerRecordIterator = consumerRecords.iterator(); + final Iterator> producedKafkaRecordIterator = producedRecordsList.iterator(); + + while (consumerRecordIterator.hasNext()) { + ConsumerRecord consumerRecord = consumerRecordIterator.next(); + ProducedKafkaRecord producedKafkaRecord = producedKafkaRecordIterator.next(); + + final String expectedKey = new String(producedKafkaRecord.getKey(), Charsets.UTF_8); + final String expectedValue = new String(producedKafkaRecord.getValue(), Charsets.UTF_8); + final String actualKey = new String(consumerRecord.key(), Charsets.UTF_8); + final String actualValue = new String(consumerRecord.value(), Charsets.UTF_8); + + // Make sure they match + assertEquals(producedKafkaRecord.getTopic(), consumerRecord.topic(), "Has correct topic"); + assertEquals(producedKafkaRecord.getPartition(), consumerRecord.partition(), "Has correct partition"); + assertEquals(producedKafkaRecord.getOffset(), consumerRecord.offset(), "Has correct offset"); + assertEquals(expectedKey, actualKey, "Has correct key"); + assertEquals(expectedValue, actualValue, "Has correct value"); + } + } + + /** + * Simple accessor. + */ + private KafkaTestServer getKafkaTestServer() { + return sharedKafkaTestResource.getKafkaTestServer(); + } +} \ No newline at end of file diff --git a/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/ZookeeperTestServerTest.java b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/ZookeeperTestServerTest.java new file mode 100644 index 0000000..e25438c --- /dev/null +++ b/kafka-junit5/src/test/java/com/salesforce/kafka/test/junit5/ZookeeperTestServerTest.java @@ -0,0 +1,81 @@ +/** + * Copyright (c) 2017-2018, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the + * following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this list of conditions and the following + * disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the distribution. + * + * * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.kafka.test.junit5; + +import org.apache.curator.test.TestingServer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test of Zookeeper Test instance. + * + * This also serves as an example of how to use this library! + */ +@ExtendWith(ZookeeperResourceExtension.class) +public class ZookeeperTestServerTest { + /** + * We have a single embedded zookeeper server that gets started when this test class is initialized. + * + * It's automatically started before any methods are run via the @ExtendWith annotation. + * It's automatically stopped after all of the tests are completed via the @ExtendWith annotation. + * This instance is passed to this class's constructor via the @ExtendWith annotation. + */ + private final SharedZookeeperTestResource sharedZookeeperTestResource; + + /** + * Constructor where KafkaResourceExtension provides the sharedKafkaTestResource object. + * @param sharedZookeeperTestResource Provided by ZookeeperResourceExtension. + */ + public ZookeeperTestServerTest(final SharedZookeeperTestResource sharedZookeeperTestResource) { + this.sharedZookeeperTestResource = sharedZookeeperTestResource; + } + + /** + * Validates that we receive a sane looking ZK connection string. + */ + @Test + void testGetZookeeperConnectString() { + final String actualConnectStr = sharedZookeeperTestResource.getZookeeperConnectString(); + + // Validate + assertNotNull(actualConnectStr, "Should have non-null connect string"); + assertTrue(actualConnectStr.startsWith("127.0.0.1:"), "Should start with 127.0.0.1"); + } + + /** + * Validates that we receive a sane looking ZK connection string. + */ + @Test + void testZookeeperServer() { + final TestingServer zkTestServer = sharedZookeeperTestResource.getZookeeperTestServer(); + + // Validate + assertNotNull(zkTestServer, "Should have non-null instance"); + } +} diff --git a/pom.xml b/pom.xml index d019f92..19debff 100644 --- a/pom.xml +++ b/pom.xml @@ -1,7 +1,7 @@ + + kafka-junit-core + kafka-junit4 + kafka-junit5 + + + pom kafka-junit This library wraps Kafka's embedded test cluster, allowing you to more easily create and run integration tests using JUnit against a "real" kafka server running within the context of your tests. 2017 @@ -83,8 +90,8 @@ 2.12.0 - - 4.12 + + 1.7.25 false @@ -125,23 +132,16 @@ ${curatorTestVersion} - + - junit - junit - ${junit.version} + org.slf4j + slf4j-simple + ${slf4j.version} + test - src/main/java - src/test/java - - - src/test/resources/ - - - org.apache.maven.plugins @@ -165,48 +165,6 @@ 1.8 - - - maven-assembly-plugin - - - jar-with-dependencies - - - - - - - - - - make-assembly - package - - single - - - - - - - - org.apache.maven.plugins - maven-surefire-plugin - 2.19.1 - - - org.apache.maven.surefire - surefire-junit47 - 2.19 - - - - -Xmx512M - ${skipTests} - plain - - @@ -222,8 +180,6 @@ **.yaml **.xml script/** - src/test/resources/** - src/main/resources/** @@ -307,7 +263,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 3.0.0-M1 + 3.0.0 public @@ -348,7 +304,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 3.0.0-M1 + 3.0.0 build-javadocs @@ -372,8 +328,7 @@ sign - ${gpg.keyname} - ${gpg.keyname} + KafkaJUnitReleaseKey diff --git a/script/header.txt b/script/header.txt index df95feb..f7298de 100644 --- a/script/header.txt +++ b/script/header.txt @@ -1,5 +1,5 @@ /** - * Copyright (c) 2017, Salesforce.com, Inc. + * Copyright (c) 2017-2018, Salesforce.com, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the