Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <souvbose@amazon.com>
  • Loading branch information
sbose2k21 committed Sep 13, 2024
1 parent 0fd763b commit 57f5d42
Show file tree
Hide file tree
Showing 18 changed files with 883 additions and 179 deletions.
5 changes: 4 additions & 1 deletion data-prepper-plugins/kinesis-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ plugins {
dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'io.micrometer:micrometer-core'
implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0'
compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
implementation project(path: ':data-prepper-plugins:aws-plugin-api')

testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
Expand All @@ -25,6 +26,8 @@ dependencies {
testImplementation project(':data-prepper-plugin-framework')
testImplementation project(':data-prepper-pipeline-parser')
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-plugins:parse-json-processor')
testImplementation project(':data-prepper-plugins:newline-codecs')
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfig;
Expand Down Expand Up @@ -53,18 +57,18 @@ public class KinesisService {
private final String kclMetricsNamespaceName;
private final String pipelineName;
private final AcknowledgementSetManager acknowledgementSetManager;
private final KinesisSourceConfig sourceConfig;
private final KinesisSourceConfig kinesisSourceConfig;
private final KinesisAsyncClient kinesisClient;
private final DynamoDbAsyncClient dynamoDbClient;
private final CloudWatchAsyncClient cloudWatchClient;
private final WorkerIdentifierGenerator workerIdentifierGenerator;
private final InputCodec codec;

@Setter
private Scheduler scheduler;

private final ExecutorService executorService;

public KinesisService(final KinesisSourceConfig sourceConfig,
public KinesisService(final KinesisSourceConfig kinesisSourceConfig,
final KinesisClientFactory kinesisClientFactory,
final PluginMetrics pluginMetrics,
final PluginFactory pluginFactory,
Expand All @@ -73,7 +77,7 @@ public KinesisService(final KinesisSourceConfig sourceConfig,
final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier,
final WorkerIdentifierGenerator workerIdentifierGenerator
){
this.sourceConfig = sourceConfig;
this.kinesisSourceConfig = kinesisSourceConfig;
this.pluginMetrics = pluginMetrics;
this.pluginFactory = pluginFactory;
this.acknowledgementSetManager = acknowledgementSetManager;
Expand All @@ -85,21 +89,24 @@ public KinesisService(final KinesisSourceConfig sourceConfig,
this.tableName = kinesisLeaseConfig.getLeaseCoordinationTable().getTableName();
this.kclMetricsNamespaceName = this.tableName;
this.dynamoDbClient = kinesisClientFactory.buildDynamoDBClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion());
this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(sourceConfig.getAwsAuthenticationConfig().getAwsRegion());
this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion());
this.cloudWatchClient = kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion());
this.pipelineName = pipelineDescription.getPipelineName();
this.applicationName = pipelineName;
this.workerIdentifierGenerator = workerIdentifierGenerator;
this.executorService = Executors.newFixedThreadPool(1);
final PluginModel codecConfiguration = kinesisSourceConfig.getCodec();
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
this.codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);
}

public void start(final Buffer<Record<Event>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
throw new IllegalStateException("Buffer provided is null.");
}

if (sourceConfig.getStreams() == null || sourceConfig.getStreams().isEmpty()) {
throw new IllegalStateException("Streams are empty!");
if (kinesisSourceConfig.getStreams() == null || kinesisSourceConfig.getStreams().isEmpty()) {
throw new InvalidPluginConfigurationException("No Kinesis streams provided.");
}

scheduler = getScheduler(buffer);
Expand Down Expand Up @@ -129,31 +136,30 @@ public Scheduler getScheduler(final Buffer<Record<Event>> buffer) {

public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {
final ShardRecordProcessorFactory processorFactory = new KinesisShardRecordProcessorFactory(
buffer, sourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory);
buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, codec);

ConfigsBuilder configsBuilder =
new ConfigsBuilder(
new KinesisMultiStreamTracker(kinesisClient, sourceConfig, applicationName),
new KinesisMultiStreamTracker(kinesisClient, kinesisSourceConfig, applicationName),
applicationName, kinesisClient, dynamoDbClient, cloudWatchClient,
workerIdentifierGenerator.generate(), processorFactory
)
.tableName(tableName)
.namespace(kclMetricsNamespaceName);

ConsumerStrategy consumerStrategy = sourceConfig.getConsumerStrategy();
ConsumerStrategy consumerStrategy = kinesisSourceConfig.getConsumerStrategy();
if (consumerStrategy == ConsumerStrategy.POLLING) {
configsBuilder.retrievalConfig().retrievalSpecificConfig(
new PollingConfig(kinesisClient)
.maxRecords(sourceConfig.getPollingConfig().getMaxPollingRecords())
.maxRecords(kinesisSourceConfig.getPollingConfig().getMaxPollingRecords())
.idleTimeBetweenReadsInMillis(
sourceConfig.getPollingConfig().getIdleTimeBetweenReads().toMillis()));
kinesisSourceConfig.getPollingConfig().getIdleTimeBetweenReads().toMillis()));
}

return new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig()
.billingMode(BillingMode.PAY_PER_REQUEST),
configsBuilder.leaseManagementConfig().billingMode(BillingMode.PAY_PER_REQUEST),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ public class KinesisStreamConfig {
@JsonProperty("checkpoint_interval")
private Duration checkPointInterval = MINIMAL_CHECKPOINT_INTERVAL;

@Getter
@JsonProperty("enable_checkpoint")
private boolean enableCheckPoint = DEFAULT_ENABLE_CHECKPOINT;

public InitialPositionInStream getInitialPosition() {
return initialPosition.getPositionInStream();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source.converter;

import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class KinesisRecordConverter {

private final InputCodec codec;

public KinesisRecordConverter(final InputCodec codec) {
this.codec = codec;
}

public List<Record<Event>> convert(List<KinesisClientRecord> kinesisClientRecords) throws IOException {
List<Record<Event>> records = new ArrayList<>();
for (KinesisClientRecord record : kinesisClientRecords) {
processRecord(record, records::add);
}
return records;
}

private void processRecord(KinesisClientRecord record, Consumer<Record<Event>> eventConsumer) throws IOException {
// Read bytebuffer
byte[] arr = new byte[record.data().remaining()];
record.data().get(arr);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr);
codec.parse(byteArrayInputStream, eventConsumer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source.processor;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

@Builder
@Getter
@Setter
public class KinesisCheckpointerRecord {
private RecordProcessorCheckpointer checkpointer;
private ExtendedSequenceNumber extendedSequenceNumber;
private boolean readyToCheckpoint;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source.processor;

import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class KinesisCheckpointerTracker {
private final Map<ExtendedSequenceNumber, KinesisCheckpointerRecord> checkpointerRecordList = new LinkedHashMap<>();

public synchronized void addRecordForCheckpoint(final ExtendedSequenceNumber extendedSequenceNumber,
final RecordProcessorCheckpointer checkpointer) {
checkpointerRecordList.put(extendedSequenceNumber, KinesisCheckpointerRecord.builder()
.extendedSequenceNumber(extendedSequenceNumber)
.checkpointer(checkpointer)
.readyToCheckpoint(false)
.build());
}

public synchronized void markSequenceNumberForCheckpoint(final ExtendedSequenceNumber extendedSequenceNumber) {
if (!checkpointerRecordList.containsKey(extendedSequenceNumber)) {
throw new IllegalArgumentException("checkpointer not available");
}
checkpointerRecordList.get(extendedSequenceNumber).setReadyToCheckpoint(true);
}

public synchronized Optional<KinesisCheckpointerRecord> getLatestAvailableCheckpointRecord() {
Optional<KinesisCheckpointerRecord> kinesisCheckpointerRecordOptional = Optional.empty();
List<ExtendedSequenceNumber> toRemoveRecords = new ArrayList<>();

for (Map.Entry<ExtendedSequenceNumber, KinesisCheckpointerRecord> entry: checkpointerRecordList.entrySet()) {
KinesisCheckpointerRecord kinesisCheckpointerRecord = entry.getValue();

// Break out of the loop on the first record which is not ready for checkpoint
if (!kinesisCheckpointerRecord.isReadyToCheckpoint()) {
break;
}

kinesisCheckpointerRecordOptional = Optional.of(kinesisCheckpointerRecord);
toRemoveRecords.add(entry.getKey());
}

//Cleanup the ones which are already marked for checkpoint
for (ExtendedSequenceNumber extendedSequenceNumber: toRemoveRecords) {
checkpointerRecordList.remove(extendedSequenceNumber);
}

return kinesisCheckpointerRecordOptional;
}

public synchronized int size() {
return checkpointerRecordList.size();
}
}
Loading

0 comments on commit 57f5d42

Please sign in to comment.