From 64cd5f46ae580eda5383797a47348d73c19f5232 Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Mon, 20 Oct 2025 16:19:06 +0800 Subject: [PATCH 01/15] make offsetHighestSucceeded accurate when partition assigned --- .../io/confluent/parallelconsumer/state/PartitionState.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index 71e7a4c5a..38e993873 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -204,7 +204,7 @@ private void initStateFromOffsetData(OffsetMapCodecManager.HighestOffsetAndIncom offsetData.getIncompleteOffsets() .forEach(offset -> incompleteOffsets.put(offset, Optional.empty())); - this.offsetHighestSucceeded = this.offsetHighestSeen; // by definition, as we only encode up to the highest seen offset (inclusive) + this.offsetHighestSucceeded = this.offsetHighestSeen - 1; // we need to make sure offset in OffsetAndMetadata -1 is the correct processed offset } private void maybeRaiseHighestSeenOffset(final long offset) { From 8e66cb166a30cb15c2377fb4c79f15eb73106c8a Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Mon, 20 Oct 2025 17:01:25 +0800 Subject: [PATCH 02/15] update readme --- CHANGELOG.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index d3922ffa5..5c9ec9810 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -17,7 +17,7 @@ endif::[] == 0.5.3.3 === Fixes - +* fix: make offsetHighestSucceeded accurate when partition assigned to avoid offset reset (#894) * fix: close parallel consumer on transactional mode when InvalidPidMappingException (#830) * fix: support kafka-clients 3.9.0 (#841) From c458c9848569cee04bdf2bcdf8ba059ba08ef5ee Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Mon, 20 Oct 2025 17:03:50 +0800 Subject: [PATCH 03/15] fix header --- README.adoc | 3 ++- .../io/confluent/parallelconsumer/state/PartitionState.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/README.adoc b/README.adoc index 0e1dbd1e5..2dee541f8 100644 --- a/README.adoc +++ b/README.adoc @@ -1537,8 +1537,9 @@ endif::[] == 0.5.3.3 === Fixes - +* fix: make offsetHighestSucceeded accurate when partition assigned to avoid offset reset (#894) * fix: close parallel consumer on transactional mode when InvalidPidMappingException (#830) +* fix: support kafka-clients 3.9.0 (#841) == 0.5.3.2 diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index 38e993873..4b738813e 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -1,7 +1,7 @@ package io.confluent.parallelconsumer.state; /*- - * Copyright (C) 2020-2024 Confluent, Inc. + * Copyright (C) 2020-2025 Confluent, Inc. */ import io.confluent.parallelconsumer.internal.BrokerPollSystem; From e280a55858673a30caebdb03cce1b00a5ef7e2bd Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Mon, 20 Oct 2025 22:36:19 +0800 Subject: [PATCH 04/15] add guard to avoid wrong commit --- .../io/confluent/parallelconsumer/state/PartitionState.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index 4b738813e..8deba7f7d 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -162,6 +162,9 @@ public class PartitionState { @Getter private final long partitionsAssignmentEpoch; + @Getter + private boolean needToCommit = false; + private long lastCommittedOffset; private Gauge lastCommittedOffsetGauge; private Gauge highestSeenOffsetGauge; @@ -274,6 +277,7 @@ private void updateHighestSucceededOffsetSoFar(long thisOffset) { if (thisOffset > highestSucceeded) { log.trace("Updating highest completed - was: {} now: {}", highestSucceeded, thisOffset); this.offsetHighestSucceeded = thisOffset; + needToCommit = true; } } @@ -397,7 +401,7 @@ public boolean isRemoved() { } public Optional getCommitDataIfDirty() { - if (isDirty()) { + if (isDirty() && needToCommit) { // setting the flag so that any subsequent offset completed while commit is being performed could mark state as dirty // and retain the dirty state on commit completion. stateChangedSinceCommitStart = false; From 727a140d143b840a2a47bbcf1aa25fedf9ec6a14 Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Tue, 21 Oct 2025 10:00:14 +0800 Subject: [PATCH 05/15] add more comment --- .../io/confluent/parallelconsumer/state/PartitionState.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index 8deba7f7d..541f84f2a 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -162,6 +162,10 @@ public class PartitionState { @Getter private final long partitionsAssignmentEpoch; + /** + * Additional flag to prevent unnecessary commit if no records has been processed yet. + * especially to prevent incorrect commit of offsetHighestSucceeded when partition just assigned + */ @Getter private boolean needToCommit = false; From 7c6c8baf593850115b5c66c726ff2ddeab31b1f7 Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Fri, 24 Oct 2025 19:54:16 +0900 Subject: [PATCH 06/15] fix UT --- .../io/confluent/parallelconsumer/state/PartitionState.java | 3 ++- .../state/PartitionStateCommittedOffsetTest.java | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index 541f84f2a..85dc2847b 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -167,6 +167,7 @@ public class PartitionState { * especially to prevent incorrect commit of offsetHighestSucceeded when partition just assigned */ @Getter + @Setter private boolean needToCommit = false; private long lastCommittedOffset; @@ -211,7 +212,7 @@ private void initStateFromOffsetData(OffsetMapCodecManager.HighestOffsetAndIncom offsetData.getIncompleteOffsets() .forEach(offset -> incompleteOffsets.put(offset, Optional.empty())); - this.offsetHighestSucceeded = this.offsetHighestSeen - 1; // we need to make sure offset in OffsetAndMetadata -1 is the correct processed offset + this.offsetHighestSucceeded = this.offsetHighestSeen ; // we need to make sure offset in OffsetAndMetadata -1 is the correct processed offset } private void maybeRaiseHighestSeenOffset(final long offset) { diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java index 9c95aece2..68b094b37 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java @@ -183,6 +183,7 @@ void workCompletedDuringAsyncCommitShouldKeepStateAsDirty(){ new TreeSet<>(Arrays.asList(completedOffset, incompleteOffset))); PartitionState state = new PartitionState<>(0, mu.getModule(), tp, offsetData); state.onSuccess(completedOffset); + state.setNeedToCommit(true); // fetch committable/completed offset OffsetAndMetadata offsetAndMetadata = state.getCommitDataIfDirty().get(); From ff5c7b2a4aa9d2bd2c8f873062559e0b0344a8c6 Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Fri, 24 Oct 2025 19:56:33 +0900 Subject: [PATCH 07/15] revert comment --- .../io/confluent/parallelconsumer/state/PartitionState.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index 85dc2847b..c92aca003 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -212,7 +212,7 @@ private void initStateFromOffsetData(OffsetMapCodecManager.HighestOffsetAndIncom offsetData.getIncompleteOffsets() .forEach(offset -> incompleteOffsets.put(offset, Optional.empty())); - this.offsetHighestSucceeded = this.offsetHighestSeen ; // we need to make sure offset in OffsetAndMetadata -1 is the correct processed offset + this.offsetHighestSucceeded = this.offsetHighestSeen; // by definition, as we only encode up to the highest seen offset (inclusive) } private void maybeRaiseHighestSeenOffset(final long offset) { From 9a210854aa37caf3dbf90a65dbfa6ca4c4091fa0 Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Tue, 28 Oct 2025 23:15:47 +0900 Subject: [PATCH 08/15] lastProcessedOffset to track the last incompletes offset --- .../parallelconsumer/state/PartitionState.java | 16 +++++++++------- .../state/PartitionStateCommittedOffsetTest.java | 1 - 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index c92aca003..afd4c1ebc 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -163,12 +163,11 @@ public class PartitionState { private final long partitionsAssignmentEpoch; /** - * Additional flag to prevent unnecessary commit if no records has been processed yet. - * especially to prevent incorrect commit of offsetHighestSucceeded when partition just assigned + * we need to persist the last incompletes offset when size is 1, to avoid wrongly commit with offsetHighestSucceeded + * if the incompletes is empty */ @Getter - @Setter - private boolean needToCommit = false; + private Long lastProcessedOffset = null; private long lastCommittedOffset; private Gauge lastCommittedOffsetGauge; @@ -213,6 +212,7 @@ private void initStateFromOffsetData(OffsetMapCodecManager.HighestOffsetAndIncom .forEach(offset -> incompleteOffsets.put(offset, Optional.empty())); this.offsetHighestSucceeded = this.offsetHighestSeen; // by definition, as we only encode up to the highest seen offset (inclusive) + this.lastProcessedOffset = null; } private void maybeRaiseHighestSeenOffset(final long offset) { @@ -262,6 +262,9 @@ public int getNumberOfIncompleteOffsets() { public void onSuccess(long offset) { //noinspection OptionalAssignedToNull - null check to see if key existed + if (this.incompleteOffsets.size() == 1) { + this.lastCommittedOffset = offset; + } boolean removedFromIncompletes = this.incompleteOffsets.remove(offset) != null; // NOSONAR assert (removedFromIncompletes); @@ -282,7 +285,6 @@ private void updateHighestSucceededOffsetSoFar(long thisOffset) { if (thisOffset > highestSucceeded) { log.trace("Updating highest completed - was: {} now: {}", highestSucceeded, thisOffset); this.offsetHighestSucceeded = thisOffset; - needToCommit = true; } } @@ -406,7 +408,7 @@ public boolean isRemoved() { } public Optional getCommitDataIfDirty() { - if (isDirty() && needToCommit) { + if (isDirty()) { // setting the flag so that any subsequent offset completed while commit is being performed could mark state as dirty // and retain the dirty state on commit completion. stateChangedSinceCommitStart = false; @@ -474,7 +476,7 @@ public long getOffsetHighestSequentialSucceeded() { boolean incompleteOffsetsWasEmpty = firstIncompleteOffset == null; if (incompleteOffsetsWasEmpty) { - return currentOffsetHighestSeen; + return lastProcessedOffset == null ? currentOffsetHighestSeen : lastProcessedOffset; } else { return firstIncompleteOffset - 1; } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java index 68b094b37..9c95aece2 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java @@ -183,7 +183,6 @@ void workCompletedDuringAsyncCommitShouldKeepStateAsDirty(){ new TreeSet<>(Arrays.asList(completedOffset, incompleteOffset))); PartitionState state = new PartitionState<>(0, mu.getModule(), tp, offsetData); state.onSuccess(completedOffset); - state.setNeedToCommit(true); // fetch committable/completed offset OffsetAndMetadata offsetAndMetadata = state.getCommitDataIfDirty().get(); From 7e73047934c1d98cdf0aeb050924feb442abea38 Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Wed, 29 Oct 2025 09:12:51 +0900 Subject: [PATCH 09/15] clear up lastProcessedOffset after commit --- .../io/confluent/parallelconsumer/state/PartitionState.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index afd4c1ebc..cd86fa983 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -164,7 +164,7 @@ public class PartitionState { /** * we need to persist the last incompletes offset when size is 1, to avoid wrongly commit with offsetHighestSucceeded - * if the incompletes is empty + * if the incompletes is empty, since we expect incompletes offset should be always higher than committed offset */ @Getter private Long lastProcessedOffset = null; @@ -225,6 +225,8 @@ private void maybeRaiseHighestSeenOffset(final long offset) { public void onOffsetCommitSuccess(OffsetAndMetadata committed) { //NOSONAR lastCommittedOffset = committed.offset(); + // clear up lastProcessedOffset after commit + lastProcessedOffset = null; setClean(); } From 551c9cf443b4346ff5476a9f660fdcc52ee45c29 Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Wed, 29 Oct 2025 09:20:34 +0900 Subject: [PATCH 10/15] add clear func --- .../confluent/parallelconsumer/state/PartitionState.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index cd86fa983..bfdf49952 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -212,7 +212,7 @@ private void initStateFromOffsetData(OffsetMapCodecManager.HighestOffsetAndIncom .forEach(offset -> incompleteOffsets.put(offset, Optional.empty())); this.offsetHighestSucceeded = this.offsetHighestSeen; // by definition, as we only encode up to the highest seen offset (inclusive) - this.lastProcessedOffset = null; + clearLastProcessedOffset(); } private void maybeRaiseHighestSeenOffset(final long offset) { @@ -226,7 +226,7 @@ private void maybeRaiseHighestSeenOffset(final long offset) { public void onOffsetCommitSuccess(OffsetAndMetadata committed) { //NOSONAR lastCommittedOffset = committed.offset(); // clear up lastProcessedOffset after commit - lastProcessedOffset = null; + clearLastProcessedOffset(); setClean(); } @@ -241,6 +241,10 @@ private void setDirty() { setDirty(true); } + private void clearLastProcessedOffset() { + lastProcessedOffset = null; + } + // todo rename isRecordComplete() // todo add support for this to TruthGen public boolean isRecordPreviouslyCompleted(final ConsumerRecord rec) { @@ -382,6 +386,7 @@ private void maybeTruncateBelowOrAbove(long bootstrapPolledOffset) { // truncate final NavigableSet incompletesToPrune = incompleteOffsets.keySet().headSet(bootstrapPolledOffset, false); incompletesToPrune.forEach(incompleteOffsets::remove); + clearLastProcessedOffset(); } else if (pollBelowExpected) { // reset to lower offset detected, so we need to reset our state to match log.warn("Bootstrap polled offset has been reset to an earlier offset ({}) for partition {} of topic {} - truncating state - all records " + From 40fba3e734777ca9858ea5d76ff9008af7f4191e Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Wed, 29 Oct 2025 13:42:50 +0900 Subject: [PATCH 11/15] add update logic check --- .../parallelconsumer/state/PartitionState.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index bfdf49952..a717e86b4 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -164,7 +164,9 @@ public class PartitionState { /** * we need to persist the last incompletes offset when size is 1, to avoid wrongly commit with offsetHighestSucceeded - * if the incompletes is empty, since we expect incompletes offset should be always higher than committed offset + * if the incompletes is empty, since we expect incompletes offset should be always higher than committed offset。 + * While race condition should be rare since commit is not frequent and incompletes normally contains multi-offsets and this will + * not be used for commit offset */ @Getter private Long lastProcessedOffset = null; @@ -225,8 +227,10 @@ private void maybeRaiseHighestSeenOffset(final long offset) { public void onOffsetCommitSuccess(OffsetAndMetadata committed) { //NOSONAR lastCommittedOffset = committed.offset(); - // clear up lastProcessedOffset after commit - clearLastProcessedOffset(); + // clear up lastProcessedOffset after commit, only commit when the offset matches to avoid race condition + if (lastCommittedOffset == lastProcessedOffset) { + clearLastProcessedOffset(); + } setClean(); } From bbed50f7406ed8cc6ee2090793ab210acdde3432 Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Thu, 30 Oct 2025 10:53:42 +0900 Subject: [PATCH 12/15] add null check --- .../io/confluent/parallelconsumer/state/PartitionState.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index a717e86b4..dc594ff4e 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -228,7 +228,7 @@ private void maybeRaiseHighestSeenOffset(final long offset) { public void onOffsetCommitSuccess(OffsetAndMetadata committed) { //NOSONAR lastCommittedOffset = committed.offset(); // clear up lastProcessedOffset after commit, only commit when the offset matches to avoid race condition - if (lastCommittedOffset == lastProcessedOffset) { + if (lastProcessedOffset != null && lastCommittedOffset == lastProcessedOffset) { clearLastProcessedOffset(); } setClean(); From 6fe9d1230abaf46030a65992551703321099cfcb Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Thu, 30 Oct 2025 10:53:57 +0900 Subject: [PATCH 13/15] update readme --- CHANGELOG.adoc | 2 +- README.adoc | 2 +- parallel-consumer-core/pom.xml | 5 +++++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index ca1898332..fa9b1e4f1 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -17,7 +17,7 @@ endif::[] == 0.5.3.3 === Fixes -* fix: make offsetHighestSucceeded accurate when partition assigned to avoid offset reset (#894) +* fix: make committed offset accurate when partition assigned to avoid offset reset (#894) * fix: close parallel consumer on transactional mode when InvalidPidMappingException (#830) * fix: support kafka-clients 3.9.0 (#841) * fix: Paused consumption across multiple consumers (#857) diff --git a/README.adoc b/README.adoc index 128477eab..322d9455d 100644 --- a/README.adoc +++ b/README.adoc @@ -1537,7 +1537,7 @@ endif::[] == 0.5.3.3 === Fixes -* fix: make offsetHighestSucceeded accurate when partition assigned to avoid offset reset (#894) +* fix: make committed offset accurate when partition assigned to avoid offset reset (#894) * fix: close parallel consumer on transactional mode when InvalidPidMappingException (#830) * fix: support kafka-clients 3.9.0 (#841) * fix: Paused consumption across multiple consumers (#857) diff --git a/parallel-consumer-core/pom.xml b/parallel-consumer-core/pom.xml index 5970f43b4..256547b81 100644 --- a/parallel-consumer-core/pom.xml +++ b/parallel-consumer-core/pom.xml @@ -116,6 +116,11 @@ 8.0.1.RELEASE test + + javax.annotation + javax.annotation-api + 1.3.2 + From 6c7711e999461d06679c6bfa78876ee0e56b3a0c Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Thu, 30 Oct 2025 11:58:58 +0900 Subject: [PATCH 14/15] revet --- parallel-consumer-core/pom.xml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/parallel-consumer-core/pom.xml b/parallel-consumer-core/pom.xml index 256547b81..5970f43b4 100644 --- a/parallel-consumer-core/pom.xml +++ b/parallel-consumer-core/pom.xml @@ -116,11 +116,6 @@ 8.0.1.RELEASE test - - javax.annotation - javax.annotation-api - 1.3.2 - From d7005295db0eb01be4134b267c240ff47e91c458 Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Fri, 31 Oct 2025 18:09:03 +0900 Subject: [PATCH 15/15] fix race condition when getOffsetToCommit --- .../state/PartitionState.java | 45 ++++++------------- 1 file changed, 14 insertions(+), 31 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index 515079291..67c476992 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -4,6 +4,7 @@ * Copyright (C) 2020-2025 Confluent, Inc. */ +import io.confluent.parallelconsumer.ParallelConsumer; import io.confluent.parallelconsumer.internal.BrokerPollSystem; import io.confluent.parallelconsumer.internal.EpochAndRecordsMap; import io.confluent.parallelconsumer.internal.PCModule; @@ -162,15 +163,6 @@ public class PartitionState { @Getter private final long partitionsAssignmentEpoch; - /** - * we need to persist the last incompletes offset when size is 1, to avoid wrongly commit with offsetHighestSucceeded - * if the incompletes is empty, since we expect incompletes offset should be always higher than committed offset。 - * While race condition should be rare since commit is not frequent and incompletes normally contains multi-offsets and this will - * not be used for commit offset - */ - @Getter - private Long lastProcessedOffset = null; - private long lastCommittedOffset; private Gauge lastCommittedOffsetGauge; private Gauge highestSeenOffsetGauge; @@ -216,7 +208,6 @@ private void initStateFromOffsetData(OffsetMapCodecManager.HighestOffsetAndIncom .forEach(offset -> incompleteOffsets.put(offset, Optional.empty())); this.offsetHighestSucceeded = this.offsetHighestSeen; // by definition, as we only encode up to the highest seen offset (inclusive) - clearLastProcessedOffset(); } private void maybeRaiseHighestSeenOffset(final long offset) { @@ -229,10 +220,6 @@ private void maybeRaiseHighestSeenOffset(final long offset) { public void onOffsetCommitSuccess(OffsetAndMetadata committed) { //NOSONAR lastCommittedOffset = committed.offset(); - // clear up lastProcessedOffset after commit, only commit when the offset matches to avoid race condition - if (lastProcessedOffset != null && lastCommittedOffset == lastProcessedOffset) { - clearLastProcessedOffset(); - } setClean(); } @@ -247,10 +234,6 @@ private void setDirty() { setDirty(true); } - private void clearLastProcessedOffset() { - lastProcessedOffset = null; - } - // todo rename isRecordComplete() // todo add support for this to TruthGen public boolean isRecordPreviouslyCompleted(final ConsumerRecord rec) { @@ -274,9 +257,6 @@ public int getNumberOfIncompleteOffsets() { public void onSuccess(long offset) { //noinspection OptionalAssignedToNull - null check to see if key existed - if (this.incompleteOffsets.size() == 1) { - this.lastCommittedOffset = offset; - } boolean removedFromIncompletes = this.incompleteOffsets.remove(offset) != null; // NOSONAR assert (removedFromIncompletes); @@ -392,7 +372,6 @@ private void maybeTruncateBelowOrAbove(long bootstrapPolledOffset) { // truncate final NavigableSet incompletesToPrune = incompleteOffsets.keySet().headSet(bootstrapPolledOffset, false); incompletesToPrune.forEach(incompleteOffsets::remove); - clearLastProcessedOffset(); } else if (pollBelowExpected) { // reset to lower offset detected, so we need to reset our state to match log.warn("Bootstrap polled offset has been reset to an earlier offset ({}) for partition {} of topic {} - truncating state - all records " + @@ -432,8 +411,11 @@ public Optional getCommitDataIfDirty() { // visible for testing protected OffsetAndMetadata createOffsetAndMetadata() { - Optional payloadOpt = tryToEncodeOffsets(); - long nextOffset = getOffsetToCommit(); + // use tuple to make sure getOffsetToCommit is invoked only once to avoid dirty read + // and commit the wrong offset + ParallelConsumer.Tuple, Long> tuple = tryToEncodeOffsets(); + Optional payloadOpt = tuple.getLeft(); + long nextOffset = tuple.getRight(); return payloadOpt .map(encodedOffsets -> new OffsetAndMetadata(nextOffset, encodedOffsets)) .orElseGet(() -> new OffsetAndMetadata(nextOffset)); @@ -489,7 +471,7 @@ public long getOffsetHighestSequentialSucceeded() { boolean incompleteOffsetsWasEmpty = firstIncompleteOffset == null; if (incompleteOffsetsWasEmpty) { - return lastProcessedOffset == null ? currentOffsetHighestSeen : lastProcessedOffset; + return currentOffsetHighestSeen; } else { return firstIncompleteOffset - 1; } @@ -503,29 +485,30 @@ public long getOffsetHighestSequentialSucceeded() { * * @return if possible, the String encoded offset map */ - private Optional tryToEncodeOffsets() { + private ParallelConsumer.Tuple, Long> tryToEncodeOffsets() { + long offsetOfNextExpectedMessage = getOffsetToCommit(); + if (incompleteOffsets.isEmpty()) { setAllowedMoreRecords(true); - return empty(); + return ParallelConsumer.Tuple.pairOf(empty(), offsetOfNextExpectedMessage); } try { // todo refactor use of null shouldn't be needed. Is OffsetMapCodecManager stateful? remove null #233 - long offsetOfNextExpectedMessage = getOffsetToCommit(); var offsetRange = getOffsetHighestSucceeded() - offsetOfNextExpectedMessage; String offsetMapPayload = om.makeOffsetMetadataPayload(offsetOfNextExpectedMessage, this); ratioPayloadUsedDistributionSummary.record(offsetMapPayload.length() / (double) offsetRange); ratioMetadataSpaceUsedDistributionSummary.record(offsetMapPayload.length() / (double) OffsetMapCodecManager.DefaultMaxMetadataSize); boolean mustStrip = updateBlockFromEncodingResult(offsetMapPayload); if (mustStrip) { - return empty(); + return ParallelConsumer.Tuple.pairOf(empty(), offsetOfNextExpectedMessage); } else { - return of(offsetMapPayload); + return ParallelConsumer.Tuple.pairOf(of(offsetMapPayload), offsetOfNextExpectedMessage); } } catch (NoEncodingPossibleException e) { setAllowedMoreRecords(false); log.warn("No encodings could be used to encode the offset map, skipping. Warning: messages might be replayed on rebalance.", e); - return empty(); + return ParallelConsumer.Tuple.pairOf(empty(), offsetOfNextExpectedMessage); } }