Skip to content

KAFKA-10409: Refactor Kakfa Streams RocksDB Iterators #18610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from

Conversation

fonsdant
Copy link
Contributor

@fonsdant fonsdant commented Jan 18, 2025

Key refactorings:

  • Use RocksDBDualCFRangeIterator for DualColumnFamilyAccessor#all, since it iterate over all when passing null as from and to key ranges;
  • Create helper methods to:
    • loadNextKeys();
    • fetchNextKeyValue();
    • handleWithTimestampOnly();
    • handleNoTimestampOnly();
    • compareAndHandleKeys();
    • check if some key-value isInRange(); and
    • ensureOpen().
  • Rewrite methods with new helper methods:
    • makeNext();
    • next(); and
    • hasNext().
  • Improve members naming with:
    • noTimestampNext;
    • withTimestampNext;
    • noTimestampIterator; and
    • withTimestampIterator.
  • Sort members and method arguments alphanumerically;
  • Add documentation.

@github-actions github-actions bot added triage PRs from the community streams labels Jan 18, 2025
@fonsdant fonsdant force-pushed the kafka-10409/refactor-kafka-streams-rocksdb-iterators branch from c5f8d03 to f147674 Compare January 18, 2025 15:15
@fonsdant fonsdant force-pushed the kafka-10409/refactor-kafka-streams-rocksdb-iterators branch from f147674 to 3338e09 Compare January 18, 2025 15:18
Signed-off-by: Joao Pedro Fonseca Dantas <fonsdant@gmail.com>
Signed-off-by: Joao Pedro Fonseca Dantas <fonsdant@gmail.com>
@fonsdant fonsdant force-pushed the kafka-10409/refactor-kafka-streams-rocksdb-iterators branch from 3338e09 to 9a614c1 Compare January 18, 2025 19:06
@fonsdant fonsdant changed the title WIP KAFKA 10409: Refactor KAFKA Streams RocksDB Iterators KAFKA 10409: Refactor KAFKA Streams RocksDB Iterators Jan 18, 2025
@fonsdant fonsdant marked this pull request as ready for review January 18, 2025 19:09
@fonsdant fonsdant changed the title KAFKA 10409: Refactor KAFKA Streams RocksDB Iterators KAFKA 10409: Refactor Kakfa Streams RocksDB Iterators Jan 18, 2025
Copy link

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@fonsdant
Copy link
Contributor Author

Hi, @ableegoldman! Could you help me with this PR? :)

@ableegoldman ableegoldman changed the title KAFKA 10409: Refactor Kakfa Streams RocksDB Iterators KAFKA-10409: Refactor Kakfa Streams RocksDB Iterators Feb 18, 2025
@ableegoldman
Copy link
Member

Yep! Will take a look in the next day or so 🙂

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @fonsdant this is a great cleanup! I left some minor suggestions inline, I'm not super familiar with this part of the code so feel free to take it or leave it

Comment on lines +324 to +325
* <p>The iterator is thread-safe for sequential operations but should not be accessed concurrently from multiple
* threads without external synchronization.</p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this mean? doesn't that just mean it's not thread safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

* @throws NoSuchElementException If there are no more elements in the iterator.
*/
@Override
public Bytes peekNextKey() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing test coverage for this class is below average (e.g. there's not peekNextKey in the test) - could you add that?

cc @ableegoldman it may be good to refactor the existing tests alongside this one so that instead of one really big test we had individual tests for each piece of functionality so it's easier to confirm the test coverage is good

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In progress.

return super.hasNext();
protected KeyValue<Bytes, byte[]> makeNext() {
loadNextKeys();
if (noTimestampNext == null && withTimestampNext == null) return allDone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be a functionality change. Not sure if this is intentional or not but it looks like we drop checking the condition where the iterators are not valid. (In other words, there's a branch where after loadNextKeys(), noTimestampNext is still null but iterator.isValid().

Not sure what might cause this condition.

The old code:

            if (nextNoTimestamp == null && iterNoTimestamp.isValid()) {
                nextNoTimestamp = iterNoTimestamp.key();
            }

            if (nextWithTimestamp == null && iterWithTimestamp.isValid()) {
                nextWithTimestamp = iterWithTimestamp.key();
            }
            if (nextNoTimestamp == null && !iterNoTimestamp.isValid()) {
                if (nextWithTimestamp == null && !iterWithTimestamp.isValid()) {
                    return allDone();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this branch is not explicit, I think it is covered. In the old code, allDone is only called if both nextTimestamp are null and both are invalid. So this is already checked on previous branches (in the new code, on loadNextKeys). That is, if both next are null, in loadNextKeys: 1) if they are valid, they will get next keys and will no longer be null (and will not go into allDone when loadNextKeys is finished since they have become different from null); 2) if they are invalid, they will remain null and will go into allDone after loadNextKeys is finished.

Comment on lines 552 to 554
return forward
? comparison < 0 || (toInclusive && comparison == 0)
: comparison > 0 || (toInclusive && comparison == 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return forward
? comparison < 0 || (toInclusive && comparison == 0)
: comparison > 0 || (toInclusive && comparison == 0);
return (toInclusive && comparison == 0) || (forward ? comparison < 0 : comparison > 0);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

this.toInclusive = toInclusive;
this.withTimestampIterator = withTimestampIterator;

this.rawLastKey = initializeIterators(from, to);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally it's better for constructors not to have side effects - otherwise it's possible that if something throws an exception during instantiation the close method won't be called.

this doesn't seem to be a regression (the old code did that as well for the RangeIterator) so consider this comment an optional suggestion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@github-actions github-actions bot removed the triage PRs from the community label Feb 19, 2025
Signed-off-by: Joao Pedro Fonseca Dantas <fonsdant@gmail.com>
Signed-off-by: Joao Pedro Fonseca Dantas <fonsdant@gmail.com>
@fonsdant
Copy link
Contributor Author

fonsdant commented Mar 28, 2025

@agavra, thanks for reviewing! I have pushed some commits :)

Signed-off-by: Joao Pedro Fonseca Dantas <fonsdant@gmail.com>
Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM once the tests are in :) will make it easier to convince me that there's no functionality changes. Thanks for addressing the comments.

Copy link
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a quick look and I think it looks good, but as Almog mentioned the existing test coverage is seriously lacking so it'll be great to fill in some the tests here. I'll do a closer pass once you push the new tests 🙂

Copy link

github-actions bot commented Jul 3, 2025

This PR is being marked as stale since it has not had any activity in 90 days. If you
would like to keep this PR alive, please leave a comment asking for a review. If the PR has
merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions bot added the stale Stale PRs label Jul 3, 2025
@fonsdant
Copy link
Contributor Author

fonsdant commented Jul 11, 2025

@agavra, sorry for the delay, I am resuming the work. How could I check the coverage report for streams only? According on readme, I can run gradle streams:reportCoverage, but when I do this, I get: * What went wrong: Cannot locate tasks that match 'streams:reportCoverage' as task 'reportCoverage' not found in project ':streams'.

P.S.: I was missing -PenableTestCoverage=true, I got it :)

@github-actions github-actions bot removed the stale Stale PRs label Jul 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants