From e44ec9d2a033ac8bd3742e9cd411977a154d9518 Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Wed, 21 Aug 2024 23:12:57 -0500 Subject: [PATCH] Address review comments Signed-off-by: Hai Yan --- .../plugins/source/rds/RdsSourceConfig.java | 2 +- .../source/rds/stream/StreamCheckpointManager.java | 7 +++++++ .../source/rds/stream/StreamCheckpointer.java | 12 +++++++++++- .../plugins/source/rds/stream/StreamScheduler.java | 2 +- .../source/rds/stream/StreamCheckpointerTest.java | 13 ++++++++++++- 5 files changed, 32 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java index c861ab8b99..a1cb8c7e2f 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java @@ -58,7 +58,7 @@ public class RdsSourceConfig { private boolean acknowledgments = false; @JsonProperty("s3_data_file_acknowledgment_timeout") - private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(5); + private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(30); @JsonProperty("stream_acknowledgment_timeout") private Duration streamAcknowledgmentTimeout = Duration.ofMinutes(10); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManager.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManager.java index 8e6d9d549f..3827f2b822 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManager.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointManager.java @@ -20,6 +20,7 @@ public class StreamCheckpointManager { private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointManager.class); static final int REGULAR_CHECKPOINT_INTERVAL_MILLIS = 60_000; + static final int CHANGE_EVENT_COUNT_PER_CHECKPOINT_BATCH = 1000; private final ConcurrentLinkedQueue changeEventStatuses = new ConcurrentLinkedQueue<>(); private final StreamCheckpointer streamCheckpointer; @@ -74,8 +75,14 @@ void runCheckpointing() { break; } } else { + int changeEventCount = 0; do { currentChangeEventStatus = changeEventStatuses.poll(); + changeEventCount++; + // In case queue are populated faster than the poll, checkpoint when reaching certain count + if (changeEventCount % CHANGE_EVENT_COUNT_PER_CHECKPOINT_BATCH == 0) { + streamCheckpointer.checkpoint(currentChangeEventStatus.getBinlogCoordinate()); + } } while (!changeEventStatuses.isEmpty()); streamCheckpointer.checkpoint(currentChangeEventStatus.getBinlogCoordinate()); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointer.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointer.java index e488db2586..b76dbab7c9 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointer.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointer.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.source.rds.stream; +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; @@ -20,13 +22,20 @@ public class StreamCheckpointer { private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointer.class); static final Duration CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE = Duration.ofMinutes(5); + static final String CHECKPOINT_COUNT = "checkpoints"; private final EnhancedSourceCoordinator sourceCoordinator; private final StreamPartition streamPartition; + private final PluginMetrics pluginMetrics; + private final Counter checkpointCounter; - public StreamCheckpointer(final EnhancedSourceCoordinator sourceCoordinator, final StreamPartition streamPartition) { + public StreamCheckpointer(final EnhancedSourceCoordinator sourceCoordinator, + final StreamPartition streamPartition, + final PluginMetrics pluginMetrics) { this.sourceCoordinator = sourceCoordinator; this.streamPartition = streamPartition; + this.pluginMetrics = pluginMetrics; + checkpointCounter = pluginMetrics.counter(CHECKPOINT_COUNT); } public void checkpoint(final BinlogCoordinate binlogCoordinate) { @@ -34,6 +43,7 @@ public void checkpoint(final BinlogCoordinate binlogCoordinate) { Optional progressState = streamPartition.getProgressState(); progressState.get().setCurrentPosition(binlogCoordinate); sourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE); + checkpointCounter.increment(); } public void extendLease() { diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java index daea1e5df3..1886bba451 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java @@ -64,7 +64,7 @@ public void run() { LOG.info("Acquired partition to read from stream"); final StreamPartition streamPartition = (StreamPartition) sourcePartition.get(); - final StreamCheckpointer streamCheckpointer = new StreamCheckpointer(sourceCoordinator, streamPartition); + final StreamCheckpointer streamCheckpointer = new StreamCheckpointer(sourceCoordinator, streamPartition, pluginMetrics); binaryLogClient.registerEventListener(new BinlogEventListener( buffer, sourceConfig, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager)); final StreamWorker streamWorker = StreamWorker.create(sourceCoordinator, binaryLogClient, pluginMetrics); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointerTest.java index 7e04e9af8b..2fdac1065f 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointerTest.java @@ -5,11 +5,13 @@ package org.opensearch.dataprepper.plugins.source.rds.stream; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; @@ -20,6 +22,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.stream.StreamCheckpointer.CHECKPOINT_COUNT; import static org.opensearch.dataprepper.plugins.source.rds.stream.StreamCheckpointer.CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE; @@ -32,11 +35,18 @@ class StreamCheckpointerTest { @Mock private StreamPartition streamPartition; + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Counter checkpointCounter; + private StreamCheckpointer streamCheckpointer; @BeforeEach void setUp() { + when(pluginMetrics.counter(CHECKPOINT_COUNT)).thenReturn(checkpointCounter); streamCheckpointer = createObjectUnderTest(); } @@ -50,6 +60,7 @@ void test_checkpoint() { verify(streamProgressState).setCurrentPosition(binlogCoordinate); verify(sourceCoordinator).saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE); + verify(checkpointCounter).increment(); } @Test @@ -67,6 +78,6 @@ void test_giveUpPartition() { } private StreamCheckpointer createObjectUnderTest() { - return new StreamCheckpointer(sourceCoordinator, streamPartition); + return new StreamCheckpointer(sourceCoordinator, streamPartition, pluginMetrics); } } \ No newline at end of file