From ccf3bb1f186b0692fa7d71f2ce5b75bca0a50c4a Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Fri, 19 Sep 2025 10:08:17 +0200 Subject: [PATCH] [FLINK-38370] Ensure CommitterOperator commits all pending committables in batch mode In #26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since streaming pipelines can continue to checkpoint even after their respective operators have been shut down, it is not safe to use a constant as this can lead to duplicate commits. However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should suffice in this scenario. Any pending committables should be processed by the ComitterOperator when the operator shuts down. No further checkpoints will take place. There are various connectors which rely on this behavior. I don't see any drawbacks from keeping this behavior for batch pipelines. --- .../operators/sink/CommitterOperator.java | 2 +- .../sink/CommitterOperatorTestBase.java | 27 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 6954ad24e36f2..2f766a341cbb0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -148,7 +148,7 @@ public void snapshotState(StateSnapshotContext context) throws Exception { public void endInput() throws Exception { if (!isCheckpointingEnabled || isBatchMode) { // There will be no final checkpoint, all committables should be committed here - commitAndEmitCheckpoints(lastCompletedCheckpointId + 1); + commitAndEmitCheckpoints(Long.MAX_VALUE); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java index c8b37943846af..ee58c8b94acb7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java @@ -276,6 +276,33 @@ void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Ex assertThat(testHarness.getOutput()).hasSize(2); } + @Test + void testEmitCommittablesBatch() throws Exception { + SinkAndCounters sinkAndCounters = sinkWithoutPostCommit(); + final OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>(sinkAndCounters.sink, true, false)); + testHarness.open(); + + // Test that all committables up to Long.MAX_VALUE are committed. + long checkpointId = Long.MAX_VALUE; + final CommittableSummary committableSummary = + new CommittableSummary<>(1, 1, checkpointId, 1, 0, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage committableWithLineage = + new CommittableWithLineage<>("1", checkpointId, 1); + testHarness.processElement(new StreamRecord<>(committableWithLineage)); + + testHarness.endInput(); + + assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1); + assertThat(testHarness.getOutput()).isEmpty(); + + testHarness.close(); + } + private OneInputStreamOperatorTestHarness< CommittableMessage, CommittableMessage> createTestHarness(