Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AcknowledgementSet support to DocumentDB/MongoDB streams #4379

Merged
merged 18 commits into from
Apr 2, 2024
Merged
2 changes: 2 additions & 0 deletions data-prepper-plugins/mongodb/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ dependencies {
implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:http-common')
implementation project(path: ':data-prepper-plugins:common')


testImplementation testLibs.mockito.inline
testImplementation testLibs.bundles.junit
testImplementation testLibs.slf4j.simple
testImplementation project(path: ':data-prepper-test-common')

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ public ExportConfig getExportConfig() {
return this.exportConfig;
}

public boolean isExportRequired() {
return this.ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM ||
this.ingestionMode == CollectionConfig.IngestionMode.EXPORT;
}

public boolean isStreamRequired() {
return this.ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM ||
this.ingestionMode == CollectionConfig.IngestionMode.STREAM;
}

public static class ExportConfig {
private static final int DEFAULT_ITEMS_PER_PARTITION = 4000;
@JsonProperty("items_per_partition")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.StreamPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -78,12 +76,7 @@ public Event convert(final String record,
}
final EventMetadata eventMetadata = event.getMetadata();

if (dataType.equals(ExportPartition.PARTITION_TYPE)) {
eventMetadata.setAttribute(MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE, ExportPartition.PARTITION_TYPE);
} else if (dataType.equals(StreamPartition.PARTITION_TYPE)) {
eventMetadata.setAttribute(MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE, StreamPartition.PARTITION_TYPE);
}

eventMetadata.setAttribute(MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE, dataType);
eventMetadata.setAttribute(MetadataKeyAttributes.MONGODB_EVENT_COLLECTION_METADATA_ATTRIBUTE, collectionConfig.getCollection());
eventMetadata.setAttribute(MetadataKeyAttributes.MONGODB_EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreationTimeMillis);
eventMetadata.setAttribute(MetadataKeyAttributes.MONGODB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE, eventName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package org.opensearch.dataprepper.plugins.mongo.documentdb;

import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig;
import org.opensearch.dataprepper.plugins.mongo.export.MongoDBExportPartitionSupplier;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.opensearch.dataprepper.plugins.mongo.export.ExportScheduler;
Expand All @@ -15,6 +17,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -24,11 +28,7 @@ public class DocumentDBService {
private final PluginMetrics pluginMetrics;
private final MongoDBSourceConfig sourceConfig;
private final AcknowledgementSetManager acknowledgementSetManager;
private final ExecutorService executor;
private ExportScheduler exportScheduler;
private ExportWorker exportWorker;
private LeaderScheduler leaderScheduler;
private StreamScheduler streamScheduler;
private ExecutorService executor;
private final MongoDBExportPartitionSupplier mongoDBExportPartitionSupplier;
public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator,
final MongoDBSourceConfig sourceConfig,
Expand All @@ -38,9 +38,7 @@ public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator,
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceConfig = sourceConfig;

this.mongoDBExportPartitionSupplier = new MongoDBExportPartitionSupplier(sourceConfig);
executor = Executors.newFixedThreadPool(4);
}

/**
Expand All @@ -51,23 +49,35 @@ public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator,
* @param buffer Data Prepper Buffer
*/
public void start(Buffer<Record<Event>> buffer) {
this.exportScheduler = new ExportScheduler(sourceCoordinator, mongoDBExportPartitionSupplier, pluginMetrics);
this.exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig);
this.leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig.getCollections());
this.streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, pluginMetrics);
final List<Runnable> runnableList = new ArrayList<>();

final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig.getCollections());
runnableList.add(leaderScheduler);

if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isExportRequired)) {
final ExportScheduler exportScheduler = new ExportScheduler(sourceCoordinator, mongoDBExportPartitionSupplier, pluginMetrics);
final ExportWorker exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig);
runnableList.add(exportScheduler);
runnableList.add(exportWorker);
}

if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isStreamRequired)) {
final StreamScheduler streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, pluginMetrics);
runnableList.add(streamScheduler);
}

executor.submit(leaderScheduler);
executor.submit(exportScheduler);
executor.submit(exportWorker);
executor.submit(streamScheduler);
executor = Executors.newFixedThreadPool(runnableList.size(), BackgroundThreadFactory.defaultExecutorThreadFactory("documentdb-source"));
runnableList.forEach(executor::submit);
}

/**
* Interrupt the running of schedulers.
* Each scheduler must implement logic for gracefully shutdown.
*/
public void shutdown() {
LOG.info("shutdown DocumentDB Service scheduler and worker");
executor.shutdownNow();
if (executor != null) {
LOG.info("shutdown DocumentDB Service scheduler and worker");
executor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,7 @@ public void run() {
}
}

private boolean isExportRequired(final CollectionConfig.IngestionMode ingestionMode) {
return ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM ||
ingestionMode == CollectionConfig.IngestionMode.EXPORT;
}

private boolean isStreamRequired(final CollectionConfig.IngestionMode ingestionMode) {
return ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM ||
ingestionMode == CollectionConfig.IngestionMode.STREAM;
}
private void init() {
LOG.info("Try to initialize DocumentDB Leader Partition");

Expand All @@ -120,13 +112,13 @@ private void init() {
coordinator.createPartition(new GlobalState(collectionConfig.getCollection(), null));

final Instant startTime = Instant.now();
final boolean exportRequired = isExportRequired(collectionConfig.getIngestionMode());
final boolean exportRequired = collectionConfig.isExportRequired();
LOG.info("Ingestion mode {} for Collection {}", collectionConfig.getIngestionMode(), collectionConfig.getCollection());
if (exportRequired) {
createExportPartition(collectionConfig, startTime);
}

if (isStreamRequired(collectionConfig.getIngestionMode())) {
if (collectionConfig.isStreamRequired()) {
createStreamPartition(collectionConfig, startTime, exportRequired);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.opensearch.dataprepper.plugins.mongo.model;

public class CheckpointStatus {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is only used by StreamAcknowledgementManager. So you can move this into the stream package and make it package-private to help keep it encapsulated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. will do.

private final String resumeToken;
private final long recordCount;
private boolean acknowledged;
private final long createTimestamp;
private Long acknowledgedTimestamp;

public CheckpointStatus(final String resumeToken, final long recordCount, final long createTimestamp) {
this.resumeToken = resumeToken;
this.recordCount = recordCount;
this.acknowledged = false;
this.createTimestamp = createTimestamp;
}

public void setAcknowledgedTimestamp(final Long acknowledgedTimestamp) {
this.acknowledgedTimestamp = acknowledgedTimestamp;
}

public void setAcknowledged(boolean acknowledged) {
this.acknowledged = acknowledged;
}

public String getResumeToken() {
return resumeToken;
}
public long getRecordCount() {
return recordCount;
}

public boolean isAcknowledged() {
return acknowledged;
}

public long getCreateTimestamp() {
return createTimestamp;
}

public long getAcknowledgedTimestamp() {
return acknowledgedTimestamp;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ private void setProgressState(final String resumeToken, final long recordNumber)
}

/**
* This method is to do a checkpoint with latest sequence number processed.
* Note that this should be called on a regular basis even there are no changes to sequence number
* This method is to do a checkpoint with latest resume token processed.
* Note that this should be called on a regular basis even there are no changes to resume token
* As the checkpoint will also extend the timeout for the lease
*
* @param resumeToken
* @param recordNumber The last record number
* @param resumeToken checkpoint token to start resuming the stream when MongoDB/DocumentDB cursor is open
* @param recordCount The last processed record count
*/
public void checkpoint(final String resumeToken, final long recordNumber) {
LOG.debug("Checkpoint stream partition for collection " + streamPartition.getCollection() + " with record number " + recordNumber);
setProgressState(resumeToken, recordNumber);
public void checkpoint(final String resumeToken, final long recordCount) {
LOG.debug("Checkpoint stream partition for collection " + streamPartition.getCollection() + " with record number " + recordCount);
setProgressState(resumeToken, recordCount);
enhancedSourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
}

Expand All @@ -76,4 +76,8 @@ public Optional<StreamLoadStatus> getGlobalStreamLoadStatus() {
public void updateStreamPartitionForAcknowledgmentWait(final Duration acknowledgmentSetTimeout) {
enhancedSourceCoordinator.saveProgressStateForPartition(streamPartition, acknowledgmentSetTimeout);
}

public void giveUpPartition() {
enhancedSourceCoordinator.giveUpPartition(streamPartition);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package org.opensearch.dataprepper.plugins.mongo.stream;

import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.plugins.mongo.model.CheckpointStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

public class StreamAcknowledgementManager {
private static final Logger LOG = LoggerFactory.getLogger(StreamAcknowledgementManager.class);
private final ConcurrentLinkedQueue<CheckpointStatus> checkpoints = new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<String, CheckpointStatus> ackStatus = new ConcurrentHashMap<>();
private final AcknowledgementSetManager acknowledgementSetManager;
private final DataStreamPartitionCheckpoint partitionCheckpoint;

private final Duration partitionAcknowledgmentTimeout;
private final int acknowledgementMonitorWaitTimeInMs;
private final int checkPointIntervalInMs;
private final ExecutorService executorService;

private boolean enableAcknowledgement = false;

public StreamAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager,
final DataStreamPartitionCheckpoint partitionCheckpoint,
final Duration partitionAcknowledgmentTimeout,
final int acknowledgementMonitorWaitTimeInMs,
final int checkPointIntervalInMs) {
this.acknowledgementSetManager = acknowledgementSetManager;
this.partitionCheckpoint = partitionCheckpoint;
this.partitionAcknowledgmentTimeout = partitionAcknowledgmentTimeout;
this.acknowledgementMonitorWaitTimeInMs = acknowledgementMonitorWaitTimeInMs;
this.checkPointIntervalInMs = checkPointIntervalInMs;
executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("mongodb-stream-ack-monitor"));
}

void init(final Consumer<Void> stopWorkerConsumer) {
enableAcknowledgement = true;
executorService.submit(() -> monitorAcknowledgment(executorService, stopWorkerConsumer));
}

private void monitorAcknowledgment(final ExecutorService executorService, final Consumer<Void> stopWorkerConsumer) {
long lastCheckpointTime = System.currentTimeMillis();
CheckpointStatus lastCheckpointStatus = null;
while (!Thread.currentThread().isInterrupted()) {
final CheckpointStatus checkpointStatus = checkpoints.peek();
if (checkpointStatus != null) {
if (checkpointStatus.isAcknowledged()) {
lastCheckpointStatus = checkpoints.poll();
ackStatus.remove(checkpointStatus.getResumeToken());
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) {
LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
partitionCheckpoint.checkpoint(checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
lastCheckpointTime = System.currentTimeMillis();
}
} else {
LOG.debug("Checkpoint not complete for resume token {}", checkpointStatus.getResumeToken());
final Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(checkpointStatus.getCreateTimestamp()), Instant.now());
// Acknowledgement not received for the checkpoint after twice ack wait time
if (ackWaitDuration.getSeconds() >= partitionAcknowledgmentTimeout.getSeconds() * 2) {
// Give up partition and should interrupt parent thread to stop processing stream
if (lastCheckpointStatus != null && lastCheckpointStatus.isAcknowledged()) {
partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
}
LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken());
partitionCheckpoint.giveUpPartition();
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this. This will end the thread and then also the stream thread. But, would these ever start again? It seems that when this happens, this node is now done working and unable to contribute.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will end the stream. New node or same node can acquire the partition and work from the checkpoint state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I see. The StreamScheduler is still running and can re-acquire.

}
}
}

try {
Thread.sleep(acknowledgementMonitorWaitTimeInMs);
} catch (InterruptedException ex) {
break;
}
}
stopWorkerConsumer.accept(null);
executorService.shutdown();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should L88-L89 be in init method instead of the callable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, once this monitor exists because of ack wait timeout, it should stop the stream worker.

}

Optional<AcknowledgementSet> createAcknowledgementSet(final String resumeToken, final long recordNumber) {
if (!enableAcknowledgement) {
return Optional.empty();
}

final CheckpointStatus checkpointStatus = new CheckpointStatus(resumeToken, recordNumber, Instant.now().toEpochMilli());
checkpoints.add(checkpointStatus);
ackStatus.put(resumeToken, checkpointStatus);
return Optional.of(acknowledgementSetManager.create((result) -> {
if (result) {
final CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken);
ackCheckpointStatus.setAcknowledgedTimestamp(Instant.now().toEpochMilli());
ackCheckpointStatus.setAcknowledged(true);
LOG.debug("Received acknowledgment of completion from sink for checkpoint {}", resumeToken);
} else {
LOG.warn("Negative acknowledgment received for checkpoint {}, resetting checkpoint", resumeToken);
// default CheckpointStatus acknowledged value is false. The monitorCheckpoints method will time out
// and reprocess stream from last successful checkpoint in the order.
}
}, partitionAcknowledgmentTimeout));
}

void shutdown() {
executorService.shutdown();
}

@VisibleForTesting
ConcurrentHashMap<String, CheckpointStatus> getAcknowledgementStatus() {
return ackStatus;
}

@VisibleForTesting
ConcurrentLinkedQueue<CheckpointStatus> getCheckpoints() {
return checkpoints;
}
}
Loading
Loading