diff --git a/data-prepper-plugins/kafka-plugins/build.gradle b/data-prepper-plugins/kafka-plugins/build.gradle index f20e4928a8..9baba37795 100644 --- a/data-prepper-plugins/kafka-plugins/build.gradle +++ b/data-prepper-plugins/kafka-plugins/build.gradle @@ -9,6 +9,7 @@ plugins { dependencies { implementation project(':data-prepper-api') + implementation project(':data-prepper-plugins:buffer-common') implementation 'org.apache.kafka:kafka-clients:3.4.0' implementation 'org.apache.avro:avro:1.11.0' implementation 'com.fasterxml.jackson.core:jackson-databind' diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/JSONConsumerIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/JSONConsumerIT.java index 1fab0d7ac1..cc777b25df 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/JSONConsumerIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/JSONConsumerIT.java @@ -25,6 +25,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; @@ -49,7 +50,7 @@ public class JSONConsumerIT { private KafkaSourceConfig kafkaSourceConfig; private KafkaSource kafkaSource; - private Buffer> buffer; + private Buffer> buffer; @ClassRule public static final EmbeddedKafkaClusterSingleNode CLUSTER = new EmbeddedKafkaClusterSingleNode(); @@ -111,4 +112,4 @@ private void produceTestMessages() throws JsonProcessingException { throw new RuntimeException(e); } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/PlainTextConsumerIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/PlainTextConsumerIT.java index a2f10eeba7..a5118e64c5 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/PlainTextConsumerIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/PlainTextConsumerIT.java @@ -20,6 +20,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; @@ -44,7 +45,7 @@ public class PlainTextConsumerIT { private KafkaSourceConfig kafkaSourceConfig; private KafkaSource kafkaSource; - private Buffer> buffer; + private Buffer> buffer; @ClassRule public static final EmbeddedKafkaClusterSingleNode CLUSTER = new EmbeddedKafkaClusterSingleNode(); @@ -101,4 +102,4 @@ private void produceTestMessages() { throw new RuntimeException(e); } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java index 4a42649fae..b2536170cb 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java @@ -21,7 +21,7 @@ public class TopicConfig { private static final Duration SESSION_TIMEOUT = Duration.ofSeconds(45); private static final int MAX_RETRY_ATTEMPT = Integer.MAX_VALUE; private static final String AUTO_OFFSET_RESET = "earliest"; - private static final Duration THREAD_WAITING_TIME = Duration.ofSeconds(1); + static final Duration THREAD_WAITING_TIME = Duration.ofSeconds(5); private static final Duration MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4); private static final Duration BUFFER_DEFAULT_TIMEOUT = Duration.ofSeconds(5); private static final Duration MAX_RETRY_DELAY = Duration.ofSeconds(1); @@ -30,7 +30,7 @@ public class TopicConfig { private static final Long FETCH_MIN_BYTES = 1L; private static final Duration RETRY_BACKOFF = Duration.ofSeconds(100); private static final Duration MAX_POLL_INTERVAL = Duration.ofSeconds(300000); - private static final Long CONSUMER_MAX_POLL_RECORDS = 500L; + private static final Integer CONSUMER_MAX_POLL_RECORDS = 500; private static final Integer NUM_OF_WORKERS = 10; private static final Duration HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(3); @@ -54,10 +54,10 @@ public class TopicConfig { @Size(min = 1) private Duration maxRetryDelay = MAX_RETRY_DELAY; - @JsonProperty("autocommit") - private String autoCommit = AUTO_COMMIT; + @JsonProperty("auto_commit") + private Boolean autoCommit = false; - @JsonProperty("autocommit_interval") + @JsonProperty("auto_commit_interval") @Size(min = 1) private Duration autoCommitInterval = AUTOCOMMIT_INTERVAL; @@ -100,7 +100,7 @@ public class TopicConfig { private Duration maxPollInterval = MAX_POLL_INTERVAL; @JsonProperty("consumer_max_poll_records") - private Long consumerMaxPollRecords = CONSUMER_MAX_POLL_RECORDS; + private Integer consumerMaxPollRecords = CONSUMER_MAX_POLL_RECORDS; @JsonProperty("heart_beat_interval") @Size(min = 1) @@ -118,7 +118,7 @@ public void setMaxRetryAttempts(Integer maxRetryAttempts) { this.maxRetryAttempts = maxRetryAttempts; } - public String getAutoCommit() { + public Boolean getAutoCommit() { return autoCommit; } @@ -186,7 +186,7 @@ public void setFetchMaxBytes(Long fetchMaxBytes) { this.fetchMaxBytes = fetchMaxBytes; } - public void setAutoCommit(String autoCommit) { + public void setAutoCommit(Boolean autoCommit) { this.autoCommit = autoCommit; } @@ -222,11 +222,11 @@ public void setMaxPollInterval(Duration maxPollInterval) { this.maxPollInterval = maxPollInterval; } - public Long getConsumerMaxPollRecords() { + public Integer getConsumerMaxPollRecords() { return consumerMaxPollRecords; } - public void setConsumerMaxPollRecords(Long consumerMaxPollRecords) { + public void setConsumerMaxPollRecords(Integer consumerMaxPollRecords) { this.consumerMaxPollRecords = consumerMaxPollRecords; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index f409193d1f..e5129e030e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -4,6 +4,9 @@ */ package org.opensearch.dataprepper.plugins.kafka.consumer; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -11,12 +14,13 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.common.TopicPartition; +import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.opensearch.dataprepper.plugins.kafka.source.KafkaSourceBufferAccumulator; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,84 +32,119 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; /** * * A utility class which will handle the core Kafka consumer operation. */ -public class KafkaSourceCustomConsumer implements ConsumerRebalanceListener { +public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceListener { private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceCustomConsumer.class); - private Map offsetsToCommit = new HashMap<>(); - private long lastReadOffset = 0L; - private volatile long lastCommitTime = System.currentTimeMillis(); - private KafkaConsumer consumer= null; - private AtomicBoolean status = new AtomicBoolean(false); - private Buffer> buffer= null; - private TopicConfig topicConfig = null; - private KafkaSourceConfig kafkaSourceConfig= null; + private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L; + private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1; + private volatile long lastCommitTime; + private KafkaConsumer consumer= null; + private AtomicBoolean shutdownInProgress; + private final String topicName; + private final TopicConfig topicConfig; private PluginMetrics pluginMetrics= null; - private String schemaType= null; + private MessageFormat schema; + private final BufferAccumulator> bufferAccumulator; + private final Buffer> buffer; + private static final ObjectMapper objectMapper = new ObjectMapper(); + private final JsonFactory jsonFactory = new JsonFactory(); + private Map offsetsToCommit; - public KafkaSourceCustomConsumer() { - } - - private KafkaSourceBufferAccumulator kafkaSourceBufferAccumulator= null; - public KafkaSourceCustomConsumer(KafkaConsumer consumer, - AtomicBoolean status, - Buffer> buffer, - TopicConfig topicConfig, - KafkaSourceConfig kafkaSourceConfig, - String schemaType, - PluginMetrics pluginMetrics) { + public KafkaSourceCustomConsumer(final KafkaConsumer consumer, + final AtomicBoolean shutdownInProgress, + final Buffer> buffer, + final TopicConfig topicConfig, + final String schemaType, + final PluginMetrics pluginMetrics) { + this.topicName = topicConfig.getName(); + this.topicConfig = topicConfig; + this.shutdownInProgress = shutdownInProgress; this.consumer = consumer; - this.status = status; this.buffer = buffer; - this.topicConfig = topicConfig; - this.kafkaSourceConfig = kafkaSourceConfig; - this.schemaType = schemaType; + this.offsetsToCommit = new HashMap<>(); this.pluginMetrics = pluginMetrics; - kafkaSourceBufferAccumulator= new KafkaSourceBufferAccumulator(topicConfig, kafkaSourceConfig, - schemaType, pluginMetrics); + schema = MessageFormat.getByMessageFormatByName(schemaType); + Duration bufferTimeout = Duration.ofSeconds(1); + bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, bufferTimeout); + lastCommitTime = System.currentTimeMillis(); } + public void consumeRecords() throws Exception { + ConsumerRecords records = + consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2); + if (!records.isEmpty() && records.count() > 0) { + Map offsets = iterateRecordPartitions(records); + offsets.forEach((partition, offset) -> + offsetsToCommit.put(partition, offset)); + } + } + + Map getOffsetsToCommit() { + return offsetsToCommit; + } - @SuppressWarnings({"rawtypes", "unchecked"}) - public void consumeRecords() { + @Override + public void run() { try { - consumer.subscribe(Arrays.asList(topicConfig.getName())); - do { - offsetsToCommit.clear(); - ConsumerRecords records = poll(consumer); - if (!records.isEmpty() && records.count() > 0) { - iterateRecordPartitions(records); + consumer.subscribe(Arrays.asList(topicName)); + while (!shutdownInProgress.get()) { + consumeRecords(); + long currentTimeMillis = System.currentTimeMillis(); + if (!topicConfig.getAutoCommit() && !offsetsToCommit.isEmpty() && + (currentTimeMillis - lastCommitTime) >= COMMIT_OFFSET_INTERVAL_MS) { + try { + consumer.commitSync(offsetsToCommit); + offsetsToCommit.clear(); + lastCommitTime = currentTimeMillis; + } catch (CommitFailedException e) { + LOG.error("Failed to commit offsets in topic "+topicName); + } } - }while (!status.get()); + } } catch (Exception exp) { LOG.error("Error while reading the records from the topic...", exp); } } - private void iterateRecordPartitions(ConsumerRecords records) throws Exception { - for (TopicPartition partition : records.partitions()) { - List> kafkaRecords = new ArrayList<>(); - List> partitionRecords = records.records(partition); - iterateConsumerRecords(kafkaRecords, partitionRecords); - if (!kafkaRecords.isEmpty()) { - kafkaSourceBufferAccumulator.writeAllRecordToBuffer(kafkaRecords, buffer, topicConfig); + private Record getRecord(ConsumerRecord consumerRecord) { + Map data = new HashMap<>(); + Event event; + if (schema == MessageFormat.JSON || schema == MessageFormat.AVRO) { + Map message = new HashMap<>(); + try { + final JsonParser jsonParser = jsonFactory.createParser((String)consumerRecord.value().toString()); + message = objectMapper.readValue(jsonParser, Map.class); + } catch (Exception e){ + LOG.error("Failed to parse JSON or AVRO record"); + return null; } + data.put(consumerRecord.key(), message); + } else { + data.put(consumerRecord.key(), (String)consumerRecord.value()); } - if (!offsetsToCommit.isEmpty() && topicConfig.getAutoCommit().equalsIgnoreCase("false")) { - lastCommitTime = kafkaSourceBufferAccumulator.commitOffsets(consumer, lastCommitTime, offsetsToCommit); - } + event = JacksonLog.builder().withData(data).build(); + return new Record(event); } - private void iterateConsumerRecords(List> kafkaRecords, List> partitionRecords) { - for (ConsumerRecord consumerRecord : partitionRecords) { - lastReadOffset = kafkaSourceBufferAccumulator.processConsumerRecords(offsetsToCommit, kafkaRecords, lastReadOffset, consumerRecord, partitionRecords); + private Map iterateRecordPartitions(ConsumerRecords records) throws Exception { + Map offsets = new HashMap<>(); + for (TopicPartition topicPartition : records.partitions()) { + List> kafkaRecords = new ArrayList<>(); + List> partitionRecords = records.records(topicPartition); + for (ConsumerRecord consumerRecord : partitionRecords) { + Record record = getRecord(consumerRecord); + if (record != null) { + bufferAccumulator.add(record); + } + } + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + offsets.put(topicPartition, new OffsetAndMetadata(lastOffset + 1)); } - } - - private ConsumerRecords poll(final KafkaConsumer consumer) { - return consumer.poll(Duration.ofMillis(1)); + return offsets; } public void closeConsumer(){ @@ -117,17 +156,13 @@ public void shutdownConsumer(){ } @Override public void onPartitionsAssigned(Collection partitions) { - for (TopicPartition partition : partitions) { - consumer.seek(partition, lastReadOffset); + for (TopicPartition topicPartition : partitions) { + Long committedOffset = consumer.committed(topicPartition).offset(); + consumer.seek(topicPartition, committedOffset); } } @Override public void onPartitionsRevoked(Collection partitions) { - try { - consumer.commitSync(offsetsToCommit); - } catch (CommitFailedException e) { - LOG.error("Failed to commit the record for the Json consumer...", e); - } } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/MultithreadedConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/MultithreadedConsumer.java deleted file mode 100644 index cff5286a91..0000000000 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/MultithreadedConsumer.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.kafka.consumer; - -import com.fasterxml.jackson.databind.JsonNode; -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.errors.WakeupException; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.LocalDateTime; -import java.util.Objects; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * * A Multithreaded helper class which helps to process the records from multiple topics in an - * asynchronous way. - */ -@SuppressWarnings("deprecation") -public class MultithreadedConsumer implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(MultithreadedConsumer.class); - private final AtomicBoolean status = new AtomicBoolean(false); - private final KafkaSourceConfig sourceConfig; - private final TopicConfig topicConfig; - private final Buffer> buffer; - private final KafkaSourceCustomConsumer customConsumer = new KafkaSourceCustomConsumer(); - private String consumerId; - private String consumerGroupId; - private String schemaType; - private Properties consumerProperties; - private PluginMetrics pluginMetrics; - - public MultithreadedConsumer(String consumerId, - String consumerGroupId, - Properties properties, - TopicConfig topicConfig, - KafkaSourceConfig sourceConfig, - Buffer> buffer, - PluginMetrics pluginMetric, - String schemaType) { - this.consumerProperties = Objects.requireNonNull(properties); - this.consumerId = consumerId; - this.consumerGroupId = consumerGroupId; - this.sourceConfig = sourceConfig; - this.topicConfig = topicConfig; - this.buffer = buffer; - this.schemaType = schemaType; - this.pluginMetrics = pluginMetric; - } - - @SuppressWarnings({"unchecked"}) - @Override - public void run() { - LOG.info("Consumer group : {} and Consumer : {} executed on : {}", consumerGroupId, consumerId, LocalDateTime.now()); - try { - MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType); - switch (schema) { - case JSON: - new KafkaSourceCustomConsumer(new KafkaConsumer(consumerProperties), status, buffer, topicConfig, sourceConfig, schemaType, pluginMetrics).consumeRecords(); - break; - case AVRO: - new KafkaSourceCustomConsumer(new KafkaConsumer(consumerProperties), status, buffer, topicConfig, sourceConfig, schemaType, pluginMetrics).consumeRecords(); - break; - case PLAINTEXT: - default: - new KafkaSourceCustomConsumer(new KafkaConsumer(consumerProperties), status, buffer, topicConfig, sourceConfig, schemaType, pluginMetrics).consumeRecords(); - break; - } - - } catch (Exception exp) { - if (exp.getCause() instanceof WakeupException && !status.get()) { - LOG.error("Error reading records from the topic...{}", exp.getMessage()); - } - } finally { - LOG.info("Closing the consumer... {}", consumerId); - closeConsumers(); - } - } - - private void closeConsumers() { - customConsumer.closeConsumer(); - } - - public void shutdownConsumer() { - status.set(false); - customConsumer.shutdownConsumer(); - } -} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 51b7362eee..9a1d96023f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.kafka.source; +import org.apache.avro.generic.GenericRecord; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; @@ -12,16 +13,18 @@ import io.micrometer.core.instrument.Counter; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.opensearch.dataprepper.plugins.kafka.consumer.MultithreadedConsumer; +import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaSourceCustomConsumer; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import org.slf4j.Logger; @@ -43,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; +import java.util.concurrent.atomic.AtomicBoolean; /** * The starting point of the Kafka-source plugin and the Kafka consumer * properties and kafka multithreaded consumers are being handled here. @@ -50,18 +54,17 @@ @SuppressWarnings("deprecation") @DataPrepperPlugin(name = "kafka", pluginType = Source.class, pluginConfigurationType = KafkaSourceConfig.class) -public class KafkaSource implements Source> { +public class KafkaSource implements Source> { private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); private final KafkaSourceConfig sourceConfig; + private AtomicBoolean shutdownInProgress; private ExecutorService executorService; private static final String KAFKA_WORKER_THREAD_PROCESSING_ERRORS = "kafkaWorkerThreadProcessingErrors"; private final Counter kafkaWorkerThreadProcessingErrors; private final PluginMetrics pluginMetrics; - private String consumerGroupID; - private MultithreadedConsumer multithreadedConsumer; - private int totalWorkers; + private KafkaSourceCustomConsumer consumer; private String pipelineName; - private static String schemaType = MessageFormat.PLAINTEXT.toString(); + private String schemaType = MessageFormat.PLAINTEXT.toString(); private static final String SCHEMA_TYPE= "schemaType"; @DataPrepperPluginConstructor @@ -71,37 +74,52 @@ public KafkaSource(final KafkaSourceConfig sourceConfig, final PluginMetrics plu this.pluginMetrics = pluginMetrics; this.pipelineName = pipelineDescription.getPipelineName(); this.kafkaWorkerThreadProcessingErrors = pluginMetrics.counter(KAFKA_WORKER_THREAD_PROCESSING_ERRORS); + shutdownInProgress = new AtomicBoolean(false); } @Override - public void start(Buffer> buffer) { + public void start(Buffer> buffer) { sourceConfig.getTopics().forEach(topic -> { - totalWorkers = 0; + Properties consumerProperties = getConsumerProperties(topic); + MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType); + KafkaConsumer kafkaConsumer; + switch (schema) { + case JSON: + kafkaConsumer = new KafkaConsumer(consumerProperties); + break; + case AVRO: + kafkaConsumer = new KafkaConsumer(consumerProperties); + break; + case PLAINTEXT: + default: + kafkaConsumer = new KafkaConsumer(consumerProperties); + break; + } + try { - Properties consumerProperties = getConsumerProperties(topic); - totalWorkers = topic.getWorkers(); - consumerGroupID = getGroupId(topic.getName()); - executorService = Executors.newFixedThreadPool(totalWorkers); - IntStream.range(0, totalWorkers + 1).forEach(index -> { - String consumerId = consumerGroupID + "::" + Integer.toString(index + 1); - multithreadedConsumer = new MultithreadedConsumer(consumerId, - consumerGroupID, consumerProperties, topic, sourceConfig, buffer, pluginMetrics, schemaType); - executorService.submit(multithreadedConsumer); + int numWorkers = topic.getWorkers(); + executorService = Executors.newFixedThreadPool(numWorkers); + IntStream.range(0, numWorkers + 1).forEach(index -> { + consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, topic, schemaType, pluginMetrics); + + executorService.submit(consumer); }); } catch (Exception e) { LOG.error("Failed to setup the Kafka Source Plugin.", e); throw new RuntimeException(); } + LOG.info("Started Kafka source for topic " + topic.getName()); }); } @Override public void stop() { LOG.info("Shutting down Consumers..."); + shutdownInProgress.set(true); executorService.shutdown(); try { if (!executorService.awaitTermination( - calculateLongestThreadWaitingTime(), TimeUnit.MILLISECONDS)) { + calculateLongestThreadWaitingTime(), TimeUnit.SECONDS)) { LOG.info("Consumer threads are waiting for shutting down..."); executorService.shutdownNow(); } @@ -115,10 +133,6 @@ public void stop() { LOG.info("Consumer shutdown successfully..."); } - private String getGroupId(String name) { - return pipelineName + "::" + name; - } - private long calculateLongestThreadWaitingTime() { List topicsList = sourceConfig.getTopics(); return topicsList.stream(). @@ -138,8 +152,12 @@ private Properties getConsumerProperties(TopicConfig topicConfig) { properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, sourceConfig.getBootStrapServers()); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, topicConfig.getAutoCommit()); + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + topicConfig.getConsumerMaxPollRecords()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId()); - schemaType = getSchemaType(sourceConfig.getSchemaConfig().getRegistryURL(), topicConfig.getName(), sourceConfig.getSchemaConfig().getVersion()); + if (sourceConfig.getSchemaConfig() != null) { + schemaType = getSchemaType(sourceConfig.getSchemaConfig().getRegistryURL(), topicConfig.getName(), sourceConfig.getSchemaConfig().getVersion()); + } if (schemaType.isEmpty()) { schemaType = MessageFormat.PLAINTEXT.toString(); } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java index 3e60bb2771..6c26666adf 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java @@ -69,11 +69,11 @@ void testConfigValues_default() { assertEquals("my-topic-2", topicConfig.getName()); assertEquals("DPKafkaProj-2", topicConfig.getGroupId()); assertEquals("kafka-consumer-group-2", topicConfig.getGroupName()); - assertEquals("false", topicConfig.getAutoCommit()); + assertEquals(false, topicConfig.getAutoCommit()); assertEquals(Duration.ofSeconds(5), topicConfig.getAutoCommitInterval()); assertEquals(Duration.ofSeconds(45), topicConfig.getSessionTimeOut()); assertEquals("earliest", topicConfig.getAutoOffsetReset()); - assertEquals(Duration.ofSeconds(1), topicConfig.getThreadWaitingTime()); + assertEquals(TopicConfig.THREAD_WAITING_TIME, topicConfig.getThreadWaitingTime()); assertEquals(Duration.ofSeconds(4), topicConfig.getMaxRecordFetchTime()); assertEquals(Duration.ofSeconds(5), topicConfig.getBufferDefaultTimeout()); assertEquals(52428800L, topicConfig.getFetchMaxBytes().longValue()); @@ -93,7 +93,7 @@ void testConfigValues_from_yaml() { assertEquals("my-topic-1", topicConfig.getName()); assertEquals("DPKafkaProj-2", topicConfig.getGroupId()); assertEquals("kafka-consumer-group-2", topicConfig.getGroupName()); - assertEquals("false", topicConfig.getAutoCommit()); + assertEquals(false, topicConfig.getAutoCommit()); assertEquals(Duration.ofSeconds(5), topicConfig.getAutoCommitInterval()); assertEquals(Duration.ofSeconds(45), topicConfig.getSessionTimeOut()); assertEquals("earliest", topicConfig.getAutoOffsetReset()); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java index 8567658004..22172aece5 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java @@ -1,99 +1,101 @@ package org.opensearch.dataprepper.plugins.kafka.consumer; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mock; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.yaml.snakeyaml.Yaml; -import java.io.FileReader; -import java.io.IOException; -import java.io.Reader; -import java.io.StringReader; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.MatcherAssert.assertThat; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.mockito.Mock; + import java.util.Map; import java.util.HashMap; import java.util.Arrays; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.LinkedHashMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.doCallRealMethod; - +import java.time.Duration; +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) public class KafkaSourceCustomConsumerTest { + @Mock private KafkaConsumer kafkaConsumer; private AtomicBoolean status; - private Buffer> buffer; + private Buffer> buffer; @Mock private KafkaSourceConfig sourceConfig; + @Mock private TopicConfig topicConfig; @Mock private PluginMetrics pluginMetrics; - private String schemaType; - private KafkaSourceCustomConsumer consumer; - private final String TEST_PIPELINE_NAME = "test_pipeline"; - - private Map>> records = new LinkedHashMap>>(); + private ConsumerRecords consumerRecords; + private final String TEST_PIPELINE_NAME = "test_pipeline"; + private AtomicBoolean shutdownInProgress; + private final String testKey1 = "testkey1"; + private final String testKey2 = "testkey2"; + private final String testValue1 = "testValue1"; + private final String testValue2 = "testValue2"; + private final Map testMap1 = Map.of("key1", "value1", "key2", 2); + private final Map testMap2 = Map.of("key3", "value3", "key4", false); + private final String testJsonValue1 = "{ \"key1\": \"value1\", \"key2\": 2}"; + private final String testJsonValue2 = "{ \"key3\": \"value3\", \"key4\": false}"; + private final int testPartition = 0; + private final int testJsonPartition = 1; @BeforeEach - public void setUp() throws IOException { - Yaml yaml = new Yaml(); - FileReader fileReader = new FileReader(getClass().getClassLoader().getResource("sample-pipelines.yaml").getFile()); - Object data = yaml.load(fileReader); - if (data instanceof Map) { - Map propertyMap = (Map) data; - Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); - Map sourceMap = (Map) logPipelineMap.get("source"); - Map kafkaConfigMap = (Map) sourceMap.get("kafka"); - ObjectMapper mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - String json = mapper.writeValueAsString(kafkaConfigMap); - Reader reader = new StringReader(json); - sourceConfig = mapper.readValue(reader, KafkaSourceConfig.class); - topicConfig = sourceConfig.getTopics().get(0); - } + public void setUp() { + kafkaConsumer = mock(KafkaConsumer.class); pluginMetrics = mock(PluginMetrics.class); + topicConfig = mock(TopicConfig.class); + when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); + when(topicConfig.getAutoCommit()).thenReturn(false); + sourceConfig = mock(KafkaSourceConfig.class); buffer = getBuffer(); - status = new AtomicBoolean(true); - kafkaConsumer = mock(KafkaConsumer.class); - schemaType = "plaintext"; - consumer = new KafkaSourceCustomConsumer(kafkaConsumer, status, buffer, topicConfig, sourceConfig, schemaType, pluginMetrics); + shutdownInProgress = new AtomicBoolean(false); + when(topicConfig.getName()).thenReturn("topic1"); } - private BlockingBuffer> getBuffer() { + public KafkaSourceCustomConsumer createObjectUnderTest(String schemaType) { + return new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, topicConfig, schemaType, pluginMetrics); + } + + private BlockingBuffer> getBuffer() { final HashMap integerHashMap = new HashMap<>(); integerHashMap.put("buffer_size", 10); integerHashMap.put("batch_size", 10); @@ -103,55 +105,86 @@ private BlockingBuffer> getBuffer() { } @Test - public void testConsumeRecords() throws InterruptedException { - + public void testPlainTextConsumeRecords() throws InterruptedException { String topic = topicConfig.getName(); - - Thread producerThread = new Thread(() -> { - setTopicData(topic); + consumerRecords = createPlainTextRecords(topic); + when(kafkaConsumer.poll(anyLong())).thenReturn(consumerRecords); + consumer = createObjectUnderTest("plaintext"); + + try { + consumer.consumeRecords(); + } catch (Exception e){} + final Map.Entry>, CheckpointState> bufferRecords = buffer.read(1000); + ArrayList> bufferedRecords = new ArrayList<>(bufferRecords.getKey()); + Assertions.assertEquals(consumerRecords.count(), bufferedRecords.size()); + Map offsetsToCommit = consumer.getOffsetsToCommit(); + offsetsToCommit.forEach((topicPartition, offsetAndMetadata) -> { + Assertions.assertEquals(topicPartition.partition(), testPartition); + Assertions.assertEquals(topicPartition.topic(), topic); + Assertions.assertEquals(offsetAndMetadata.offset(), 2L); }); - producerThread.start(); - TimeUnit.SECONDS.sleep(1); - ConsumerRecords consumerRecords = new ConsumerRecords(records); - when(kafkaConsumer.poll(any())).thenReturn(consumerRecords); - KafkaSourceCustomConsumer spyConsumer = spy(consumer); - spyConsumer.consumeRecords(); - verify(spyConsumer).consumeRecords(); - final Map.Entry>, CheckpointState> bufferRecords = buffer.read(1000); - Assertions.assertEquals(2, new ArrayList<>(bufferRecords.getKey()).size()); - } - private void setTopicData(String topic) { - ConsumerRecord record1 = new ConsumerRecord<>(topic, 0, 0L, "mykey-1", "myvalue-1"); - ConsumerRecord record2 = new ConsumerRecord<>(topic, 0, 0L, "mykey-2", "myvalue-2"); - records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); + for (Record record: bufferedRecords) { + Event event = record.getData(); + String value1 = event.get(testKey1, String.class); + String value2 = event.get(testKey2, String.class); + assertTrue(value1 != null || value2 != null); + if (value1 != null) { + Assertions.assertEquals(value1, testValue1); + } + if (value2 != null) { + Assertions.assertEquals(value2, testValue2); + } + } } @Test - void testOnPartitionsRevoked() { - KafkaSourceCustomConsumer spyConsumer = spy(consumer); - setTopicData(topicConfig.getName()); - final List topicPartitions = records.keySet().stream().collect(Collectors.toList()); - spyConsumer.onPartitionsRevoked(topicPartitions); - verify(spyConsumer).onPartitionsRevoked(topicPartitions); + public void testJsonConsumeRecords() throws InterruptedException, Exception { + String topic = topicConfig.getName(); + consumerRecords = createJsonRecords(topic); + when(kafkaConsumer.poll(anyLong())).thenReturn(consumerRecords); + consumer = createObjectUnderTest("json"); + + consumer.consumeRecords(); + final Map.Entry>, CheckpointState> bufferRecords = buffer.read(1000); + ArrayList> bufferedRecords = new ArrayList<>(bufferRecords.getKey()); + Assertions.assertEquals(consumerRecords.count(), bufferedRecords.size()); + Map offsetsToCommit = consumer.getOffsetsToCommit(); + offsetsToCommit.forEach((topicPartition, offsetAndMetadata) -> { + Assertions.assertEquals(topicPartition.partition(), testJsonPartition); + Assertions.assertEquals(topicPartition.topic(), topic); + Assertions.assertEquals(offsetAndMetadata.offset(), 102L); + }); + + for (Record record: bufferedRecords) { + Event event = record.getData(); + Map value1 = event.get(testKey1, Map.class); + Map value2 = event.get(testKey2, Map.class); + assertTrue(value1 != null || value2 != null); + if (value1 != null) { + testMap1.forEach((k, v) -> assertThat(value1, hasEntry(k,v))); + } + if (value2 != null) { + testMap2.forEach((k, v) -> assertThat(value2, hasEntry(k,v))); + } + } } - @Test - void testOnPartitionsAssigned() { - //Map>> records = new LinkedHashMap<>(); - ConsumerRecord record1 = new ConsumerRecord<>("my-topic-1", 0, 0L, "mykey-1", "myvalue-1"); - ConsumerRecord record2 = new ConsumerRecord<>("my-topic-1", 0, 0L, "mykey-2", "myvalue-2"); - //records.put(new TopicPartition("my-topic-1", 1), Arrays.asList(record1, record2)); - TopicPartition partition = new TopicPartition("my-topic-1", 1); - //records.put(partition, Arrays.asList(record1, record2)); - - KafkaSourceCustomConsumer spyConsumer = spy(consumer); - doCallRealMethod().when(spyConsumer).onPartitionsAssigned(Arrays.asList(partition)); - - spyConsumer.onPartitionsAssigned(Arrays.asList(partition)); - verify(spyConsumer).onPartitionsAssigned(Arrays.asList(partition)); - - /*spyConsumer.onPartitionsRevoked(anyList()); - verify(spyConsumer).onPartitionsRevoked(anyList());*/ + private ConsumerRecords createPlainTextRecords(String topic) { + Map> records = new HashMap<>(); + ConsumerRecord record1 = new ConsumerRecord<>(topic, testPartition, 0L, testKey1, testValue1); + ConsumerRecord record2 = new ConsumerRecord<>(topic, testPartition, 1L, testKey2, testValue2); + records.put(new TopicPartition(topic, testPartition), Arrays.asList(record1, record2)); + return new ConsumerRecords(records); + } + + private ConsumerRecords createJsonRecords(String topic) throws Exception { + final ObjectMapper mapper = new ObjectMapper(); + Map> records = new HashMap<>(); + ConsumerRecord record1 = new ConsumerRecord<>(topic, testJsonPartition, 100L, testKey1, mapper.convertValue(testMap1, JsonNode.class)); + ConsumerRecord record2 = new ConsumerRecord<>(topic, testJsonPartition, 101L, testKey2, mapper.convertValue(testMap2, JsonNode.class)); + records.put(new TopicPartition(topic, testJsonPartition), Arrays.asList(record1, record2)); + return new ConsumerRecords(records); } + } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/MultithreadedConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/MultithreadedConsumerTest.java deleted file mode 100644 index da66a26531..0000000000 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/MultithreadedConsumerTest.java +++ /dev/null @@ -1,126 +0,0 @@ -package org.opensearch.dataprepper.plugins.kafka.consumer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mock; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.yaml.snakeyaml.Yaml; - -import java.io.FileReader; -import java.io.IOException; -import java.io.Reader; -import java.io.StringReader; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Arrays; -import java.util.Properties; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.doCallRealMethod; - - -class MultithreadedConsumerTest { - - @Mock - MultithreadedConsumer multithreadedConsumer; - @Mock - Properties properties; - @Mock - KafkaSourceConfig sourceConfig; - @Mock - TopicConfig topicConfig; - @Mock - Buffer> buffer; - @Mock - PluginMetrics pluginMetrics; - private static final String BOOTSTRAP_SERVERS = "localhost:9092"; - @BeforeEach - void setUp()throws IOException { - Yaml yaml = new Yaml(); - FileReader fileReader = new FileReader(getClass().getClassLoader().getResource("sample-pipelines.yaml").getFile()); - Object data = yaml.load(fileReader); - if (data instanceof Map) { - Map propertyMap = (Map) data; - Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); - Map sourceMap = (Map) logPipelineMap.get("source"); - Map kafkaConfigMap = (Map) sourceMap.get("kafka"); - ObjectMapper mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - String json = mapper.writeValueAsString(kafkaConfigMap); - Reader reader = new StringReader(json); - sourceConfig = mapper.readValue(reader, KafkaSourceConfig.class); - topicConfig = sourceConfig.getTopics().get(0); - } - pluginMetrics = mock(PluginMetrics.class); - buffer = mock(Buffer.class); - } - - private MultithreadedConsumer createObjectUnderTest(String consumerId, - String consumerGroupId, - String schema){ - properties = new Properties(); - properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - return new MultithreadedConsumer(consumerId, - consumerGroupId, - properties, - topicConfig, - sourceConfig, - buffer, - pluginMetrics, - schema); - } - - // @ParameterizedTest - // @ValueSource(strings = "plaintext") - @Test - void testRunWithPlainText() throws InterruptedException { - String topic = topicConfig.getName(); - //schemaType = "plaintext"; - Map>> records = new LinkedHashMap<>(); - Thread producerThread = new Thread(() -> { - ConsumerRecord record1 = new ConsumerRecord<>(topic, 0, 0L, "mykey-1", "myvalue-1"); - ConsumerRecord record2 = new ConsumerRecord<>(topic, 0, 0L, "mykey-2", "myvalue-2"); - records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); - }); - producerThread.start(); - TimeUnit.SECONDS.sleep(1); - producerThread.join(); - multithreadedConsumer = createObjectUnderTest("DPKafkaProj-1", - "DPKafkaProj-1","plaintext"); - MultithreadedConsumer spySource = spy(multithreadedConsumer); - doCallRealMethod().when(spySource).run(); - // spySource.run(); - //verify(spySource).run(); - } - - - /* @ParameterizedTest - @ValueSource(strings = {"plaintext", "json", "avro"}) - @Test - void testRunWithParameters(String schemaType) { - multithreadedConsumer = createObjectUnderTest("DPKafkaProj-1", - "DPKafkaProj-1", - schemaType); - MultithreadedConsumer spySource = spy(multithreadedConsumer); - doCallRealMethod().when(spySource).run(); - spySource.run(); - verify(spySource).run(); - }*/ - -} \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java index ec8d15e9b1..ba08ec5dbd 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java @@ -5,11 +5,16 @@ package org.opensearch.dataprepper.plugins.kafka.source; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -18,37 +23,17 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.configuration.PipelineDescription; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; -import org.opensearch.dataprepper.plugins.kafka.consumer.MultithreadedConsumer; -import org.springframework.test.util.ReflectionTestUtils; -import org.yaml.snakeyaml.Yaml; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; -import java.io.FileReader; -import java.io.Reader; -import java.io.StringReader; -import java.util.ArrayList; +import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ExecutorService; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.verify; - +import java.time.Duration; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) class KafkaSourceTest { - @Mock - private KafkaSource source; + private KafkaSource kafkaSource; @Mock private KafkaSourceConfig sourceConfig; @@ -56,9 +41,6 @@ class KafkaSourceTest { @Mock private PluginMetrics pluginMetrics; - @Mock - private ExecutorService executorService; - @Mock private SchemaConfig schemaConfig; @@ -67,63 +49,59 @@ class KafkaSourceTest { @Mock private PipelineDescription pipelineDescription; @Mock - OAuthConfig oAuthConfig; - @Mock PlainTextAuthConfig plainTextAuthConfig; + @Mock + TopicConfig topic1, topic2; + @Mock + private Buffer> buffer; + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String TOPIC = "my-topic"; + private static final String TEST_GROUP_ID = "testGroupId"; + + + public KafkaSource createObjectUnderTest() { + return new KafkaSource(sourceConfig, pluginMetrics, pipelineDescription); + } @BeforeEach void setUp() throws Exception { - Yaml yaml = new Yaml(); - FileReader fileReader = new FileReader(getClass().getClassLoader().getResource("sample-pipelines.yaml").getFile()); - Object data = yaml.load(fileReader); - if (data instanceof Map) { - Map propertyMap = (Map) data; - Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); - Map sourceMap = (Map) logPipelineMap.get("source"); - Map kafkaConfigMap = (Map) sourceMap.get("kafka"); - ObjectMapper mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - String json = mapper.writeValueAsString(kafkaConfigMap); - Reader reader = new StringReader(json); - sourceConfig = mapper.readValue(reader, KafkaSourceConfig.class); - topicConfig = sourceConfig.getTopics().get(0); - oAuthConfig = sourceConfig.getAuthConfig().getoAuthConfig(); - plainTextAuthConfig = sourceConfig.getAuthConfig().getPlainTextAuthConfig(); - schemaConfig = sourceConfig.getSchemaConfig(); - } + sourceConfig = mock(KafkaSourceConfig.class); + pipelineDescription = mock(PipelineDescription.class); + pluginMetrics = mock(PluginMetrics.class); + when(topic1.getName()).thenReturn("topic1"); + when(topic2.getName()).thenReturn("topic2"); + when(topic1.getWorkers()).thenReturn(2); + when(topic2.getWorkers()).thenReturn(3); + when(topic1.getAutoCommitInterval()).thenReturn(Duration.ofSeconds(1)); + when(topic2.getAutoCommitInterval()).thenReturn(Duration.ofSeconds(1)); + when(topic1.getAutoOffsetReset()).thenReturn("earliest"); + when(topic2.getAutoOffsetReset()).thenReturn("earliest"); + when(topic1.getConsumerMaxPollRecords()).thenReturn(1); + when(topic2.getConsumerMaxPollRecords()).thenReturn(1); + when(topic1.getGroupId()).thenReturn(TEST_GROUP_ID); + when(topic2.getGroupId()).thenReturn(TEST_GROUP_ID); + when(topic1.getAutoCommit()).thenReturn(false); + when(topic2.getAutoCommit()).thenReturn(false); + when(topic1.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(10)); + when(topic2.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(10)); + when(sourceConfig.getBootStrapServers()).thenReturn(List.of("http://localhost:1234")); + when(sourceConfig.getTopics()).thenReturn(Arrays.asList(topic1, topic2)); } @Test - void test_kafkaSource_start_execution_catch_block() { - source = new KafkaSource(null, pluginMetrics, pipelineDescription); - KafkaSource spySource = spy(source); - Assertions.assertThrows(Exception.class, () -> spySource.start(any())); + void test_kafkaSource_start_stop() { + kafkaSource = createObjectUnderTest(); + kafkaSource.start(buffer); + try { + Thread.sleep(10); + } catch (Exception e){} + kafkaSource.stop(); } @Test - void test_kafkaSource_stop_execution() throws Exception { - List consumers = buildKafkaSourceConsumer(); - source = new KafkaSource(sourceConfig, pluginMetrics,pipelineDescription); - KafkaSource spySource = spy(source); - ReflectionTestUtils.setField(spySource, "executorService", executorService); - doCallRealMethod().when(spySource).stop(); - spySource.stop(); - verify(spySource).stop(); - } - - private List buildKafkaSourceConsumer() { - List consumers = new ArrayList<>(); - Properties prop = new Properties(); - prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); - prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - MultithreadedConsumer kafkaSourceConsumer = new MultithreadedConsumer( - topicConfig.getGroupId(), - topicConfig.getGroupId(), - prop, null,sourceConfig, null, pluginMetrics,null); - consumers.add(kafkaSourceConsumer); - return consumers; + void test_kafkaSource_start_execution_catch_block() { + kafkaSource = createObjectUnderTest(); + Assertions.assertThrows(Exception.class, () -> kafkaSource.start(null)); } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml index 2c398a99dc..efd860a8a9 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml @@ -8,8 +8,8 @@ log-pipeline: group_name: kafka-consumer-group-2 group_id: DPKafkaProj-2 workers: 10 #optional and default is 10 - autocommit: false #optional and dafault is false - autocommit_interval: 5 #optional and dafault is 5s + auto_commit: false #optional and dafault is false + auto_commit_interval: 5 #optional and dafault is 5s session_timeout: 45 #optional and dafault is 45s max_retry_attempts: 1000 #optional and dafault is 5 max_retry_delay: 1 #optional and dafault is 5 @@ -48,4 +48,4 @@ log-pipeline: oauth_sasl_login_callback_handler_class: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler oauth_jwks_endpoint_url: https://dev-13650048.okta.com/oauth2/default/v1/keys sink: - - stdout: \ No newline at end of file + - stdout: