Skip to content

Commit

Permalink
Kafka source integration test (#2891)
Browse files Browse the repository at this point in the history
* Integration testcases

Signed-off-by: Ajeesh Gopalakrishnakurup <ajeesh.akd@gmail.com>

* Fix for the Integration testcases

Signed-off-by: Ajeesh Gopalakrishnakurup <ajeesh.akd@gmail.com>

* Fix for the white source issue

Signed-off-by: Ajeesh Gopalakrishnakurup <ajeesh.akd@gmail.com>

* Fixes for the merge conflicts

Signed-off-by: Ajeesh Gopalakrishnakurup <ajeesh.akd@gmail.com>

---------

Signed-off-by: Ajeesh Gopalakrishnakurup <ajeesh.akd@gmail.com>
  • Loading branch information
ajeeshakd authored Jun 27, 2023
1 parent 9ed1529 commit 8bb96dd
Show file tree
Hide file tree
Showing 7 changed files with 644 additions and 0 deletions.
43 changes: 43 additions & 0 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,53 @@ dependencies {
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation 'io.confluent:kafka-avro-serializer:7.3.3'
implementation 'io.confluent:kafka-schema-registry-client:7.3.3'
implementation 'io.confluent:kafka-avro-serializer:7.3.3'
implementation 'io.confluent:kafka-schema-registry-client:7.3.3'
implementation 'io.confluent:kafka-schema-registry:7.3.3:tests'
testImplementation 'org.mockito:mockito-inline:4.1.0'
testImplementation 'org.yaml:snakeyaml:2.0'
testImplementation testLibs.spring.test
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2'
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-plugins:blocking-buffer')
testImplementation 'org.mockito:mockito-inline:4.1.0'
testImplementation 'org.apache.kafka:kafka_2.13:3.4.0'
testImplementation 'org.apache.kafka:kafka_2.13:3.4.0:test'
testImplementation 'org.apache.curator:curator-test:5.5.0'
testImplementation 'io.confluent:kafka-schema-registry:7.4.0'
testImplementation 'junit:junit:4.13.1'
testImplementation 'org.apache.kafka:kafka-clients:3.4.0:test'
testImplementation 'org.apache.kafka:connect-json:3.4.0'
}

test {
useJUnitPlatform()
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
//resources.srcDir file('src/integrationTest/resources')
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

filter {
includeTestsMatching '*IT'
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.source;

import io.confluent.kafka.schemaregistry.RestApp;
import io.confluent.kafka.schemaregistry.avro.AvroCompatibilityLevel;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import kafka.server.KafkaConfig$;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

/**
* Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance, 1 Kafka broker, and 1
* Confluent Schema Registry instance.
*/
public class EmbeddedKafkaClusterSingleNode extends ExternalResource {

private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaClusterSingleNode.class);
private static final int DEFAULT_BROKER_PORT = 0;
private static final String KAFKA_SCHEMAS_TOPIC = "_schemas";
private static final String AVRO_COMPATIBILITY_TYPE = AvroCompatibilityLevel.NONE.name;
private static final String KAFKASTORE_OPERATION_TIMEOUT_MS = "60000";
private static final String KAFKASTORE_DEBUG = "true";
private static final String KAFKASTORE_INIT_TIMEOUT = "90000";

private EmbeddedZooKeeperServer zookeeper;
private EmbeddedKafkaServer broker;
private RestApp schemaRegistry;
private final Properties brokerConfig;
private boolean running;

public EmbeddedKafkaClusterSingleNode() {
this(new Properties());
}

public EmbeddedKafkaClusterSingleNode(final Properties brokerConfig) {
this.brokerConfig = new Properties();
this.brokerConfig.put(SchemaRegistryConfig.KAFKASTORE_TIMEOUT_CONFIG, KAFKASTORE_OPERATION_TIMEOUT_MS);
this.brokerConfig.putAll(brokerConfig);
}

/**
* Creates and starts the cluster.
*/
public void start() throws Exception {
log.debug("Initiating embedded Kafka cluster startup");
log.debug("Starting a ZooKeeper instance...");
zookeeper = new EmbeddedZooKeeperServer();
log.debug("ZooKeeper instance is running at {}", zookeeper.connectString());

final Properties effectiveBrokerConfig = effectiveBrokerConfigFrom(brokerConfig, zookeeper);
log.debug("Starting a Kafka instance on ...",
effectiveBrokerConfig.getProperty(KafkaConfig$.MODULE$.ZkConnectDoc()));
broker = new EmbeddedKafkaServer(effectiveBrokerConfig);
log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}",
broker.brokerList(), broker.zookeeperConnect());

final Properties schemaRegistryProps = new Properties();

schemaRegistryProps.put(SchemaRegistryConfig.KAFKASTORE_TIMEOUT_CONFIG, KAFKASTORE_OPERATION_TIMEOUT_MS);
schemaRegistryProps.put(SchemaRegistryConfig.DEBUG_CONFIG, KAFKASTORE_DEBUG);
schemaRegistryProps.put(SchemaRegistryConfig.KAFKASTORE_INIT_TIMEOUT_CONFIG, KAFKASTORE_INIT_TIMEOUT);
schemaRegistryProps.put(SchemaRegistryConfig.KAFKASTORE_BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");


schemaRegistry = new RestApp(0, zookeeperConnect(), KAFKA_SCHEMAS_TOPIC, "none", schemaRegistryProps);
schemaRegistry.start();
running = true;
}

private Properties effectiveBrokerConfigFrom(final Properties brokerConfig, final EmbeddedZooKeeperServer zookeeper) {
final Properties effectiveConfig = new Properties();
effectiveConfig.putAll(brokerConfig);
effectiveConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zookeeper.connectString());
effectiveConfig.put(KafkaConfig$.MODULE$.ZkSessionTimeoutMsProp(), 30 * 1000);
effectiveConfig.put(KafkaConfig$.MODULE$.ZkConnectionTimeoutMsProp(), 60 * 1000);
effectiveConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
effectiveConfig.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
effectiveConfig.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
effectiveConfig.put(KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 1);
effectiveConfig.put(KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), 1);
effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
return effectiveConfig;
}

@Override
protected void before() throws Exception {
start();
}

@Override
protected void after() {
stop();
}

/**
* Stops the cluster.
*/
public void stop() {
log.info("Stopping Confluent");
try {
try {
if (schemaRegistry != null) {
schemaRegistry.stop();
}
} catch (final Exception fatal) {
throw new RuntimeException(fatal);
}
if (broker != null) {
broker.stop();
}
try {
if (zookeeper != null) {
zookeeper.stop();
}
} catch (final IOException fatal) {
throw new RuntimeException(fatal);
}
} finally {
running = false;
}
log.info("Confluent Stopped");
}

public String bootstrapServers() {
return broker.brokerList();
}

public String zookeeperConnect() {
return zookeeper.connectString();
}

public String schemaRegistryUrl() {
return schemaRegistry.restConnect;
}

public void createTopic(final String topic) {
createTopic(topic, 1, (short) 1, Collections.emptyMap());
}

public void createTopic(final String topic, final int partitions, final short replication) {
createTopic(topic, partitions, replication, Collections.emptyMap());
}

public void createTopic(final String topic,
final int partitions,
final short replication,
final Map<String, String> topicConfig) {
broker.createTopic(topic, partitions, replication, topicConfig);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.source;


import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Time;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
* Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by
* default.
*
* Requires a running ZooKeeper instance to connect to. By default, it expects a ZooKeeper instance
* running at `127.0.0.1:2181`.
*/
public class EmbeddedKafkaServer {

private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaServer.class);

private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181";

private final Properties effectiveConfig;
private final File logDir;
private final TemporaryFolder tmpFolder;
private final KafkaServer kafka;

public EmbeddedKafkaServer(final Properties config) throws IOException {
tmpFolder = new TemporaryFolder();
tmpFolder.create();
logDir = tmpFolder.newFolder();
effectiveConfig = effectiveConfigFrom(config);
final boolean loggingEnabled = true;

final KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled);
log.info("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...",
logDir, zookeeperConnect());
kafka = TestUtils.createServer(kafkaConfig, Time.SYSTEM);
log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
brokerList(), zookeeperConnect());
}

private Properties effectiveConfigFrom(final Properties initialConfig) throws IOException {
final Properties effectiveConfig = new Properties();
effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 1);
effectiveConfig.put(KafkaConfig$.MODULE$.NumPartitionsProp(), 1);
effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
effectiveConfig.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), 1000000);
effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true);

effectiveConfig.putAll(initialConfig);
effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), logDir.getAbsolutePath());
return effectiveConfig;
}

public String brokerList() {
return kafka.config().zkConnect();
}


public String zookeeperConnect() {
return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT);
}

public void stop() {
log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...",
brokerList(), zookeeperConnect());
kafka.shutdown();
kafka.awaitShutdown();
log.debug("Removing temp folder {} with logs.dir at {} ...", tmpFolder, logDir);
tmpFolder.delete();
log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
brokerList(), zookeeperConnect());
}

public void createTopic(final String topic) {
createTopic(topic, 1, (short) 1, Collections.emptyMap());
}

public void createTopic(final String topic, final int partitions, final short replication) {
createTopic(topic, partitions, replication, Collections.emptyMap());
}

public void createTopic(final String topic,
final int partitions,
final short replication,
final Map<String, String> topicConfig) {
log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
topic, partitions, replication, topicConfig);

final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

try (final AdminClient adminClient = AdminClient.create(properties)) {
final NewTopic newTopic = new NewTopic(topic, partitions, replication);
newTopic.configs(topicConfig);
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
} catch (final InterruptedException | ExecutionException fatal) {
throw new RuntimeException(fatal);
}

}

public void deleteTopic(final String topic) {
log.debug("Deleting topic {}", topic);
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList());

try (final AdminClient adminClient = AdminClient.create(properties)) {
adminClient.deleteTopics(Collections.singleton(topic)).all().get();
} catch (final InterruptedException e) {
throw new RuntimeException(e);
} catch (final ExecutionException e) {
if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
throw new RuntimeException(e);
}
}
}

KafkaServer kafkaServer() {
return kafka;
}
}
Loading

0 comments on commit 8bb96dd

Please sign in to comment.