diff --git a/data-prepper-plugins/kinesis-source/build.gradle b/data-prepper-plugins/kinesis-source/build.gradle index ae380ba0e1..c4a0614e36 100644 --- a/data-prepper-plugins/kinesis-source/build.gradle +++ b/data-prepper-plugins/kinesis-source/build.gradle @@ -10,12 +10,13 @@ plugins { dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:common') + implementation project(path: ':data-prepper-plugins:buffer-common') + implementation project(path: ':data-prepper-plugins:aws-plugin-api') implementation 'com.fasterxml.jackson.core:jackson-core' implementation 'io.micrometer:micrometer-core' implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0' compileOnly 'org.projectlombok:lombok:1.18.20' annotationProcessor 'org.projectlombok:lombok:1.18.20' - implementation project(path: ':data-prepper-plugins:aws-plugin-api') testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' @@ -25,6 +26,8 @@ dependencies { testImplementation project(':data-prepper-plugin-framework') testImplementation project(':data-prepper-pipeline-parser') testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' + testImplementation project(':data-prepper-plugins:parse-json-processor') + testImplementation project(':data-prepper-plugins:newline-codecs') } jacocoTestCoverageVerification { diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java index 442f8e4fe6..4ed15833f6 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java @@ -14,8 +14,12 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfig; @@ -53,18 +57,18 @@ public class KinesisService { private final String kclMetricsNamespaceName; private final String pipelineName; private final AcknowledgementSetManager acknowledgementSetManager; - private final KinesisSourceConfig sourceConfig; + private final KinesisSourceConfig kinesisSourceConfig; private final KinesisAsyncClient kinesisClient; private final DynamoDbAsyncClient dynamoDbClient; private final CloudWatchAsyncClient cloudWatchClient; private final WorkerIdentifierGenerator workerIdentifierGenerator; + private final InputCodec codec; @Setter private Scheduler scheduler; - private final ExecutorService executorService; - public KinesisService(final KinesisSourceConfig sourceConfig, + public KinesisService(final KinesisSourceConfig kinesisSourceConfig, final KinesisClientFactory kinesisClientFactory, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, @@ -73,7 +77,7 @@ public KinesisService(final KinesisSourceConfig sourceConfig, final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier, final WorkerIdentifierGenerator workerIdentifierGenerator ){ - this.sourceConfig = sourceConfig; + this.kinesisSourceConfig = kinesisSourceConfig; this.pluginMetrics = pluginMetrics; this.pluginFactory = pluginFactory; this.acknowledgementSetManager = acknowledgementSetManager; @@ -85,21 +89,24 @@ public KinesisService(final KinesisSourceConfig sourceConfig, this.tableName = kinesisLeaseConfig.getLeaseCoordinationTable().getTableName(); this.kclMetricsNamespaceName = this.tableName; this.dynamoDbClient = kinesisClientFactory.buildDynamoDBClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion()); - this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(sourceConfig.getAwsAuthenticationConfig().getAwsRegion()); + this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion()); this.cloudWatchClient = kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion()); this.pipelineName = pipelineDescription.getPipelineName(); this.applicationName = pipelineName; this.workerIdentifierGenerator = workerIdentifierGenerator; this.executorService = Executors.newFixedThreadPool(1); + final PluginModel codecConfiguration = kinesisSourceConfig.getCodec(); + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); + this.codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings); } public void start(final Buffer> buffer) { if (buffer == null) { - throw new IllegalStateException("Buffer provided is null"); + throw new IllegalStateException("Buffer provided is null."); } - if (sourceConfig.getStreams() == null || sourceConfig.getStreams().isEmpty()) { - throw new IllegalStateException("Streams are empty!"); + if (kinesisSourceConfig.getStreams() == null || kinesisSourceConfig.getStreams().isEmpty()) { + throw new InvalidPluginConfigurationException("No Kinesis streams provided."); } scheduler = getScheduler(buffer); @@ -129,31 +136,30 @@ public Scheduler getScheduler(final Buffer> buffer) { public Scheduler createScheduler(final Buffer> buffer) { final ShardRecordProcessorFactory processorFactory = new KinesisShardRecordProcessorFactory( - buffer, sourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory); + buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, codec); ConfigsBuilder configsBuilder = new ConfigsBuilder( - new KinesisMultiStreamTracker(kinesisClient, sourceConfig, applicationName), + new KinesisMultiStreamTracker(kinesisClient, kinesisSourceConfig, applicationName), applicationName, kinesisClient, dynamoDbClient, cloudWatchClient, workerIdentifierGenerator.generate(), processorFactory ) .tableName(tableName) .namespace(kclMetricsNamespaceName); - ConsumerStrategy consumerStrategy = sourceConfig.getConsumerStrategy(); + ConsumerStrategy consumerStrategy = kinesisSourceConfig.getConsumerStrategy(); if (consumerStrategy == ConsumerStrategy.POLLING) { configsBuilder.retrievalConfig().retrievalSpecificConfig( new PollingConfig(kinesisClient) - .maxRecords(sourceConfig.getPollingConfig().getMaxPollingRecords()) + .maxRecords(kinesisSourceConfig.getPollingConfig().getMaxPollingRecords()) .idleTimeBetweenReadsInMillis( - sourceConfig.getPollingConfig().getIdleTimeBetweenReads().toMillis())); + kinesisSourceConfig.getPollingConfig().getIdleTimeBetweenReads().toMillis())); } return new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), - configsBuilder.leaseManagementConfig() - .billingMode(BillingMode.PAY_PER_REQUEST), + configsBuilder.leaseManagementConfig().billingMode(BillingMode.PAY_PER_REQUEST), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamConfig.java index b6e4480229..b26732e357 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamConfig.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamConfig.java @@ -35,10 +35,6 @@ public class KinesisStreamConfig { @JsonProperty("checkpoint_interval") private Duration checkPointInterval = MINIMAL_CHECKPOINT_INTERVAL; - @Getter - @JsonProperty("enable_checkpoint") - private boolean enableCheckPoint = DEFAULT_ENABLE_CHECKPOINT; - public InitialPositionInStream getInitialPosition() { return initialPosition.getPositionInStream(); } diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java new file mode 100644 index 0000000000..5a70b95c10 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.kinesis.source.converter; + +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +public class KinesisRecordConverter { + + private final InputCodec codec; + + public KinesisRecordConverter(final InputCodec codec) { + this.codec = codec; + } + + public List> convert(List kinesisClientRecords) throws IOException { + List> records = new ArrayList<>(); + for (KinesisClientRecord record : kinesisClientRecords) { + processRecord(record, records::add); + } + return records; + } + + private void processRecord(KinesisClientRecord record, Consumer> eventConsumer) throws IOException { + // Read bytebuffer + byte[] arr = new byte[record.data().remaining()]; + record.data().get(arr); + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr); + codec.parse(byteArrayInputStream, eventConsumer); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerRecord.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerRecord.java new file mode 100644 index 0000000000..b891de2bd0 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerRecord.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.kinesis.source.processor; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import software.amazon.kinesis.processor.RecordProcessorCheckpointer; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +@Builder +@Getter +@Setter +public class KinesisCheckpointerRecord { + private RecordProcessorCheckpointer checkpointer; + private ExtendedSequenceNumber extendedSequenceNumber; + private boolean readyToCheckpoint; +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerTracker.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerTracker.java new file mode 100644 index 0000000000..9f8e5f7625 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerTracker.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.kinesis.source.processor; + +import software.amazon.kinesis.processor.RecordProcessorCheckpointer; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class KinesisCheckpointerTracker { + private final Map checkpointerRecordList = new LinkedHashMap<>(); + + public synchronized void addRecordForCheckpoint(final ExtendedSequenceNumber extendedSequenceNumber, + final RecordProcessorCheckpointer checkpointer) { + checkpointerRecordList.put(extendedSequenceNumber, KinesisCheckpointerRecord.builder() + .extendedSequenceNumber(extendedSequenceNumber) + .checkpointer(checkpointer) + .readyToCheckpoint(false) + .build()); + } + + public synchronized void markSequenceNumberForCheckpoint(final ExtendedSequenceNumber extendedSequenceNumber) { + if (!checkpointerRecordList.containsKey(extendedSequenceNumber)) { + throw new IllegalArgumentException("checkpointer not available"); + } + checkpointerRecordList.get(extendedSequenceNumber).setReadyToCheckpoint(true); + } + + public synchronized Optional getLatestAvailableCheckpointRecord() { + Optional kinesisCheckpointerRecordOptional = Optional.empty(); + List toRemoveRecords = new ArrayList<>(); + + for (Map.Entry entry: checkpointerRecordList.entrySet()) { + KinesisCheckpointerRecord kinesisCheckpointerRecord = entry.getValue(); + + // Break out of the loop on the first record which is not ready for checkpoint + if (!kinesisCheckpointerRecord.isReadyToCheckpoint()) { + break; + } + + kinesisCheckpointerRecordOptional = Optional.of(kinesisCheckpointerRecord); + toRemoveRecords.add(entry.getKey()); + } + + //Cleanup the ones which are already marked for checkpoint + for (ExtendedSequenceNumber extendedSequenceNumber: toRemoveRecords) { + checkpointerRecordList.remove(extendedSequenceNumber); + } + + return kinesisCheckpointerRecordOptional; + } + + public synchronized int size() { + return checkpointerRecordList.size(); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java index 956ba88f7a..7d02ff959a 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java @@ -1,19 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + package org.opensearch.dataprepper.plugins.kinesis.source.processor; +import com.google.common.annotations.VisibleForTesting; import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.codec.InputCodec; -import org.opensearch.dataprepper.model.configuration.PluginModel; -import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.kinesis.source.KinesisSource; import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.converter.KinesisRecordConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.kinesis.common.StreamIdentifier; @@ -28,55 +37,67 @@ import software.amazon.kinesis.processor.RecordProcessorCheckpointer; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import java.io.ByteArrayInputStream; -import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; import java.util.List; +import java.util.ListIterator; import java.util.Optional; -import java.util.function.Consumer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; public class KinesisRecordProcessor implements ShardRecordProcessor { - private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class); + private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordProcessor.class); + private static final int DEFAULT_MONITOR_WAIT_TIME_MS = 15_000; private final StreamIdentifier streamIdentifier; private final KinesisStreamConfig kinesisStreamConfig; private final Duration checkpointInterval; private final KinesisSourceConfig kinesisSourceConfig; - private final Buffer> buffer; + private final BufferAccumulator> bufferAccumulator; + private final KinesisRecordConverter kinesisRecordConverter; private String kinesisShardId; - private final InputCodec codec; private long lastCheckpointTimeInMillis; private final int bufferTimeoutMillis; private final AcknowledgementSetManager acknowledgementSetManager; - private final Counter acknowledgementSetCallbackCounter; + private final Counter acknowledgementSetSuccesses; + private final Counter acknowledgementSetFailures; + private final Counter recordsProcessed; private final Counter recordProcessingErrors; private final Counter checkpointFailures; private static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofSeconds(20); - public static final String ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME = "acknowledgementSetCallbackCounter"; + public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses"; + public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures"; + public static final String KINESIS_RECORD_PROCESSED = "recordProcessed"; public static final String KINESIS_RECORD_PROCESSING_ERRORS = "recordProcessingErrors"; public static final String KINESIS_CHECKPOINT_FAILURES = "checkpointFailures"; public static final String KINESIS_STREAM_TAG_KEY = "stream"; + private KinesisCheckpointerTracker kinesisCheckpointerTracker; + private final ExecutorService executorService; + private AtomicBoolean isStopRequested; - public KinesisRecordProcessor(Buffer> buffer, + public KinesisRecordProcessor(final BufferAccumulator> bufferAccumulator, final KinesisSourceConfig kinesisSourceConfig, final AcknowledgementSetManager acknowledgementSetManager, final PluginMetrics pluginMetrics, - final PluginFactory pluginFactory, + final KinesisRecordConverter kinesisRecordConverter, final StreamIdentifier streamIdentifier) { this.bufferTimeoutMillis = (int) kinesisSourceConfig.getBufferTimeout().toMillis(); this.streamIdentifier = streamIdentifier; this.kinesisSourceConfig = kinesisSourceConfig; this.kinesisStreamConfig = getStreamConfig(kinesisSourceConfig); - final PluginModel codecConfiguration = kinesisSourceConfig.getCodec(); - final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); - this.codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings); + this.kinesisRecordConverter = kinesisRecordConverter; this.acknowledgementSetManager = acknowledgementSetManager; - this.acknowledgementSetCallbackCounter = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName()); + this.acknowledgementSetSuccesses = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName()); + this.acknowledgementSetFailures = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName()); + this.recordsProcessed = pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSED, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName()); this.recordProcessingErrors = pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName()); this.checkpointFailures = pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName()); this.checkpointInterval = kinesisStreamConfig.getCheckPointInterval(); - this.buffer = buffer; + this.bufferAccumulator = bufferAccumulator; + this.kinesisCheckpointerTracker = new KinesisCheckpointerTracker(); + this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("kinesis-ack-monitor")); + this.isStopRequested = new AtomicBoolean(false); } private KinesisStreamConfig getStreamConfig(final KinesisSourceConfig kinesisSourceConfig) { @@ -87,49 +108,101 @@ private KinesisStreamConfig getStreamConfig(final KinesisSourceConfig kinesisSou public void initialize(InitializationInput initializationInput) { // Called once when the processor is initialized. kinesisShardId = initializationInput.shardId(); - LOG.info("Initialize Processor for shard: {}", kinesisShardId); + String kinesisStreamName = streamIdentifier.streamName(); + LOG.info("Initialize Processor for stream: {}, shard: {}", kinesisStreamName, kinesisShardId); lastCheckpointTimeInMillis = System.currentTimeMillis(); + + if (kinesisSourceConfig.isAcknowledgments()) { + executorService.submit(() -> monitorCheckpoint(executorService)); + } } - private AcknowledgementSet createAcknowledgmentSet(final ProcessRecordsInput processRecordsInput) { + private void monitorCheckpoint(final ExecutorService executorService) { + while (!isStopRequested.get()) { + if (System.currentTimeMillis() - lastCheckpointTimeInMillis >= checkpointInterval.toMillis()) { + LOG.debug("Regular checkpointing for shard {}", kinesisShardId); + + Optional kinesisCheckpointerRecordOptional = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord(); + if (kinesisCheckpointerRecordOptional.isPresent()) { + RecordProcessorCheckpointer recordProcessorCheckpointer = kinesisCheckpointerRecordOptional.get().getCheckpointer(); + String sequenceNumber = kinesisCheckpointerRecordOptional.get().getExtendedSequenceNumber().sequenceNumber(); + Long subSequenceNumber = kinesisCheckpointerRecordOptional.get().getExtendedSequenceNumber().subSequenceNumber(); + checkpoint(recordProcessorCheckpointer, sequenceNumber, subSequenceNumber); + lastCheckpointTimeInMillis = System.currentTimeMillis(); + } + } + try { + Thread.sleep(DEFAULT_MONITOR_WAIT_TIME_MS); + } catch (InterruptedException ex) { + break; + } + } + executorService.shutdown(); + } + + private AcknowledgementSet createAcknowledgmentSet(final ProcessRecordsInput processRecordsInput, + final ExtendedSequenceNumber extendedSequenceNumber) { return acknowledgementSetManager.create((result) -> { - acknowledgementSetCallbackCounter.increment(); + String kinesisStreamName = streamIdentifier.streamName(); if (result) { - LOG.info("acknowledgements received"); - checkpoint(processRecordsInput.checkpointer()); + acknowledgementSetSuccesses.increment(); + kinesisCheckpointerTracker.markSequenceNumberForCheckpoint(extendedSequenceNumber); + LOG.debug("acknowledgements received for stream: {}, shardId: {}", kinesisStreamName, kinesisShardId); } else { - LOG.info("acknowledgements received with false"); + acknowledgementSetFailures.increment(); + LOG.debug("acknowledgements received with false for stream: {}, shardId: {}", kinesisStreamName, kinesisShardId); } - }, ACKNOWLEDGEMENT_SET_TIMEOUT); } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { - List> records = new ArrayList<>(); - try { Optional acknowledgementSetOpt = Optional.empty(); boolean acknowledgementsEnabled = kinesisSourceConfig.isAcknowledgments(); + ExtendedSequenceNumber extendedSequenceNumber = getLatestSequenceNumberFromInput(processRecordsInput); if (acknowledgementsEnabled) { - acknowledgementSetOpt = Optional.of(createAcknowledgmentSet(processRecordsInput)); + acknowledgementSetOpt = Optional.of(createAcknowledgmentSet(processRecordsInput, extendedSequenceNumber)); } - for (KinesisClientRecord record : processRecordsInput.records()) { - processRecord(record, records::add); + // Track the records for checkpoint purpose + kinesisCheckpointerTracker.addRecordForCheckpoint(extendedSequenceNumber, processRecordsInput.checkpointer()); + List> records = kinesisRecordConverter.convert(processRecordsInput.records()); + + int eventCount = 0; + for (Record record: records) { + Event event = record.getData(); + acknowledgementSetOpt.ifPresent(acknowledgementSet -> acknowledgementSet.add(event)); + EventMetadata eventMetadata = event.getMetadata(); + eventMetadata.setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, + streamIdentifier.streamName().toLowerCase()); + bufferAccumulator.add(record); + eventCount++; } - acknowledgementSetOpt.ifPresent(acknowledgementSet -> records.forEach(record -> acknowledgementSet.add(record.getData()))); + // Flush buffer at the end + bufferAccumulator.flush(); + recordsProcessed.increment(eventCount); - buffer.writeAll(records, bufferTimeoutMillis); + // If acks are not enabled, mark the sequence number for checkpoint + if (!acknowledgementsEnabled) { + kinesisCheckpointerTracker.markSequenceNumberForCheckpoint(extendedSequenceNumber); + } + + LOG.debug("Number of Records {} written for stream: {}, shardId: {} to buffer: {}", eventCount, streamIdentifier.streamName(), kinesisShardId, records.size()); acknowledgementSetOpt.ifPresent(AcknowledgementSet::complete); // Checkpoint for shard - if (kinesisStreamConfig.isEnableCheckPoint() && System.currentTimeMillis() - lastCheckpointTimeInMillis > checkpointInterval.toMillis()) { - LOG.info("Regular checkpointing for shard " + kinesisShardId); - checkpoint(processRecordsInput.checkpointer()); - lastCheckpointTimeInMillis = System.currentTimeMillis(); + if (!acknowledgementsEnabled && (System.currentTimeMillis() - lastCheckpointTimeInMillis >= checkpointInterval.toMillis())) { + LOG.debug("Regular checkpointing for shard {}", kinesisShardId); + + Optional KinesisCheckpointerRecordOptional = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord(); + if (KinesisCheckpointerRecordOptional.isPresent()) { + ExtendedSequenceNumber lastExtendedSequenceNumber = KinesisCheckpointerRecordOptional.get().getExtendedSequenceNumber(); + checkpoint(processRecordsInput.checkpointer(), lastExtendedSequenceNumber.sequenceNumber(), lastExtendedSequenceNumber.subSequenceNumber()); + lastCheckpointTimeInMillis = System.currentTimeMillis(); + } } } catch (Exception ex) { recordProcessingErrors.increment(); @@ -137,14 +210,6 @@ public void processRecords(ProcessRecordsInput processRecordsInput) { } } - private void processRecord(KinesisClientRecord record, Consumer> eventConsumer) throws IOException { - // Read bytebuffer - byte[] arr = new byte[record.data().remaining()]; - record.data().get(arr); - ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr); - codec.parse(byteArrayInputStream, eventConsumer); - } - @Override public void leaseLost(LeaseLostInput leaseLostInput) { LOG.debug("Lease Lost"); @@ -152,22 +217,60 @@ public void leaseLost(LeaseLostInput leaseLostInput) { @Override public void shardEnded(ShardEndedInput shardEndedInput) { - LOG.debug("Reached shard end, checkpointing shard: {}", kinesisShardId); + String kinesisStream = streamIdentifier.streamName(); + LOG.debug("Reached shard end, checkpointing for stream: {}, shardId: {}", kinesisStream, kinesisShardId); checkpoint(shardEndedInput.checkpointer()); } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { - LOG.info("Scheduler is shutting down, checkpointing shard: {}", kinesisShardId); + String kinesisStream = streamIdentifier.streamName(); + isStopRequested.set(true); + LOG.debug("Scheduler is shutting down, checkpointing for stream: {}, shardId: {}", kinesisStream, kinesisShardId); checkpoint(shutdownRequestedInput.checkpointer()); } + @VisibleForTesting + public void checkpoint(RecordProcessorCheckpointer checkpointer, String sequenceNumber, long subSequenceNumber) { + try { + String kinesisStream = streamIdentifier.streamName(); + LOG.debug("Checkpoint for stream: {}, shardId: {}, sequence: {}, subsequence: {}", kinesisStream, kinesisShardId, sequenceNumber, subSequenceNumber); + checkpointer.checkpoint(sequenceNumber, subSequenceNumber); + } catch (ShutdownException | ThrottlingException | InvalidStateException ex) { + LOG.debug("Caught exception at checkpoint, skipping checkpoint.", ex); + checkpointFailures.increment(); + } + } + private void checkpoint(RecordProcessorCheckpointer checkpointer) { try { + String kinesisStream = streamIdentifier.streamName(); + LOG.debug("Checkpoint for stream: {}, shardId: {}", kinesisStream, kinesisShardId); checkpointer.checkpoint(); } catch (ShutdownException | ThrottlingException | InvalidStateException ex) { - LOG.info("Caught exception at checkpoint, skipping checkpoint.", ex); + LOG.debug("Caught exception at checkpoint, skipping checkpoint.", ex); checkpointFailures.increment(); } } + + private ExtendedSequenceNumber getLatestSequenceNumberFromInput(final ProcessRecordsInput processRecordsInput) { + ListIterator recordIterator = processRecordsInput.records().listIterator(); + ExtendedSequenceNumber largestExtendedSequenceNumber = null; + while (recordIterator.hasNext()) { + KinesisClientRecord record = recordIterator.next(); + ExtendedSequenceNumber extendedSequenceNumber = + new ExtendedSequenceNumber(record.sequenceNumber(), record.subSequenceNumber()); + + if (largestExtendedSequenceNumber == null + || largestExtendedSequenceNumber.compareTo(extendedSequenceNumber) < 0) { + largestExtendedSequenceNumber = extendedSequenceNumber; + } + } + return largestExtendedSequenceNumber; + } + + @VisibleForTesting + public void setKinesisCheckpointerTracker(final KinesisCheckpointerTracker kinesisCheckpointerTracker) { + this.kinesisCheckpointerTracker = kinesisCheckpointerTracker; + } } diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java index f551c503e5..e326789312 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java @@ -1,12 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + package org.opensearch.dataprepper.plugins.kinesis.source.processor; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.converter.KinesisRecordConverter; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; @@ -17,18 +29,18 @@ public class KinesisShardRecordProcessorFactory implements ShardRecordProcessorF private final KinesisSourceConfig kinesisSourceConfig; private final AcknowledgementSetManager acknowledgementSetManager; private final PluginMetrics pluginMetrics; - private final PluginFactory pluginFactory; + private final KinesisRecordConverter kinesisRecordConverter; public KinesisShardRecordProcessorFactory(Buffer> buffer, KinesisSourceConfig kinesisSourceConfig, final AcknowledgementSetManager acknowledgementSetManager, final PluginMetrics pluginMetrics, - final PluginFactory pluginFactory) { + final InputCodec codec) { this.kinesisSourceConfig = kinesisSourceConfig; this.buffer = buffer; this.acknowledgementSetManager = acknowledgementSetManager; this.pluginMetrics = pluginMetrics; - this.pluginFactory = pluginFactory; + this.kinesisRecordConverter = new KinesisRecordConverter(codec); } @Override @@ -38,6 +50,8 @@ public ShardRecordProcessor shardRecordProcessor() { @Override public ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) { - return new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier); + BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, + kinesisSourceConfig.getNumberOfRecordsToAccumulate(), kinesisSourceConfig.getBufferTimeout()); + return new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); } } \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/MetadataKeyAttributes.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/MetadataKeyAttributes.java new file mode 100644 index 0000000000..e2debba54e --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/MetadataKeyAttributes.java @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.kinesis.source.processor; + +public class MetadataKeyAttributes { + static final String KINESIS_STREAM_NAME_METADATA_ATTRIBUTE = "kinesis_stream_name"; +} diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java index 49f0c898a0..12986d9969 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java @@ -16,8 +16,11 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfig; @@ -43,6 +46,7 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -55,6 +59,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -63,6 +69,7 @@ public class KinesisServiceTest { private final String PIPELINE_NAME = "kinesis-pipeline-test"; private final String streamId = "stream-1"; + private static final String codec_plugin_name = "json"; private static final Duration CHECKPOINT_INTERVAL = Duration.ofMillis(0); private static final int NUMBER_OF_RECORDS_TO_ACCUMULATE = 10; @@ -183,6 +190,15 @@ void setup() { when(kinesisSourceConfig.getStreams()).thenReturn(streamConfigs); when(kinesisSourceConfig.getNumberOfRecordsToAccumulate()).thenReturn(NUMBER_OF_RECORDS_TO_ACCUMULATE); + PluginModel pluginModel = mock(PluginModel.class); + when(pluginModel.getPluginName()).thenReturn(codec_plugin_name); + when(pluginModel.getPluginSettings()).thenReturn(Collections.emptyMap()); + when(kinesisSourceConfig.getCodec()).thenReturn(pluginModel); + + pluginFactory = mock(PluginFactory.class); + InputCodec codec = mock(InputCodec.class); + when(pluginFactory.loadPlugin(eq(InputCodec.class), any())).thenReturn(codec); + when(kinesisClientFactory.buildDynamoDBClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(dynamoDbClient); when(kinesisClientFactory.buildKinesisAsyncClient(awsAuthenticationConfig.getAwsRegion())).thenReturn(kinesisClient); when(kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(cloudWatchClient); @@ -263,7 +279,7 @@ void testServiceStartNullStreams() { when(kinesisSourceConfig.getStreams()).thenReturn(null); KinesisService kinesisService = createObjectUnderTest(); - assertThrows(IllegalStateException.class, () -> kinesisService.start(buffer)); + assertThrows(InvalidPluginConfigurationException.class, () -> kinesisService.start(buffer)); verify(scheduler, times(0)).run(); } @@ -273,7 +289,7 @@ void testServiceStartEmptyStreams() { when(kinesisSourceConfig.getStreams()).thenReturn(new ArrayList<>()); KinesisService kinesisService = createObjectUnderTest(); - assertThrows(IllegalStateException.class, () -> kinesisService.start(buffer)); + assertThrows(InvalidPluginConfigurationException.class, () -> kinesisService.start(buffer)); verify(scheduler, times(0)).run(); } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceTest.java index e65ce1dc1a..fad335dd63 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceTest.java @@ -18,7 +18,9 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; @@ -31,6 +33,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.regions.Region; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -40,6 +43,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -48,6 +52,7 @@ public class KinesisSourceTest { private final String PIPELINE_NAME = "kinesis-pipeline-test"; private final String streamId = "stream-1"; + private static final String codec_plugin_name = "json"; @Mock private PluginMetrics pluginMetrics; @@ -93,6 +98,16 @@ void setup() { awsAuthenticationConfig = mock(AwsAuthenticationConfig.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); kinesisService = mock(KinesisService.class); + + PluginModel pluginModel = mock(PluginModel.class); + when(pluginModel.getPluginName()).thenReturn(codec_plugin_name); + when(pluginModel.getPluginSettings()).thenReturn(Collections.emptyMap()); + when(kinesisSourceConfig.getCodec()).thenReturn(pluginModel); + + pluginFactory = mock(PluginFactory.class); + InputCodec codec = mock(InputCodec.class); + when(pluginFactory.loadPlugin(eq(InputCodec.class), any())).thenReturn(codec); + kinesisLeaseConfigSupplier = mock(KinesisLeaseConfigSupplier.class); kinesisLeaseConfig = mock(KinesisLeaseConfig.class); kinesisLeaseCoordinationTableConfig = mock(KinesisLeaseCoordinationTableConfig.class); diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java index 166d5c587e..5846fe4b04 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java @@ -88,9 +88,10 @@ void testSourceConfig() { assertEquals(kinesisSourceConfig.getConsumerStrategy(), ConsumerStrategy.ENHANCED_FAN_OUT); assertNull(kinesisSourceConfig.getPollingConfig()); + assertEquals(streamConfigs.size(), 3); + for (KinesisStreamConfig kinesisStreamConfig: streamConfigs) { assertTrue(kinesisStreamConfig.getName().contains("stream")); - assertFalse(kinesisStreamConfig.isEnableCheckPoint()); assertEquals(kinesisStreamConfig.getInitialPosition(), InitialPositionInStream.LATEST); assertEquals(kinesisStreamConfig.getCheckPointInterval(), MINIMAL_CHECKPOINT_INTERVAL); } @@ -117,9 +118,10 @@ void testSourceConfigWithStreamCodec() { assertEquals(kinesisSourceConfig.getPollingConfig().getMaxPollingRecords(), 10); assertEquals(kinesisSourceConfig.getPollingConfig().getIdleTimeBetweenReads(), Duration.ofSeconds(10)); + assertEquals(streamConfigs.size(), 3); + for (KinesisStreamConfig kinesisStreamConfig: streamConfigs) { assertTrue(kinesisStreamConfig.getName().contains("stream")); - assertFalse(kinesisStreamConfig.isEnableCheckPoint()); assertEquals(kinesisStreamConfig.getInitialPosition(), InitialPositionInStream.LATEST); assertEquals(kinesisStreamConfig.getCheckPointInterval(), MINIMAL_CHECKPOINT_INTERVAL); } @@ -148,9 +150,10 @@ void testSourceConfigWithInitialPosition() { expectedCheckpointIntervals.put("stream-2", Duration.ofMinutes(15)); expectedCheckpointIntervals.put("stream-3", Duration.ofHours(2)); + assertEquals(streamConfigs.size(), 3); + for (KinesisStreamConfig kinesisStreamConfig: streamConfigs) { assertTrue(kinesisStreamConfig.getName().contains("stream")); - assertTrue(kinesisStreamConfig.isEnableCheckPoint()); assertEquals(kinesisStreamConfig.getInitialPosition(), InitialPositionInStream.TRIM_HORIZON); assertEquals(kinesisStreamConfig.getCheckPointInterval(), expectedCheckpointIntervals.get(kinesisStreamConfig.getName())); } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java new file mode 100644 index 0000000000..6b0646e993 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java @@ -0,0 +1,96 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.kinesis.source.converter; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.event.TestEventFactory; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputCodec; +import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputConfig; +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class KinesisRecordConverterTest { + + @Test + void setup() throws IOException { + InputCodec codec = mock(InputCodec.class); + KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter(codec); + doNothing().when(codec).parse(any(InputStream.class), any(Consumer.class)); + + String sample_record_data = "sample record data"; + KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder() + .data(ByteBuffer.wrap(sample_record_data.getBytes())) + .build(); + kinesisRecordConverter.convert(List.of(kinesisClientRecord)); + verify(codec, times(1)).parse(any(InputStream.class), any(Consumer.class)); + } + + @Test + public void testRecordConverterWithNdJsonInputCodec() throws IOException { + + ObjectMapper objectMapper = new ObjectMapper(); + + int numRecords = 10; + final List> jsonObjects = IntStream.range(0, numRecords) + .mapToObj(i -> generateJson()) + .collect(Collectors.toList()); + + final StringWriter writer = new StringWriter(); + + for (final Map jsonObject : jsonObjects) { + writer.append(objectMapper.writeValueAsString(jsonObject)); + writer.append(System.lineSeparator()); + } + + KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter( + new NdjsonInputCodec(new NdjsonInputConfig(), TestEventFactory.getTestEventFactory())); + + KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder() + .data(ByteBuffer.wrap(writer.toString().getBytes())) + .build(); + List> events = kinesisRecordConverter.convert(List.of(kinesisClientRecord)); + + assertEquals(events.size(), numRecords); + } + + private static Map generateJson() { + final Map jsonObject = new LinkedHashMap<>(); + for (int i = 0; i < 1; i++) { + jsonObject.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + jsonObject.put(UUID.randomUUID().toString(), Arrays.asList(UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID().toString())); + + return jsonObject; + } +} diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerRecordTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerRecordTest.java new file mode 100644 index 0000000000..a2cf8fecaf --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerRecordTest.java @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ +package org.opensearch.dataprepper.plugins.kinesis.source.processor; + +import org.junit.jupiter.api.Test; +import software.amazon.kinesis.processor.RecordProcessorCheckpointer; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.mock; + +public class KinesisCheckpointerRecordTest { + private String shardId = "shardId-123"; + private String testConcurrencyToken = "testToken"; + + @Test + public void validateTwoRecords() { + + KinesisCheckpointerRecord kinesisCheckpointerRecord1 = KinesisCheckpointerRecord.builder() + .extendedSequenceNumber(ExtendedSequenceNumber.LATEST) + .readyToCheckpoint(false) + .build(); + KinesisCheckpointerRecord kinesisCheckpointerRecord2 = KinesisCheckpointerRecord.builder() + .extendedSequenceNumber(ExtendedSequenceNumber.LATEST) + .readyToCheckpoint(false) + .build(); + + assertEquals(kinesisCheckpointerRecord1.isReadyToCheckpoint(), kinesisCheckpointerRecord2.isReadyToCheckpoint()); + assertEquals(kinesisCheckpointerRecord1.getCheckpointer(), kinesisCheckpointerRecord2.getCheckpointer()); + assertEquals(kinesisCheckpointerRecord1.getExtendedSequenceNumber(), kinesisCheckpointerRecord2.getExtendedSequenceNumber()); + } + + @Test + public void validateTwoRecordsWithSetterMethods() { + RecordProcessorCheckpointer recordProcessorCheckpointer = mock(RecordProcessorCheckpointer.class); + KinesisCheckpointerRecord kinesisCheckpointerRecord1 = KinesisCheckpointerRecord.builder().build(); + kinesisCheckpointerRecord1.setCheckpointer(recordProcessorCheckpointer); + kinesisCheckpointerRecord1.setExtendedSequenceNumber(ExtendedSequenceNumber.LATEST); + kinesisCheckpointerRecord1.setReadyToCheckpoint(false); + + KinesisCheckpointerRecord kinesisCheckpointerRecord2 = KinesisCheckpointerRecord.builder().build(); + kinesisCheckpointerRecord2.setCheckpointer(recordProcessorCheckpointer); + kinesisCheckpointerRecord2.setExtendedSequenceNumber(ExtendedSequenceNumber.LATEST); + kinesisCheckpointerRecord2.setReadyToCheckpoint(false); + + assertEquals(kinesisCheckpointerRecord1.isReadyToCheckpoint(), kinesisCheckpointerRecord2.isReadyToCheckpoint()); + assertEquals(kinesisCheckpointerRecord1.getCheckpointer(), kinesisCheckpointerRecord2.getCheckpointer()); + assertEquals(kinesisCheckpointerRecord1.getExtendedSequenceNumber(), kinesisCheckpointerRecord2.getExtendedSequenceNumber()); + } + + @Test + public void testInvalidRecords() { + KinesisCheckpointerRecord kinesisCheckpointerRecord = KinesisCheckpointerRecord.builder().build(); + assertNotNull(kinesisCheckpointerRecord); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerTrackerTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerTrackerTest.java new file mode 100644 index 0000000000..ea76d1f789 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisCheckpointerTrackerTest.java @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.kinesis.source.processor; + +import org.junit.jupiter.api.Test; +import software.amazon.kinesis.processor.RecordProcessorCheckpointer; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Random; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +public class KinesisCheckpointerTrackerTest { + + private Random random = new Random(); + + @Test + void testCheckPointerAddAndGet() { + KinesisCheckpointerTracker kinesisCheckpointerTracker = new KinesisCheckpointerTracker(); + + List extendedSequenceNumberList = new ArrayList<>(); + int numRecords = 10; + for (int i=0; i checkpointRecord = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord(); + assertTrue(checkpointRecord.isEmpty()); + assertEquals(kinesisCheckpointerTracker.size(), numRecords); + + int idx = random.nextInt(numRecords); + ExtendedSequenceNumber extendedSequenceNumber1 = extendedSequenceNumberList.get(idx); + kinesisCheckpointerTracker.markSequenceNumberForCheckpoint(extendedSequenceNumber1); + + Optional firstcheckpointer = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord(); + if (idx != 0) { + assertTrue(firstcheckpointer.isEmpty()); + assertEquals(kinesisCheckpointerTracker.size(), numRecords); + } else { + assertFalse(firstcheckpointer.isEmpty()); + assertEquals(kinesisCheckpointerTracker.size(), numRecords-1); + } + } + @Test + void testGetLastCheckpointerAndStoreIsEmpty() { + KinesisCheckpointerTracker kinesisCheckpointerTracker = new KinesisCheckpointerTracker(); + + List extendedSequenceNumberList = new ArrayList<>(); + int numRecords = 10; + for (int i=0; i checkpointer = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord(); + assertTrue(checkpointer.isPresent()); + assertEquals(0, kinesisCheckpointerTracker.size()); + } + + @Test + public void testMarkCheckpointerReadyForCheckpoint() { + + KinesisCheckpointerTracker kinesisCheckpointerTracker = new KinesisCheckpointerTracker(); + + ExtendedSequenceNumber extendedSequenceNumber = mock(ExtendedSequenceNumber.class); + assertThrows(IllegalArgumentException.class, () -> kinesisCheckpointerTracker.markSequenceNumberForCheckpoint(extendedSequenceNumber)); + + Optional checkpointer = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord(); + assertTrue(checkpointer.isEmpty()); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java index 87ae19031a..902a70bfa6 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java @@ -15,20 +15,23 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; -import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.converter.KinesisRecordConverter; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; @@ -39,21 +42,24 @@ import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.RecordProcessorCheckpointer; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyCollection; -import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -61,8 +67,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME; +import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME; +import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME; import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_CHECKPOINT_FAILURES; +import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_PROCESSED; import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_PROCESSING_ERRORS; import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_STREAM_TAG_KEY; @@ -71,6 +79,8 @@ public class KinesisRecordProcessorTest { private static final String shardId = "123"; private static final String streamId = "stream-1"; private static final String codec_plugin_name = "json"; + private static final String sequence_number = "10001"; + private static final Long sub_sequence_number = 1L; private static final Duration CHECKPOINT_INTERVAL = Duration.ofMillis(1000); private static final int NUMBER_OF_RECORDS_TO_ACCUMULATE = 10; @@ -96,9 +106,6 @@ public class KinesisRecordProcessorTest { @Mock private RecordProcessorCheckpointer checkpointer; - @Mock - private Buffer> buffer; - @Mock StreamIdentifier streamIdentifier; @@ -108,6 +115,9 @@ public class KinesisRecordProcessorTest { @Mock private AcknowledgementSet acknowledgementSet; + @Mock + private Counter recordProcessed; + @Mock private Counter recordProcessingErrors; @@ -115,7 +125,22 @@ public class KinesisRecordProcessorTest { private Counter checkpointFailures; @Mock - private Counter acknowledgementSetCallbackCounter; + private Counter acknowledgementSetSuccesses; + + @Mock + private Counter acknowledgementSetFailures; + + @Mock + private InputCodec codec; + + @Mock + private BufferAccumulator> bufferAccumulator; + + @Mock + private KinesisRecordConverter kinesisRecordConverter; + + @Mock + private KinesisCheckpointerTracker kinesisCheckpointerTracker; @BeforeEach public void setup() { @@ -123,6 +148,9 @@ public void setup() { pluginMetrics = mock(PluginMetrics.class); pluginFactory = mock(PluginFactory.class); acknowledgementSet = mock(AcknowledgementSet.class); + bufferAccumulator = mock(BufferAccumulator.class); + kinesisRecordConverter = mock(KinesisRecordConverter.class); + kinesisCheckpointerTracker = mock(KinesisCheckpointerTracker.class); when(initializationInput.shardId()).thenReturn(shardId); when(streamIdentifier.streamName()).thenReturn(streamId); @@ -132,73 +160,116 @@ public void setup() { when(pluginModel.getPluginSettings()).thenReturn(Collections.emptyMap()); when(kinesisSourceConfig.getCodec()).thenReturn(pluginModel); - InputCodec codec = mock(InputCodec.class); + codec = mock(InputCodec.class); when(pluginFactory.loadPlugin(eq(InputCodec.class), any())).thenReturn(codec); when(kinesisStreamConfig.getCheckPointInterval()).thenReturn(CHECKPOINT_INTERVAL); when(kinesisSourceConfig.getNumberOfRecordsToAccumulate()).thenReturn(NUMBER_OF_RECORDS_TO_ACCUMULATE); when(kinesisSourceConfig.getStreams()).thenReturn(List.of(kinesisStreamConfig)); when(processRecordsInput.checkpointer()).thenReturn(checkpointer); - when(pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName())).thenReturn(acknowledgementSetCallbackCounter); + when(pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME, KINESIS_STREAM_TAG_KEY, + streamIdentifier.streamName())).thenReturn(acknowledgementSetSuccesses); + when(pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME, KINESIS_STREAM_TAG_KEY, + streamIdentifier.streamName())).thenReturn(acknowledgementSetFailures); + + recordProcessed = mock(Counter.class); + when(pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSED, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName())).thenReturn(recordProcessed); + + recordProcessingErrors = mock(Counter.class); + when(pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName())).thenReturn(recordProcessingErrors); } @Test - void testProcessRecordsWithoutAcknowledgementsCheckpointsEnabled() + void testProcessRecordsWithoutAcknowledgementsWithCheckpointApplied() throws Exception { - when(processRecordsInput.records()).thenReturn(createInputKinesisClientRecords()); + List kinesisClientRecords = createInputKinesisClientRecords(); + when(processRecordsInput.records()).thenReturn(kinesisClientRecords); when(kinesisSourceConfig.isAcknowledgments()).thenReturn(false); - when(kinesisStreamConfig.isEnableCheckPoint()).thenReturn(true); + when(kinesisStreamConfig.getCheckPointInterval()).thenReturn(Duration.ofMillis(0)); when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); - kinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier); + List> records = new ArrayList<>(); + Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + Record record = new Record<>(event); + records.add(record); + when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); + + kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); + KinesisCheckpointerRecord kinesisCheckpointerRecord = mock(KinesisCheckpointerRecord.class); + ExtendedSequenceNumber extendedSequenceNumber = mock(ExtendedSequenceNumber.class); + when(extendedSequenceNumber.sequenceNumber()).thenReturn(sequence_number); + when(extendedSequenceNumber.subSequenceNumber()).thenReturn(sub_sequence_number); + when(kinesisCheckpointerRecord.getExtendedSequenceNumber()).thenReturn(extendedSequenceNumber); + when(kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord()).thenReturn(Optional.of(kinesisCheckpointerRecord)); + kinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); kinesisRecordProcessor.initialize(initializationInput); - Thread.sleep(2000); - kinesisRecordProcessor.processRecords(processRecordsInput); - verify(checkpointer).checkpoint(); - verify(buffer).writeAll(anyCollection(), anyInt()); + verify(checkpointer).checkpoint(eq(sequence_number), eq(sub_sequence_number)); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + + verify(bufferAccumulator).add(recordArgumentCaptor.capture()); + verify(bufferAccumulator).flush(); + + List> recordsCaptured = recordArgumentCaptor.getAllValues(); + assertEquals(recordsCaptured.size(), records.size()); + for (Record eventRecord: recordsCaptured) { + EventMetadata eventMetadata = eventRecord.getData().getMetadata(); + assertEquals(eventMetadata.getAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE), streamIdentifier.streamName()); + } + verify(acknowledgementSetManager, times(0)).create(any(), any(Duration.class)); + verify(recordProcessed, times(1)).increment(anyDouble()); } @Test - void testProcessRecordsWithAcknowledgementsCheckpointsEnabled() + public void testProcessRecordsWithoutAcknowledgementsEnabled() throws Exception { List kinesisClientRecords = createInputKinesisClientRecords(); when(processRecordsInput.records()).thenReturn(kinesisClientRecords); - when(kinesisSourceConfig.isAcknowledgments()).thenReturn(true); - when(kinesisStreamConfig.isEnableCheckPoint()).thenReturn(false); - AtomicReference numEventsAdded = new AtomicReference<>(0); - doAnswer(a -> { - numEventsAdded.getAndSet(numEventsAdded.get() + 1); - return null; - }).when(acknowledgementSet).add(any()); + when(kinesisSourceConfig.isAcknowledgments()).thenReturn(false); + when(kinesisStreamConfig.getCheckPointInterval()).thenReturn(Duration.ofMillis(0)); + when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); - doAnswer(invocation -> { - Consumer consumer = invocation.getArgument(0); - consumer.accept(true); - return acknowledgementSet; - }).when(acknowledgementSetManager).create(any(Consumer.class), any(Duration.class)); + List> records = new ArrayList<>(); + Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + Record record = new Record<>(event); + records.add(record); + when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); - kinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier); + kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); + when(kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord()).thenReturn(Optional.empty()); + kinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); kinesisRecordProcessor.initialize(initializationInput); - Thread.sleep(2000); - kinesisRecordProcessor.processRecords(processRecordsInput); - verify(checkpointer).checkpoint(); - verify(buffer).writeAll(anyCollection(), anyInt()); - verify(acknowledgementSetManager, times(1)).create(any(), any(Duration.class)); + verifyNoInteractions(checkpointer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + + verify(bufferAccumulator).add(recordArgumentCaptor.capture()); + verify(bufferAccumulator).flush(); + + List> recordsCaptured = recordArgumentCaptor.getAllValues(); + assertEquals(recordsCaptured.size(), records.size()); + for (Record eventRecord: recordsCaptured) { + EventMetadata eventMetadata = eventRecord.getData().getMetadata(); + assertEquals(eventMetadata.getAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE), streamIdentifier.streamName()); + } + + verify(acknowledgementSetManager, times(0)).create(any(), any(Duration.class)); + verify(recordProcessed, times(1)).increment(anyDouble()); } @Test - void testProcessRecordsWithAcknowledgementsEnabledAndAcksReturnFalse() + void testProcessRecordsWithAcknowledgementsEnabled() throws Exception { List kinesisClientRecords = createInputKinesisClientRecords(); when(processRecordsInput.records()).thenReturn(kinesisClientRecords); when(kinesisSourceConfig.isAcknowledgments()).thenReturn(true); - when(kinesisStreamConfig.isEnableCheckPoint()).thenReturn(false); + when(kinesisStreamConfig.getCheckPointInterval()).thenReturn(Duration.ofMillis(0)); AtomicReference numEventsAdded = new AtomicReference<>(0); doAnswer(a -> { numEventsAdded.getAndSet(numEventsAdded.get() + 1); @@ -207,28 +278,52 @@ void testProcessRecordsWithAcknowledgementsEnabledAndAcksReturnFalse() doAnswer(invocation -> { Consumer consumer = invocation.getArgument(0); - consumer.accept(false); + consumer.accept(true); return acknowledgementSet; }).when(acknowledgementSetManager).create(any(Consumer.class), any(Duration.class)); - kinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier); + List> records = new ArrayList<>(); + Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + Record record = new Record<>(event); + records.add(record); + when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); + + kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); + KinesisCheckpointerRecord kinesisCheckpointerRecord = mock(KinesisCheckpointerRecord.class); + ExtendedSequenceNumber extendedSequenceNumber = mock(ExtendedSequenceNumber.class); + when(extendedSequenceNumber.sequenceNumber()).thenReturn(sequence_number); + when(extendedSequenceNumber.subSequenceNumber()).thenReturn(sub_sequence_number); + when(kinesisCheckpointerRecord.getExtendedSequenceNumber()).thenReturn(extendedSequenceNumber); + when(kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord()).thenReturn(Optional.of(kinesisCheckpointerRecord)); + kinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); kinesisRecordProcessor.initialize(initializationInput); - Thread.sleep(2000); - kinesisRecordProcessor.processRecords(processRecordsInput); - verify(checkpointer, times(0)).checkpoint(); - verify(buffer).writeAll(anyCollection(), anyInt()); + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + + verify(bufferAccumulator).add(recordArgumentCaptor.capture()); + verify(bufferAccumulator).flush(); + + List> recordsCaptured = recordArgumentCaptor.getAllValues(); + assertEquals(recordsCaptured.size(), records.size()); + for (Record eventRecord: recordsCaptured) { + EventMetadata eventMetadata = eventRecord.getData().getMetadata(); + assertEquals(eventMetadata.getAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE), streamIdentifier.streamName()); + } verify(acknowledgementSetManager, times(1)).create(any(), any(Duration.class)); + verify(acknowledgementSetSuccesses, atLeastOnce()).increment(); + verify(recordProcessed, times(1)).increment(anyDouble()); + verifyNoInteractions(recordProcessingErrors); } @Test void testProcessRecordsWithNDJsonInputCodec() throws Exception { - when(processRecordsInput.records()).thenReturn(createInputKinesisClientRecords()); + List kinesisClientRecords = createInputKinesisClientRecords(); + when(processRecordsInput.records()).thenReturn(kinesisClientRecords); when(kinesisSourceConfig.isAcknowledgments()).thenReturn(false); - when(kinesisStreamConfig.isEnableCheckPoint()).thenReturn(true); + when(kinesisStreamConfig.getCheckPointInterval()).thenReturn(Duration.ofMillis(0)); PluginModel pluginModel = mock(PluginModel.class); when(pluginModel.getPluginName()).thenReturn("ndjson"); @@ -239,79 +334,96 @@ void testProcessRecordsWithNDJsonInputCodec() when(pluginFactory.loadPlugin(eq(InputCodec.class), any())).thenReturn(codec); when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); - kinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier); + + List> records = new ArrayList<>(); + Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + Record record = new Record<>(event); + records.add(record); + when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); + + kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); + kinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); + KinesisCheckpointerRecord kinesisCheckpointerRecord = mock(KinesisCheckpointerRecord.class); + ExtendedSequenceNumber extendedSequenceNumber = mock(ExtendedSequenceNumber.class); + when(extendedSequenceNumber.sequenceNumber()).thenReturn(sequence_number); + when(extendedSequenceNumber.subSequenceNumber()).thenReturn(sub_sequence_number); + when(kinesisCheckpointerRecord.getExtendedSequenceNumber()).thenReturn(extendedSequenceNumber); + when(kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord()).thenReturn(Optional.of(kinesisCheckpointerRecord)); kinesisRecordProcessor.initialize(initializationInput); - Thread.sleep(2000); kinesisRecordProcessor.processRecords(processRecordsInput); - verify(checkpointer).checkpoint(); - verify(buffer).writeAll(anyCollection(), anyInt()); + verify(checkpointer).checkpoint(eq(sequence_number), eq(sub_sequence_number)); + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + + verify(bufferAccumulator).add(recordArgumentCaptor.capture()); + + List> recordsCaptured = recordArgumentCaptor.getAllValues(); + assertEquals(recordsCaptured.size(), records.size()); + for (Record eventRecord: recordsCaptured) { + EventMetadata eventMetadata = eventRecord.getData().getMetadata(); + assertEquals(eventMetadata.getAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE), streamIdentifier.streamName()); + } + verify(acknowledgementSetManager, times(0)).create(any(), any(Duration.class)); + verify(recordProcessed, times(1)).increment(anyDouble()); } - @Test void testProcessRecordsNoThrowException() throws Exception { - when(processRecordsInput.records()).thenReturn(createInputKinesisClientRecords()); + List kinesisClientRecords = createInputKinesisClientRecords(); + when(processRecordsInput.records()).thenReturn(kinesisClientRecords); when(kinesisSourceConfig.isAcknowledgments()).thenReturn(false); - when(kinesisStreamConfig.isEnableCheckPoint()).thenReturn(false); + List> records = new ArrayList<>(); + Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + Record record = new Record<>(event); + records.add(record); + when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); final Throwable exception = mock(RuntimeException.class); - doThrow(exception).when(buffer).writeAll(any(), anyInt()); - - recordProcessingErrors = mock(Counter.class); - when(pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName())).thenReturn(recordProcessingErrors); + doThrow(exception).when(bufferAccumulator).add(any(Record.class)); - kinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier); + kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); + kinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); kinesisRecordProcessor.initialize(initializationInput); assertDoesNotThrow(() -> kinesisRecordProcessor.processRecords(processRecordsInput)); verify(recordProcessingErrors, times(1)).increment(); + verify(recordProcessed, times(0)).increment(anyDouble()); } @Test - void testProcessRecordsWithoutAcknowledgementsAndCheckpoints() + void testProcessRecordsBufferFlushNoThrowException() throws Exception { - when(processRecordsInput.records()).thenReturn(createInputKinesisClientRecords()); + List kinesisClientRecords = createInputKinesisClientRecords(); + when(processRecordsInput.records()).thenReturn(kinesisClientRecords); when(kinesisSourceConfig.isAcknowledgments()).thenReturn(false); - when(kinesisStreamConfig.isEnableCheckPoint()).thenReturn(false); - when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); - - kinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier); - kinesisRecordProcessor.initialize(initializationInput); - - kinesisRecordProcessor.processRecords(processRecordsInput); - verifyNoInteractions(checkpointer); - verify(buffer).writeAll(anyCollection(), anyInt()); - verify(acknowledgementSetManager, times(0)).create(any(), any(Duration.class)); - } - - @Test - void testProcessRecordsWithAcknowledgements() - throws Exception { - when(processRecordsInput.records()).thenReturn(createInputKinesisClientRecords()); - when(kinesisSourceConfig.isAcknowledgments()).thenReturn(true); - when(kinesisStreamConfig.isEnableCheckPoint()).thenReturn(false); - when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); + List> records = new ArrayList<>(); + Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + Record record = new Record<>(event); + records.add(record); + when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records); + final Throwable exception = mock(RuntimeException.class); + doThrow(exception).when(bufferAccumulator).flush(); - kinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier); + kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); + kinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); kinesisRecordProcessor.initialize(initializationInput); - kinesisRecordProcessor.processRecords(processRecordsInput); + assertDoesNotThrow(() -> kinesisRecordProcessor.processRecords(processRecordsInput)); + verify(recordProcessingErrors, times(1)).increment(); + verify(recordProcessed, times(0)).increment(anyDouble()); - verifyNoInteractions(checkpointer); - verify(buffer).writeAll(anyCollection(), anyInt()); - verify(acknowledgementSetManager, times(1)).create(any(), any(Duration.class)); } @Test - void testShardEndedFlushCalled() throws Exception { - KinesisRecordProcessor mockKinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier); + void testShardEndedLatestCheckpoint() { + KinesisRecordProcessor mockKinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); ShardEndedInput shardEndedInput = mock(ShardEndedInput.class); when(shardEndedInput.checkpointer()).thenReturn(checkpointer); + mockKinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); mockKinesisRecordProcessor.shardEnded(shardEndedInput); @@ -324,7 +436,8 @@ void testShardEndedCheckpointerThrowsNoThrowException(final Class exc checkpointFailures = mock(Counter.class); when(pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName())).thenReturn(checkpointFailures); - KinesisRecordProcessor mockKinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier); + KinesisRecordProcessor mockKinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); + mockKinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); ShardEndedInput shardEndedInput = mock(ShardEndedInput.class); when(shardEndedInput.checkpointer()).thenReturn(checkpointer); doThrow(exceptionType).when(checkpointer).checkpoint(); @@ -337,11 +450,12 @@ void testShardEndedCheckpointerThrowsNoThrowException(final Class exc } @Test - void testShutdownRequested() { + void testShutdownRequestedWithLatestCheckpoint() { checkpointFailures = mock(Counter.class); when(pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName())).thenReturn(checkpointFailures); - KinesisRecordProcessor mockKinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier); + KinesisRecordProcessor mockKinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); + mockKinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); ShutdownRequestedInput shutdownRequestedInput = mock(ShutdownRequestedInput.class); when(shutdownRequestedInput.checkpointer()).thenReturn(checkpointer); @@ -357,7 +471,24 @@ void testShutdownRequestedCheckpointerThrowsNoThrowException(final Class mockKinesisRecordProcessor.checkpoint(checkpointer, sequence_number, sub_sequence_number)); + + verify(checkpointer).checkpoint(eq(sequence_number), eq(sub_sequence_number)); + verify(checkpointFailures, times(1)).increment(); + } + + @ParameterizedTest + @ValueSource(classes = {ShutdownException.class, ThrottlingException.class, InvalidStateException.class}) + void testShutdownRequestedCheckpointerThrowsNoThrowExceptionRegularCheckpoint(final Class exceptionType) throws Exception { + checkpointFailures = mock(Counter.class); + when(pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName())).thenReturn(checkpointFailures); + + KinesisRecordProcessor mockKinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier); + mockKinesisRecordProcessor.setKinesisCheckpointerTracker(kinesisCheckpointerTracker); ShutdownRequestedInput shutdownRequestedInput = mock(ShutdownRequestedInput.class); when(shutdownRequestedInput.checkpointer()).thenReturn(checkpointer); doThrow(exceptionType).when(checkpointer).checkpoint(); @@ -373,7 +504,9 @@ private List createInputKinesisClientRecords() { List kinesisClientRecords = new ArrayList<>(); for (int i = 0; i< KinesisRecordProcessorTest.NUMBER_OF_RECORDS_TO_ACCUMULATE; i++) { Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder().data(ByteBuffer.wrap(event.toJsonString().getBytes())).build(); + KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder() + .data(ByteBuffer.wrap(event.toJsonString().getBytes())) + .sequenceNumber(Integer.toString(100 + i)).subSequenceNumber(i).build(); kinesisClientRecords.add(kinesisClientRecord); } return kinesisClientRecords; diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactoryTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactoryTest.java index 1a74514b6b..9f0a555253 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactoryTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactoryTest.java @@ -63,6 +63,9 @@ public class KinesisShardRecordProcessorFactoryTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private InputCodec codec; + @BeforeEach void setup() { MockitoAnnotations.initMocks(this); @@ -71,8 +74,9 @@ void setup() { when(pluginModel.getPluginName()).thenReturn(codec_plugin_name); when(pluginModel.getPluginSettings()).thenReturn(Collections.emptyMap()); when(kinesisSourceConfig.getCodec()).thenReturn(pluginModel); + when(kinesisSourceConfig.getNumberOfRecordsToAccumulate()).thenReturn(100); - InputCodec codec = mock(InputCodec.class); + codec = mock(InputCodec.class); when(pluginFactory.loadPlugin(eq(InputCodec.class), any())).thenReturn(codec); when(streamIdentifier.streamName()).thenReturn(streamId); @@ -82,13 +86,13 @@ void setup() { @Test void testKinesisRecordProcessFactoryReturnsKinesisRecordProcessor() { - kinesisShardRecordProcessorFactory = new KinesisShardRecordProcessorFactory(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory); + kinesisShardRecordProcessorFactory = new KinesisShardRecordProcessorFactory(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, codec); assertInstanceOf(KinesisRecordProcessor.class, kinesisShardRecordProcessorFactory.shardRecordProcessor(streamIdentifier)); } @Test void testKinesisRecordProcessFactoryDefaultUnsupported() { - kinesisShardRecordProcessorFactory = new KinesisShardRecordProcessorFactory(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory); + kinesisShardRecordProcessorFactory = new KinesisShardRecordProcessorFactory(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, codec); assertThrows(UnsupportedOperationException.class, () -> kinesisShardRecordProcessorFactory.shardRecordProcessor()); } } diff --git a/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_checkpoint_enabled.yaml b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_checkpoint_enabled.yaml index e918048529..c8b58725fd 100644 --- a/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_checkpoint_enabled.yaml +++ b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_checkpoint_enabled.yaml @@ -3,15 +3,12 @@ source: streams: - stream_name: "stream-1" initial_position: "EARLIEST" - enable_checkpoint: true checkpoint_interval: "20s" - stream_name: "stream-2" initial_position: "EARLIEST" - enable_checkpoint: true checkpoint_interval: "PT15M" - stream_name: "stream-3" initial_position: "EARLIEST" - enable_checkpoint: true checkpoint_interval: "PT2H" codec: ndjson: