diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java index 005fee88fd..48d6859ba4 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java @@ -8,6 +8,7 @@ import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.network.SSLMode; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventFactory; @@ -50,6 +51,7 @@ public class RdsService { private final EventFactory eventFactory; private final PluginMetrics pluginMetrics; private final RdsSourceConfig sourceConfig; + private final AcknowledgementSetManager acknowledgementSetManager; private ExecutorService executor; private LeaderScheduler leaderScheduler; private ExportScheduler exportScheduler; @@ -60,11 +62,13 @@ public RdsService(final EnhancedSourceCoordinator sourceCoordinator, final RdsSourceConfig sourceConfig, final EventFactory eventFactory, final ClientFactory clientFactory, - final PluginMetrics pluginMetrics) { + final PluginMetrics pluginMetrics, + final AcknowledgementSetManager acknowledgementSetManager) { this.sourceCoordinator = sourceCoordinator; this.eventFactory = eventFactory; this.pluginMetrics = pluginMetrics; this.sourceConfig = sourceConfig; + this.acknowledgementSetManager = acknowledgementSetManager; rdsClient = clientFactory.buildRdsClient(); s3Client = clientFactory.buildS3Client(); @@ -94,7 +98,7 @@ public void start(Buffer> buffer) { exportScheduler = new ExportScheduler( sourceCoordinator, snapshotManager, exportTaskManager, s3Client, pluginMetrics); dataFileScheduler = new DataFileScheduler( - sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics); + sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics, acknowledgementSetManager); runnableList.add(exportScheduler); runnableList.add(dataFileScheduler); } @@ -106,7 +110,8 @@ public void start(Buffer> buffer) { } else { binaryLogClient.setSSLMode(SSLMode.DISABLED); } - streamScheduler = new StreamScheduler(sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics); + streamScheduler = new StreamScheduler( + sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics, acknowledgementSetManager); runnableList.add(streamScheduler); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java index 071fc5889b..5c2b08fb5c 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; @@ -35,6 +36,7 @@ public class RdsSource implements Source>, UsesEnhancedSourceCoord private final PluginMetrics pluginMetrics; private final RdsSourceConfig sourceConfig; private final EventFactory eventFactory; + private final AcknowledgementSetManager acknowledgementSetManager; private EnhancedSourceCoordinator sourceCoordinator; private RdsService rdsService; @@ -42,10 +44,12 @@ public class RdsSource implements Source>, UsesEnhancedSourceCoord public RdsSource(final PluginMetrics pluginMetrics, final RdsSourceConfig sourceConfig, final EventFactory eventFactory, - final AwsCredentialsSupplier awsCredentialsSupplier) { + final AwsCredentialsSupplier awsCredentialsSupplier, + final AcknowledgementSetManager acknowledgementSetManager) { this.pluginMetrics = pluginMetrics; this.sourceConfig = sourceConfig; this.eventFactory = eventFactory; + this.acknowledgementSetManager = acknowledgementSetManager; clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig()); } @@ -56,7 +60,7 @@ public void start(Buffer> buffer) { Objects.requireNonNull(sourceCoordinator); sourceCoordinator.createPartition(new LeaderPartition()); - rdsService = new RdsService(sourceCoordinator, sourceConfig, eventFactory, clientFactory, pluginMetrics); + rdsService = new RdsService(sourceCoordinator, sourceConfig, eventFactory, clientFactory, pluginMetrics, acknowledgementSetManager); LOG.info("Start RDS service"); rdsService.start(buffer); @@ -80,4 +84,9 @@ public void setEnhancedSourceCoordinator(EnhancedSourceCoordinator sourceCoordin public Function getPartitionFactory() { return new PartitionFactory(); } + + @Override + public boolean areAcknowledgementsEnabled() { + return sourceConfig.isAcknowledgmentsEnabled(); + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java index 548dc4a2fb..a1cb8c7e2f 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.plugins.source.rds.configuration.StreamConfig; import org.opensearch.dataprepper.plugins.source.rds.configuration.TlsConfig; +import java.time.Duration; import java.util.List; /** @@ -56,6 +57,12 @@ public class RdsSourceConfig { @JsonProperty("acknowledgments") private boolean acknowledgments = false; + @JsonProperty("s3_data_file_acknowledgment_timeout") + private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(30); + + @JsonProperty("stream_acknowledgment_timeout") + private Duration streamAcknowledgmentTimeout = Duration.ofMinutes(10); + @JsonProperty("s3_bucket") private String s3Bucket; @@ -106,6 +113,14 @@ public boolean isAcknowledgmentsEnabled() { return acknowledgments; } + public Duration getDataFileAcknowledgmentTimeout() { + return dataFileAcknowledgmentTimeout; + } + + public Duration getStreamAcknowledgmentTimeout() { + return streamAcknowledgmentTimeout; + } + public String getS3Bucket() { return s3Bucket; } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/StreamProgressState.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/StreamProgressState.java index 21873179da..81a3c7f5ac 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/StreamProgressState.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/StreamProgressState.java @@ -10,23 +10,12 @@ public class StreamProgressState { - @JsonProperty("startPosition") - private BinlogCoordinate startPosition; - @JsonProperty("currentPosition") private BinlogCoordinate currentPosition; @JsonProperty("waitForExport") private boolean waitForExport = false; - public BinlogCoordinate getStartPosition() { - return startPosition; - } - - public void setStartPosition(BinlogCoordinate startPosition) { - this.startPosition = startPosition; - } - public BinlogCoordinate getCurrentPosition() { return currentPosition; } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java index c6815e37b7..64c613bc43 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java @@ -9,9 +9,12 @@ import io.micrometer.core.instrument.DistributionSummary; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState; @@ -30,6 +33,8 @@ public class DataFileLoader implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(DataFileLoader.class); static final Duration VERSION_OVERLAP_TIME_FOR_EXPORT = Duration.ofMinutes(5); + static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); + static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; static final String EXPORT_RECORDS_TOTAL_COUNT = "exportRecordsTotal"; static final String EXPORT_RECORDS_PROCESSED_COUNT = "exportRecordsProcessed"; static final String EXPORT_RECORDS_PROCESSING_ERROR_COUNT = "exportRecordsProcessingErrors"; @@ -41,8 +46,11 @@ public class DataFileLoader implements Runnable { private final String objectKey; private final S3ObjectReader objectReader; private final InputCodec codec; - private final BufferAccumulator> bufferAccumulator; + private final Buffer> buffer; private final ExportRecordConverter recordConverter; + private final EnhancedSourceCoordinator sourceCoordinator; + private final AcknowledgementSet acknowledgementSet; + private final Duration acknowledgmentTimeout; private final Counter exportRecordsTotalCounter; private final Counter exportRecordSuccessCounter; private final Counter exportRecordErrorCounter; @@ -51,17 +59,23 @@ public class DataFileLoader implements Runnable { private DataFileLoader(final DataFilePartition dataFilePartition, final InputCodec codec, - final BufferAccumulator> bufferAccumulator, + final Buffer> buffer, final S3ObjectReader objectReader, final ExportRecordConverter recordConverter, - final PluginMetrics pluginMetrics) { + final PluginMetrics pluginMetrics, + final EnhancedSourceCoordinator sourceCoordinator, + final AcknowledgementSet acknowledgementSet, + final Duration acknowledgmentTimeout) { this.dataFilePartition = dataFilePartition; bucket = dataFilePartition.getBucket(); objectKey = dataFilePartition.getKey(); this.objectReader = objectReader; this.codec = codec; - this.bufferAccumulator = bufferAccumulator; + this.buffer = buffer; this.recordConverter = recordConverter; + this.sourceCoordinator = sourceCoordinator; + this.acknowledgementSet = acknowledgementSet; + this.acknowledgmentTimeout = acknowledgmentTimeout; exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT); exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT); @@ -72,17 +86,23 @@ private DataFileLoader(final DataFilePartition dataFilePartition, public static DataFileLoader create(final DataFilePartition dataFilePartition, final InputCodec codec, - final BufferAccumulator> bufferAccumulator, + final Buffer> buffer, final S3ObjectReader objectReader, final ExportRecordConverter recordConverter, - final PluginMetrics pluginMetrics) { - return new DataFileLoader(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter, pluginMetrics); + final PluginMetrics pluginMetrics, + final EnhancedSourceCoordinator sourceCoordinator, + final AcknowledgementSet acknowledgementSet, + final Duration acknowledgmentTimeout) { + return new DataFileLoader(dataFilePartition, codec, buffer, objectReader, recordConverter, + pluginMetrics, sourceCoordinator, acknowledgementSet, acknowledgmentTimeout); } @Override public void run() { LOG.info(SENSITIVE, "Start loading s3://{}/{}", bucket, objectKey); + final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); + AtomicLong eventCount = new AtomicLong(); try (InputStream inputStream = objectReader.readFile(bucket, objectKey)) { codec.parse(inputStream, record -> { @@ -100,15 +120,19 @@ public void run() { final long snapshotTime = progressState.getSnapshotTime(); final long eventVersionNumber = snapshotTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis(); - Record transformedRecord = new Record<>( - recordConverter.convert( - record, - progressState.getSourceDatabase(), - progressState.getSourceTable(), - primaryKeys, - snapshotTime, - eventVersionNumber)); - bufferAccumulator.add(transformedRecord); + final Event transformedEvent = recordConverter.convert( + record, + progressState.getSourceDatabase(), + progressState.getSourceTable(), + primaryKeys, + snapshotTime, + eventVersionNumber); + + if (acknowledgementSet != null) { + acknowledgementSet.add(transformedEvent); + } + + bufferAccumulator.add(new Record<>(transformedEvent)); eventCount.getAndIncrement(); bytesProcessedSummary.record(bytes); } catch (Exception e) { @@ -125,6 +149,10 @@ public void run() { try { bufferAccumulator.flush(); + if (acknowledgementSet != null) { + sourceCoordinator.saveProgressStateForPartition(dataFilePartition, acknowledgmentTimeout); + acknowledgementSet.complete(); + } exportRecordSuccessCounter.increment(eventCount.get()); } catch (Exception e) { LOG.error("Failed to write events to buffer", e); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java index f766aec3d2..33c17d9d80 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java @@ -6,8 +6,9 @@ package org.opensearch.dataprepper.plugins.source.rds.export; import io.micrometer.core.instrument.Counter; -import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; @@ -32,6 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import static org.opensearch.dataprepper.plugins.source.rds.RdsService.DATA_LOADER_MAX_JOB_COUNT; @@ -48,8 +50,7 @@ public class DataFileScheduler implements Runnable { private static final Duration DEFAULT_UPDATE_LOAD_STATUS_TIMEOUT = Duration.ofMinutes(30); - static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); - static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; + static final String EXPORT_S3_OBJECTS_PROCESSED_COUNT = "exportS3ObjectsProcessed"; static final String EXPORT_S3_OBJECTS_ERROR_COUNT = "exportS3ObjectsErrors"; static final String ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE = "activeExportS3ObjectConsumers"; @@ -60,9 +61,10 @@ public class DataFileScheduler implements Runnable { private final RdsSourceConfig sourceConfig; private final S3ObjectReader objectReader; private final InputCodec codec; - private final BufferAccumulator> bufferAccumulator; private final ExportRecordConverter recordConverter; + private final Buffer> buffer; private final PluginMetrics pluginMetrics; + private final AcknowledgementSetManager acknowledgementSetManager; private final Counter exportFileSuccessCounter; private final Counter exportFileErrorCounter; @@ -75,15 +77,17 @@ public DataFileScheduler(final EnhancedSourceCoordinator sourceCoordinator, final S3Client s3Client, final EventFactory eventFactory, final Buffer> buffer, - final PluginMetrics pluginMetrics) { + final PluginMetrics pluginMetrics, + final AcknowledgementSetManager acknowledgementSetManager) { this.sourceCoordinator = sourceCoordinator; this.sourceConfig = sourceConfig; codec = new ParquetInputCodec(eventFactory); - bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); objectReader = new S3ObjectReader(s3Client); recordConverter = new ExportRecordConverter(); executor = Executors.newFixedThreadPool(DATA_LOADER_MAX_JOB_COUNT); + this.buffer = buffer; this.pluginMetrics = pluginMetrics; + this.acknowledgementSetManager = acknowledgementSetManager; this.exportFileSuccessCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT); this.exportFileErrorCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_ERROR_COUNT); @@ -133,23 +137,39 @@ public void shutdown() { } private void processDataFilePartition(DataFilePartition dataFilePartition) { + // Create AcknowledgmentSet + final boolean isAcknowledgmentsEnabled = sourceConfig.isAcknowledgmentsEnabled(); + AcknowledgementSet acknowledgementSet = null; + if (sourceConfig.isAcknowledgmentsEnabled()) { + acknowledgementSet = acknowledgementSetManager.create((result) -> { + if (result) { + completeDataLoader(dataFilePartition).accept(null, null); + LOG.info("Received acknowledgment of completion from sink for data file {}", dataFilePartition.getKey()); + } else { + exportFileErrorCounter.increment(); + LOG.warn("Negative acknowledgment received for data file {}, retrying", dataFilePartition.getKey()); + sourceCoordinator.giveUpPartition(dataFilePartition); + } + }, sourceConfig.getDataFileAcknowledgmentTimeout()); + } + Runnable loader = DataFileLoader.create( - dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter, pluginMetrics); + dataFilePartition, codec, buffer, objectReader, recordConverter, pluginMetrics, + sourceCoordinator, acknowledgementSet, sourceConfig.getDataFileAcknowledgmentTimeout()); CompletableFuture runLoader = CompletableFuture.runAsync(loader, executor); - runLoader.whenComplete((v, ex) -> { - if (ex == null) { - exportFileSuccessCounter.increment(); - // Update global state so we know if all s3 files have been loaded - updateLoadStatus(dataFilePartition.getExportTaskId(), DEFAULT_UPDATE_LOAD_STATUS_TIMEOUT); - sourceCoordinator.completePartition(dataFilePartition); - } else { - exportFileErrorCounter.increment(); - LOG.error("There was an exception while processing an S3 data file", ex); - sourceCoordinator.giveUpPartition(dataFilePartition); - } - numOfWorkers.decrementAndGet(); - }); + if (isAcknowledgmentsEnabled) { + runLoader.whenComplete((v, ex) -> { + if (ex != null) { + exportFileErrorCounter.increment(); + LOG.error("There was an exception while processing an S3 data file: {}", ex); + sourceCoordinator.giveUpPartition(dataFilePartition); + } + numOfWorkers.decrementAndGet(); + }); + } else { + runLoader.whenComplete(completeDataLoader(dataFilePartition)); + } numOfWorkers.incrementAndGet(); } @@ -183,4 +203,20 @@ private void updateLoadStatus(String exportTaskId, Duration timeout) { } } } + + private BiConsumer completeDataLoader(DataFilePartition dataFilePartition) { + return (v, ex) -> { + if (ex == null) { + exportFileSuccessCounter.increment(); + // Update global state, so we know if all s3 files have been loaded + updateLoadStatus(dataFilePartition.getExportTaskId(), DEFAULT_UPDATE_LOAD_STATUS_TIMEOUT); + sourceCoordinator.completePartition(dataFilePartition); + } else { + exportFileErrorCounter.increment(); + LOG.error("There was an exception while processing an S3 data file", ex); + sourceCoordinator.giveUpPartition(dataFilePartition); + } + numOfWorkers.decrementAndGet(); + }; + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java index 882fb0242d..4fc00de3a5 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java @@ -148,12 +148,14 @@ private Map> getPrimaryKeyMap() { private void createStreamPartition(RdsSourceConfig sourceConfig) { final StreamProgressState progressState = new StreamProgressState(); progressState.setWaitForExport(sourceConfig.isExportEnabled()); - getCurrentBinlogPosition().ifPresent(progressState::setStartPosition); + getCurrentBinlogPosition().ifPresent(progressState::setCurrentPosition); StreamPartition streamPartition = new StreamPartition(sourceConfig.getDbIdentifier(), progressState); sourceCoordinator.createPartition(streamPartition); } private Optional getCurrentBinlogPosition() { - return schemaManager.getCurrentBinaryLogPosition(); + Optional binlogCoordinate = schemaManager.getCurrentBinaryLogPosition(); + LOG.debug("Current binlog position: {}", binlogCoordinate.orElse(null)); + return binlogCoordinate; } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/BinlogCoordinate.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/BinlogCoordinate.java index 6818dabe9b..2b02de14b1 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/BinlogCoordinate.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/BinlogCoordinate.java @@ -30,4 +30,12 @@ public String getBinlogFilename() { public long getBinlogPosition() { return binlogPosition; } + + @Override + public String toString() { + return "BinlogCoordinate{" + + "binlogFilename='" + binlogFilename + '\'' + + ", binlogPosition=" + binlogPosition + + '}'; + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java index 487e9aefd0..181716a69a 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java @@ -7,7 +7,9 @@ import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; +import com.github.shyiko.mysql.binlog.event.EventHeaderV4; import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.RotateEventData; import com.github.shyiko.mysql.binlog.event.TableMapEventData; import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata; import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; @@ -16,18 +18,22 @@ import io.micrometer.core.instrument.DistributionSummary; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; import org.opensearch.dataprepper.plugins.source.rds.converter.StreamRecordConverter; +import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -52,25 +58,44 @@ public class BinlogEventListener implements BinaryLogClient.EventListener { private final Map tableMetadataMap; private final StreamRecordConverter recordConverter; + private final BinaryLogClient binaryLogClient; private final BufferAccumulator> bufferAccumulator; private final List tableNames; private final String s3Prefix; + private final boolean isAcknowledgmentsEnabled; private final PluginMetrics pluginMetrics; - + private final List pipelineEvents; + private final StreamCheckpointManager streamCheckpointManager; private final Counter changeEventSuccessCounter; private final Counter changeEventErrorCounter; private final DistributionSummary bytesReceivedSummary; private final DistributionSummary bytesProcessedSummary; + /** + * currentBinlogCoordinate is the coordinate where next event will start + */ + private BinlogCoordinate currentBinlogCoordinate; + public BinlogEventListener(final Buffer> buffer, final RdsSourceConfig sourceConfig, - final PluginMetrics pluginMetrics) { + final PluginMetrics pluginMetrics, + final BinaryLogClient binaryLogClient, + final StreamCheckpointer streamCheckpointer, + final AcknowledgementSetManager acknowledgementSetManager) { + this.binaryLogClient = binaryLogClient; tableMetadataMap = new HashMap<>(); recordConverter = new StreamRecordConverter(sourceConfig.getStream().getPartitionCount()); bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); s3Prefix = sourceConfig.getS3Prefix(); tableNames = sourceConfig.getTableNames(); + isAcknowledgmentsEnabled = sourceConfig.isAcknowledgmentsEnabled(); this.pluginMetrics = pluginMetrics; + pipelineEvents = new ArrayList<>(); + + this.streamCheckpointManager = new StreamCheckpointManager( + streamCheckpointer, sourceConfig.isAcknowledgmentsEnabled(), + acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout()); + streamCheckpointManager.start(); changeEventSuccessCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT); changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT); @@ -80,9 +105,12 @@ public BinlogEventListener(final Buffer> buffer, @Override public void onEvent(com.github.shyiko.mysql.binlog.event.Event event) { - EventType eventType = event.getHeader().getEventType(); + final EventType eventType = event.getHeader().getEventType(); switch (eventType) { + case ROTATE: + handleEventAndErrors(event, this::handleRotateEvent); + break; case TABLE_MAP: handleEventAndErrors(event, this::handleTableMapEvent); break; @@ -101,6 +129,27 @@ public void onEvent(com.github.shyiko.mysql.binlog.event.Event event) { } } + public void stopClient() { + try { + binaryLogClient.disconnect(); + LOG.info("Binary log client disconnected."); + } catch (Exception e) { + LOG.error("Binary log client failed to disconnect.", e); + } + } + + void handleRotateEvent(com.github.shyiko.mysql.binlog.event.Event event) { + final RotateEventData data = event.getData(); + currentBinlogCoordinate = new BinlogCoordinate(data.getBinlogFilename(), data.getBinlogPosition()); + + // Trigger a checkpoint update for this rotate when there're no row mutation events being processed + if (streamCheckpointManager.getChangeEventStatuses().isEmpty()) { + ChangeEventStatus changeEventStatus = streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate); + if (isAcknowledgmentsEnabled) { + changeEventStatus.setAcknowledgmentStatus(ChangeEventStatus.AcknowledgmentStatus.POSITIVE_ACK); + } + } + } void handleTableMapEvent(com.github.shyiko.mysql.binlog.event.Event event) { final TableMapEventData data = event.getData(); final TableMapEventMetadata tableMapEventMetadata = data.getEventMetadata(); @@ -116,109 +165,55 @@ void handleTableMapEvent(com.github.shyiko.mysql.binlog.event.Event event) { } void handleInsertEvent(com.github.shyiko.mysql.binlog.event.Event event) { - final long bytes = event.toString().getBytes().length; - bytesReceivedSummary.record(bytes); - LOG.debug("Handling insert event"); final WriteRowsEventData data = event.getData(); - if (!tableMetadataMap.containsKey(data.getTableId())) { - LOG.debug("Cannot find table metadata, the event is likely not from a table of interest or the table metadata was not read"); - return; - } - final TableMetadata tableMetadata = tableMetadataMap.get(data.getTableId()); - final String fullTableName = tableMetadata.getFullTableName(); - if (!isTableOfInterest(fullTableName)) { - LOG.debug("The event is not from a table of interest"); - return; - } - final List columnNames = tableMetadata.getColumnNames(); - final List primaryKeys = tableMetadata.getPrimaryKeys(); - final long eventTimestampMillis = event.getHeader().getTimestamp(); - - // Construct data prepper JacksonEvent - int eventCount = 0; - for (final Object[] rowDataArray : data.getRows()) { - final Map rowDataMap = new HashMap<>(); - for (int i = 0; i < rowDataArray.length; i++) { - rowDataMap.put(columnNames.get(i), rowDataArray[i]); - } - - Event pipelineEvent = recordConverter.convert( - rowDataMap, - tableMetadata.getDatabaseName(), - tableMetadata.getTableName(), - event.getHeader().getEventType(), - OpenSearchBulkActions.INDEX, - primaryKeys, - s3Prefix, - eventTimestampMillis, - eventTimestampMillis); - addToBuffer(new Record<>(pipelineEvent)); - eventCount++; - } - bytesProcessedSummary.record(bytes); - - flushBuffer(eventCount); + handleRowChangeEvent(event, data.getTableId(), data.getRows(), OpenSearchBulkActions.INDEX); } void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) { - final long bytes = event.toString().getBytes().length; - bytesReceivedSummary.record(bytes); - LOG.debug("Handling update event"); final UpdateRowsEventData data = event.getData(); - if (!tableMetadataMap.containsKey(data.getTableId())) { - return; - } - final TableMetadata tableMetadata = tableMetadataMap.get(data.getTableId()); - final String fullTableName = tableMetadata.getFullTableName(); - if (!isTableOfInterest(fullTableName)) { - LOG.debug("The event is not from a table of interest"); - return; - } - final List columnNames = tableMetadata.getColumnNames(); - final List primaryKeys = tableMetadata.getPrimaryKeys(); - final long eventTimestampMillis = event.getHeader().getTimestamp(); - int eventCount = 0; - for (Map.Entry updatedRow : data.getRows()) { - // updatedRow contains data before update as key and data after update as value - final Object[] rowData = updatedRow.getValue(); + // updatedRow contains data before update as key and data after update as value + final List rows = data.getRows().stream() + .map(Map.Entry::getValue) + .collect(Collectors.toList()); - final Map dataMap = new HashMap<>(); - for (int i = 0; i < rowData.length; i++) { - dataMap.put(columnNames.get(i), rowData[i]); - } + handleRowChangeEvent(event, data.getTableId(), rows, OpenSearchBulkActions.INDEX); + } - final Event pipelineEvent = recordConverter.convert( - dataMap, - tableMetadata.getDatabaseName(), - tableMetadata.getTableName(), - event.getHeader().getEventType(), - OpenSearchBulkActions.INDEX, - primaryKeys, - s3Prefix, - eventTimestampMillis, - eventTimestampMillis); - addToBuffer(new Record<>(pipelineEvent)); - eventCount++; - } - bytesProcessedSummary.record(bytes); + void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { + LOG.debug("Handling delete event"); + final DeleteRowsEventData data = event.getData(); - flushBuffer(eventCount); + handleRowChangeEvent(event, data.getTableId(), data.getRows(), OpenSearchBulkActions.DELETE); } - void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { + private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event, + long tableId, + List rows, + OpenSearchBulkActions bulkAction) { + + // Update binlog coordinate after it's first assigned in rotate event handler + if (currentBinlogCoordinate != null) { + final EventHeaderV4 eventHeader = event.getHeader(); + currentBinlogCoordinate = new BinlogCoordinate(currentBinlogCoordinate.getBinlogFilename(), eventHeader.getNextPosition()); + LOG.debug("Current binlog coordinate after receiving a row change event: " + currentBinlogCoordinate); + } + + AcknowledgementSet acknowledgementSet = null; + if (isAcknowledgmentsEnabled) { + acknowledgementSet = streamCheckpointManager.createAcknowledgmentSet(currentBinlogCoordinate); + } + final long bytes = event.toString().getBytes().length; bytesReceivedSummary.record(bytes); - LOG.debug("Handling delete event"); - final DeleteRowsEventData data = event.getData(); - if (!tableMetadataMap.containsKey(data.getTableId())) { + if (!tableMetadataMap.containsKey(tableId)) { LOG.debug("Cannot find table metadata, the event is likely not from a table of interest or the table metadata was not read"); return; } - final TableMetadata tableMetadata = tableMetadataMap.get(data.getTableId()); + final TableMetadata tableMetadata = tableMetadataMap.get(tableId); final String fullTableName = tableMetadata.getFullTableName(); if (!isTableOfInterest(fullTableName)) { LOG.debug("The event is not from a table of interest"); @@ -228,8 +223,7 @@ void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { final List primaryKeys = tableMetadata.getPrimaryKeys(); final long eventTimestampMillis = event.getHeader().getTimestamp(); - int eventCount = 0; - for (Object[] rowDataArray : data.getRows()) { + for (Object[] rowDataArray : rows) { final Map rowDataMap = new HashMap<>(); for (int i = 0; i < rowDataArray.length; i++) { rowDataMap.put(columnNames.get(i), rowDataArray[i]); @@ -240,24 +234,41 @@ void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { tableMetadata.getDatabaseName(), tableMetadata.getTableName(), event.getHeader().getEventType(), - OpenSearchBulkActions.DELETE, + bulkAction, primaryKeys, s3Prefix, eventTimestampMillis, eventTimestampMillis); - addToBuffer(new Record<>(pipelineEvent)); - eventCount++; + pipelineEvents.add(pipelineEvent); } + + writeToBuffer(acknowledgementSet); bytesProcessedSummary.record(bytes); - flushBuffer(eventCount); + if (isAcknowledgmentsEnabled) { + acknowledgementSet.complete(); + } else { + streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate); + } } private boolean isTableOfInterest(String tableName) { return new HashSet<>(tableNames).contains(tableName); } - private void addToBuffer(final Record record) { + private void writeToBuffer(AcknowledgementSet acknowledgementSet) { + for (Event pipelineEvent : pipelineEvents) { + addToBufferAccumulator(new Record<>(pipelineEvent)); + if (acknowledgementSet != null) { + acknowledgementSet.add(pipelineEvent); + } + } + + flushBufferAccumulator(pipelineEvents.size()); + pipelineEvents.clear(); + } + + private void addToBufferAccumulator(final Record record) { try { bufferAccumulator.add(record); } catch (Exception e) { @@ -265,11 +276,13 @@ private void addToBuffer(final Record record) { } } - private void flushBuffer(int eventCount) { + private void flushBufferAccumulator(int eventCount) { try { bufferAccumulator.flush(); changeEventSuccessCounter.increment(eventCount); } catch (Exception e) { + // this will only happen if writing to buffer gets interrupted from shutdown, + // otherwise bufferAccumulator will keep retrying with backoff LOG.error("Failed to flush buffer", e); changeEventErrorCounter.increment(eventCount); } @@ -280,7 +293,7 @@ private void handleEventAndErrors(com.github.shyiko.mysql.binlog.event.Event eve try { function.accept(event); } catch (Exception e) { - LOG.error("Failed to process change event", e); + LOG.error("Failed to process change event of type {}", event.getHeader().getEventType(), e); } } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/ChangeEventStatus.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/ChangeEventStatus.java new file mode 100644 index 0000000000..f2b70cbe7b --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/ChangeEventStatus.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.stream; + +import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; + +public class ChangeEventStatus { + + private final BinlogCoordinate binlogCoordinate; + private final long timestamp; + private volatile AcknowledgmentStatus acknowledgmentStatus; + + public enum AcknowledgmentStatus { + POSITIVE_ACK, + NEGATIVE_ACK, + NO_ACK + } + + public ChangeEventStatus(final BinlogCoordinate binlogCoordinate, final long timestamp) { + this.binlogCoordinate = binlogCoordinate; + this.timestamp = timestamp; + acknowledgmentStatus = AcknowledgmentStatus.NO_ACK; + } + + public void setAcknowledgmentStatus(final AcknowledgmentStatus acknowledgmentStatus) { + this.acknowledgmentStatus = acknowledgmentStatus; + } + + public AcknowledgmentStatus getAcknowledgmentStatus() { + return acknowledgmentStatus; + } + + public boolean isPositiveAcknowledgment() { + return acknowledgmentStatus == AcknowledgmentStatus.POSITIVE_ACK; + } + + public boolean isNegativeAcknowledgment() { + return acknowledgmentStatus == AcknowledgmentStatus.NEGATIVE_ACK; + } + + public BinlogCoordinate getBinlogCoordinate() { + return binlogCoordinate; + } + + public long getTimestamp() { + return timestamp; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManager.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManager.java new file mode 100644 index 0000000000..3827f2b822 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManager.java @@ -0,0 +1,133 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.stream; + +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class StreamCheckpointManager { + private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointManager.class); + static final int REGULAR_CHECKPOINT_INTERVAL_MILLIS = 60_000; + static final int CHANGE_EVENT_COUNT_PER_CHECKPOINT_BATCH = 1000; + + private final ConcurrentLinkedQueue changeEventStatuses = new ConcurrentLinkedQueue<>(); + private final StreamCheckpointer streamCheckpointer; + private final ExecutorService executorService; + private final Runnable stopStreamRunnable; + private final boolean isAcknowledgmentEnabled; + private final AcknowledgementSetManager acknowledgementSetManager; + private final Duration acknowledgmentTimeout; + + public StreamCheckpointManager(final StreamCheckpointer streamCheckpointer, + final boolean isAcknowledgmentEnabled, + final AcknowledgementSetManager acknowledgementSetManager, + final Runnable stopStreamRunnable, + final Duration acknowledgmentTimeout) { + this.acknowledgementSetManager = acknowledgementSetManager; + this.streamCheckpointer = streamCheckpointer; + this.isAcknowledgmentEnabled = isAcknowledgmentEnabled; + this.stopStreamRunnable = stopStreamRunnable; + this.acknowledgmentTimeout = acknowledgmentTimeout; + executorService = Executors.newSingleThreadExecutor(); + } + + public void start() { + executorService.submit(this::runCheckpointing); + } + + void runCheckpointing() { + ChangeEventStatus currentChangeEventStatus; + + while (!Thread.currentThread().isInterrupted()) { + try { + if (changeEventStatuses.isEmpty()) { + LOG.debug("No records processed. Extend the lease on stream partition."); + streamCheckpointer.extendLease(); + } else { + if (isAcknowledgmentEnabled) { + ChangeEventStatus lastChangeEventStatus = null; + currentChangeEventStatus = changeEventStatuses.peek(); + while (currentChangeEventStatus != null && currentChangeEventStatus.isPositiveAcknowledgment()) { + lastChangeEventStatus = currentChangeEventStatus; + currentChangeEventStatus = changeEventStatuses.poll(); + } + + if (lastChangeEventStatus != null) { + streamCheckpointer.checkpoint(lastChangeEventStatus.getBinlogCoordinate()); + } + + // If negative ack is seen, give up partition and exit loop to stop processing stream + if (currentChangeEventStatus != null && currentChangeEventStatus.isNegativeAcknowledgment()) { + LOG.info("Received negative acknowledgement for change event at {}. Will restart from most recent checkpoint", currentChangeEventStatus.getBinlogCoordinate()); + streamCheckpointer.giveUpPartition(); + break; + } + } else { + int changeEventCount = 0; + do { + currentChangeEventStatus = changeEventStatuses.poll(); + changeEventCount++; + // In case queue are populated faster than the poll, checkpoint when reaching certain count + if (changeEventCount % CHANGE_EVENT_COUNT_PER_CHECKPOINT_BATCH == 0) { + streamCheckpointer.checkpoint(currentChangeEventStatus.getBinlogCoordinate()); + } + } while (!changeEventStatuses.isEmpty()); + streamCheckpointer.checkpoint(currentChangeEventStatus.getBinlogCoordinate()); + } + } + } catch (Exception e) { + LOG.warn("Exception while checkpointing. The stream processing will start from previous checkpoint.", e); + break; + } + + try { + Thread.sleep(REGULAR_CHECKPOINT_INTERVAL_MILLIS); + } catch (InterruptedException ex) { + break; + } + } + + stopStreamRunnable.run(); + stop(); + } + + public void stop() { + executorService.shutdownNow(); + } + + public ChangeEventStatus saveChangeEventsStatus(BinlogCoordinate binlogCoordinate) { + final ChangeEventStatus changeEventStatus = new ChangeEventStatus(binlogCoordinate, Instant.now().toEpochMilli()); + changeEventStatuses.add(changeEventStatus); + return changeEventStatus; + } + + public AcknowledgementSet createAcknowledgmentSet(BinlogCoordinate binlogCoordinate) { + LOG.debug("Create acknowledgment set for events receive prior to {}", binlogCoordinate); + final ChangeEventStatus changeEventStatus = new ChangeEventStatus(binlogCoordinate, Instant.now().toEpochMilli()); + changeEventStatuses.add(changeEventStatus); + return acknowledgementSetManager.create((result) -> { + if (result) { + changeEventStatus.setAcknowledgmentStatus(ChangeEventStatus.AcknowledgmentStatus.POSITIVE_ACK); + } else { + changeEventStatus.setAcknowledgmentStatus(ChangeEventStatus.AcknowledgmentStatus.NEGATIVE_ACK); + } + }, acknowledgmentTimeout); + } + + //VisibleForTesting + ConcurrentLinkedQueue getChangeEventStatuses() { + return changeEventStatuses; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointer.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointer.java new file mode 100644 index 0000000000..b76dbab7c9 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointer.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.stream; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; +import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Optional; + +public class StreamCheckpointer { + + private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointer.class); + + static final Duration CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE = Duration.ofMinutes(5); + static final String CHECKPOINT_COUNT = "checkpoints"; + + private final EnhancedSourceCoordinator sourceCoordinator; + private final StreamPartition streamPartition; + private final PluginMetrics pluginMetrics; + private final Counter checkpointCounter; + + public StreamCheckpointer(final EnhancedSourceCoordinator sourceCoordinator, + final StreamPartition streamPartition, + final PluginMetrics pluginMetrics) { + this.sourceCoordinator = sourceCoordinator; + this.streamPartition = streamPartition; + this.pluginMetrics = pluginMetrics; + checkpointCounter = pluginMetrics.counter(CHECKPOINT_COUNT); + } + + public void checkpoint(final BinlogCoordinate binlogCoordinate) { + LOG.debug("Checkpointing stream partition {} with binlog coordinate {}", streamPartition.getPartitionKey(), binlogCoordinate); + Optional progressState = streamPartition.getProgressState(); + progressState.get().setCurrentPosition(binlogCoordinate); + sourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE); + checkpointCounter.increment(); + } + + public void extendLease() { + LOG.debug("Extending lease of stream partition {}", streamPartition.getPartitionKey()); + sourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE); + } + + public void giveUpPartition() { + LOG.debug("Giving up stream partition {}", streamPartition.getPartitionKey()); + sourceCoordinator.giveUpPartition(streamPartition); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java index acb4ea3f85..1886bba451 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java @@ -7,6 +7,7 @@ import com.github.shyiko.mysql.binlog.BinaryLogClient; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -18,6 +19,8 @@ import org.slf4j.LoggerFactory; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class StreamScheduler implements Runnable { @@ -29,7 +32,10 @@ public class StreamScheduler implements Runnable { private final EnhancedSourceCoordinator sourceCoordinator; private final RdsSourceConfig sourceConfig; private final BinaryLogClient binaryLogClient; + private final Buffer> buffer; private final PluginMetrics pluginMetrics; + private final AcknowledgementSetManager acknowledgementSetManager; + private final ExecutorService executorService; private volatile boolean shutdownRequested = false; @@ -37,13 +43,15 @@ public StreamScheduler(final EnhancedSourceCoordinator sourceCoordinator, final RdsSourceConfig sourceConfig, final BinaryLogClient binaryLogClient, final Buffer> buffer, - final PluginMetrics pluginMetrics) { + final PluginMetrics pluginMetrics, + final AcknowledgementSetManager acknowledgementSetManager) { this.sourceCoordinator = sourceCoordinator; this.sourceConfig = sourceConfig; this.binaryLogClient = binaryLogClient; - this.binaryLogClient.registerEventListener(new BinlogEventListener(buffer, sourceConfig, pluginMetrics)); + this.buffer = buffer; this.pluginMetrics = pluginMetrics; - + this.acknowledgementSetManager = acknowledgementSetManager; + executorService = Executors.newCachedThreadPool(); } @Override @@ -56,12 +64,15 @@ public void run() { LOG.info("Acquired partition to read from stream"); final StreamPartition streamPartition = (StreamPartition) sourcePartition.get(); + final StreamCheckpointer streamCheckpointer = new StreamCheckpointer(sourceCoordinator, streamPartition, pluginMetrics); + binaryLogClient.registerEventListener(new BinlogEventListener( + buffer, sourceConfig, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager)); final StreamWorker streamWorker = StreamWorker.create(sourceCoordinator, binaryLogClient, pluginMetrics); - streamWorker.processStream(streamPartition); + executorService.submit(() -> streamWorker.processStream(streamPartition)); } try { - LOG.debug("Waiting to acquire stream partition."); + LOG.debug("Looping to acquire new stream partition or idle while stream worker is working"); Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); } catch (final InterruptedException e) { LOG.info("The StreamScheduler was interrupted while waiting to retry, stopping processing"); @@ -81,6 +92,7 @@ public void run() { } public void shutdown() { + executorService.shutdownNow(); shutdownRequested = true; } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java index 303ecc2a53..ed88b196b6 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java @@ -84,7 +84,7 @@ private boolean isExportDone(StreamPartition streamPartition) { } private void setStartBinlogPosition(final StreamPartition streamPartition) { - final BinlogCoordinate startBinlogPosition = streamPartition.getProgressState().get().getStartPosition(); + final BinlogCoordinate startBinlogPosition = streamPartition.getProgressState().get().getCurrentPosition(); // set start of binlog stream to current position if exists if (startBinlogPosition != null) { diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java index e0b69746c2..0a814e7fc1 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java @@ -13,6 +13,7 @@ import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventFactory; @@ -69,6 +70,9 @@ class RdsServiceTest { @Mock private Buffer> buffer; + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + @BeforeEach void setUp() { when(clientFactory.buildRdsClient()).thenReturn(rdsClient); @@ -166,6 +170,6 @@ void test_service_shutdown_calls_executor_shutdownNow() { } private RdsService createObjectUnderTest() { - return new RdsService(sourceCoordinator, sourceConfig, eventFactory, clientFactory, pluginMetrics); + return new RdsService(sourceCoordinator, sourceConfig, eventFactory, clientFactory, pluginMetrics, acknowledgementSetManager); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceTest.java index 682f16ed51..f414173b05 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceTest.java @@ -12,6 +12,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig; @@ -37,6 +38,9 @@ class RdsSourceTest { @Mock private AwsAuthenticationConfig awsAuthenticationConfig; + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + @BeforeEach void setUp() { when(sourceConfig.getAwsAuthenticationConfig()).thenReturn(awsAuthenticationConfig); @@ -49,6 +53,6 @@ void test_when_buffer_is_null_then_start_throws_exception() { } private RdsSource createObjectUnderTest() { - return new RdsSource(pluginMetrics, sourceConfig, eventFactory, awsCredentialsSupplier); + return new RdsSource(pluginMetrics, sourceConfig, eventFactory, awsCredentialsSupplier, acknowledgementSetManager); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java index ccb36347fa..c470021c6b 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java @@ -19,6 +19,8 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.BaseEventBuilder; import org.opensearch.dataprepper.model.event.Event; @@ -26,16 +28,20 @@ import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.io.InputFile; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.codec.parquet.ParquetInputCodec; import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState; import java.io.InputStream; +import java.time.Duration; import java.util.Optional; import java.util.UUID; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -56,7 +62,7 @@ class DataFileLoaderTest { private DataFilePartition dataFilePartition; @Mock - private BufferAccumulator> bufferAccumulator; + private Buffer> buffer; @Mock private EventFactory eventFactory; @@ -70,6 +76,15 @@ class DataFileLoaderTest { @Mock private PluginMetrics pluginMetrics; + @Mock + private EnhancedSourceCoordinator sourceCoordinator; + + @Mock + private AcknowledgementSet acknowledgementSet; + + @Mock + private Duration acknowledgmentTimeout; + @Mock private Counter exportRecordsTotalCounter; @@ -115,19 +130,26 @@ void test_run_success() throws Exception { when(eventFactory.eventBuilder(any())).thenReturn(eventBuilder); when(eventBuilder.withEventType(any()).withData(any()).build()).thenReturn(event); when(event.toJsonString()).thenReturn(randomString); + when(recordConverter.convert(any(), any(), any(), any(), anyLong(), anyLong())).thenReturn(event); + + AvroParquetReader.Builder builder = mock(AvroParquetReader.Builder.class); + ParquetReader parquetReader = mock(ParquetReader.class); + BufferAccumulator> bufferAccumulator = mock(BufferAccumulator.class); + when(builder.build()).thenReturn(parquetReader); + when(parquetReader.read()).thenReturn(mock(GenericRecord.class, RETURNS_DEEP_STUBS), null); + try (MockedStatic readerMockedStatic = mockStatic(AvroParquetReader.class); + MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { - try (MockedStatic readerMockedStatic = mockStatic(AvroParquetReader.class)) { - ParquetReader parquetReader = mock(ParquetReader.class); - AvroParquetReader.Builder builder = mock(AvroParquetReader.Builder.class); readerMockedStatic.when(() -> AvroParquetReader.builder(any(InputFile.class), any())).thenReturn(builder); - when(builder.build()).thenReturn(parquetReader); - when(parquetReader.read()).thenReturn(mock(GenericRecord.class, RETURNS_DEEP_STUBS), null); + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(any(Buffer.class), anyInt(), any(Duration.class))).thenReturn(bufferAccumulator); dataFileLoader.run(); } verify(bufferAccumulator).add(any(Record.class)); verify(bufferAccumulator).flush(); + verify(acknowledgementSet).add(event); + verify(acknowledgementSet).complete(); verify(exportRecordsTotalCounter).increment(); verify(bytesReceivedSummary).record(sizeBytes); @@ -157,20 +179,27 @@ void test_flush_failure_then_error_metric_updated() throws Exception { when(eventFactory.eventBuilder(any())).thenReturn(eventBuilder); when(eventBuilder.withEventType(any()).withData(any()).build()).thenReturn(event); when(event.toJsonString()).thenReturn(randomString); - doThrow(new RuntimeException("testing")).when(bufferAccumulator).flush(); - try (MockedStatic readerMockedStatic = mockStatic(AvroParquetReader.class)) { - ParquetReader parquetReader = mock(ParquetReader.class); - AvroParquetReader.Builder builder = mock(AvroParquetReader.Builder.class); + when(recordConverter.convert(any(), any(), any(), any(), anyLong(), anyLong())).thenReturn(event); + + ParquetReader parquetReader = mock(ParquetReader.class); + AvroParquetReader.Builder builder = mock(AvroParquetReader.Builder.class); + BufferAccumulator> bufferAccumulator = mock(BufferAccumulator.class); + doThrow(new RuntimeException("testing")).when(bufferAccumulator).flush(); + when(builder.build()).thenReturn(parquetReader); + when(parquetReader.read()).thenReturn(mock(GenericRecord.class, RETURNS_DEEP_STUBS), null); + try (MockedStatic readerMockedStatic = mockStatic(AvroParquetReader.class); + MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { readerMockedStatic.when(() -> AvroParquetReader.builder(any(InputFile.class), any())).thenReturn(builder); - when(builder.build()).thenReturn(parquetReader); - when(parquetReader.read()).thenReturn(mock(GenericRecord.class, RETURNS_DEEP_STUBS), null); + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(any(Buffer.class), anyInt(), any(Duration.class))).thenReturn(bufferAccumulator); dataFileLoader.run(); } verify(bufferAccumulator).add(any(Record.class)); verify(bufferAccumulator).flush(); + verify(acknowledgementSet).add(event); + verify(acknowledgementSet, never()).complete(); verify(exportRecordsTotalCounter).increment(); verify(bytesReceivedSummary).record(sizeBytes); @@ -181,6 +210,7 @@ void test_flush_failure_then_error_metric_updated() throws Exception { private DataFileLoader createObjectUnderTest() { final InputCodec codec = new ParquetInputCodec(eventFactory); - return DataFileLoader.create(dataFilePartition, codec, bufferAccumulator, s3ObjectReader, recordConverter, pluginMetrics); + return DataFileLoader.create(dataFilePartition, codec, buffer, s3ObjectReader, recordConverter, + pluginMetrics, sourceCoordinator, acknowledgementSet, acknowledgmentTimeout); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java index 5a5a56c6fd..f249e0b025 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java @@ -12,8 +12,8 @@ import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; @@ -72,6 +72,9 @@ class DataFileSchedulerTest { @Mock private PluginMetrics pluginMetrics; + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + @Mock private DataFilePartition dataFilePartition; @@ -112,7 +115,7 @@ void test_given_no_datafile_partition_then_no_export() throws InterruptedExcepti } @Test - void test_given_available_datafile_partition_then_load_datafile() { + void test_given_available_datafile_partition_then_load_datafile() throws InterruptedException { final String exportTaskId = UUID.randomUUID().toString(); when(dataFilePartition.getExportTaskId()).thenReturn(exportTaskId); @@ -129,9 +132,10 @@ void test_given_available_datafile_partition_then_load_datafile() { // MockedStatic needs to be created on the same thread it's used try (MockedStatic dataFileLoaderMockedStatic = mockStatic(DataFileLoader.class)) { DataFileLoader dataFileLoader = mock(DataFileLoader.class); - dataFileLoaderMockedStatic.when(() -> DataFileLoader.create( - eq(dataFilePartition), any(InputCodec.class), any(BufferAccumulator.class), any(S3ObjectReader.class), - any(ExportRecordConverter.class), any(PluginMetrics.class))) + dataFileLoaderMockedStatic.when(() -> DataFileLoader.create(eq(dataFilePartition), any(InputCodec.class), + any(Buffer.class), any(S3ObjectReader.class), + any(ExportRecordConverter.class), any(PluginMetrics.class), + any(EnhancedSourceCoordinator.class), any(), any(Duration.class))) .thenReturn(dataFileLoader); doNothing().when(dataFileLoader).run(); objectUnderTest.run(); @@ -157,9 +161,10 @@ void test_data_file_loader_throws_exception_then_give_up_partition() { // MockedStatic needs to be created on the same thread it's used try (MockedStatic dataFileLoaderMockedStatic = mockStatic(DataFileLoader.class)) { DataFileLoader dataFileLoader = mock(DataFileLoader.class); - dataFileLoaderMockedStatic.when(() -> DataFileLoader.create( - eq(dataFilePartition), any(InputCodec.class), any(BufferAccumulator.class), any(S3ObjectReader.class), - any(ExportRecordConverter.class), any(PluginMetrics.class))) + dataFileLoaderMockedStatic.when(() -> DataFileLoader.create(eq(dataFilePartition), any(InputCodec.class), + any(Buffer.class), any(S3ObjectReader.class), + any(ExportRecordConverter.class), any(PluginMetrics.class), + any(EnhancedSourceCoordinator.class), any(), any(Duration.class))) .thenReturn(dataFileLoader); doThrow(new RuntimeException()).when(dataFileLoader).run(); objectUnderTest.run(); @@ -187,6 +192,6 @@ void test_shutdown() { } private DataFileScheduler createObjectUnderTest() { - return new DataFileScheduler(sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics); + return new DataFileScheduler(sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics, acknowledgementSetManager); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java index 30f622c5d7..d33ecd3a70 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.rds.stream; +import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.event.EventType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -15,6 +16,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -37,6 +39,15 @@ class BinlogEventListenerTest { @Mock private PluginMetrics pluginMetrics; + @Mock + private BinaryLogClient binaryLogClient; + + @Mock + private StreamCheckpointer streamCheckpointer; + + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) private com.github.shyiko.mysql.binlog.event.Event binlogEvent; @@ -91,6 +102,6 @@ void test_given_DeleteRows_event_then_calls_correct_handler(EventType eventType) } private BinlogEventListener createObjectUnderTest() { - return new BinlogEventListener(buffer, sourceConfig, pluginMetrics); + return new BinlogEventListener(buffer, sourceConfig, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManagerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManagerTest.java new file mode 100644 index 0000000000..deddb45e32 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManagerTest.java @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.stream; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; + + +@ExtendWith(MockitoExtension.class) +class StreamCheckpointManagerTest { + + static final Duration ACK_TIMEOUT = Duration.ofMinutes(5); + + @Mock + private StreamCheckpointer streamCheckpointer; + + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + + @Mock + private Runnable stopStreamRunnable; + + private boolean isAcknowledgmentEnabled = false; + + @BeforeEach + void setUp() { + + } + + @Test + void test_start() { + final ExecutorService executorService = mock(ExecutorService.class); + try (MockedStatic executorsMockedStatic = mockStatic(Executors.class)) { + executorsMockedStatic.when(Executors::newSingleThreadExecutor).thenReturn(executorService); + + final StreamCheckpointManager streamCheckpointManager = createObjectUnderTest(); + streamCheckpointManager.start(); + } + verify(executorService).submit(any(Runnable.class)); + } + + @Test + void test_shutdown() { + final ExecutorService executorService = mock(ExecutorService.class); + try (MockedStatic executorsMockedStatic = mockStatic(Executors.class)) { + executorsMockedStatic.when(Executors::newSingleThreadExecutor).thenReturn(executorService); + + final StreamCheckpointManager streamCheckpointManager = createObjectUnderTest(); + streamCheckpointManager.start(); + streamCheckpointManager.stop(); + } + + verify(executorService).shutdownNow(); + } + + @Test + void test_saveChangeEventsStatus() { + final BinlogCoordinate binlogCoordinate = mock(BinlogCoordinate.class); + final StreamCheckpointManager streamCheckpointManager = createObjectUnderTest(); + streamCheckpointManager.saveChangeEventsStatus(binlogCoordinate); + + assertThat(streamCheckpointManager.getChangeEventStatuses().size(), is(1)); + assertThat(streamCheckpointManager.getChangeEventStatuses().peek().getBinlogCoordinate(), is(binlogCoordinate)); + } + + @Test + void test_createAcknowledgmentSet() { + final BinlogCoordinate binlogCoordinate = mock(BinlogCoordinate.class); + final StreamCheckpointManager streamCheckpointManager = createObjectUnderTest(); + streamCheckpointManager.createAcknowledgmentSet(binlogCoordinate); + + assertThat(streamCheckpointManager.getChangeEventStatuses().size(), is(1)); + ChangeEventStatus changeEventStatus = streamCheckpointManager.getChangeEventStatuses().peek(); + assertThat(changeEventStatus.getBinlogCoordinate(), is(binlogCoordinate)); + verify(acknowledgementSetManager).create(any(Consumer.class), eq(ACK_TIMEOUT)); + } + + private StreamCheckpointManager createObjectUnderTest() { + return new StreamCheckpointManager( + streamCheckpointer, isAcknowledgmentEnabled, acknowledgementSetManager, stopStreamRunnable, ACK_TIMEOUT); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointerTest.java new file mode 100644 index 0000000000..2fdac1065f --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointerTest.java @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.stream; + +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; +import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; + +import java.util.Optional; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.stream.StreamCheckpointer.CHECKPOINT_COUNT; +import static org.opensearch.dataprepper.plugins.source.rds.stream.StreamCheckpointer.CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE; + + +@ExtendWith(MockitoExtension.class) +class StreamCheckpointerTest { + + @Mock + private EnhancedSourceCoordinator sourceCoordinator; + + @Mock + private StreamPartition streamPartition; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Counter checkpointCounter; + + private StreamCheckpointer streamCheckpointer; + + + @BeforeEach + void setUp() { + when(pluginMetrics.counter(CHECKPOINT_COUNT)).thenReturn(checkpointCounter); + streamCheckpointer = createObjectUnderTest(); + } + + @Test + void test_checkpoint() { + final BinlogCoordinate binlogCoordinate = mock(BinlogCoordinate.class); + final StreamProgressState streamProgressState = mock(StreamProgressState.class); + when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); + + streamCheckpointer.checkpoint(binlogCoordinate); + + verify(streamProgressState).setCurrentPosition(binlogCoordinate); + verify(sourceCoordinator).saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE); + verify(checkpointCounter).increment(); + } + + @Test + void test_extendLease() { + streamCheckpointer.extendLease(); + + verify(sourceCoordinator).saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE); + } + + @Test + void test_giveUpPartition() { + streamCheckpointer.giveUpPartition(); + + verify(sourceCoordinator).giveUpPartition(streamPartition); + } + + private StreamCheckpointer createObjectUnderTest() { + return new StreamCheckpointer(sourceCoordinator, streamPartition, pluginMetrics); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamSchedulerTest.java index 1a152137ee..3c8b70cab2 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamSchedulerTest.java @@ -14,6 +14,7 @@ import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -27,12 +28,12 @@ import java.util.concurrent.Executors; import static org.awaitility.Awaitility.await; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -53,6 +54,9 @@ class StreamSchedulerTest { @Mock private Buffer> buffer; + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + private StreamScheduler objectUnderTest; @BeforeEach @@ -71,8 +75,7 @@ void test_given_no_stream_partition_then_no_stream_actions() throws InterruptedE Thread.sleep(100); executorService.shutdownNow(); - verify(binaryLogClient).registerEventListener(any(BinlogEventListener.class)); - verifyNoMoreInteractions(binaryLogClient); + verifyNoInteractions(binaryLogClient); } @Test @@ -111,6 +114,6 @@ void test_shutdown() { } private StreamScheduler createObjectUnderTest() { - return new StreamScheduler(sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics); + return new StreamScheduler(sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics, acknowledgementSetManager); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTest.java index 1392c852e8..ecc7d86d47 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTest.java @@ -54,7 +54,7 @@ void test_processStream_with_given_binlog_coordinates() throws IOException { final String binlogFilename = UUID.randomUUID().toString(); final long binlogPosition = 100L; when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); - when(streamProgressState.getStartPosition()).thenReturn(new BinlogCoordinate(binlogFilename, binlogPosition)); + when(streamProgressState.getCurrentPosition()).thenReturn(new BinlogCoordinate(binlogFilename, binlogPosition)); when(streamProgressState.shouldWaitForExport()).thenReturn(false); streamWorker.processStream(streamPartition); @@ -70,7 +70,7 @@ void test_processStream_without_current_binlog_coordinates() throws IOException when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); final String binlogFilename = "binlog-001"; final Long binlogPosition = 100L; - when(streamProgressState.getStartPosition()).thenReturn(null); + when(streamProgressState.getCurrentPosition()).thenReturn(null); when(streamProgressState.shouldWaitForExport()).thenReturn(false); streamWorker.processStream(streamPartition);