Skip to content

Conversation

AHeise
Copy link
Contributor

@AHeise AHeise commented Sep 30, 2025

KafkaEnumerator's state contains the TopicPartitions only but not the offsets, so it doesn't contain the full split state contrary to the design intent.

There are a couple of issues with that approach. It implicitly assumes that splits are fully assigned to readers before the first checkpoint. Else the enumerator will invoke the offset initializer again on recovery from such a checkpoint leading to inconsistencies (LATEST may be initialized during the first attempt for some partitions and initialized during second attempt for others).

Through addSplitBack callback, you may also get these scenarios later for BATCH which actually leads to duplicate rows (in case of EARLIEST or SPECIFIC-OFFSETS) or data loss (in case of LATEST). Finally, it's not possible to safely use KafkaSource as part of a HybridSource because the offset initializer cannot even be recreated on recovery.

All cases are solved by also retaining the offset in the enumerator state. To that end, this commit merges the async discovery phases to immediately initialize the splits from the partitions. Any subsequent checkpoint will contain the proper start offset.

KafkaEnumerator's state contains the TopicPartitions only but not the offsets, so it doesn't contain the full split state contrary to the design intent.

There are a couple of issues with that approach. It implicitly assumes that splits are fully assigned to readers before the first checkpoint. Else the enumerator will invoke the offset initializer again on recovery from such a checkpoint leading to inconsistencies (LATEST may be initialized during the first attempt for some partitions and initialized during second attempt for others).

Through addSplitBack callback, you may also get these scenarios later for BATCH which actually leads to duplicate rows (in case of EARLIEST or SPECIFIC-OFFSETS) or data loss (in case of LATEST). Finally, it's not possible to safely use KafkaSource as part of a HybridSource because the offset initializer cannot even be recreated on recovery.

All cases are solved by also retaining the offset in the enumerator state. To that end, this commit merges the async discovery phases to immediately initialize the splits from the partitions. Any subsequent checkpoint will contain the proper start offset.
topicPartitions.add(
new KafkaPartitionSplit(
new TopicPartition(TOPIC_PREFIX + readerId, partition),
STARTING_OFFSET));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. This is a very good improvement for the connector.
I noticed that the current test creates splits using the constant KafkaPartitionSplit.EARLIEST_OFFSET, would it make sense to add a test case that uses a real-world offset (e.g., 123)?

public void testAddSplitsBack() throws Throwable {
@ParameterizedTest
@EnumSource(StandardOffsetsInitializer.class)
public void testAddSplitsBack(StandardOffsetsInitializer offsetsInitializer) throws Throwable {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is my understanding correct that the test verifies that the offset is correctly recalculated on recovery, but doesn't verify that the original offset(before the failure) was preserved and restored?

@fapaul fapaul self-requested a review October 2, 2025 06:41
Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks mostly good left some inline comments

new SplitAndAssignmentStatus(
new KafkaPartitionSplit(
new TopicPartition(topic, partition),
DEFAULT_STARTING_OFFSET),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a behavioral change? Previously the unassigned split would get the starting offset configured by the user on reassignment.

private boolean noMoreNewPartitionSplits = false;
// this flag will be marked as true if initial partitions are discovered after enumerator starts
private boolean initialDiscoveryFinished;
private volatile boolean initialDiscoveryFinished;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why volatile is needed here? Afaik we don't access it from a different thread than before.

* @param fetchedPartitions Map from topic name to its description
* @param t Exception in worker thread
*/
private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions, Throwable t) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It would have been good to have a small refactoring commit to simplify the method calls it makes the reviewer easier :)

assertThat(expectedAssignmentsForReader)
.contains(split.getTopicPartition());
assertThat(actualAssignments).containsOnlyKeys(expectedAssignments.keySet());
SoftAssertions.assertSoftly(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a big fan of soft assertions but this pattern with the lambda I think is "okay". It's just a dangerous precedent when folks start using the the version that requires explicitly calling assertAll() and we silently disable all exceptions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants