diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 4a640fedf7598..7fecc8abffeee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -151,7 +151,7 @@ public void snapshotState(StateSnapshotContext context) throws Exception { public void endInput() throws Exception { if (!isCheckpointingEnabled || isBatchMode) { // There will be no final checkpoint, all committables should be committed here - commitAndEmitCheckpoints(lastCompletedCheckpointId + 1); + commitAndEmitCheckpoints(Long.MAX_VALUE); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java index 03a0c791b3a13..0025acdf2f369 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java @@ -36,11 +36,14 @@ import org.assertj.core.api.ListAssert; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.util.Collection; import java.util.function.IntSupplier; +import java.util.stream.Stream; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; @@ -85,9 +88,17 @@ SinkAndCounters sinkWithoutPostCommit() { () -> committer.successfulCommits); } + static Stream testParameters() { + return Stream.of( + Arguments.of(true, false), + Arguments.of(true, true), + Arguments.of(false, false), + Arguments.of(false, true)); + } + @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testEmitCommittables(boolean withPostCommitTopology) throws Exception { + @MethodSource("testParameters") + void testEmitCommittables(boolean withPostCommitTopology, boolean isBatch) throws Exception { SinkAndCounters sinkAndCounters; if (withPostCommitTopology) { // Insert global committer to simulate post commit topology @@ -99,7 +110,8 @@ void testEmitCommittables(boolean withPostCommitTopology) throws Exception { CommittableMessage, CommittableMessage> testHarness = new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>(sinkAndCounters.sink, false, true)); + new CommitterOperatorFactory<>( + sinkAndCounters.sink, isBatch, true)); testHarness.open(); final CommittableSummary committableSummary = @@ -127,6 +139,33 @@ void testEmitCommittables(boolean withPostCommitTopology) throws Exception { testHarness.close(); } + @Test + void testEmitCommittablesBatch() throws Exception { + SinkAndCounters sinkAndCounters = sinkWithoutPostCommit(); + final OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>(sinkAndCounters.sink, true, false)); + testHarness.open(); + + // Test that all committables up to Long.MAX_VALUE are committed. + long checkpointId = Long.MAX_VALUE; + final CommittableSummary committableSummary = + new CommittableSummary<>(1, 1, checkpointId, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage committableWithLineage = + new CommittableWithLineage<>("1", checkpointId, 1); + testHarness.processElement(new StreamRecord<>(committableWithLineage)); + + testHarness.endInput(); + + assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1); + assertThat(testHarness.getOutput()).isEmpty(); + + testHarness.close(); + } + @Test void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { SinkAndCounters sinkAndCounters = sinkWithPostCommit();