diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 543a21f07..f33e618d2 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -23,7 +23,7 @@ endif::[] == 0.5.3.3 === Fixes - +* fix: make committed offset accurate when partition assigned to avoid offset reset (#894) * fix: close parallel consumer on transactional mode when InvalidPidMappingException (#830) * fix: support kafka-clients 3.9.0 (#841) * fix: Paused consumption across multiple consumers (#857) diff --git a/README.adoc b/README.adoc index 39f4db1be..14eadefe1 100644 --- a/README.adoc +++ b/README.adoc @@ -1543,7 +1543,7 @@ endif::[] == 0.5.3.3 === Fixes - +* fix: make committed offset accurate when partition assigned to avoid offset reset (#894) * fix: close parallel consumer on transactional mode when InvalidPidMappingException (#830) * fix: support kafka-clients 3.9.0 (#841) * fix: Paused consumption across multiple consumers (#857) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index 5f2e036fe..67c476992 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -1,9 +1,10 @@ package io.confluent.parallelconsumer.state; /*- - * Copyright (C) 2020-2024 Confluent, Inc. + * Copyright (C) 2020-2025 Confluent, Inc. */ +import io.confluent.parallelconsumer.ParallelConsumer; import io.confluent.parallelconsumer.internal.BrokerPollSystem; import io.confluent.parallelconsumer.internal.EpochAndRecordsMap; import io.confluent.parallelconsumer.internal.PCModule; @@ -410,8 +411,11 @@ public Optional getCommitDataIfDirty() { // visible for testing protected OffsetAndMetadata createOffsetAndMetadata() { - Optional payloadOpt = tryToEncodeOffsets(); - long nextOffset = getOffsetToCommit(); + // use tuple to make sure getOffsetToCommit is invoked only once to avoid dirty read + // and commit the wrong offset + ParallelConsumer.Tuple, Long> tuple = tryToEncodeOffsets(); + Optional payloadOpt = tuple.getLeft(); + long nextOffset = tuple.getRight(); return payloadOpt .map(encodedOffsets -> new OffsetAndMetadata(nextOffset, encodedOffsets)) .orElseGet(() -> new OffsetAndMetadata(nextOffset)); @@ -481,29 +485,30 @@ public long getOffsetHighestSequentialSucceeded() { * * @return if possible, the String encoded offset map */ - private Optional tryToEncodeOffsets() { + private ParallelConsumer.Tuple, Long> tryToEncodeOffsets() { + long offsetOfNextExpectedMessage = getOffsetToCommit(); + if (incompleteOffsets.isEmpty()) { setAllowedMoreRecords(true); - return empty(); + return ParallelConsumer.Tuple.pairOf(empty(), offsetOfNextExpectedMessage); } try { // todo refactor use of null shouldn't be needed. Is OffsetMapCodecManager stateful? remove null #233 - long offsetOfNextExpectedMessage = getOffsetToCommit(); var offsetRange = getOffsetHighestSucceeded() - offsetOfNextExpectedMessage; String offsetMapPayload = om.makeOffsetMetadataPayload(offsetOfNextExpectedMessage, this); ratioPayloadUsedDistributionSummary.record(offsetMapPayload.length() / (double) offsetRange); ratioMetadataSpaceUsedDistributionSummary.record(offsetMapPayload.length() / (double) OffsetMapCodecManager.DefaultMaxMetadataSize); boolean mustStrip = updateBlockFromEncodingResult(offsetMapPayload); if (mustStrip) { - return empty(); + return ParallelConsumer.Tuple.pairOf(empty(), offsetOfNextExpectedMessage); } else { - return of(offsetMapPayload); + return ParallelConsumer.Tuple.pairOf(of(offsetMapPayload), offsetOfNextExpectedMessage); } } catch (NoEncodingPossibleException e) { setAllowedMoreRecords(false); log.warn("No encodings could be used to encode the offset map, skipping. Warning: messages might be replayed on rebalance.", e); - return empty(); + return ParallelConsumer.Tuple.pairOf(empty(), offsetOfNextExpectedMessage); } }