Skip to content

Commit

Permalink
Kafka Source code refactoring (#2951)
Browse files Browse the repository at this point in the history
* Kafka Source code refactoring

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Fixes for failing build/tests

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Addressed review comments. Cleaned up code

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
kkondaka and Krishna Kondaka committed Jun 30, 2023
1 parent 0d29418 commit ed7001c
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 499 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ plugins {

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:buffer-common')
implementation 'org.apache.kafka:kafka-clients:3.4.0'
implementation 'org.apache.avro:avro:1.11.0'
implementation 'com.fasterxml.jackson.core:jackson-databind'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
Expand All @@ -49,7 +50,7 @@ public class JSONConsumerIT {
private KafkaSourceConfig kafkaSourceConfig;

private KafkaSource kafkaSource;
private Buffer<Record<Object>> buffer;
private Buffer<Record<Event>> buffer;

@ClassRule
public static final EmbeddedKafkaClusterSingleNode CLUSTER = new EmbeddedKafkaClusterSingleNode();
Expand Down Expand Up @@ -111,4 +112,4 @@ private void produceTestMessages() throws JsonProcessingException {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
Expand All @@ -44,7 +45,7 @@ public class PlainTextConsumerIT {
private KafkaSourceConfig kafkaSourceConfig;

private KafkaSource kafkaSource;
private Buffer<Record<Object>> buffer;
private Buffer<Record<Event>> buffer;

@ClassRule
public static final EmbeddedKafkaClusterSingleNode CLUSTER = new EmbeddedKafkaClusterSingleNode();
Expand Down Expand Up @@ -101,4 +102,4 @@ private void produceTestMessages() {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class TopicConfig {
private static final Duration SESSION_TIMEOUT = Duration.ofSeconds(45);
private static final int MAX_RETRY_ATTEMPT = Integer.MAX_VALUE;
private static final String AUTO_OFFSET_RESET = "earliest";
private static final Duration THREAD_WAITING_TIME = Duration.ofSeconds(1);
static final Duration THREAD_WAITING_TIME = Duration.ofSeconds(5);
private static final Duration MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4);
private static final Duration BUFFER_DEFAULT_TIMEOUT = Duration.ofSeconds(5);
private static final Duration MAX_RETRY_DELAY = Duration.ofSeconds(1);
Expand All @@ -30,7 +30,7 @@ public class TopicConfig {
private static final Long FETCH_MIN_BYTES = 1L;
private static final Duration RETRY_BACKOFF = Duration.ofSeconds(100);
private static final Duration MAX_POLL_INTERVAL = Duration.ofSeconds(300000);
private static final Long CONSUMER_MAX_POLL_RECORDS = 500L;
private static final Integer CONSUMER_MAX_POLL_RECORDS = 500;
private static final Integer NUM_OF_WORKERS = 10;
private static final Duration HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(3);

Expand All @@ -54,10 +54,10 @@ public class TopicConfig {
@Size(min = 1)
private Duration maxRetryDelay = MAX_RETRY_DELAY;

@JsonProperty("autocommit")
private String autoCommit = AUTO_COMMIT;
@JsonProperty("auto_commit")
private Boolean autoCommit = false;

@JsonProperty("autocommit_interval")
@JsonProperty("auto_commit_interval")
@Size(min = 1)
private Duration autoCommitInterval = AUTOCOMMIT_INTERVAL;

Expand Down Expand Up @@ -100,7 +100,7 @@ public class TopicConfig {
private Duration maxPollInterval = MAX_POLL_INTERVAL;

@JsonProperty("consumer_max_poll_records")
private Long consumerMaxPollRecords = CONSUMER_MAX_POLL_RECORDS;
private Integer consumerMaxPollRecords = CONSUMER_MAX_POLL_RECORDS;

@JsonProperty("heart_beat_interval")
@Size(min = 1)
Expand All @@ -118,7 +118,7 @@ public void setMaxRetryAttempts(Integer maxRetryAttempts) {
this.maxRetryAttempts = maxRetryAttempts;
}

public String getAutoCommit() {
public Boolean getAutoCommit() {
return autoCommit;
}

Expand Down Expand Up @@ -186,7 +186,7 @@ public void setFetchMaxBytes(Long fetchMaxBytes) {
this.fetchMaxBytes = fetchMaxBytes;
}

public void setAutoCommit(String autoCommit) {
public void setAutoCommit(Boolean autoCommit) {
this.autoCommit = autoCommit;
}

Expand Down Expand Up @@ -222,11 +222,11 @@ public void setMaxPollInterval(Duration maxPollInterval) {
this.maxPollInterval = maxPollInterval;
}

public Long getConsumerMaxPollRecords() {
public Integer getConsumerMaxPollRecords() {
return consumerMaxPollRecords;
}

public void setConsumerMaxPollRecords(Long consumerMaxPollRecords) {
public void setConsumerMaxPollRecords(Integer consumerMaxPollRecords) {
this.consumerMaxPollRecords = consumerMaxPollRecords;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,23 @@
*/
package org.opensearch.dataprepper.plugins.kafka.consumer;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.common.TopicPartition;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.source.KafkaSourceBufferAccumulator;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,84 +32,119 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

/**
* * A utility class which will handle the core Kafka consumer operation.
*/
public class KafkaSourceCustomConsumer implements ConsumerRebalanceListener {
public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceListener {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceCustomConsumer.class);
private Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
private long lastReadOffset = 0L;
private volatile long lastCommitTime = System.currentTimeMillis();
private KafkaConsumer<String, Object> consumer= null;
private AtomicBoolean status = new AtomicBoolean(false);
private Buffer<Record<Object>> buffer= null;
private TopicConfig topicConfig = null;
private KafkaSourceConfig kafkaSourceConfig= null;
private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L;
private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1;
private volatile long lastCommitTime;
private KafkaConsumer consumer= null;
private AtomicBoolean shutdownInProgress;
private final String topicName;
private final TopicConfig topicConfig;
private PluginMetrics pluginMetrics= null;
private String schemaType= null;
private MessageFormat schema;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final Buffer<Record<Event>> buffer;
private static final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();
private Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;

public KafkaSourceCustomConsumer() {
}

private KafkaSourceBufferAccumulator kafkaSourceBufferAccumulator= null;
public KafkaSourceCustomConsumer(KafkaConsumer consumer,
AtomicBoolean status,
Buffer<Record<Object>> buffer,
TopicConfig topicConfig,
KafkaSourceConfig kafkaSourceConfig,
String schemaType,
PluginMetrics pluginMetrics) {
public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
final AtomicBoolean shutdownInProgress,
final Buffer<Record<Event>> buffer,
final TopicConfig topicConfig,
final String schemaType,
final PluginMetrics pluginMetrics) {
this.topicName = topicConfig.getName();
this.topicConfig = topicConfig;
this.shutdownInProgress = shutdownInProgress;
this.consumer = consumer;
this.status = status;
this.buffer = buffer;
this.topicConfig = topicConfig;
this.kafkaSourceConfig = kafkaSourceConfig;
this.schemaType = schemaType;
this.offsetsToCommit = new HashMap<>();
this.pluginMetrics = pluginMetrics;
kafkaSourceBufferAccumulator= new KafkaSourceBufferAccumulator(topicConfig, kafkaSourceConfig,
schemaType, pluginMetrics);
schema = MessageFormat.getByMessageFormatByName(schemaType);
Duration bufferTimeout = Duration.ofSeconds(1);
bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, bufferTimeout);
lastCommitTime = System.currentTimeMillis();
}

public <T> void consumeRecords() throws Exception {
ConsumerRecords<String, T> records =
consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2);
if (!records.isEmpty() && records.count() > 0) {
Map<TopicPartition, OffsetAndMetadata> offsets = iterateRecordPartitions(records);
offsets.forEach((partition, offset) ->
offsetsToCommit.put(partition, offset));
}
}

Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit() {
return offsetsToCommit;
}

@SuppressWarnings({"rawtypes", "unchecked"})
public void consumeRecords() {
@Override
public void run() {
try {
consumer.subscribe(Arrays.asList(topicConfig.getName()));
do {
offsetsToCommit.clear();
ConsumerRecords<String, Object> records = poll(consumer);
if (!records.isEmpty() && records.count() > 0) {
iterateRecordPartitions(records);
consumer.subscribe(Arrays.asList(topicName));
while (!shutdownInProgress.get()) {
consumeRecords();
long currentTimeMillis = System.currentTimeMillis();
if (!topicConfig.getAutoCommit() && !offsetsToCommit.isEmpty() &&
(currentTimeMillis - lastCommitTime) >= COMMIT_OFFSET_INTERVAL_MS) {
try {
consumer.commitSync(offsetsToCommit);
offsetsToCommit.clear();
lastCommitTime = currentTimeMillis;
} catch (CommitFailedException e) {
LOG.error("Failed to commit offsets in topic "+topicName);
}
}
}while (!status.get());
}
} catch (Exception exp) {
LOG.error("Error while reading the records from the topic...", exp);
}
}

private void iterateRecordPartitions(ConsumerRecords<String, Object> records) throws Exception {
for (TopicPartition partition : records.partitions()) {
List<Record<Object>> kafkaRecords = new ArrayList<>();
List<ConsumerRecord<String, Object>> partitionRecords = records.records(partition);
iterateConsumerRecords(kafkaRecords, partitionRecords);
if (!kafkaRecords.isEmpty()) {
kafkaSourceBufferAccumulator.writeAllRecordToBuffer(kafkaRecords, buffer, topicConfig);
private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord) {
Map<String, Object> data = new HashMap<>();
Event event;
if (schema == MessageFormat.JSON || schema == MessageFormat.AVRO) {
Map<String, Object> message = new HashMap<>();
try {
final JsonParser jsonParser = jsonFactory.createParser((String)consumerRecord.value().toString());
message = objectMapper.readValue(jsonParser, Map.class);
} catch (Exception e){
LOG.error("Failed to parse JSON or AVRO record");
return null;
}
data.put(consumerRecord.key(), message);
} else {
data.put(consumerRecord.key(), (String)consumerRecord.value());
}
if (!offsetsToCommit.isEmpty() && topicConfig.getAutoCommit().equalsIgnoreCase("false")) {
lastCommitTime = kafkaSourceBufferAccumulator.commitOffsets(consumer, lastCommitTime, offsetsToCommit);
}
event = JacksonLog.builder().withData(data).build();
return new Record<Event>(event);
}

private void iterateConsumerRecords(List<Record<Object>> kafkaRecords, List<ConsumerRecord<String, Object>> partitionRecords) {
for (ConsumerRecord<String, Object> consumerRecord : partitionRecords) {
lastReadOffset = kafkaSourceBufferAccumulator.processConsumerRecords(offsetsToCommit, kafkaRecords, lastReadOffset, consumerRecord, partitionRecords);
private <T> Map<TopicPartition, OffsetAndMetadata> iterateRecordPartitions(ConsumerRecords<String, T> records) throws Exception {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition topicPartition : records.partitions()) {
List<Record<Event>> kafkaRecords = new ArrayList<>();
List<ConsumerRecord<String, T>> partitionRecords = records.records(topicPartition);
for (ConsumerRecord<String, T> consumerRecord : partitionRecords) {
Record<Event> record = getRecord(consumerRecord);
if (record != null) {
bufferAccumulator.add(record);
}
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsets.put(topicPartition, new OffsetAndMetadata(lastOffset + 1));
}
}

private ConsumerRecords<String, Object> poll(final KafkaConsumer<String, Object> consumer) {
return consumer.poll(Duration.ofMillis(1));
return offsets;
}

public void closeConsumer(){
Expand All @@ -117,17 +156,13 @@ public void shutdownConsumer(){
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
consumer.seek(partition, lastReadOffset);
for (TopicPartition topicPartition : partitions) {
Long committedOffset = consumer.committed(topicPartition).offset();
consumer.seek(topicPartition, committedOffset);
}
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
try {
consumer.commitSync(offsetsToCommit);
} catch (CommitFailedException e) {
LOG.error("Failed to commit the record for the Json consumer...", e);
}
}
}
Loading

0 comments on commit ed7001c

Please sign in to comment.