Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AcknowledgementSet support to DocumentDB/MongoDB streams #4379

Merged
merged 18 commits into from
Apr 2, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ public ExportConfig getExportConfig() {
return this.exportConfig;
}

public boolean isExportRequired() {
return this.ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM ||
this.ingestionMode == CollectionConfig.IngestionMode.EXPORT;
}

public boolean isStreamRequired() {
return this.ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM ||
this.ingestionMode == CollectionConfig.IngestionMode.STREAM;
}

public static class ExportConfig {
private static final int DEFAULT_ITEMS_PER_PARTITION = 4000;
@JsonProperty("items_per_partition")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.StreamPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -78,12 +76,7 @@ public Event convert(final String record,
}
final EventMetadata eventMetadata = event.getMetadata();

if (dataType.equals(ExportPartition.PARTITION_TYPE)) {
eventMetadata.setAttribute(MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE, ExportPartition.PARTITION_TYPE);
} else if (dataType.equals(StreamPartition.PARTITION_TYPE)) {
eventMetadata.setAttribute(MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE, StreamPartition.PARTITION_TYPE);
}

eventMetadata.setAttribute(MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE, dataType);
eventMetadata.setAttribute(MetadataKeyAttributes.MONGODB_EVENT_COLLECTION_METADATA_ATTRIBUTE, collectionConfig.getCollection());
eventMetadata.setAttribute(MetadataKeyAttributes.MONGODB_EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreationTimeMillis);
eventMetadata.setAttribute(MetadataKeyAttributes.MONGODB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE, eventName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig;
import org.opensearch.dataprepper.plugins.mongo.export.MongoDBExportPartitionSupplier;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.opensearch.dataprepper.plugins.mongo.export.ExportScheduler;
Expand All @@ -15,6 +16,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -24,11 +27,7 @@ public class DocumentDBService {
private final PluginMetrics pluginMetrics;
private final MongoDBSourceConfig sourceConfig;
private final AcknowledgementSetManager acknowledgementSetManager;
private final ExecutorService executor;
private ExportScheduler exportScheduler;
private ExportWorker exportWorker;
private LeaderScheduler leaderScheduler;
private StreamScheduler streamScheduler;
private ExecutorService executor;
private final MongoDBExportPartitionSupplier mongoDBExportPartitionSupplier;
public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator,
final MongoDBSourceConfig sourceConfig,
Expand All @@ -38,9 +37,7 @@ public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator,
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceConfig = sourceConfig;

this.mongoDBExportPartitionSupplier = new MongoDBExportPartitionSupplier(sourceConfig);
executor = Executors.newFixedThreadPool(4);
}

/**
Expand All @@ -51,23 +48,35 @@ public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator,
* @param buffer Data Prepper Buffer
*/
public void start(Buffer<Record<Event>> buffer) {
this.exportScheduler = new ExportScheduler(sourceCoordinator, mongoDBExportPartitionSupplier, pluginMetrics);
this.exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig);
this.leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig.getCollections());
this.streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, pluginMetrics);
final List<Runnable> runnableList = new ArrayList<>();

final LeaderScheduler leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig.getCollections());
runnableList.add(leaderScheduler);

if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isExportRequired)) {
final ExportScheduler exportScheduler = new ExportScheduler(sourceCoordinator, mongoDBExportPartitionSupplier, pluginMetrics);
final ExportWorker exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig);
runnableList.add(exportScheduler);
runnableList.add(exportWorker);
}

if (sourceConfig.getCollections().stream().anyMatch(CollectionConfig::isStreamRequired)) {
final StreamScheduler streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, pluginMetrics);
runnableList.add(streamScheduler);
}

executor.submit(leaderScheduler);
executor.submit(exportScheduler);
executor.submit(exportWorker);
executor.submit(streamScheduler);
executor = Executors.newFixedThreadPool(runnableList.size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, use the BackgroundThreadFactory here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

runnableList.forEach(executor::submit);
}

/**
* Interrupt the running of schedulers.
* Each scheduler must implement logic for gracefully shutdown.
*/
public void shutdown() {
LOG.info("shutdown DocumentDB Service scheduler and worker");
executor.shutdownNow();
if (executor != null) {
LOG.info("shutdown DocumentDB Service scheduler and worker");
executor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,7 @@ public void run() {
}
}

private boolean isExportRequired(final CollectionConfig.IngestionMode ingestionMode) {
return ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM ||
ingestionMode == CollectionConfig.IngestionMode.EXPORT;
}

private boolean isStreamRequired(final CollectionConfig.IngestionMode ingestionMode) {
return ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM ||
ingestionMode == CollectionConfig.IngestionMode.STREAM;
}
private void init() {
LOG.info("Try to initialize DocumentDB Leader Partition");

Expand All @@ -120,13 +112,13 @@ private void init() {
coordinator.createPartition(new GlobalState(collectionConfig.getCollection(), null));

final Instant startTime = Instant.now();
final boolean exportRequired = isExportRequired(collectionConfig.getIngestionMode());
final boolean exportRequired = collectionConfig.isExportRequired();
LOG.info("Ingestion mode {} for Collection {}", collectionConfig.getIngestionMode(), collectionConfig.getCollection());
if (exportRequired) {
createExportPartition(collectionConfig, startTime);
}

if (isStreamRequired(collectionConfig.getIngestionMode())) {
if (collectionConfig.isStreamRequired()) {
createStreamPartition(collectionConfig, startTime, exportRequired);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.opensearch.dataprepper.plugins.mongo.model;

public class CheckpointStatus {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is only used by StreamAcknowledgementManager. So you can move this into the stream package and make it package-private to help keep it encapsulated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. will do.

private final String resumeToken;
private final long recordCount;
private boolean acknowledged;
private final long createTimestamp;
private Long acknowledgedTimestamp;

public CheckpointStatus(final String resumeToken, final long recordCount, final long createTimestamp) {
this.resumeToken = resumeToken;
this.recordCount = recordCount;
this.acknowledged = false;
this.createTimestamp = createTimestamp;
}

public void setAcknowledgedTimestamp(final Long acknowledgedTimestamp) {
this.acknowledgedTimestamp = acknowledgedTimestamp;
}

public void setAcknowledged(boolean acknowledged) {
this.acknowledged = acknowledged;
}

public String getResumeToken() {
return resumeToken;
}
public long getRecordCount() {
return recordCount;
}

public boolean isAcknowledged() {
return acknowledged;
}

public long getCreateTimestamp() {
return createTimestamp;
}

public long getAcknowledgedTimestamp() {
return acknowledgedTimestamp;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,8 @@ public Optional<StreamLoadStatus> getGlobalStreamLoadStatus() {
public void updateStreamPartitionForAcknowledgmentWait(final Duration acknowledgmentSetTimeout) {
enhancedSourceCoordinator.saveProgressStateForPartition(streamPartition, acknowledgmentSetTimeout);
}

public void giveUpPartition() {
enhancedSourceCoordinator.giveUpPartition(streamPartition);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package org.opensearch.dataprepper.plugins.mongo.stream;

import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.plugins.mongo.model.CheckpointStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class StreamAcknowledgementManager {
private static final Logger LOG = LoggerFactory.getLogger(StreamAcknowledgementManager.class);
private final ConcurrentLinkedQueue<CheckpointStatus> checkpoints = new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<String, CheckpointStatus> ackStatus = new ConcurrentHashMap<>();

private final AcknowledgementSetManager acknowledgementSetManager;
private final DataStreamPartitionCheckpoint partitionCheckpoint;

private final Duration partitionAcknowledgmentTimeout;
private final int acknowledgementMonitorWaitTimeInMs;
private final int checkPointIntervalInMs;
private final ExecutorService executorService;

private boolean enableAcknowledgement = false;

public StreamAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager,
final DataStreamPartitionCheckpoint partitionCheckpoint,
final Duration partitionAcknowledgmentTimeout,
final int acknowledgementMonitorWaitTimeInMs,
final int checkPointIntervalInMs) {
this.acknowledgementSetManager = acknowledgementSetManager;
this.partitionCheckpoint = partitionCheckpoint;
this.partitionAcknowledgmentTimeout = partitionAcknowledgmentTimeout;
this.acknowledgementMonitorWaitTimeInMs = acknowledgementMonitorWaitTimeInMs;
this.checkPointIntervalInMs = checkPointIntervalInMs;
executorService = Executors.newSingleThreadExecutor();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the BackgroundThreadFactory for all ExecutorServices going forward.

Here is an example of using it:

executorService = Executors.newFixedThreadPool(s3SourceConfig.getNumWorkers(), BackgroundThreadFactory.defaultExecutorThreadFactory("s3-source-sqs"));

This thread factory will give us more useful names and also make the thread a daemon thread to ensure Data Prepper shuts down properly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

void init() {
enableAcknowledgement = true;
final Thread currentThread = Thread.currentThread();
executorService.submit(() -> monitorCheckpoints(executorService, currentThread));
}

private void monitorCheckpoints(final ExecutorService executorService, final Thread parentThread) {
long lastCheckpointTime = System.currentTimeMillis();
CheckpointStatus lastCheckpointStatus = null;
while (!Thread.currentThread().isInterrupted()) {
final CheckpointStatus checkpointStatus = checkpoints.peek();
if (checkpointStatus != null) {
if (checkpointStatus.isAcknowledged()) {
lastCheckpointStatus = checkpoints.poll();
ackStatus.remove(checkpointStatus.getResumeToken());
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) {
LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
partitionCheckpoint.checkpoint(checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
lastCheckpointTime = System.currentTimeMillis();
}
} else {
LOG.debug("Checkpoint not complete for resume token {}", checkpointStatus.getResumeToken());
final Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(checkpointStatus.getCreateTimestamp()), Instant.now());
// Acknowledgement not received for the checkpoint after twice ack wait time
if (ackWaitDuration.getSeconds() > partitionAcknowledgmentTimeout.getSeconds() * 2) {
// Give up partition and should interrupt parent thread to stop processing stream
if (lastCheckpointStatus != null && lastCheckpointStatus.isAcknowledged()) {
partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
}
LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken());
partitionCheckpoint.giveUpPartition();
Thread.currentThread().interrupt();
}
}
}

try {
Thread.sleep(acknowledgementMonitorWaitTimeInMs);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
parentThread.interrupt();
executorService.shutdown();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should L88-L89 be in init method instead of the callable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, once this monitor exists because of ack wait timeout, it should stop the stream worker.

}

Optional<AcknowledgementSet> createAcknowledgementSet(final String resumeToken, final long recordNumber) {
if (!enableAcknowledgement) {
return Optional.empty();
}

final CheckpointStatus checkpointStatus = new CheckpointStatus(resumeToken, recordNumber, Instant.now().toEpochMilli());
checkpoints.add(checkpointStatus);
ackStatus.put(resumeToken, checkpointStatus);
return Optional.of(acknowledgementSetManager.create((result) -> {
if (result) {
final CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken);
ackCheckpointStatus.setAcknowledgedTimestamp(Instant.now().toEpochMilli());
ackCheckpointStatus.setAcknowledged(true);
LOG.debug("Received acknowledgment of completion from sink for checkpoint {}", resumeToken);
} else {
LOG.warn("Negative acknowledgment received for checkpoint {}, resetting checkpoint", resumeToken);
// default CheckpointStatus acknowledged value is false. The monitorCheckpoints method will time out
// and reprocess stream from last successful checkpoint in the order.
}
}, partitionAcknowledgmentTimeout));
}

void shutdown() {
executorService.shutdown();
}

@VisibleForTesting
ConcurrentHashMap<String, CheckpointStatus> getAcknowledgementStatus() {
return ackStatus;
}

@VisibleForTesting
ConcurrentLinkedQueue<CheckpointStatus> getCheckpoints() {
return checkpoints;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
public class StreamScheduler implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class);
private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60_000;
static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 120_000;
/**
* Number of records to accumulate before flushing to buffer
*/
static final int DEFAULT_BUFFER_BATCH_SIZE = 10;
/**
* Number of stream records to accumulate to write to buffer and checkpoint
*/
private static final int DEFAULT_STREAM_BATCH_SIZE = 100;
static final int DEFAULT_STREAM_BATCH_SIZE = 100;
static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
private final EnhancedSourceCoordinator sourceCoordinator;
private final RecordBufferWriter recordBufferWriter;
Expand Down Expand Up @@ -62,7 +63,7 @@ public void run() {
streamPartition = (StreamPartition) sourcePartition.get();
final DataStreamPartitionCheckpoint partitionCheckpoint = new DataStreamPartitionCheckpoint(sourceCoordinator, streamPartition);
final StreamWorker streamWorker = StreamWorker.create(recordBufferWriter, acknowledgementSetManager,
sourceConfig, partitionCheckpoint, pluginMetrics, DEFAULT_STREAM_BATCH_SIZE);
sourceConfig, partitionCheckpoint, pluginMetrics, DEFAULT_STREAM_BATCH_SIZE, DEFAULT_CHECKPOINT_INTERVAL_MILLS);
streamWorker.processStream(streamPartition);
}
try {
Expand Down
Loading
Loading