Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <oeyh@amazon.com>
  • Loading branch information
oeyh committed Aug 22, 2024
1 parent 847128b commit e44ec9d
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class RdsSourceConfig {
private boolean acknowledgments = false;

@JsonProperty("s3_data_file_acknowledgment_timeout")
private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(5);
private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(30);

@JsonProperty("stream_acknowledgment_timeout")
private Duration streamAcknowledgmentTimeout = Duration.ofMinutes(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
public class StreamCheckpointManager {
private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointManager.class);
static final int REGULAR_CHECKPOINT_INTERVAL_MILLIS = 60_000;
static final int CHANGE_EVENT_COUNT_PER_CHECKPOINT_BATCH = 1000;

private final ConcurrentLinkedQueue<ChangeEventStatus> changeEventStatuses = new ConcurrentLinkedQueue<>();
private final StreamCheckpointer streamCheckpointer;
Expand Down Expand Up @@ -74,8 +75,14 @@ void runCheckpointing() {
break;
}
} else {
int changeEventCount = 0;
do {
currentChangeEventStatus = changeEventStatuses.poll();
changeEventCount++;
// In case queue are populated faster than the poll, checkpoint when reaching certain count
if (changeEventCount % CHANGE_EVENT_COUNT_PER_CHECKPOINT_BATCH == 0) {
streamCheckpointer.checkpoint(currentChangeEventStatus.getBinlogCoordinate());
}
} while (!changeEventStatuses.isEmpty());
streamCheckpointer.checkpoint(currentChangeEventStatus.getBinlogCoordinate());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.source.rds.stream;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState;
Expand All @@ -20,20 +22,28 @@ public class StreamCheckpointer {
private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointer.class);

static final Duration CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE = Duration.ofMinutes(5);
static final String CHECKPOINT_COUNT = "checkpoints";

private final EnhancedSourceCoordinator sourceCoordinator;
private final StreamPartition streamPartition;
private final PluginMetrics pluginMetrics;
private final Counter checkpointCounter;

public StreamCheckpointer(final EnhancedSourceCoordinator sourceCoordinator, final StreamPartition streamPartition) {
public StreamCheckpointer(final EnhancedSourceCoordinator sourceCoordinator,
final StreamPartition streamPartition,
final PluginMetrics pluginMetrics) {
this.sourceCoordinator = sourceCoordinator;
this.streamPartition = streamPartition;
this.pluginMetrics = pluginMetrics;
checkpointCounter = pluginMetrics.counter(CHECKPOINT_COUNT);
}

public void checkpoint(final BinlogCoordinate binlogCoordinate) {
LOG.debug("Checkpointing stream partition {} with binlog coordinate {}", streamPartition.getPartitionKey(), binlogCoordinate);
Optional<StreamProgressState> progressState = streamPartition.getProgressState();
progressState.get().setCurrentPosition(binlogCoordinate);
sourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
checkpointCounter.increment();
}

public void extendLease() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void run() {
LOG.info("Acquired partition to read from stream");

final StreamPartition streamPartition = (StreamPartition) sourcePartition.get();
final StreamCheckpointer streamCheckpointer = new StreamCheckpointer(sourceCoordinator, streamPartition);
final StreamCheckpointer streamCheckpointer = new StreamCheckpointer(sourceCoordinator, streamPartition, pluginMetrics);
binaryLogClient.registerEventListener(new BinlogEventListener(
buffer, sourceConfig, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager));
final StreamWorker streamWorker = StreamWorker.create(sourceCoordinator, binaryLogClient, pluginMetrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

package org.opensearch.dataprepper.plugins.source.rds.stream;

import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState;
Expand All @@ -20,6 +22,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.rds.stream.StreamCheckpointer.CHECKPOINT_COUNT;
import static org.opensearch.dataprepper.plugins.source.rds.stream.StreamCheckpointer.CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE;


Expand All @@ -32,11 +35,18 @@ class StreamCheckpointerTest {
@Mock
private StreamPartition streamPartition;

@Mock
private PluginMetrics pluginMetrics;

@Mock
private Counter checkpointCounter;

private StreamCheckpointer streamCheckpointer;


@BeforeEach
void setUp() {
when(pluginMetrics.counter(CHECKPOINT_COUNT)).thenReturn(checkpointCounter);
streamCheckpointer = createObjectUnderTest();
}

Expand All @@ -50,6 +60,7 @@ void test_checkpoint() {

verify(streamProgressState).setCurrentPosition(binlogCoordinate);
verify(sourceCoordinator).saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
verify(checkpointCounter).increment();
}

@Test
Expand All @@ -67,6 +78,6 @@ void test_giveUpPartition() {
}

private StreamCheckpointer createObjectUnderTest() {
return new StreamCheckpointer(sourceCoordinator, streamPartition);
return new StreamCheckpointer(sourceCoordinator, streamPartition, pluginMetrics);
}
}

0 comments on commit e44ec9d

Please sign in to comment.