From d16feb4ac236c647335770a739102769b5da7259 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 1 Apr 2024 08:07:02 -0500 Subject: [PATCH 01/18] Add AcknowledgementSet support to DocumentDB/MongoDB streams Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../mongo/configuration/CollectionConfig.java | 10 ++ .../mongo/converter/RecordConverter.java | 9 +- .../mongo/documentdb/DocumentDBService.java | 43 +++-- .../plugins/mongo/leader/LeaderScheduler.java | 12 +- .../plugins/mongo/model/CheckpointStatus.java | 45 +++++ .../stream/DataStreamPartitionCheckpoint.java | 4 + .../stream/StreamAcknowledgementManager.java | 126 +++++++++++++ .../plugins/mongo/stream/StreamScheduler.java | 5 +- .../plugins/mongo/stream/StreamWorker.java | 64 +++++-- .../mongo/leader/LeaderSchedulerTest.java | 7 +- .../StreamAcknowledgementManagerTest.java | 168 ++++++++++++++++++ .../mongo/stream/StreamSchedulerTest.java | 3 +- .../mongo/stream/StreamWorkerTest.java | 57 +++++- 13 files changed, 492 insertions(+), 61 deletions(-) create mode 100644 data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/model/CheckpointStatus.java create mode 100644 data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java create mode 100644 data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/CollectionConfig.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/CollectionConfig.java index 9d9ffa87d0..f710e0f5c6 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/CollectionConfig.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/CollectionConfig.java @@ -44,6 +44,16 @@ public ExportConfig getExportConfig() { return this.exportConfig; } + public boolean isExportRequired() { + return this.ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM || + this.ingestionMode == CollectionConfig.IngestionMode.EXPORT; + } + + public boolean isStreamRequired() { + return this.ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM || + this.ingestionMode == CollectionConfig.IngestionMode.STREAM; + } + public static class ExportConfig { private static final int DEFAULT_ITEMS_PER_PARTITION = 4000; @JsonProperty("items_per_partition") diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/converter/RecordConverter.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/converter/RecordConverter.java index 7a5b0636e7..d21258a7ca 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/converter/RecordConverter.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/converter/RecordConverter.java @@ -12,8 +12,6 @@ import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig; -import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition; -import org.opensearch.dataprepper.plugins.mongo.coordination.partition.StreamPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,12 +76,7 @@ public Event convert(final String record, } final EventMetadata eventMetadata = event.getMetadata(); - if (dataType.equals(ExportPartition.PARTITION_TYPE)) { - eventMetadata.setAttribute(MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE, ExportPartition.PARTITION_TYPE); - } else if (dataType.equals(StreamPartition.PARTITION_TYPE)) { - eventMetadata.setAttribute(MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE, StreamPartition.PARTITION_TYPE); - } - + eventMetadata.setAttribute(MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE, dataType); eventMetadata.setAttribute(MetadataKeyAttributes.MONGODB_EVENT_COLLECTION_METADATA_ATTRIBUTE, collectionConfig.getCollection()); eventMetadata.setAttribute(MetadataKeyAttributes.MONGODB_EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreationTimeMillis); eventMetadata.setAttribute(MetadataKeyAttributes.MONGODB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE, eventName); diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java index 7898768d41..7f069da766 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java @@ -6,6 +6,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig; import org.opensearch.dataprepper.plugins.mongo.export.MongoDBExportPartitionSupplier; import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig; import org.opensearch.dataprepper.plugins.mongo.export.ExportScheduler; @@ -15,6 +16,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -24,11 +27,7 @@ public class DocumentDBService { private final PluginMetrics pluginMetrics; private final MongoDBSourceConfig sourceConfig; private final AcknowledgementSetManager acknowledgementSetManager; - private final ExecutorService executor; - private ExportScheduler exportScheduler; - private ExportWorker exportWorker; - private LeaderScheduler leaderScheduler; - private StreamScheduler streamScheduler; + private ExecutorService executor; private final MongoDBExportPartitionSupplier mongoDBExportPartitionSupplier; public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator, final MongoDBSourceConfig sourceConfig, @@ -38,9 +37,7 @@ public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator, this.pluginMetrics = pluginMetrics; this.acknowledgementSetManager = acknowledgementSetManager; this.sourceConfig = sourceConfig; - this.mongoDBExportPartitionSupplier = new MongoDBExportPartitionSupplier(sourceConfig); - executor = Executors.newFixedThreadPool(4); } /** @@ -51,15 +48,25 @@ public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator, * @param buffer Data Prepper Buffer */ public void start(Buffer> buffer) { - this.exportScheduler = new ExportScheduler(sourceCoordinator, mongoDBExportPartitionSupplier, pluginMetrics); - this.exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig); - this.leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig.getCollections()); - this.streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, pluginMetrics); + final List runnableList = new ArrayList<>(); + + final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig.getCollections()); + runnableList.add(leaderScheduler); + + if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isExportRequired)) { + final ExportScheduler exportScheduler = new ExportScheduler(sourceCoordinator, mongoDBExportPartitionSupplier, pluginMetrics); + final ExportWorker exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig); + runnableList.add(exportScheduler); + runnableList.add(exportWorker); + } + + if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isStreamRequired)) { + final StreamScheduler streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, pluginMetrics); + runnableList.add(streamScheduler); + } - executor.submit(leaderScheduler); - executor.submit(exportScheduler); - executor.submit(exportWorker); - executor.submit(streamScheduler); + executor = Executors.newFixedThreadPool(runnableList.size()); + runnableList.forEach(executor::submit); } /** @@ -67,7 +74,9 @@ public void start(Buffer> buffer) { * Each scheduler must implement logic for gracefully shutdown. */ public void shutdown() { - LOG.info("shutdown DocumentDB Service scheduler and worker"); - executor.shutdownNow(); + if (executor != null) { + LOG.info("shutdown DocumentDB Service scheduler and worker"); + executor.shutdownNow(); + } } } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java index f4ab200e27..53d0b8d912 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java @@ -101,15 +101,7 @@ public void run() { } } - private boolean isExportRequired(final CollectionConfig.IngestionMode ingestionMode) { - return ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM || - ingestionMode == CollectionConfig.IngestionMode.EXPORT; - } - private boolean isStreamRequired(final CollectionConfig.IngestionMode ingestionMode) { - return ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM || - ingestionMode == CollectionConfig.IngestionMode.STREAM; - } private void init() { LOG.info("Try to initialize DocumentDB Leader Partition"); @@ -120,13 +112,13 @@ private void init() { coordinator.createPartition(new GlobalState(collectionConfig.getCollection(), null)); final Instant startTime = Instant.now(); - final boolean exportRequired = isExportRequired(collectionConfig.getIngestionMode()); + final boolean exportRequired = collectionConfig.isExportRequired(); LOG.info("Ingestion mode {} for Collection {}", collectionConfig.getIngestionMode(), collectionConfig.getCollection()); if (exportRequired) { createExportPartition(collectionConfig, startTime); } - if (isStreamRequired(collectionConfig.getIngestionMode())) { + if (collectionConfig.isStreamRequired()) { createStreamPartition(collectionConfig, startTime, exportRequired); } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/model/CheckpointStatus.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/model/CheckpointStatus.java new file mode 100644 index 0000000000..c7aafa556f --- /dev/null +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/model/CheckpointStatus.java @@ -0,0 +1,45 @@ +package org.opensearch.dataprepper.plugins.mongo.model; + +public class CheckpointStatus { + private final String resumeToken; + private final long recordCount; + private boolean acknowledged; + private final long createTimestamp; + private Long acknowledgedTimestamp; + + public CheckpointStatus(final String resumeToken, final long recordCount, final long createTimestamp) { + this.resumeToken = resumeToken; + this.recordCount = recordCount; + this.acknowledged = false; + this.createTimestamp = createTimestamp; + } + + public void setAcknowledgedTimestamp(final Long acknowledgedTimestamp) { + this.acknowledgedTimestamp = acknowledgedTimestamp; + } + + public void setAcknowledged(boolean acknowledged) { + this.acknowledged = acknowledged; + } + + public String getResumeToken() { + return resumeToken; + } + public long getRecordCount() { + return recordCount; + } + + public boolean isAcknowledged() { + return acknowledged; + } + + public long getCreateTimestamp() { + return createTimestamp; + } + + public long getAcknowledgedTimestamp() { + return acknowledgedTimestamp; + } + + +} diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java index 642fb659c9..3f752bf93d 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java @@ -76,4 +76,8 @@ public Optional getGlobalStreamLoadStatus() { public void updateStreamPartitionForAcknowledgmentWait(final Duration acknowledgmentSetTimeout) { enhancedSourceCoordinator.saveProgressStateForPartition(streamPartition, acknowledgmentSetTimeout); } + + public void giveUpPartition() { + enhancedSourceCoordinator.giveUpPartition(streamPartition); + } } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java new file mode 100644 index 0000000000..a3f052629c --- /dev/null +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java @@ -0,0 +1,126 @@ +package org.opensearch.dataprepper.plugins.mongo.stream; + +import com.google.common.annotations.VisibleForTesting; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.plugins.mongo.model.CheckpointStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class StreamAcknowledgementManager { + private static final Logger LOG = LoggerFactory.getLogger(StreamAcknowledgementManager.class); + private final ConcurrentLinkedQueue checkpoints = new ConcurrentLinkedQueue<>(); + private final ConcurrentHashMap ackStatus = new ConcurrentHashMap<>(); + + private final AcknowledgementSetManager acknowledgementSetManager; + private final DataStreamPartitionCheckpoint partitionCheckpoint; + + private final Duration partitionAcknowledgmentTimeout; + private final int acknowledgementMonitorWaitTimeInMs; + private final int checkPointIntervalInMs; + private final ExecutorService executorService; + + private boolean enableAcknowledgement = false; + + public StreamAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager, + final DataStreamPartitionCheckpoint partitionCheckpoint, + final Duration partitionAcknowledgmentTimeout, + final int acknowledgementMonitorWaitTimeInMs, + final int checkPointIntervalInMs) { + this.acknowledgementSetManager = acknowledgementSetManager; + this.partitionCheckpoint = partitionCheckpoint; + this.partitionAcknowledgmentTimeout = partitionAcknowledgmentTimeout; + this.acknowledgementMonitorWaitTimeInMs = acknowledgementMonitorWaitTimeInMs; + this.checkPointIntervalInMs = checkPointIntervalInMs; + executorService = Executors.newSingleThreadExecutor(); + } + + void init() { + enableAcknowledgement = true; + final Thread currentThread = Thread.currentThread(); + executorService.submit(() -> monitorCheckpoints(executorService, currentThread)); + } + + private void monitorCheckpoints(final ExecutorService executorService, final Thread parentThread) { + long lastCheckpointTime = System.currentTimeMillis(); + CheckpointStatus lastCheckpointStatus = null; + while (!Thread.currentThread().isInterrupted()) { + final CheckpointStatus checkpointStatus = checkpoints.peek(); + if (checkpointStatus != null) { + if (checkpointStatus.isAcknowledged()) { + lastCheckpointStatus = checkpoints.poll(); + ackStatus.remove(checkpointStatus.getResumeToken()); + if (System.currentTimeMillis() - lastCheckpointTime > checkPointIntervalInMs) { + LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount()); + partitionCheckpoint.checkpoint(checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount()); + lastCheckpointTime = System.currentTimeMillis(); + } + } else { + LOG.info("Checkpoint not complete"); + final Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(checkpointStatus.getCreateTimestamp()), Instant.now()); + // Acknowledgement not received for the checkpoint after twice ack wait duration + if (ackWaitDuration.getSeconds() > partitionAcknowledgmentTimeout.getSeconds() * 2) { + // Give up partition and should interrupt parent thread to stop processing stream + if (lastCheckpointStatus != null && lastCheckpointStatus.isAcknowledged()) { + partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount()); + } + partitionCheckpoint.giveUpPartition(); + Thread.currentThread().interrupt(); + } + } + } + + try { + Thread.sleep(acknowledgementMonitorWaitTimeInMs); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + parentThread.interrupt(); + executorService.shutdown(); + } + + Optional createAcknowledgementSet(final String resumeToken, final long recordNumber) { + if (!enableAcknowledgement) { + return Optional.empty(); + } + + final CheckpointStatus checkpointStatus = new CheckpointStatus(resumeToken, recordNumber, Instant.now().toEpochMilli()); + checkpoints.add(checkpointStatus); + ackStatus.put(resumeToken, checkpointStatus); + return Optional.of(acknowledgementSetManager.create((result) -> { + if (result) { + final CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken); + ackCheckpointStatus.setAcknowledgedTimestamp(Instant.now().toEpochMilli()); + ackCheckpointStatus.setAcknowledged(true); + LOG.debug("Received acknowledgment of completion from sink for checkpoint {}", resumeToken); + } else { + LOG.warn("Negative acknowledgment received for checkpoint {}, resetting checkpoint", resumeToken); + // default CheckpointStatus acknowledged value is false. The monitorCheckpoints method will time out + // and reprocess stream from last successful checkpoint in the order. + } + }, partitionAcknowledgmentTimeout)); + } + + void shutdown() { + executorService.shutdown(); + } + + @VisibleForTesting + ConcurrentHashMap getAcknowledgementStatus() { + return ackStatus; + } + + @VisibleForTesting + ConcurrentLinkedQueue getCheckpoints() { + return checkpoints; + } +} diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java index 60752fa0d3..676b04ee95 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java @@ -23,6 +23,7 @@ public class StreamScheduler implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class); private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60_000; + static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 120_000; /** * Number of records to accumulate before flushing to buffer */ @@ -30,7 +31,7 @@ public class StreamScheduler implements Runnable { /** * Number of stream records to accumulate to write to buffer and checkpoint */ - private static final int DEFAULT_STREAM_BATCH_SIZE = 100; + static final int DEFAULT_STREAM_BATCH_SIZE = 100; static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); private final EnhancedSourceCoordinator sourceCoordinator; private final RecordBufferWriter recordBufferWriter; @@ -62,7 +63,7 @@ public void run() { streamPartition = (StreamPartition) sourcePartition.get(); final DataStreamPartitionCheckpoint partitionCheckpoint = new DataStreamPartitionCheckpoint(sourceCoordinator, streamPartition); final StreamWorker streamWorker = StreamWorker.create(recordBufferWriter, acknowledgementSetManager, - sourceConfig, partitionCheckpoint, pluginMetrics, DEFAULT_STREAM_BATCH_SIZE); + sourceConfig, partitionCheckpoint, pluginMetrics, DEFAULT_STREAM_BATCH_SIZE, DEFAULT_CHECKPOINT_INTERVAL_MILLS); streamWorker.processStream(streamPartition); } try { diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java index 1ff882b8c6..dca3478408 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java @@ -32,6 +32,8 @@ public class StreamWorker { public static final String STREAM_PREFIX = "STREAM-"; private static final Logger LOG = LoggerFactory.getLogger(StreamWorker.class); private static final int DEFAULT_EXPORT_COMPLETE_WAIT_INTERVAL_MILLIS = 90_000; + private static final int DEFAULT_MONITOR_WAIT_TIME_MS = 15_000; + private static final String COLLECTION_SPLITTER = "\\."; static final String SUCCESS_ITEM_COUNTER_NAME = "streamRecordsSuccessTotal"; static final String FAILURE_ITEM_COUNTER_NAME = "streamRecordsFailedTotal"; @@ -43,7 +45,9 @@ public class StreamWorker { private final Counter failureItemsCounter; private final AcknowledgementSetManager acknowledgementSetManager; private final PluginMetrics pluginMetrics; - private final int defaultFlushBatchSize; + private final int recordFlushBatchSize; + final int checkPointIntervalInMs; + private final StreamAcknowledgementManager streamAcknowledgementManager; private final JsonWriterSettings writerSettings = JsonWriterSettings.builder() .outputMode(JsonMode.RELAXED) @@ -55,26 +59,35 @@ public static StreamWorker create(final RecordBufferWriter recordBufferWriter, final MongoDBSourceConfig sourceConfig, final DataStreamPartitionCheckpoint partitionCheckpoint, final PluginMetrics pluginMetrics, - final int defaultFlushBatchSize + final int recordFlushBatchSize, + final int checkPointIntervalInMs ) { return new StreamWorker(recordBufferWriter, acknowledgementSetManager, - sourceConfig, partitionCheckpoint, pluginMetrics, defaultFlushBatchSize); + sourceConfig, partitionCheckpoint, pluginMetrics, recordFlushBatchSize, checkPointIntervalInMs); } public StreamWorker(final RecordBufferWriter recordBufferWriter, final AcknowledgementSetManager acknowledgementSetManager, final MongoDBSourceConfig sourceConfig, final DataStreamPartitionCheckpoint partitionCheckpoint, final PluginMetrics pluginMetrics, - final int defaultFlushBatchSize + final int recordFlushBatchSize, + final int checkPointIntervalInMs ) { this.recordBufferWriter = recordBufferWriter; this.sourceConfig = sourceConfig; this.partitionCheckpoint = partitionCheckpoint; this.acknowledgementSetManager = acknowledgementSetManager; this.pluginMetrics = pluginMetrics; - this.defaultFlushBatchSize = defaultFlushBatchSize; + this.recordFlushBatchSize = recordFlushBatchSize; + this.checkPointIntervalInMs = checkPointIntervalInMs; this.successItemsCounter = pluginMetrics.counter(SUCCESS_ITEM_COUNTER_NAME); this.failureItemsCounter = pluginMetrics.counter(FAILURE_ITEM_COUNTER_NAME); + streamAcknowledgementManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, + sourceConfig.getPartitionAcknowledgmentTimeout(), DEFAULT_MONITOR_WAIT_TIME_MS, checkPointIntervalInMs); + if (sourceConfig.isAcknowledgmentsEnabled()) { + // starts acknowledgement monitoring thread + streamAcknowledgementManager.init(); + } } private MongoCursor> getChangeStreamCursor(final MongoCollection collection, @@ -101,7 +114,7 @@ public void processStream(final StreamPartition streamPartition) { if (collectionDBNameList.size() < 2) { throw new IllegalArgumentException("Invalid Collection Name. Must be in db.collection format"); } - int recordCount = 0; + long recordCount = 0; final List records = new ArrayList<>(); // TODO: create acknowledgementSet AcknowledgementSet acknowledgementSet = null; @@ -120,10 +133,11 @@ public void processStream(final StreamPartition streamPartition) { Thread.sleep(DEFAULT_EXPORT_COMPLETE_WAIT_INTERVAL_MILLIS); } catch (final InterruptedException ex) { LOG.info("The StreamScheduler was interrupted while waiting to retry, stopping processing"); + Thread.currentThread().interrupt(); break; } } - + long lastCheckpointTime = System.currentTimeMillis(); while (cursor.hasNext() && !Thread.currentThread().isInterrupted()) { try { final ChangeStreamDocument document = cursor.next(); @@ -134,17 +148,24 @@ public void processStream(final StreamPartition streamPartition) { records.add(record); recordCount += 1; - if (recordCount % defaultFlushBatchSize == 0) { - LOG.debug("Write to buffer for line " + (recordCount - defaultFlushBatchSize) + " to " + recordCount); + if (recordCount % recordFlushBatchSize == 0) { + LOG.debug("Write to buffer for line " + (recordCount - recordFlushBatchSize) + " to " + recordCount); + acknowledgementSet = streamAcknowledgementManager.createAcknowledgementSet(checkPointToken, recordCount).orElse(null); recordBufferWriter.writeToBuffer(acknowledgementSet, records); + successItemsCounter.increment(records.size()); records.clear(); - LOG.debug("Perform regular checkpointing for stream Loader"); - partitionCheckpoint.checkpoint(checkPointToken, recordCount); - successItemsCounter.increment(); + if (!sourceConfig.isAcknowledgmentsEnabled() && System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) { + LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkPointToken, recordCount); + partitionCheckpoint.checkpoint(checkPointToken, recordCount); + lastCheckpointTime = System.currentTimeMillis(); + } } } catch (Exception e) { - LOG.error("Failed to add record to buffer with error {}", e.getMessage()); - failureItemsCounter.increment(); + // TODO handle documents with size > 10 MB. + // this will only happen if writing to buffer gets interrupted from shutdown, + // otherwise it's infinite backoff and retry + LOG.error("Failed to add records to buffer with error {}", e.getMessage()); + failureItemsCounter.increment(records.size()); } } } @@ -152,12 +173,21 @@ public void processStream(final StreamPartition streamPartition) { LOG.error("Exception connecting to cluster and processing stream", e); throw new RuntimeException(e); } finally { - LOG.info("Checkpointing processing stream"); if (!records.isEmpty()) { + LOG.info("Flushing and checkpointing last processed record batch from the stream before terminating"); + acknowledgementSet = streamAcknowledgementManager.createAcknowledgementSet(checkPointToken, recordCount).orElse(null); recordBufferWriter.writeToBuffer(acknowledgementSet, records); + successItemsCounter.increment(records.size()); + // Do final checkpoint. + if (!sourceConfig.isAcknowledgmentsEnabled()) { + partitionCheckpoint.checkpoint(checkPointToken, recordCount); + } + } + + // shutdown acknowledgement monitoring thread + if (streamAcknowledgementManager != null) { + streamAcknowledgementManager.shutdown(); } - // Do final checkpoint. - partitionCheckpoint.checkpoint(checkPointToken, recordCount); } } } diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java index afae4cdc8d..1387ed24f8 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java @@ -63,7 +63,8 @@ void test_should_init() { leaderScheduler = new LeaderScheduler(coordinator, List.of(collectionConfig), Duration.ofMillis(100)); leaderPartition = new LeaderPartition(); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); - given(collectionConfig.getIngestionMode()).willReturn(CollectionConfig.IngestionMode.EXPORT_STREAM); + given(collectionConfig.isExportRequired()).willReturn(true); + given(collectionConfig.isStreamRequired()).willReturn(true); given(collectionConfig.getExportConfig()).willReturn(exportConfig); given(exportConfig.getItemsPerPartition()).willReturn(new Random().nextInt()); given(collectionConfig.getCollection()).willReturn(UUID.randomUUID().toString()); @@ -96,7 +97,7 @@ void test_should_init_export() { leaderScheduler = new LeaderScheduler(coordinator, List.of(collectionConfig), Duration.ofMillis(100)); leaderPartition = new LeaderPartition(); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); - given(collectionConfig.getIngestionMode()).willReturn(CollectionConfig.IngestionMode.EXPORT); + given(collectionConfig.isExportRequired()).willReturn(true); given(collectionConfig.getExportConfig()).willReturn(exportConfig); given(exportConfig.getItemsPerPartition()).willReturn(new Random().nextInt()); given(collectionConfig.getCollection()).willReturn(UUID.randomUUID().toString()); @@ -129,7 +130,7 @@ void test_should_init_stream() { leaderScheduler = new LeaderScheduler(coordinator, List.of(collectionConfig), Duration.ofMillis(100)); leaderPartition = new LeaderPartition(); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); - given(collectionConfig.getIngestionMode()).willReturn(CollectionConfig.IngestionMode.STREAM); + given(collectionConfig.isStreamRequired()).willReturn(true); given(collectionConfig.getCollection()).willReturn(UUID.randomUUID().toString()); final ExecutorService executorService = Executors.newSingleThreadExecutor(); diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java new file mode 100644 index 0000000000..f55311899c --- /dev/null +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java @@ -0,0 +1,168 @@ +package org.opensearch.dataprepper.plugins.mongo.stream; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.plugins.mongo.model.CheckpointStatus; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class StreamAcknowledgementManagerTest { + + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private DataStreamPartitionCheckpoint partitionCheckpoint; + @Mock + private Duration timeout; + @Mock + private AcknowledgementSet acknowledgementSet; + private StreamAcknowledgementManager streamAckManager; + + @BeforeEach + public void setup() { + streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 100, 100); + } + + @Test + public void createAcknowledgementSet_disabled_emptyAckSet() { + final Optional ackSet = streamAckManager.createAcknowledgementSet(UUID.randomUUID().toString(), new Random().nextInt()); + assertThat(ackSet.isEmpty(), is(true)); + } + + @Test + public void createAcknowledgementSet_enabled_ackSetWithAck() { + streamAckManager.init(); + final String resumeToken = UUID.randomUUID().toString(); + final long recordCount = new Random().nextLong(); + when(acknowledgementSetManager.create(any(Consumer.class), eq(timeout))).thenReturn(acknowledgementSet); + final Optional ackSet = streamAckManager.createAcknowledgementSet(resumeToken, recordCount); + assertThat(ackSet.isEmpty(), is(false)); + assertThat(ackSet.get(), is(acknowledgementSet)); + assertThat(streamAckManager.getCheckpoints().peek().getResumeToken(), is(resumeToken)); + assertThat(streamAckManager.getCheckpoints().peek().getRecordCount(), is(recordCount)); + final ArgumentCaptor> argumentCaptor = ArgumentCaptor.forClass(Consumer.class); + verify(acknowledgementSetManager).create(argumentCaptor.capture(), eq(timeout)); + final Consumer consumer = argumentCaptor.getValue(); + consumer.accept(true); + final ConcurrentHashMap ackStatus = streamAckManager.getAcknowledgementStatus(); + final CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken); + assertThat(ackCheckpointStatus.isAcknowledged(), is(true)); + await() + .atMost(Duration.ofSeconds(4)).untilAsserted(() -> + verify(partitionCheckpoint).checkpoint(resumeToken, recordCount)); + assertThat(streamAckManager.getCheckpoints().peek(), is(nullValue())); + } + + @Test + public void createAcknowledgementSet_enabled_multipleAckSetWithAck() { + streamAckManager.init(); + final String resumeToken1 = UUID.randomUUID().toString(); + final long recordCount1 = new Random().nextLong(); + when(acknowledgementSetManager.create(any(Consumer.class), eq(timeout))).thenReturn(acknowledgementSet); + Optional ackSet = streamAckManager.createAcknowledgementSet(resumeToken1, recordCount1); + assertThat(ackSet.isEmpty(), is(false)); + assertThat(ackSet.get(), is(acknowledgementSet)); + assertThat(streamAckManager.getCheckpoints().peek().getResumeToken(), is(resumeToken1)); + assertThat(streamAckManager.getCheckpoints().peek().getRecordCount(), is(recordCount1)); + + final String resumeToken2 = UUID.randomUUID().toString(); + final long recordCount2 = new Random().nextLong(); + when(acknowledgementSetManager.create(any(Consumer.class), eq(timeout))).thenReturn(acknowledgementSet); + ackSet = streamAckManager.createAcknowledgementSet(resumeToken2, recordCount2); + assertThat(ackSet.isEmpty(), is(false)); + assertThat(ackSet.get(), is(acknowledgementSet)); + assertThat(streamAckManager.getCheckpoints().peek().getResumeToken(), is(resumeToken1)); + assertThat(streamAckManager.getCheckpoints().peek().getRecordCount(), is(recordCount1)); + ArgumentCaptor> argumentCaptor = ArgumentCaptor.forClass(Consumer.class); + verify(acknowledgementSetManager, times(2)).create(argumentCaptor.capture(), eq(timeout)); + List> consumers = argumentCaptor.getAllValues(); + consumers.get(0).accept(true); + consumers.get(1).accept(true); + ConcurrentHashMap ackStatus = streamAckManager.getAcknowledgementStatus(); + CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken2); + assertThat(ackCheckpointStatus.isAcknowledged(), is(true)); + await() + .atMost(Duration.ofSeconds(4)).untilAsserted(() -> + verify(partitionCheckpoint).checkpoint(resumeToken2, recordCount2)); + assertThat(streamAckManager.getCheckpoints().peek(), is(nullValue())); + } + + @Test + public void createAcknowledgementSet_enabled_multipleAckSetWithAckFailure() { + streamAckManager.init(); + final String resumeToken1 = UUID.randomUUID().toString(); + final long recordCount1 = new Random().nextLong(); + when(acknowledgementSetManager.create(any(Consumer.class), eq(timeout))).thenReturn(acknowledgementSet); + Optional ackSet = streamAckManager.createAcknowledgementSet(resumeToken1, recordCount1); + assertThat(ackSet.isEmpty(), is(false)); + assertThat(ackSet.get(), is(acknowledgementSet)); + assertThat(streamAckManager.getCheckpoints().peek().getResumeToken(), is(resumeToken1)); + assertThat(streamAckManager.getCheckpoints().peek().getRecordCount(), is(recordCount1)); + + final String resumeToken2 = UUID.randomUUID().toString(); + final long recordCount2 = new Random().nextLong(); + when(acknowledgementSetManager.create(any(Consumer.class), eq(timeout))).thenReturn(acknowledgementSet); + ackSet = streamAckManager.createAcknowledgementSet(resumeToken2, recordCount2); + assertThat(ackSet.isEmpty(), is(false)); + assertThat(ackSet.get(), is(acknowledgementSet)); + assertThat(streamAckManager.getCheckpoints().peek().getResumeToken(), is(resumeToken1)); + assertThat(streamAckManager.getCheckpoints().peek().getRecordCount(), is(recordCount1)); + ArgumentCaptor> argumentCaptor = ArgumentCaptor.forClass(Consumer.class); + verify(acknowledgementSetManager, times(2)).create(argumentCaptor.capture(), eq(timeout)); + List> consumers = argumentCaptor.getAllValues(); + consumers.get(0).accept(false); + consumers.get(1).accept(true); + ConcurrentHashMap ackStatus = streamAckManager.getAcknowledgementStatus(); + CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken2); + assertThat(ackCheckpointStatus.isAcknowledged(), is(true)); + await() + .atMost(Duration.ofSeconds(4)).untilAsserted(() -> + verifyNoInteractions(partitionCheckpoint)); + assertThat(streamAckManager.getCheckpoints().peek().getResumeToken(), is(resumeToken1)); + assertThat(streamAckManager.getCheckpoints().peek().getRecordCount(), is(recordCount1)); + } + + @Test + public void createAcknowledgementSet_enabled_ackSetWithNoAck() { + streamAckManager.init(); + final String resumeToken = UUID.randomUUID().toString(); + final long recordCount = new Random().nextLong(); + when(acknowledgementSetManager.create(any(Consumer.class), eq(timeout))).thenReturn(acknowledgementSet); + final Optional ackSet = streamAckManager.createAcknowledgementSet(resumeToken, recordCount); + assertThat(ackSet.isEmpty(), is(false)); + assertThat(ackSet.get(), is(acknowledgementSet)); + assertThat(streamAckManager.getCheckpoints().peek().getResumeToken(), is(resumeToken)); + assertThat(streamAckManager.getCheckpoints().peek().getRecordCount(), is(recordCount)); + final ArgumentCaptor> argumentCaptor = ArgumentCaptor.forClass(Consumer.class); + verify(acknowledgementSetManager).create(argumentCaptor.capture(), eq(timeout)); + final Consumer consumer = argumentCaptor.getValue(); + consumer.accept(false); + final ConcurrentHashMap ackStatus = streamAckManager.getAcknowledgementStatus(); + final CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken); + assertThat(ackCheckpointStatus.isAcknowledged(), is(false)); + } +} diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java index dced676ceb..57a645de97 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java @@ -32,6 +32,7 @@ import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; +import static org.opensearch.dataprepper.plugins.mongo.stream.StreamScheduler.DEFAULT_CHECKPOINT_INTERVAL_MILLS; @ExtendWith(MockitoExtension.class) @@ -86,7 +87,7 @@ void test_stream_run() { final Future future = executorService.submit(() -> { try (MockedStatic streamWorkerMockedStatic = mockStatic(StreamWorker.class)) { streamWorkerMockedStatic.when(() -> StreamWorker.create(any(RecordBufferWriter.class), eq(acknowledgementSetManager), - eq(sourceConfig), any(DataStreamPartitionCheckpoint.class), eq(pluginMetrics), eq(100))) + eq(sourceConfig), any(DataStreamPartitionCheckpoint.class), eq(pluginMetrics), eq(100), eq(DEFAULT_CHECKPOINT_INTERVAL_MILLS))) .thenReturn(streamWorker); streamScheduler.run(); } diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index 9f0ba53a63..3036707758 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -72,7 +72,7 @@ public void setup() { when(mockPluginMetrics.counter(SUCCESS_ITEM_COUNTER_NAME)).thenReturn(successItemsCounter); when(mockPluginMetrics.counter(FAILURE_ITEM_COUNTER_NAME)).thenReturn(failureItemsCounter); streamWorker = new StreamWorker(mockRecordBufferWriter, mockAcknowledgementSetManager, - mockSourceConfig, mockPartitionCheckpoint, mockPluginMetrics, 2); + mockSourceConfig, mockPartitionCheckpoint, mockPluginMetrics, 2, 0); } @Test @@ -100,7 +100,7 @@ void test_processStream_success() { ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class); ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class); Document doc1 = mock(Document.class); - Document doc2 = mock(Document.class); + Document doc2 = mock(Document.class); BsonDocument bsonDoc1 = new BsonDocument("resumeToken1", new BsonInt32(123)); BsonDocument bsonDoc2 = new BsonDocument("resumeToken2", new BsonInt32(234)); when(streamDoc1.getResumeToken()).thenReturn(bsonDoc1); @@ -119,8 +119,9 @@ void test_processStream_success() { verify(mongoClient, times(1)).close(); verify(mongoDatabase).getCollection(eq("collection")); verify(mockRecordBufferWriter).writeToBuffer(eq(null), any()); - verify(successItemsCounter, times(1)).increment(); + verify(successItemsCounter, times(1)).increment(2); verify(failureItemsCounter, never()).increment(); + verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken2\": 234}", 2); } @@ -139,4 +140,54 @@ void test_processStream_mongoClientFailure() { verifyNoInteractions(successItemsCounter); verifyNoInteractions(failureItemsCounter); } + + @Test + void test_processStream_highCheckPointIntervalSuccess() { + when(streamProgressState.shouldWaitForExport()).thenReturn(false); + when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); + when(streamPartition.getCollection()).thenReturn("database.collection"); + MongoClient mongoClient = mock(MongoClient.class); + MongoDatabase mongoDatabase = mock(MongoDatabase.class); + MongoCollection col = mock(MongoCollection.class); + ChangeStreamIterable changeStreamIterable = mock(ChangeStreamIterable.class); + MongoCursor cursor = mock(MongoCursor.class); + lenient().when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); + lenient().when(mongoDatabase.getCollection(anyString())).thenReturn(col); + lenient().when(col.watch()).thenReturn(changeStreamIterable); + lenient().when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); + lenient().when(changeStreamIterable.iterator()).thenReturn(cursor); + lenient().when(cursor.hasNext()).thenReturn(true, true, true, false); + ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class); + ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class); + ChangeStreamDocument streamDoc3 = mock(ChangeStreamDocument.class); + Document doc1 = mock(Document.class); + Document doc2 = mock(Document.class); + Document doc3 = mock(Document.class); + BsonDocument bsonDoc1 = new BsonDocument("resumeToken1", new BsonInt32(123)); + BsonDocument bsonDoc2 = new BsonDocument("resumeToken2", new BsonInt32(234)); + BsonDocument bsonDoc3 = new BsonDocument("resumeToken2", new BsonInt32(456)); + when(streamDoc1.getResumeToken()).thenReturn(bsonDoc1); + when(streamDoc2.getResumeToken()).thenReturn(bsonDoc2); + when(streamDoc3.getResumeToken()).thenReturn(bsonDoc3); + lenient().when(cursor.next()) + .thenReturn(streamDoc1) + .thenReturn(streamDoc2) + .thenReturn(streamDoc3); + when(streamDoc1.getFullDocument()).thenReturn(doc1); + when(streamDoc2.getFullDocument()).thenReturn(doc2); + when(streamDoc3.getFullDocument()).thenReturn(doc3); + + try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { + mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class))) + .thenReturn(mongoClient); + streamWorker.processStream(streamPartition); + } + verify(mongoClient, times(1)).close(); + verify(mongoDatabase).getCollection(eq("collection")); + verify(mockRecordBufferWriter, times(2)).writeToBuffer(eq(null), any()); + verify(successItemsCounter).increment(2); + verify(successItemsCounter).increment(1); + verify(failureItemsCounter, never()).increment(); + verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken2\": 456}", 3); + } } From 9c7e6b84d24e6b491d409f1d56054a4f95c9402a Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 1 Apr 2024 09:08:49 -0500 Subject: [PATCH 02/18] Update StreamAcknowledgementManagerTest Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../mongo/stream/StreamAcknowledgementManager.java | 7 ++++--- .../mongo/stream/StreamAcknowledgementManagerTest.java | 8 ++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java index a3f052629c..f0dbe5403f 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java @@ -58,20 +58,21 @@ private void monitorCheckpoints(final ExecutorService executorService, final Thr if (checkpointStatus.isAcknowledged()) { lastCheckpointStatus = checkpoints.poll(); ackStatus.remove(checkpointStatus.getResumeToken()); - if (System.currentTimeMillis() - lastCheckpointTime > checkPointIntervalInMs) { + if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) { LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount()); partitionCheckpoint.checkpoint(checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount()); lastCheckpointTime = System.currentTimeMillis(); } } else { - LOG.info("Checkpoint not complete"); + LOG.debug("Checkpoint not complete for resume token {}", checkpointStatus.getResumeToken()); final Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(checkpointStatus.getCreateTimestamp()), Instant.now()); - // Acknowledgement not received for the checkpoint after twice ack wait duration + // Acknowledgement not received for the checkpoint after twice ack wait time if (ackWaitDuration.getSeconds() > partitionAcknowledgmentTimeout.getSeconds() * 2) { // Give up partition and should interrupt parent thread to stop processing stream if (lastCheckpointStatus != null && lastCheckpointStatus.isAcknowledged()) { partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount()); } + LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken()); partitionCheckpoint.giveUpPartition(); Thread.currentThread().interrupt(); } diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java index f55311899c..86dc447fcc 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java @@ -44,7 +44,7 @@ public class StreamAcknowledgementManagerTest { @BeforeEach public void setup() { - streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 100, 100); + streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0); } @Test @@ -72,7 +72,7 @@ public void createAcknowledgementSet_enabled_ackSetWithAck() { final CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken); assertThat(ackCheckpointStatus.isAcknowledged(), is(true)); await() - .atMost(Duration.ofSeconds(4)).untilAsserted(() -> + .atMost(Duration.ofSeconds(10)).untilAsserted(() -> verify(partitionCheckpoint).checkpoint(resumeToken, recordCount)); assertThat(streamAckManager.getCheckpoints().peek(), is(nullValue())); } @@ -106,7 +106,7 @@ public void createAcknowledgementSet_enabled_multipleAckSetWithAck() { CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken2); assertThat(ackCheckpointStatus.isAcknowledged(), is(true)); await() - .atMost(Duration.ofSeconds(4)).untilAsserted(() -> + .atMost(Duration.ofSeconds(10)).untilAsserted(() -> verify(partitionCheckpoint).checkpoint(resumeToken2, recordCount2)); assertThat(streamAckManager.getCheckpoints().peek(), is(nullValue())); } @@ -140,7 +140,7 @@ public void createAcknowledgementSet_enabled_multipleAckSetWithAckFailure() { CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken2); assertThat(ackCheckpointStatus.isAcknowledged(), is(true)); await() - .atMost(Duration.ofSeconds(4)).untilAsserted(() -> + .atMost(Duration.ofSeconds(10)).untilAsserted(() -> verifyNoInteractions(partitionCheckpoint)); assertThat(streamAckManager.getCheckpoints().peek().getResumeToken(), is(resumeToken1)); assertThat(streamAckManager.getCheckpoints().peek().getRecordCount(), is(recordCount1)); From e7d3b948ae9199783b2ed2f2e2ea90ed0d69beec Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 1 Apr 2024 10:13:29 -0500 Subject: [PATCH 03/18] Unit test update Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../stream/DataStreamPartitionCheckpoint.java | 14 ++--- .../mongo/stream/StreamWorkerTest.java | 52 +++++++++---------- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java index 3f752bf93d..9d6b9a2e67 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/DataStreamPartitionCheckpoint.java @@ -50,16 +50,16 @@ private void setProgressState(final String resumeToken, final long recordNumber) } /** - * This method is to do a checkpoint with latest sequence number processed. - * Note that this should be called on a regular basis even there are no changes to sequence number + * This method is to do a checkpoint with latest resume token processed. + * Note that this should be called on a regular basis even there are no changes to resume token * As the checkpoint will also extend the timeout for the lease * - * @param resumeToken - * @param recordNumber The last record number + * @param resumeToken checkpoint token to start resuming the stream when MongoDB/DocumentDB cursor is open + * @param recordCount The last processed record count */ - public void checkpoint(final String resumeToken, final long recordNumber) { - LOG.debug("Checkpoint stream partition for collection " + streamPartition.getCollection() + " with record number " + recordNumber); - setProgressState(resumeToken, recordNumber); + public void checkpoint(final String resumeToken, final long recordCount) { + LOG.debug("Checkpoint stream partition for collection " + streamPartition.getCollection() + " with record number " + recordCount); + setProgressState(resumeToken, recordCount); enhancedSourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE); } diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index 3036707758..a6c25bf12a 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -32,7 +32,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; @@ -91,23 +90,23 @@ void test_processStream_success() { MongoCollection col = mock(MongoCollection.class); ChangeStreamIterable changeStreamIterable = mock(ChangeStreamIterable.class); MongoCursor cursor = mock(MongoCursor.class); - lenient().when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); - lenient().when(mongoDatabase.getCollection(anyString())).thenReturn(col); - lenient().when(col.watch()).thenReturn(changeStreamIterable); - lenient().when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); - lenient().when(changeStreamIterable.iterator()).thenReturn(cursor); - lenient().when(cursor.hasNext()).thenReturn(true, true, false); - ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class); - ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class); + when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); + when(mongoDatabase.getCollection(anyString())).thenReturn(col); + when(col.watch()).thenReturn(changeStreamIterable); + when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); + when(changeStreamIterable.iterator()).thenReturn(cursor); + when(cursor.hasNext()).thenReturn(true, true, false); + ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class); + ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class); Document doc1 = mock(Document.class); Document doc2 = mock(Document.class); BsonDocument bsonDoc1 = new BsonDocument("resumeToken1", new BsonInt32(123)); BsonDocument bsonDoc2 = new BsonDocument("resumeToken2", new BsonInt32(234)); when(streamDoc1.getResumeToken()).thenReturn(bsonDoc1); when(streamDoc2.getResumeToken()).thenReturn(bsonDoc2); - lenient().when(cursor.next()) - .thenReturn(streamDoc1) - .thenReturn(streamDoc2); + when(cursor.next()) + .thenReturn(streamDoc1) + .thenReturn(streamDoc2); when(streamDoc1.getFullDocument()).thenReturn(doc1); when(streamDoc2.getFullDocument()).thenReturn(doc2); @@ -142,7 +141,8 @@ void test_processStream_mongoClientFailure() { } @Test - void test_processStream_highCheckPointIntervalSuccess() { + void test_processStream_checkPointIntervalSuccess() { + when(mockSourceConfig.isAcknowledgmentsEnabled()).thenReturn(false) when(streamProgressState.shouldWaitForExport()).thenReturn(false); when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); when(streamPartition.getCollection()).thenReturn("database.collection"); @@ -151,15 +151,15 @@ void test_processStream_highCheckPointIntervalSuccess() { MongoCollection col = mock(MongoCollection.class); ChangeStreamIterable changeStreamIterable = mock(ChangeStreamIterable.class); MongoCursor cursor = mock(MongoCursor.class); - lenient().when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); - lenient().when(mongoDatabase.getCollection(anyString())).thenReturn(col); - lenient().when(col.watch()).thenReturn(changeStreamIterable); - lenient().when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); - lenient().when(changeStreamIterable.iterator()).thenReturn(cursor); - lenient().when(cursor.hasNext()).thenReturn(true, true, true, false); - ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class); - ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class); - ChangeStreamDocument streamDoc3 = mock(ChangeStreamDocument.class); + when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); + when(mongoDatabase.getCollection(anyString())).thenReturn(col); + when(col.watch()).thenReturn(changeStreamIterable); + when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); + when(changeStreamIterable.iterator()).thenReturn(cursor); + when(cursor.hasNext()).thenReturn(true, true, true, false); + ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class); + ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class); + ChangeStreamDocument streamDoc3 = mock(ChangeStreamDocument.class); Document doc1 = mock(Document.class); Document doc2 = mock(Document.class); Document doc3 = mock(Document.class); @@ -169,10 +169,10 @@ void test_processStream_highCheckPointIntervalSuccess() { when(streamDoc1.getResumeToken()).thenReturn(bsonDoc1); when(streamDoc2.getResumeToken()).thenReturn(bsonDoc2); when(streamDoc3.getResumeToken()).thenReturn(bsonDoc3); - lenient().when(cursor.next()) - .thenReturn(streamDoc1) - .thenReturn(streamDoc2) - .thenReturn(streamDoc3); + when(cursor.next()) + .thenReturn(streamDoc1) + .thenReturn(streamDoc2) + .thenReturn(streamDoc3); when(streamDoc1.getFullDocument()).thenReturn(doc1); when(streamDoc2.getFullDocument()).thenReturn(doc2); when(streamDoc3.getFullDocument()).thenReturn(doc3); From 4def974b869c4dc00722cae84b75d983c0f92200 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 1 Apr 2024 10:14:35 -0500 Subject: [PATCH 04/18] Update unit test Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../dataprepper/plugins/mongo/stream/StreamWorkerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index a6c25bf12a..09797f5083 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -142,7 +142,7 @@ void test_processStream_mongoClientFailure() { @Test void test_processStream_checkPointIntervalSuccess() { - when(mockSourceConfig.isAcknowledgmentsEnabled()).thenReturn(false) + when(mockSourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); when(streamProgressState.shouldWaitForExport()).thenReturn(false); when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); when(streamPartition.getCollection()).thenReturn("database.collection"); From 5740266998661ad591163ec51abb35428f46c2cb Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 1 Apr 2024 11:36:53 -0500 Subject: [PATCH 05/18] Update unit test Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../mongo/stream/StreamWorkerTest.java | 74 +++++++++---------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index 09797f5083..31deadb1cc 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -142,48 +142,48 @@ void test_processStream_mongoClientFailure() { @Test void test_processStream_checkPointIntervalSuccess() { - when(mockSourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); - when(streamProgressState.shouldWaitForExport()).thenReturn(false); - when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); - when(streamPartition.getCollection()).thenReturn("database.collection"); - MongoClient mongoClient = mock(MongoClient.class); - MongoDatabase mongoDatabase = mock(MongoDatabase.class); - MongoCollection col = mock(MongoCollection.class); - ChangeStreamIterable changeStreamIterable = mock(ChangeStreamIterable.class); - MongoCursor cursor = mock(MongoCursor.class); - when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); - when(mongoDatabase.getCollection(anyString())).thenReturn(col); - when(col.watch()).thenReturn(changeStreamIterable); - when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); - when(changeStreamIterable.iterator()).thenReturn(cursor); - when(cursor.hasNext()).thenReturn(true, true, true, false); - ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class); - ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class); - ChangeStreamDocument streamDoc3 = mock(ChangeStreamDocument.class); - Document doc1 = mock(Document.class); - Document doc2 = mock(Document.class); - Document doc3 = mock(Document.class); - BsonDocument bsonDoc1 = new BsonDocument("resumeToken1", new BsonInt32(123)); - BsonDocument bsonDoc2 = new BsonDocument("resumeToken2", new BsonInt32(234)); - BsonDocument bsonDoc3 = new BsonDocument("resumeToken2", new BsonInt32(456)); - when(streamDoc1.getResumeToken()).thenReturn(bsonDoc1); - when(streamDoc2.getResumeToken()).thenReturn(bsonDoc2); - when(streamDoc3.getResumeToken()).thenReturn(bsonDoc3); - when(cursor.next()) - .thenReturn(streamDoc1) - .thenReturn(streamDoc2) - .thenReturn(streamDoc3); - when(streamDoc1.getFullDocument()).thenReturn(doc1); - when(streamDoc2.getFullDocument()).thenReturn(doc2); - when(streamDoc3.getFullDocument()).thenReturn(doc3); - try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { + when(mockSourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); + when(streamProgressState.shouldWaitForExport()).thenReturn(false); + when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); + when(streamPartition.getCollection()).thenReturn("database.collection"); + MongoClient mongoClient = mock(MongoClient.class); + MongoDatabase mongoDatabase = mock(MongoDatabase.class); + MongoCollection col = mock(MongoCollection.class); + ChangeStreamIterable changeStreamIterable = mock(ChangeStreamIterable.class); + MongoCursor cursor = mock(MongoCursor.class); + when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); + when(mongoDatabase.getCollection(anyString())).thenReturn(col); + when(col.watch()).thenReturn(changeStreamIterable); + when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); + when(changeStreamIterable.iterator()).thenReturn(cursor); + when(cursor.hasNext()).thenReturn(true, true, true, false); + ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class); + ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class); + ChangeStreamDocument streamDoc3 = mock(ChangeStreamDocument.class); + Document doc1 = mock(Document.class); + Document doc2 = mock(Document.class); + Document doc3 = mock(Document.class); + BsonDocument bsonDoc1 = new BsonDocument("resumeToken1", new BsonInt32(123)); + BsonDocument bsonDoc2 = new BsonDocument("resumeToken2", new BsonInt32(234)); + BsonDocument bsonDoc3 = new BsonDocument("resumeToken2", new BsonInt32(456)); + when(streamDoc1.getResumeToken()).thenReturn(bsonDoc1); + when(streamDoc2.getResumeToken()).thenReturn(bsonDoc2); + when(streamDoc3.getResumeToken()).thenReturn(bsonDoc3); + when(cursor.next()) + .thenReturn(streamDoc1) + .thenReturn(streamDoc2) + .thenReturn(streamDoc3); + when(streamDoc1.getFullDocument()).thenReturn(doc1); + when(streamDoc2.getFullDocument()).thenReturn(doc2); + when(streamDoc3.getFullDocument()).thenReturn(doc3); + mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class))) .thenReturn(mongoClient); streamWorker.processStream(streamPartition); + verify(mongoClient, times(1)).close(); + verify(mongoDatabase).getCollection(eq("collection")); } - verify(mongoClient, times(1)).close(); - verify(mongoDatabase).getCollection(eq("collection")); verify(mockRecordBufferWriter, times(2)).writeToBuffer(eq(null), any()); verify(successItemsCounter).increment(2); verify(successItemsCounter).increment(1); From a1417a702101536914db9a4c0e7b8c7851682c1c Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 1 Apr 2024 11:54:53 -0500 Subject: [PATCH 06/18] Unit test fix Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../dataprepper/plugins/mongo/stream/StreamWorkerTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index 31deadb1cc..eede5d2f21 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -166,7 +166,7 @@ void test_processStream_checkPointIntervalSuccess() { Document doc3 = mock(Document.class); BsonDocument bsonDoc1 = new BsonDocument("resumeToken1", new BsonInt32(123)); BsonDocument bsonDoc2 = new BsonDocument("resumeToken2", new BsonInt32(234)); - BsonDocument bsonDoc3 = new BsonDocument("resumeToken2", new BsonInt32(456)); + BsonDocument bsonDoc3 = new BsonDocument("resumeToken3", new BsonInt32(456)); when(streamDoc1.getResumeToken()).thenReturn(bsonDoc1); when(streamDoc2.getResumeToken()).thenReturn(bsonDoc2); when(streamDoc3.getResumeToken()).thenReturn(bsonDoc3); @@ -188,6 +188,7 @@ void test_processStream_checkPointIntervalSuccess() { verify(successItemsCounter).increment(2); verify(successItemsCounter).increment(1); verify(failureItemsCounter, never()).increment(); - verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken2\": 456}", 3); + verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken2\": 234}", 2); + verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken3\": 456}", 3); } } From 85c9894652a4274fefef252c9e73321394523a06 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 1 Apr 2024 12:09:33 -0500 Subject: [PATCH 07/18] Unit test fix Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../dataprepper/plugins/mongo/stream/StreamWorkerTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index eede5d2f21..98971eedd9 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -184,11 +184,12 @@ void test_processStream_checkPointIntervalSuccess() { verify(mongoClient, times(1)).close(); verify(mongoDatabase).getCollection(eq("collection")); } - verify(mockRecordBufferWriter, times(2)).writeToBuffer(eq(null), any()); + //verify(mockRecordBufferWriter, times(2)).writeToBuffer(eq(null), any()); + verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken2\": 234}", 2); verify(successItemsCounter).increment(2); + verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken3\": 456}", 3); verify(successItemsCounter).increment(1); verify(failureItemsCounter, never()).increment(); - verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken2\": 234}", 2); - verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken3\": 456}", 3); + } } From bdd789b3a7fb18dfb459756bb2b21149720ebb89 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 1 Apr 2024 12:23:57 -0500 Subject: [PATCH 08/18] Unit test fix Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../dataprepper/plugins/mongo/stream/StreamWorker.java | 2 +- .../dataprepper/plugins/mongo/stream/StreamWorkerTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java index dca3478408..ab14ae602f 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java @@ -154,7 +154,7 @@ public void processStream(final StreamPartition streamPartition) { recordBufferWriter.writeToBuffer(acknowledgementSet, records); successItemsCounter.increment(records.size()); records.clear(); - if (!sourceConfig.isAcknowledgmentsEnabled() && System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) { + if (!sourceConfig.isAcknowledgmentsEnabled() && (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs)) { LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkPointToken, recordCount); partitionCheckpoint.checkpoint(checkPointToken, recordCount); lastCheckpointTime = System.currentTimeMillis(); diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index 98971eedd9..71ba8186c4 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -185,10 +185,10 @@ void test_processStream_checkPointIntervalSuccess() { verify(mongoDatabase).getCollection(eq("collection")); } //verify(mockRecordBufferWriter, times(2)).writeToBuffer(eq(null), any()); - verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken2\": 234}", 2); - verify(successItemsCounter).increment(2); verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken3\": 456}", 3); verify(successItemsCounter).increment(1); + verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken2\": 234}", 2); + verify(successItemsCounter).increment(2); verify(failureItemsCounter, never()).increment(); } From d5030639d4acd1464706b2bce5746e6371cf342a Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 1 Apr 2024 12:59:08 -0500 Subject: [PATCH 09/18] Update unit test Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../dataprepper/plugins/mongo/stream/StreamWorkerTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index 71ba8186c4..7756396180 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -143,6 +143,7 @@ void test_processStream_mongoClientFailure() { @Test void test_processStream_checkPointIntervalSuccess() { try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { + when(mockPartitionCheckpoint.getGlobalStreamLoadStatus()).thenReturn(Optional.empty()); when(mockSourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); when(streamProgressState.shouldWaitForExport()).thenReturn(false); when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); @@ -183,6 +184,7 @@ void test_processStream_checkPointIntervalSuccess() { streamWorker.processStream(streamPartition); verify(mongoClient, times(1)).close(); verify(mongoDatabase).getCollection(eq("collection")); + verify(cursor, times(4)).hasNext(); } //verify(mockRecordBufferWriter, times(2)).writeToBuffer(eq(null), any()); verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken3\": 456}", 3); From 3444d1e981cea00da5bb3facae98ccb3bb98ed88 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 1 Apr 2024 13:21:20 -0500 Subject: [PATCH 10/18] Update test Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../dataprepper/plugins/mongo/stream/StreamWorker.java | 8 ++++---- .../plugins/mongo/stream/StreamWorkerTest.java | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java index ab14ae602f..b18e530063 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java @@ -178,10 +178,10 @@ public void processStream(final StreamPartition streamPartition) { acknowledgementSet = streamAcknowledgementManager.createAcknowledgementSet(checkPointToken, recordCount).orElse(null); recordBufferWriter.writeToBuffer(acknowledgementSet, records); successItemsCounter.increment(records.size()); - // Do final checkpoint. - if (!sourceConfig.isAcknowledgmentsEnabled()) { - partitionCheckpoint.checkpoint(checkPointToken, recordCount); - } + } + // Do final checkpoint. + if (!sourceConfig.isAcknowledgmentsEnabled()) { + partitionCheckpoint.checkpoint(checkPointToken, recordCount); } // shutdown acknowledgement monitoring thread diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index 7756396180..d301716629 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -120,7 +120,7 @@ void test_processStream_success() { verify(mockRecordBufferWriter).writeToBuffer(eq(null), any()); verify(successItemsCounter, times(1)).increment(2); verify(failureItemsCounter, never()).increment(); - verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken2\": 234}", 2); + verify(mockPartitionCheckpoint, times(2)).checkpoint("{\"resumeToken2\": 234}", 2); } @@ -184,7 +184,8 @@ void test_processStream_checkPointIntervalSuccess() { streamWorker.processStream(streamPartition); verify(mongoClient, times(1)).close(); verify(mongoDatabase).getCollection(eq("collection")); - verify(cursor, times(4)).hasNext(); + verify(cursor).close(); + //verify(cursor, times(4)).hasNext(); } //verify(mockRecordBufferWriter, times(2)).writeToBuffer(eq(null), any()); verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken3\": 456}", 3); From 1b9749c445ec6b5f950fd16d17fd845ebfb8712c Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 1 Apr 2024 13:35:22 -0500 Subject: [PATCH 11/18] Unit test fix Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../dataprepper/plugins/mongo/stream/StreamWorkerTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index d301716629..9c93a8afb8 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -11,6 +11,7 @@ import org.bson.BsonDocument; import org.bson.BsonInt32; import org.bson.Document; +import org.bson.json.JsonWriterSettings; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -107,6 +108,8 @@ void test_processStream_success() { when(cursor.next()) .thenReturn(streamDoc1) .thenReturn(streamDoc2); + when(doc1.toJson(any(JsonWriterSettings.class))).thenReturn(UUID.randomUUID().toString()); + when(doc2.toJson(any(JsonWriterSettings.class))).thenReturn(UUID.randomUUID().toString()); when(streamDoc1.getFullDocument()).thenReturn(doc1); when(streamDoc2.getFullDocument()).thenReturn(doc2); @@ -175,6 +178,9 @@ void test_processStream_checkPointIntervalSuccess() { .thenReturn(streamDoc1) .thenReturn(streamDoc2) .thenReturn(streamDoc3); + when(doc1.toJson(any(JsonWriterSettings.class))).thenReturn(UUID.randomUUID().toString()); + when(doc2.toJson(any(JsonWriterSettings.class))).thenReturn(UUID.randomUUID().toString()); + when(doc3.toJson(any(JsonWriterSettings.class))).thenReturn(UUID.randomUUID().toString()); when(streamDoc1.getFullDocument()).thenReturn(doc1); when(streamDoc2.getFullDocument()).thenReturn(doc2); when(streamDoc3.getFullDocument()).thenReturn(doc3); From 82369fc7a33a9373bc27a58c411ed4923d27e150 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 1 Apr 2024 13:54:27 -0500 Subject: [PATCH 12/18] Unit test fix Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../plugins/mongo/stream/StreamWorkerTest.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index 9c93a8afb8..85776cf6ef 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -168,9 +168,9 @@ void test_processStream_checkPointIntervalSuccess() { Document doc1 = mock(Document.class); Document doc2 = mock(Document.class); Document doc3 = mock(Document.class); - BsonDocument bsonDoc1 = new BsonDocument("resumeToken1", new BsonInt32(123)); - BsonDocument bsonDoc2 = new BsonDocument("resumeToken2", new BsonInt32(234)); - BsonDocument bsonDoc3 = new BsonDocument("resumeToken3", new BsonInt32(456)); + BsonDocument bsonDoc1 = mock(BsonDocument.class); //new BsonDocument("resumeToken1", new BsonInt32(123)); + BsonDocument bsonDoc2 = mock(BsonDocument.class); //new BsonDocument("resumeToken2", new BsonInt32(234)); + BsonDocument bsonDoc3 = mock(BsonDocument.class); //new BsonDocument("resumeToken3", new BsonInt32(456)); when(streamDoc1.getResumeToken()).thenReturn(bsonDoc1); when(streamDoc2.getResumeToken()).thenReturn(bsonDoc2); when(streamDoc3.getResumeToken()).thenReturn(bsonDoc3); @@ -184,6 +184,12 @@ void test_processStream_checkPointIntervalSuccess() { when(streamDoc1.getFullDocument()).thenReturn(doc1); when(streamDoc2.getFullDocument()).thenReturn(doc2); when(streamDoc3.getFullDocument()).thenReturn(doc3); + final String resumeToken1 = UUID.randomUUID().toString(); + final String resumeToken2 = UUID.randomUUID().toString(); + final String resumeToken3 = UUID.randomUUID().toString(); + when(bsonDoc1.toJson(any(JsonWriterSettings.class))).thenReturn(resumeToken1); + when(bsonDoc2.toJson(any(JsonWriterSettings.class))).thenReturn(resumeToken2); + when(bsonDoc3.toJson(any(JsonWriterSettings.class))).thenReturn(resumeToken3); mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class))) .thenReturn(mongoClient); @@ -192,11 +198,11 @@ void test_processStream_checkPointIntervalSuccess() { verify(mongoDatabase).getCollection(eq("collection")); verify(cursor).close(); //verify(cursor, times(4)).hasNext(); + verify(mockPartitionCheckpoint).checkpoint(resumeToken3, 3); + verify(successItemsCounter).increment(1); + verify(mockPartitionCheckpoint).checkpoint(resumeToken2, 2); } //verify(mockRecordBufferWriter, times(2)).writeToBuffer(eq(null), any()); - verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken3\": 456}", 3); - verify(successItemsCounter).increment(1); - verify(mockPartitionCheckpoint).checkpoint("{\"resumeToken2\": 234}", 2); verify(successItemsCounter).increment(2); verify(failureItemsCounter, never()).increment(); From f77c08ab2a6d131bf510be9b7bd39e28fa83b3ea Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 1 Apr 2024 14:34:39 -0500 Subject: [PATCH 13/18] Fix unit test Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../plugins/mongo/stream/StreamWorkerTest.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index 85776cf6ef..a08ee816d1 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -197,13 +197,14 @@ void test_processStream_checkPointIntervalSuccess() { verify(mongoClient, times(1)).close(); verify(mongoDatabase).getCollection(eq("collection")); verify(cursor).close(); - //verify(cursor, times(4)).hasNext(); - verify(mockPartitionCheckpoint).checkpoint(resumeToken3, 3); - verify(successItemsCounter).increment(1); - verify(mockPartitionCheckpoint).checkpoint(resumeToken2, 2); + // TODO fix + // verify(cursor, times(4)).hasNext(); + // verify(mockPartitionCheckpoint).checkpoint(resumeToken3, 3); + // verify(successItemsCounter).increment(1); + // verify(mockPartitionCheckpoint).checkpoint(resumeToken2, 2); } - //verify(mockRecordBufferWriter, times(2)).writeToBuffer(eq(null), any()); - verify(successItemsCounter).increment(2); + // verify(mockRecordBufferWriter, times(2)).writeToBuffer(eq(null), any()); + // verify(successItemsCounter).increment(2); verify(failureItemsCounter, never()).increment(); } From bd4c251c17b163fd35be7f6e3117318ff61f02cb Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 1 Apr 2024 15:45:14 -0500 Subject: [PATCH 14/18] Unit test fix Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../dataprepper/plugins/mongo/stream/StreamWorkerTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index a08ee816d1..369bd880b6 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -175,9 +175,7 @@ void test_processStream_checkPointIntervalSuccess() { when(streamDoc2.getResumeToken()).thenReturn(bsonDoc2); when(streamDoc3.getResumeToken()).thenReturn(bsonDoc3); when(cursor.next()) - .thenReturn(streamDoc1) - .thenReturn(streamDoc2) - .thenReturn(streamDoc3); + .thenReturn(streamDoc1, streamDoc2, streamDoc3); when(doc1.toJson(any(JsonWriterSettings.class))).thenReturn(UUID.randomUUID().toString()); when(doc2.toJson(any(JsonWriterSettings.class))).thenReturn(UUID.randomUUID().toString()); when(doc3.toJson(any(JsonWriterSettings.class))).thenReturn(UUID.randomUUID().toString()); From 99a73d34628622701409a05782c2849d950b692a Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 1 Apr 2024 16:15:30 -0500 Subject: [PATCH 15/18] Disabling test Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../dataprepper/plugins/mongo/stream/StreamWorkerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index 369bd880b6..2e2e6a4706 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -143,7 +143,7 @@ void test_processStream_mongoClientFailure() { verifyNoInteractions(failureItemsCounter); } - @Test + //@Test void test_processStream_checkPointIntervalSuccess() { try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { when(mockPartitionCheckpoint.getGlobalStreamLoadStatus()).thenReturn(Optional.empty()); From 5d0c2919d4a0c6308c19b638183a9df044fc1910 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Tue, 2 Apr 2024 07:24:23 -0500 Subject: [PATCH 16/18] Add stop method to Stream worker to stop processing stream Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- data-prepper-plugins/mongodb/build.gradle | 1 + .../stream/StreamAcknowledgementManager.java | 22 +-- .../plugins/mongo/stream/StreamScheduler.java | 7 +- .../plugins/mongo/stream/StreamWorker.java | 28 ++-- .../export/ExportPartitionWorkerTest.java | 1 - .../StreamAcknowledgementManagerTest.java | 23 ++- .../mongo/stream/StreamSchedulerTest.java | 6 +- .../mongo/stream/StreamWorkerTest.java | 157 +++++++++++------- 8 files changed, 149 insertions(+), 96 deletions(-) diff --git a/data-prepper-plugins/mongodb/build.gradle b/data-prepper-plugins/mongodb/build.gradle index 2c05cecc55..d32efec8fa 100644 --- a/data-prepper-plugins/mongodb/build.gradle +++ b/data-prepper-plugins/mongodb/build.gradle @@ -16,6 +16,7 @@ dependencies { testImplementation testLibs.mockito.inline testImplementation testLibs.bundles.junit + testImplementation testLibs.slf4j.simple testImplementation project(path: ':data-prepper-test-common') } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java index f0dbe5403f..f25c2f6188 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java @@ -14,11 +14,12 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Consumer; public class StreamAcknowledgementManager { private static final Logger LOG = LoggerFactory.getLogger(StreamAcknowledgementManager.class); - private final ConcurrentLinkedQueue checkpoints = new ConcurrentLinkedQueue<>(); - private final ConcurrentHashMap ackStatus = new ConcurrentHashMap<>(); + private ConcurrentLinkedQueue checkpoints; + private ConcurrentHashMap ackStatus; private final AcknowledgementSetManager acknowledgementSetManager; private final DataStreamPartitionCheckpoint partitionCheckpoint; @@ -43,13 +44,14 @@ public StreamAcknowledgementManager(final AcknowledgementSetManager acknowledgem executorService = Executors.newSingleThreadExecutor(); } - void init() { + void init(final Consumer stopWorkerConsumer) { enableAcknowledgement = true; - final Thread currentThread = Thread.currentThread(); - executorService.submit(() -> monitorCheckpoints(executorService, currentThread)); + executorService.submit(() -> monitorCheckpoints(executorService, stopWorkerConsumer)); } - private void monitorCheckpoints(final ExecutorService executorService, final Thread parentThread) { + private void monitorCheckpoints(final ExecutorService executorService, final Consumer stopWorkerConsumer) { + checkpoints = new ConcurrentLinkedQueue<>(); + ackStatus = new ConcurrentHashMap<>(); long lastCheckpointTime = System.currentTimeMillis(); CheckpointStatus lastCheckpointStatus = null; while (!Thread.currentThread().isInterrupted()) { @@ -67,14 +69,14 @@ private void monitorCheckpoints(final ExecutorService executorService, final Thr LOG.debug("Checkpoint not complete for resume token {}", checkpointStatus.getResumeToken()); final Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(checkpointStatus.getCreateTimestamp()), Instant.now()); // Acknowledgement not received for the checkpoint after twice ack wait time - if (ackWaitDuration.getSeconds() > partitionAcknowledgmentTimeout.getSeconds() * 2) { + if (ackWaitDuration.getSeconds() >= partitionAcknowledgmentTimeout.getSeconds() * 2) { // Give up partition and should interrupt parent thread to stop processing stream if (lastCheckpointStatus != null && lastCheckpointStatus.isAcknowledged()) { partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount()); } LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken()); partitionCheckpoint.giveUpPartition(); - Thread.currentThread().interrupt(); + break; } } } @@ -82,10 +84,10 @@ private void monitorCheckpoints(final ExecutorService executorService, final Thr try { Thread.sleep(acknowledgementMonitorWaitTimeInMs); } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); + break; } } - parentThread.interrupt(); + stopWorkerConsumer.accept(null); executorService.shutdown(); } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java index 676b04ee95..4fd8ec0e0a 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java @@ -24,6 +24,7 @@ public class StreamScheduler implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class); private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60_000; static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 120_000; + private static final int DEFAULT_MONITOR_WAIT_TIME_MS = 15_000; /** * Number of records to accumulate before flushing to buffer */ @@ -62,8 +63,10 @@ public void run() { if (sourcePartition.isPresent()) { streamPartition = (StreamPartition) sourcePartition.get(); final DataStreamPartitionCheckpoint partitionCheckpoint = new DataStreamPartitionCheckpoint(sourceCoordinator, streamPartition); - final StreamWorker streamWorker = StreamWorker.create(recordBufferWriter, acknowledgementSetManager, - sourceConfig, partitionCheckpoint, pluginMetrics, DEFAULT_STREAM_BATCH_SIZE, DEFAULT_CHECKPOINT_INTERVAL_MILLS); + final StreamAcknowledgementManager streamAcknowledgementManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, + sourceConfig.getPartitionAcknowledgmentTimeout(), DEFAULT_MONITOR_WAIT_TIME_MS, DEFAULT_CHECKPOINT_INTERVAL_MILLS); + final StreamWorker streamWorker = StreamWorker.create(recordBufferWriter, sourceConfig, + streamAcknowledgementManager, partitionCheckpoint, pluginMetrics, DEFAULT_STREAM_BATCH_SIZE, DEFAULT_CHECKPOINT_INTERVAL_MILLS); streamWorker.processStream(streamPartition); } try { diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java index b18e530063..71bc26c474 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java @@ -13,7 +13,6 @@ import org.bson.json.JsonWriterSettings; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.plugins.mongo.buffer.RecordBufferWriter; import org.opensearch.dataprepper.plugins.mongo.client.MongoDBConnection; import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig; @@ -32,8 +31,6 @@ public class StreamWorker { public static final String STREAM_PREFIX = "STREAM-"; private static final Logger LOG = LoggerFactory.getLogger(StreamWorker.class); private static final int DEFAULT_EXPORT_COMPLETE_WAIT_INTERVAL_MILLIS = 90_000; - private static final int DEFAULT_MONITOR_WAIT_TIME_MS = 15_000; - private static final String COLLECTION_SPLITTER = "\\."; static final String SUCCESS_ITEM_COUNTER_NAME = "streamRecordsSuccessTotal"; static final String FAILURE_ITEM_COUNTER_NAME = "streamRecordsFailedTotal"; @@ -43,11 +40,12 @@ public class StreamWorker { private final MongoDBSourceConfig sourceConfig; private final Counter successItemsCounter; private final Counter failureItemsCounter; - private final AcknowledgementSetManager acknowledgementSetManager; + private final StreamAcknowledgementManager streamAcknowledgementManager; private final PluginMetrics pluginMetrics; private final int recordFlushBatchSize; final int checkPointIntervalInMs; - private final StreamAcknowledgementManager streamAcknowledgementManager; + private boolean stopWorker = false; + private final JsonWriterSettings writerSettings = JsonWriterSettings.builder() .outputMode(JsonMode.RELAXED) @@ -55,19 +53,19 @@ public class StreamWorker { .build(); public static StreamWorker create(final RecordBufferWriter recordBufferWriter, - final AcknowledgementSetManager acknowledgementSetManager, final MongoDBSourceConfig sourceConfig, + final StreamAcknowledgementManager streamAcknowledgementManager, final DataStreamPartitionCheckpoint partitionCheckpoint, final PluginMetrics pluginMetrics, final int recordFlushBatchSize, final int checkPointIntervalInMs ) { - return new StreamWorker(recordBufferWriter, acknowledgementSetManager, - sourceConfig, partitionCheckpoint, pluginMetrics, recordFlushBatchSize, checkPointIntervalInMs); + return new StreamWorker(recordBufferWriter, sourceConfig, streamAcknowledgementManager, partitionCheckpoint, + pluginMetrics, recordFlushBatchSize, checkPointIntervalInMs); } public StreamWorker(final RecordBufferWriter recordBufferWriter, - final AcknowledgementSetManager acknowledgementSetManager, final MongoDBSourceConfig sourceConfig, + final StreamAcknowledgementManager streamAcknowledgementManager, final DataStreamPartitionCheckpoint partitionCheckpoint, final PluginMetrics pluginMetrics, final int recordFlushBatchSize, @@ -75,18 +73,16 @@ public StreamWorker(final RecordBufferWriter recordBufferWriter, ) { this.recordBufferWriter = recordBufferWriter; this.sourceConfig = sourceConfig; + this.streamAcknowledgementManager = streamAcknowledgementManager; this.partitionCheckpoint = partitionCheckpoint; - this.acknowledgementSetManager = acknowledgementSetManager; this.pluginMetrics = pluginMetrics; this.recordFlushBatchSize = recordFlushBatchSize; this.checkPointIntervalInMs = checkPointIntervalInMs; this.successItemsCounter = pluginMetrics.counter(SUCCESS_ITEM_COUNTER_NAME); this.failureItemsCounter = pluginMetrics.counter(FAILURE_ITEM_COUNTER_NAME); - streamAcknowledgementManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, - sourceConfig.getPartitionAcknowledgmentTimeout(), DEFAULT_MONITOR_WAIT_TIME_MS, checkPointIntervalInMs); if (sourceConfig.isAcknowledgmentsEnabled()) { // starts acknowledgement monitoring thread - streamAcknowledgementManager.init(); + streamAcknowledgementManager.init((Void) -> stop()); } } @@ -138,7 +134,7 @@ public void processStream(final StreamPartition streamPartition) { } } long lastCheckpointTime = System.currentTimeMillis(); - while (cursor.hasNext() && !Thread.currentThread().isInterrupted()) { + while (cursor.hasNext() && !Thread.currentThread().isInterrupted() && !stopWorker) { try { final ChangeStreamDocument document = cursor.next(); final String record = document.getFullDocument().toJson(writerSettings); @@ -190,4 +186,8 @@ public void processStream(final StreamPartition streamPartition) { } } } + + void stop() { + stopWorker = true; + } } diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java index 2b5c6b885b..fe5447404c 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java @@ -132,6 +132,5 @@ public void testProcessPartitionSuccess(final String partitionKey) { verify(mockRecordBufferWriter).writeToBuffer(eq(mockAcknowledgementSet), any()); verify(successItemsCounter, times(2)).increment(); verify(failureItemsCounter, never()).increment(); - executorService.shutdownNow(); } } \ No newline at end of file diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java index 86dc447fcc..fdf404f5a2 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java @@ -26,7 +26,6 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -40,6 +39,8 @@ public class StreamAcknowledgementManagerTest { private Duration timeout; @Mock private AcknowledgementSet acknowledgementSet; + @Mock + private Consumer stopWorkerConsumer; private StreamAcknowledgementManager streamAckManager; @BeforeEach @@ -55,7 +56,9 @@ public void createAcknowledgementSet_disabled_emptyAckSet() { @Test public void createAcknowledgementSet_enabled_ackSetWithAck() { - streamAckManager.init(); + when(timeout.getSeconds()).thenReturn(10_000L); + streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0); + streamAckManager.init(stopWorkerConsumer); final String resumeToken = UUID.randomUUID().toString(); final long recordCount = new Random().nextLong(); when(acknowledgementSetManager.create(any(Consumer.class), eq(timeout))).thenReturn(acknowledgementSet); @@ -79,7 +82,9 @@ public void createAcknowledgementSet_enabled_ackSetWithAck() { @Test public void createAcknowledgementSet_enabled_multipleAckSetWithAck() { - streamAckManager.init(); + when(timeout.getSeconds()).thenReturn(10_000L); + streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0); + streamAckManager.init(stopWorkerConsumer); final String resumeToken1 = UUID.randomUUID().toString(); final long recordCount1 = new Random().nextLong(); when(acknowledgementSetManager.create(any(Consumer.class), eq(timeout))).thenReturn(acknowledgementSet); @@ -113,7 +118,7 @@ public void createAcknowledgementSet_enabled_multipleAckSetWithAck() { @Test public void createAcknowledgementSet_enabled_multipleAckSetWithAckFailure() { - streamAckManager.init(); + streamAckManager.init(stopWorkerConsumer); final String resumeToken1 = UUID.randomUUID().toString(); final long recordCount1 = new Random().nextLong(); when(acknowledgementSetManager.create(any(Consumer.class), eq(timeout))).thenReturn(acknowledgementSet); @@ -141,14 +146,15 @@ public void createAcknowledgementSet_enabled_multipleAckSetWithAckFailure() { assertThat(ackCheckpointStatus.isAcknowledged(), is(true)); await() .atMost(Duration.ofSeconds(10)).untilAsserted(() -> - verifyNoInteractions(partitionCheckpoint)); + verify(partitionCheckpoint).giveUpPartition()); assertThat(streamAckManager.getCheckpoints().peek().getResumeToken(), is(resumeToken1)); assertThat(streamAckManager.getCheckpoints().peek().getRecordCount(), is(recordCount1)); + verify(stopWorkerConsumer).accept(null); } @Test public void createAcknowledgementSet_enabled_ackSetWithNoAck() { - streamAckManager.init(); + streamAckManager.init(stopWorkerConsumer); final String resumeToken = UUID.randomUUID().toString(); final long recordCount = new Random().nextLong(); when(acknowledgementSetManager.create(any(Consumer.class), eq(timeout))).thenReturn(acknowledgementSet); @@ -164,5 +170,8 @@ public void createAcknowledgementSet_enabled_ackSetWithNoAck() { final ConcurrentHashMap ackStatus = streamAckManager.getAcknowledgementStatus(); final CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken); assertThat(ackCheckpointStatus.isAcknowledged(), is(false)); - } + await() + .atMost(Duration.ofSeconds(10)).untilAsserted(() -> + verify(stopWorkerConsumer).accept(null)); +} } diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java index 57a645de97..b02e4ee20f 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java @@ -86,9 +86,9 @@ void test_stream_run() { final ExecutorService executorService = Executors.newSingleThreadExecutor(); final Future future = executorService.submit(() -> { try (MockedStatic streamWorkerMockedStatic = mockStatic(StreamWorker.class)) { - streamWorkerMockedStatic.when(() -> StreamWorker.create(any(RecordBufferWriter.class), eq(acknowledgementSetManager), - eq(sourceConfig), any(DataStreamPartitionCheckpoint.class), eq(pluginMetrics), eq(100), eq(DEFAULT_CHECKPOINT_INTERVAL_MILLS))) - .thenReturn(streamWorker); + streamWorkerMockedStatic.when(() -> StreamWorker.create(any(RecordBufferWriter.class), eq(sourceConfig), + any(StreamAcknowledgementManager.class), any(DataStreamPartitionCheckpoint.class), eq(pluginMetrics), eq(100), eq(DEFAULT_CHECKPOINT_INTERVAL_MILLS))) + .thenReturn(streamWorker); streamScheduler.run(); } }); diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index 2e2e6a4706..ca69d03b67 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -19,16 +19,20 @@ import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.plugins.mongo.buffer.RecordBufferWriter; import org.opensearch.dataprepper.plugins.mongo.client.MongoDBConnection; import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig; import org.opensearch.dataprepper.plugins.mongo.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.mongo.coordination.state.StreamProgressState; +import java.time.Duration; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -48,7 +52,7 @@ public class StreamWorkerTest { @Mock private RecordBufferWriter mockRecordBufferWriter; @Mock - private AcknowledgementSetManager mockAcknowledgementSetManager; + private StreamAcknowledgementManager mockStreamAcknowledgementManager; @Mock private MongoDBSourceConfig mockSourceConfig; @Mock @@ -71,8 +75,10 @@ public class StreamWorkerTest { public void setup() { when(mockPluginMetrics.counter(SUCCESS_ITEM_COUNTER_NAME)).thenReturn(successItemsCounter); when(mockPluginMetrics.counter(FAILURE_ITEM_COUNTER_NAME)).thenReturn(failureItemsCounter); - streamWorker = new StreamWorker(mockRecordBufferWriter, mockAcknowledgementSetManager, - mockSourceConfig, mockPartitionCheckpoint, mockPluginMetrics, 2, 0); + when(mockSourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); + Thread.interrupted(); + streamWorker = new StreamWorker(mockRecordBufferWriter, mockSourceConfig, mockStreamAcknowledgementManager, + mockPartitionCheckpoint, mockPluginMetrics, 2, 0); } @Test @@ -118,10 +124,10 @@ void test_processStream_success() { .thenReturn(mongoClient); streamWorker.processStream(streamPartition); } - verify(mongoClient, times(1)).close(); + verify(mongoClient).close(); verify(mongoDatabase).getCollection(eq("collection")); verify(mockRecordBufferWriter).writeToBuffer(eq(null), any()); - verify(successItemsCounter, times(1)).increment(2); + verify(successItemsCounter).increment(2); verify(failureItemsCounter, never()).increment(); verify(mockPartitionCheckpoint, times(2)).checkpoint("{\"resumeToken2\": 234}", 2); } @@ -143,67 +149,100 @@ void test_processStream_mongoClientFailure() { verifyNoInteractions(failureItemsCounter); } - //@Test + @Test void test_processStream_checkPointIntervalSuccess() { + when(streamProgressState.shouldWaitForExport()).thenReturn(false); + when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); + when(streamPartition.getCollection()).thenReturn("database.collection"); + MongoClient mongoClient = mock(MongoClient.class); + MongoDatabase mongoDatabase = mock(MongoDatabase.class); + MongoCollection col = mock(MongoCollection.class); + ChangeStreamIterable changeStreamIterable = mock(ChangeStreamIterable.class); + MongoCursor cursor = mock(MongoCursor.class); + when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); + when(mongoDatabase.getCollection(anyString())).thenReturn(col); + when(col.watch()).thenReturn(changeStreamIterable); + when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); + when(changeStreamIterable.iterator()).thenReturn(cursor); + when(cursor.hasNext()).thenReturn(true) + .thenReturn(true) + .thenReturn(true) + .thenReturn(false); + ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class); + ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class); + ChangeStreamDocument streamDoc3 = mock(ChangeStreamDocument.class); + Document doc1 = mock(Document.class); + Document doc2 = mock(Document.class); + Document doc3 = mock(Document.class); + BsonDocument bsonDoc1 = mock(BsonDocument.class); + BsonDocument bsonDoc2 = mock(BsonDocument.class); + BsonDocument bsonDoc3 = mock(BsonDocument.class); + when(streamDoc1.getResumeToken()).thenReturn(bsonDoc1); + when(streamDoc2.getResumeToken()).thenReturn(bsonDoc2); + when(streamDoc3.getResumeToken()).thenReturn(bsonDoc3); + when(cursor.next()) + .thenReturn(streamDoc1, streamDoc2, streamDoc3); + when(doc1.toJson(any(JsonWriterSettings.class))).thenReturn(UUID.randomUUID().toString()); + when(doc2.toJson(any(JsonWriterSettings.class))).thenReturn(UUID.randomUUID().toString()); + when(doc3.toJson(any(JsonWriterSettings.class))).thenReturn(UUID.randomUUID().toString()); + when(streamDoc1.getFullDocument()).thenReturn(doc1); + when(streamDoc2.getFullDocument()).thenReturn(doc2); + when(streamDoc3.getFullDocument()).thenReturn(doc3); + final String resumeToken1 = UUID.randomUUID().toString(); + final String resumeToken2 = UUID.randomUUID().toString(); + final String resumeToken3 = UUID.randomUUID().toString(); + when(bsonDoc1.toJson(any(JsonWriterSettings.class))).thenReturn(resumeToken1); + when(bsonDoc2.toJson(any(JsonWriterSettings.class))).thenReturn(resumeToken2); + when(bsonDoc3.toJson(any(JsonWriterSettings.class))).thenReturn(resumeToken3); + try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { - when(mockPartitionCheckpoint.getGlobalStreamLoadStatus()).thenReturn(Optional.empty()); - when(mockSourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); - when(streamProgressState.shouldWaitForExport()).thenReturn(false); - when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); - when(streamPartition.getCollection()).thenReturn("database.collection"); - MongoClient mongoClient = mock(MongoClient.class); - MongoDatabase mongoDatabase = mock(MongoDatabase.class); - MongoCollection col = mock(MongoCollection.class); - ChangeStreamIterable changeStreamIterable = mock(ChangeStreamIterable.class); - MongoCursor cursor = mock(MongoCursor.class); - when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); - when(mongoDatabase.getCollection(anyString())).thenReturn(col); - when(col.watch()).thenReturn(changeStreamIterable); - when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); - when(changeStreamIterable.iterator()).thenReturn(cursor); - when(cursor.hasNext()).thenReturn(true, true, true, false); - ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class); - ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class); - ChangeStreamDocument streamDoc3 = mock(ChangeStreamDocument.class); - Document doc1 = mock(Document.class); - Document doc2 = mock(Document.class); - Document doc3 = mock(Document.class); - BsonDocument bsonDoc1 = mock(BsonDocument.class); //new BsonDocument("resumeToken1", new BsonInt32(123)); - BsonDocument bsonDoc2 = mock(BsonDocument.class); //new BsonDocument("resumeToken2", new BsonInt32(234)); - BsonDocument bsonDoc3 = mock(BsonDocument.class); //new BsonDocument("resumeToken3", new BsonInt32(456)); - when(streamDoc1.getResumeToken()).thenReturn(bsonDoc1); - when(streamDoc2.getResumeToken()).thenReturn(bsonDoc2); - when(streamDoc3.getResumeToken()).thenReturn(bsonDoc3); - when(cursor.next()) - .thenReturn(streamDoc1, streamDoc2, streamDoc3); - when(doc1.toJson(any(JsonWriterSettings.class))).thenReturn(UUID.randomUUID().toString()); - when(doc2.toJson(any(JsonWriterSettings.class))).thenReturn(UUID.randomUUID().toString()); - when(doc3.toJson(any(JsonWriterSettings.class))).thenReturn(UUID.randomUUID().toString()); - when(streamDoc1.getFullDocument()).thenReturn(doc1); - when(streamDoc2.getFullDocument()).thenReturn(doc2); - when(streamDoc3.getFullDocument()).thenReturn(doc3); - final String resumeToken1 = UUID.randomUUID().toString(); - final String resumeToken2 = UUID.randomUUID().toString(); - final String resumeToken3 = UUID.randomUUID().toString(); - when(bsonDoc1.toJson(any(JsonWriterSettings.class))).thenReturn(resumeToken1); - when(bsonDoc2.toJson(any(JsonWriterSettings.class))).thenReturn(resumeToken2); - when(bsonDoc3.toJson(any(JsonWriterSettings.class))).thenReturn(resumeToken3); mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class))) .thenReturn(mongoClient); streamWorker.processStream(streamPartition); - verify(mongoClient, times(1)).close(); - verify(mongoDatabase).getCollection(eq("collection")); - verify(cursor).close(); - // TODO fix - // verify(cursor, times(4)).hasNext(); - // verify(mockPartitionCheckpoint).checkpoint(resumeToken3, 3); - // verify(successItemsCounter).increment(1); - // verify(mockPartitionCheckpoint).checkpoint(resumeToken2, 2); + } - // verify(mockRecordBufferWriter, times(2)).writeToBuffer(eq(null), any()); - // verify(successItemsCounter).increment(2); + verify(mongoClient, times(1)).close(); + verify(mongoDatabase).getCollection(eq("collection")); + verify(cursor).close(); + verify(cursor, times(4)).hasNext(); + verify(mockPartitionCheckpoint).checkpoint(resumeToken3, 3); + verify(successItemsCounter).increment(1); + verify(mockPartitionCheckpoint).checkpoint(resumeToken2, 2); + verify(mockRecordBufferWriter, times(2)).writeToBuffer(eq(null), any()); + verify(successItemsCounter).increment(2); verify(failureItemsCounter, never()).increment(); + } + @Test + void test_processStream_stopWorker() { + when(streamProgressState.shouldWaitForExport()).thenReturn(false); + when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); + when(streamPartition.getCollection()).thenReturn("database.collection"); + MongoClient mongoClient = mock(MongoClient.class); + MongoDatabase mongoDatabase = mock(MongoDatabase.class); + MongoCollection col = mock(MongoCollection.class); + ChangeStreamIterable changeStreamIterable = mock(ChangeStreamIterable.class); + MongoCursor cursor = mock(MongoCursor.class); + when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); + when(mongoDatabase.getCollection(anyString())).thenReturn(col); + when(col.watch()).thenReturn(changeStreamIterable); + when(changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP)).thenReturn(changeStreamIterable); + when(changeStreamIterable.iterator()).thenReturn(cursor); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final Future future = executorService.submit(() -> { + try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { + mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class))) + .thenReturn(mongoClient); + streamWorker.processStream(streamPartition); + } + }); + streamWorker.stop(); + await() + .atMost(Duration.ofSeconds(4)) + .untilAsserted(() -> verify(mongoClient).close()); + future.cancel(true); + executorService.shutdownNow(); + verify(mongoDatabase).getCollection(eq("collection")); } } From 22b3685e31a31ef8fcdb4273b3183de2ff26896d Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Tue, 2 Apr 2024 07:50:04 -0500 Subject: [PATCH 17/18] Add BackgroundThreadFactory that adds thread name prefix for debugging Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- data-prepper-plugins/mongodb/build.gradle | 1 + .../mongo/documentdb/DocumentDBService.java | 3 ++- .../mongo/stream/StreamAcknowledgementManager.java | 14 ++++++-------- .../plugins/mongo/stream/StreamWorker.java | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/data-prepper-plugins/mongodb/build.gradle b/data-prepper-plugins/mongodb/build.gradle index d32efec8fa..89eaeff185 100644 --- a/data-prepper-plugins/mongodb/build.gradle +++ b/data-prepper-plugins/mongodb/build.gradle @@ -12,6 +12,7 @@ dependencies { implementation project(path: ':data-prepper-plugins:aws-plugin-api') implementation project(path: ':data-prepper-plugins:buffer-common') implementation project(':data-prepper-plugins:http-common') + implementation project(path: ':data-prepper-plugins:common') testImplementation testLibs.mockito.inline diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java index 7f069da766..5063912fa6 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBService.java @@ -1,5 +1,6 @@ package org.opensearch.dataprepper.plugins.mongo.documentdb; +import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; @@ -65,7 +66,7 @@ public void start(Buffer> buffer) { runnableList.add(streamScheduler); } - executor = Executors.newFixedThreadPool(runnableList.size()); + executor = Executors.newFixedThreadPool(runnableList.size(), BackgroundThreadFactory.defaultExecutorThreadFactory("documentdb-source")); runnableList.forEach(executor::submit); } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java index f25c2f6188..b1fe8d3529 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java @@ -1,6 +1,7 @@ package org.opensearch.dataprepper.plugins.mongo.stream; import com.google.common.annotations.VisibleForTesting; +import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.plugins.mongo.model.CheckpointStatus; @@ -18,9 +19,8 @@ public class StreamAcknowledgementManager { private static final Logger LOG = LoggerFactory.getLogger(StreamAcknowledgementManager.class); - private ConcurrentLinkedQueue checkpoints; - private ConcurrentHashMap ackStatus; - + private final ConcurrentLinkedQueue checkpoints = new ConcurrentLinkedQueue<>(); + private final ConcurrentHashMap ackStatus = new ConcurrentHashMap<>(); private final AcknowledgementSetManager acknowledgementSetManager; private final DataStreamPartitionCheckpoint partitionCheckpoint; @@ -41,17 +41,15 @@ public StreamAcknowledgementManager(final AcknowledgementSetManager acknowledgem this.partitionAcknowledgmentTimeout = partitionAcknowledgmentTimeout; this.acknowledgementMonitorWaitTimeInMs = acknowledgementMonitorWaitTimeInMs; this.checkPointIntervalInMs = checkPointIntervalInMs; - executorService = Executors.newSingleThreadExecutor(); + executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("mongodb-stream-ack-monitor")); } void init(final Consumer stopWorkerConsumer) { enableAcknowledgement = true; - executorService.submit(() -> monitorCheckpoints(executorService, stopWorkerConsumer)); + executorService.submit(() -> monitorAcknowledgment(executorService, stopWorkerConsumer)); } - private void monitorCheckpoints(final ExecutorService executorService, final Consumer stopWorkerConsumer) { - checkpoints = new ConcurrentLinkedQueue<>(); - ackStatus = new ConcurrentHashMap<>(); + private void monitorAcknowledgment(final ExecutorService executorService, final Consumer stopWorkerConsumer) { long lastCheckpointTime = System.currentTimeMillis(); CheckpointStatus lastCheckpointStatus = null; while (!Thread.currentThread().isInterrupted()) { diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java index 71bc26c474..f823bceb98 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java @@ -145,7 +145,7 @@ public void processStream(final StreamPartition streamPartition) { recordCount += 1; if (recordCount % recordFlushBatchSize == 0) { - LOG.debug("Write to buffer for line " + (recordCount - recordFlushBatchSize) + " to " + recordCount); + LOG.debug("Write to buffer for line {} to {}", (recordCount - recordFlushBatchSize), recordCount); acknowledgementSet = streamAcknowledgementManager.createAcknowledgementSet(checkPointToken, recordCount).orElse(null); recordBufferWriter.writeToBuffer(acknowledgementSet, records); successItemsCounter.increment(records.size()); From cf4b8576b3e8fd88f9a4e259eebe6e16d551cfbc Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Tue, 2 Apr 2024 08:23:43 -0500 Subject: [PATCH 18/18] Update unit test Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../plugins/mongo/stream/StreamAcknowledgementManagerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java index fdf404f5a2..40d556ec35 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -56,7 +57,7 @@ public void createAcknowledgementSet_disabled_emptyAckSet() { @Test public void createAcknowledgementSet_enabled_ackSetWithAck() { - when(timeout.getSeconds()).thenReturn(10_000L); + lenient().when(timeout.getSeconds()).thenReturn(10_000L); streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0); streamAckManager.init(stopWorkerConsumer); final String resumeToken = UUID.randomUUID().toString();