Skip to content

Conversation

@sangreal
Copy link
Contributor

@sangreal sangreal commented Oct 20, 2025

Description...
make offsetHighestSucceeded accurate when partition assigned
We have encountered offset reset issue while frequent partition rebalancing.
The root cause is caused by :
(1) the offsetHighestSucceeded is assigned w/ offset in OffsetAndMetadata which is to-be processed
(2) incompletes is non-empty
(3) one WorkContainer is processed successfully, then dirty is true (this offset < offsetHighestSucceeded)
(4) committer choose (offsetHighestSucceeded + 1) to commit because incompletes is non-empty (the offset is removed from incompletes)
(5) rebalancing happens, new consumer try to pull record will throw out of range and begin offset reset

issue #894

Checklist

  • Documentation (if applicable)
  • Changelog

@sangreal sangreal requested a review from a team as a code owner October 20, 2025 08:31
@sangreal sangreal changed the title make offsetHighestSucceeded accurate when partition assigned make offsetHighestSucceeded accurate when partition assigned to avoid offset reset Oct 20, 2025
@rkolesnev
Copy link
Contributor

rkolesnev commented Oct 26, 2025

Hey @sangreal - i will dig a bit more into it - but it doesn't look right to me - whenever partition is dirty (any offset was processed) - we do need to commit - even if highest succeeded was not advanced.
Something seems off to me in the steps above - there are two highest offsets that we care about - highest succeded overall and highest sequentially succeeded (that is offset that is committed) - when highest sequentially succeeded goes up - that is the base offset that is being committed. There cannot be any incompletes lower than that offset.

@rkolesnev
Copy link
Contributor

If you could build a test / reproducible example of this - it would help as well...

@sangreal
Copy link
Contributor Author

@rkolesnev It is really great that you could take your personal time to review my pr 👍
Actually, initially I make the fix in here. https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java#L207
to let this.offsetHighestSucceeded = this.offsetHighestSeen - 1;
But if rebalancing keeping happen and we have enough lower offset incompletes to commit, the offset will kept committing one by one and may eventually out of range.
So we have to make sure we only commit the processed message.
And when incompletes remove this offset before commit happens, offsetHighestSucceeded will be used instead.
So as you see, to make perfect fix, this will be very complex (make sure the offset removal happen after commit or other measure)

Again, I am all ear for your suggestion.

@rkolesnev
Copy link
Contributor

Hmm, I still don't fully understand where is the issue though.
In Kafka it is last processed offset + 1 that is committed - even in standard Kafka consumer. That should not cause issues. I don't think that we should be doing highe
Can you please review what you are observing and describe the flow step by step?
Is offset getting increased somewhere where there was no actual progress?

@sangreal sangreal changed the title make offsetHighestSucceeded accurate when partition assigned to avoid offset reset make committed accurate when partition assigned to avoid offset reset Oct 29, 2025
@sangreal sangreal changed the title make committed accurate when partition assigned to avoid offset reset make committed offset accurate when partition assigned to avoid offset reset Oct 29, 2025
@sangreal
Copy link
Contributor Author

sangreal commented Oct 29, 2025

I have updated the way to fix after more thinking. The idea is make sure getOffsetToCommit is invoked only once to avoid dirty read and commit the wrong offset.
@rkolesnev

@sangreal
Copy link
Contributor Author

@johnbyrnejb please help review as well, thx a lot

@sangreal
Copy link
Contributor Author

Parallel Consumer Offset reset Issue flow.pdf
I have updated the diagram and pr again, please have a check when you have time, thanks a lot

@sangreal
Copy link
Contributor Author

sangreal commented Nov 5, 2025

Let me explain more for you guys to reviews.

  1. supposed to commit 601266890 + 1 with incompletes (601266891)
  2. after creating OffsetAndMetadata, incompletes (601266891) and complete offset with runlength encoding with result of : (11)
  3. right before getOffsetToCommit, the 601266891 is processed and removed from incompletes. The offsetToCommit is changed to highestSucceedOffset + 1 since incompletes is empty (601266893)
  4. As a result, the final offset (601266893) and OffsetMetadata (11)
  5. rebalancing happened, when decoding OffsetAndMetadata, it will be calculated based on relative offset, therefore, highestSucceedOffset (601266894) and incompletes (601266893)
  6. 601266893 is normally processed, then incompletes be empty
  7. when commit, the offsetToCommit is 601266894 + 1 = 601266895.
  8. rebalancing happens again, when trying to poll 601266895, this offset doesn't exist, therefore offset reset happens.

@rkolesnev @johnbyrnejb please help review when you have time, we are waiting for the fix since this offset reset issue happens once every several days. Thanks a lot.

@rkolesnev
Copy link
Contributor

@sangreal - i had spent more time looking into this - and will try to get it done this week.
I think the main problem is this - in Kafka the committed offset is not offset that was last processed but last processed + 1.
So if we have 10 messages on topic with offsets 0-9 - and we processed offset 9 - we actually commit offset 10.

The Parallel Consumer has a bug somewhere in marking state dirty and advancing offset to commit by 1 - so after multiple rebalances it ends up committing not offset 10 - but offset 11 - which brings subscription out of valid range and causes auto offset reset to happen...

I am in the process of mapping all possible state transitions for PartitionState to work out if there are any other race conditions / state mismatches.

@sangreal sangreal closed this Nov 5, 2025
@sangreal sangreal reopened this Nov 5, 2025
@sangreal
Copy link
Contributor Author

sangreal commented Nov 5, 2025

@rkolesnev thank you so much for your efforts!
IMO, the commit and incompletes and complete encoding logic should be fine, although there will be some possible dupe.
The main issue here is the double fetch tobeCommit offset and in between, the incompletes may change and eventually changes the tobeCommit offset.
My solution is to avoid double fetching by merging into one.
This may cause dupe one or two offsets (incompletes offset may be processed) but I think this scenario should be rare and minor.
Looking forward to your investigation and opinion

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants