-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add AcknowledgementSet support to DocumentDB/MongoDB streams #4379
Changes from 15 commits
d16feb4
9c7e6b8
e7d3b94
4def974
5740266
a1417a7
85c9894
bdd789b
d503063
3444d1e
1b9749c
82369fc
f77c08a
bd4c251
99a73d3
5d0c291
22b3685
cf4b857
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package org.opensearch.dataprepper.plugins.mongo.model; | ||
|
||
public class CheckpointStatus { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class is only used by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. will do. |
||
private final String resumeToken; | ||
private final long recordCount; | ||
private boolean acknowledged; | ||
private final long createTimestamp; | ||
private Long acknowledgedTimestamp; | ||
|
||
public CheckpointStatus(final String resumeToken, final long recordCount, final long createTimestamp) { | ||
this.resumeToken = resumeToken; | ||
this.recordCount = recordCount; | ||
this.acknowledged = false; | ||
this.createTimestamp = createTimestamp; | ||
} | ||
|
||
public void setAcknowledgedTimestamp(final Long acknowledgedTimestamp) { | ||
this.acknowledgedTimestamp = acknowledgedTimestamp; | ||
} | ||
|
||
public void setAcknowledged(boolean acknowledged) { | ||
this.acknowledged = acknowledged; | ||
} | ||
|
||
public String getResumeToken() { | ||
return resumeToken; | ||
} | ||
public long getRecordCount() { | ||
return recordCount; | ||
} | ||
|
||
public boolean isAcknowledged() { | ||
return acknowledged; | ||
} | ||
|
||
public long getCreateTimestamp() { | ||
return createTimestamp; | ||
} | ||
|
||
public long getAcknowledgedTimestamp() { | ||
return acknowledgedTimestamp; | ||
} | ||
|
||
|
||
} |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,127 @@ | ||||
package org.opensearch.dataprepper.plugins.mongo.stream; | ||||
|
||||
import com.google.common.annotations.VisibleForTesting; | ||||
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; | ||||
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; | ||||
import org.opensearch.dataprepper.plugins.mongo.model.CheckpointStatus; | ||||
import org.slf4j.Logger; | ||||
import org.slf4j.LoggerFactory; | ||||
|
||||
import java.time.Duration; | ||||
import java.time.Instant; | ||||
import java.util.Optional; | ||||
import java.util.concurrent.ConcurrentHashMap; | ||||
import java.util.concurrent.ConcurrentLinkedQueue; | ||||
import java.util.concurrent.ExecutorService; | ||||
import java.util.concurrent.Executors; | ||||
|
||||
public class StreamAcknowledgementManager { | ||||
private static final Logger LOG = LoggerFactory.getLogger(StreamAcknowledgementManager.class); | ||||
private final ConcurrentLinkedQueue<CheckpointStatus> checkpoints = new ConcurrentLinkedQueue<>(); | ||||
private final ConcurrentHashMap<String, CheckpointStatus> ackStatus = new ConcurrentHashMap<>(); | ||||
|
||||
private final AcknowledgementSetManager acknowledgementSetManager; | ||||
private final DataStreamPartitionCheckpoint partitionCheckpoint; | ||||
|
||||
private final Duration partitionAcknowledgmentTimeout; | ||||
private final int acknowledgementMonitorWaitTimeInMs; | ||||
private final int checkPointIntervalInMs; | ||||
private final ExecutorService executorService; | ||||
|
||||
private boolean enableAcknowledgement = false; | ||||
|
||||
public StreamAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager, | ||||
final DataStreamPartitionCheckpoint partitionCheckpoint, | ||||
final Duration partitionAcknowledgmentTimeout, | ||||
final int acknowledgementMonitorWaitTimeInMs, | ||||
final int checkPointIntervalInMs) { | ||||
this.acknowledgementSetManager = acknowledgementSetManager; | ||||
this.partitionCheckpoint = partitionCheckpoint; | ||||
this.partitionAcknowledgmentTimeout = partitionAcknowledgmentTimeout; | ||||
this.acknowledgementMonitorWaitTimeInMs = acknowledgementMonitorWaitTimeInMs; | ||||
this.checkPointIntervalInMs = checkPointIntervalInMs; | ||||
executorService = Executors.newSingleThreadExecutor(); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's use the Here is an example of using it: Line 48 in 19b18a1
This thread factory will give us more useful names and also make the thread a daemon thread to ensure Data Prepper shuts down properly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||
} | ||||
|
||||
void init() { | ||||
enableAcknowledgement = true; | ||||
final Thread currentThread = Thread.currentThread(); | ||||
executorService.submit(() -> monitorCheckpoints(executorService, currentThread)); | ||||
} | ||||
|
||||
private void monitorCheckpoints(final ExecutorService executorService, final Thread parentThread) { | ||||
long lastCheckpointTime = System.currentTimeMillis(); | ||||
CheckpointStatus lastCheckpointStatus = null; | ||||
while (!Thread.currentThread().isInterrupted()) { | ||||
final CheckpointStatus checkpointStatus = checkpoints.peek(); | ||||
if (checkpointStatus != null) { | ||||
if (checkpointStatus.isAcknowledged()) { | ||||
lastCheckpointStatus = checkpoints.poll(); | ||||
ackStatus.remove(checkpointStatus.getResumeToken()); | ||||
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) { | ||||
LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount()); | ||||
partitionCheckpoint.checkpoint(checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount()); | ||||
lastCheckpointTime = System.currentTimeMillis(); | ||||
} | ||||
} else { | ||||
LOG.debug("Checkpoint not complete for resume token {}", checkpointStatus.getResumeToken()); | ||||
final Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(checkpointStatus.getCreateTimestamp()), Instant.now()); | ||||
// Acknowledgement not received for the checkpoint after twice ack wait time | ||||
if (ackWaitDuration.getSeconds() > partitionAcknowledgmentTimeout.getSeconds() * 2) { | ||||
// Give up partition and should interrupt parent thread to stop processing stream | ||||
if (lastCheckpointStatus != null && lastCheckpointStatus.isAcknowledged()) { | ||||
partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount()); | ||||
} | ||||
LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken()); | ||||
partitionCheckpoint.giveUpPartition(); | ||||
Thread.currentThread().interrupt(); | ||||
} | ||||
} | ||||
} | ||||
|
||||
try { | ||||
Thread.sleep(acknowledgementMonitorWaitTimeInMs); | ||||
} catch (InterruptedException ex) { | ||||
Thread.currentThread().interrupt(); | ||||
} | ||||
} | ||||
parentThread.interrupt(); | ||||
executorService.shutdown(); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should L88-L89 be in init method instead of the callable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, once this monitor exists because of ack wait timeout, it should stop the stream worker. |
||||
} | ||||
|
||||
Optional<AcknowledgementSet> createAcknowledgementSet(final String resumeToken, final long recordNumber) { | ||||
if (!enableAcknowledgement) { | ||||
return Optional.empty(); | ||||
} | ||||
|
||||
final CheckpointStatus checkpointStatus = new CheckpointStatus(resumeToken, recordNumber, Instant.now().toEpochMilli()); | ||||
checkpoints.add(checkpointStatus); | ||||
ackStatus.put(resumeToken, checkpointStatus); | ||||
return Optional.of(acknowledgementSetManager.create((result) -> { | ||||
if (result) { | ||||
final CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken); | ||||
ackCheckpointStatus.setAcknowledgedTimestamp(Instant.now().toEpochMilli()); | ||||
ackCheckpointStatus.setAcknowledged(true); | ||||
LOG.debug("Received acknowledgment of completion from sink for checkpoint {}", resumeToken); | ||||
} else { | ||||
LOG.warn("Negative acknowledgment received for checkpoint {}, resetting checkpoint", resumeToken); | ||||
// default CheckpointStatus acknowledged value is false. The monitorCheckpoints method will time out | ||||
// and reprocess stream from last successful checkpoint in the order. | ||||
} | ||||
}, partitionAcknowledgmentTimeout)); | ||||
} | ||||
|
||||
void shutdown() { | ||||
executorService.shutdown(); | ||||
} | ||||
|
||||
@VisibleForTesting | ||||
ConcurrentHashMap<String, CheckpointStatus> getAcknowledgementStatus() { | ||||
return ackStatus; | ||||
} | ||||
|
||||
@VisibleForTesting | ||||
ConcurrentLinkedQueue<CheckpointStatus> getCheckpoints() { | ||||
return checkpoints; | ||||
} | ||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, use the
BackgroundThreadFactory
here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done