-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Refactor hash join dynamic filtering for progressive bounds application #17632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
a7da859
to
fadb4f6
Compare
cc @rkrishn7 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors DataFusion's hash join dynamic filtering to use progressive bounds application instead of waiting for all build-side partitions to complete, improving join performance by reducing probe-side scan overhead.
Key Changes:
- Eliminates barrier-based synchronization in favor of immediate progressive filtering
- Implements hash-based filter expressions to ensure correctness without coordination
- Optimizes final filter to remove hash computations when all partitions complete
Reviewed Changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
stream.rs |
Removes barrier synchronization, simplifies state machine by eliminating WaitPartitionBoundsReport state |
shared_bounds.rs |
Implements progressive filter logic with hash-based expressions and partition completion tracking |
physical-plan/Cargo.toml |
Adds dependency on datafusion-functions for hash function access |
functions/lib.rs |
Exposes new hash module for internal use |
hash.rs |
New hash function implementation using DataFusion's internal hash algorithm |
functions/Cargo.toml |
Adds ahash dependency for consistent hashing |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
Ok(Arc::new(ScalarFunctionExpr::new( | ||
"hash", | ||
hash_udf, | ||
self.on_right.clone(), | ||
Field::new("hash_result", DataType::UInt64, false).into(), | ||
Arc::new(ConfigOptions::default()), | ||
))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a new ConfigOptions::default()
wrapped in Arc
for every hash expression is inefficient. Consider caching this value as a field in SharedBoundsAccumulator
to avoid repeated allocations.
Copilot uses AI. Check for mistakes.
datafusion/functions/src/hash.rs
Outdated
pub fn new() -> Self { | ||
// Use the same seed as hash joins for consistency | ||
let random_state = | ||
RandomState::with_seeds('H' as u64, 'A' as u64, 'S' as u64, 'H' as u64); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded seed values ('H', 'A', 'S', 'H') should be defined as constants to ensure consistency across the codebase and make them easier to maintain. Consider defining these as module-level constants.
Copilot uses AI. Check for mistakes.
datafusion/functions/src/hash.rs
Outdated
#[user_doc( | ||
doc_section(label = "Hashing Functions"), | ||
description = "Computes hash values for one or more arrays using DataFusion's internal hash algorithm. When multiple arrays are provided, their hash values are combined using the same logic as multi-column joins.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My feeling is that long term we should just switch from using ahash to murmurhash or something so that it is (1) not machine specific (works for distributed use cases, pre-computed hash columns, etc.) and (2) we can specify the algorithm used instead of a generic "internal hash"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think switching to something like xxhash64 would be ideal. Suits our needs and is stable/reusable for distributed cases + i'm pretty sure its faster than murmur (not sure about ahash). I'm aware that https://github.com/DoumanAsh/xxhash-rust is used by Polars so looking into this repo could be a good place to start.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @Dandandan did some initial work on hashing a few years ago, so may remember why we went with ahash vs others
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahash probably is faster (didn't benchmark it TBH against all alternatives, but it had a lot of benchmarks showing it is faster than xxhash / murmurhash, and gave good speedup compared to SipHash which was used before it via HashMap
).
Maybe https://github.com/orlp/foldhash/ is a good candidate for an alternative speed-wise (is now used by hashbrown instead of aHash as default).
There is something to say for keeping it documented as "internal hash" as well, as it will allow to change it without breaking the API in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also found https://github.com/hoxxep/rapidhash in the meantime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really great idea!
Ok(Arc::new(ScalarFunctionExpr::new( | ||
"hash", | ||
hash_udf, | ||
self.on_right.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we want to specify the RandomState
here?
The notion of the "correct" partition (and thus if the bounds are relevant) occurs downstream of re-partitioning, since the build side builds hash tables and independent bounds according to these partitions.
So I would think we would want to specify the same random state as the repartition operator to ensure that the hash(...) % n != partition_id
portion returns the right result, right? Otherwise we may potentially evaluate incorrect bounds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! I'll fix it and think about how we can add a test
This commit addresses a critical correctness issue identified in PR review where the hash function used in filter expressions had different RandomState seeds than the RepartitionExec operator, potentially causing incorrect partition assignments and filtering. ## Key Fixes **RandomState Consistency**: - Use RepartitionExec's RandomState (0,0,0,0) instead of custom seeds ('H','A','S','H') - Ensures `hash(row) % num_partitions` produces same partition assignments as actual partitioning - Prevents false negatives in progressive filtering logic **Performance Optimization**: - Cache ConfigOptions in SharedBoundsAccumulator to avoid repeated allocations - Single Arc<ConfigOptions> shared across all hash expression creations **Code Quality**: - Improved comment clarity replacing outdated "HEAD" references - Better documentation of deduplication logic ## Why This Matters Without matching RandomState seeds, our progressive filter expressions could: - Incorrectly filter out valid join candidates (correctness issue) - Allow through data that doesn't belong to a partition (performance issue) - Break the fundamental assumption that hash-based filtering matches partitioning Resolves: apache#17632 (comment) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
8e3023f
to
b63bb3e
Compare
cc @gabotechs |
I think one thing to note with this approach is that we are running the hash function up to I wonder if we should wait until we can memoize non-volatile function calls with the same arguments within a filter? (which I think is tracked by #17599). |
It shouldn't be too bad: without filters you'd still have to run the hash function once in RepartitionExec and another hash function in HashJoinExec. So we're running 2 times instead of 3 for rows that match the filter. For rows that are pruned we run it 1 time instead of 2. And that's only until all of the build sides are done, then we may run it 0 times. |
But we should run some benchmarks. |
The |
Is |
} | ||
|
||
// Combine all column predicates for this partition with AND | ||
if column_predicates.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we return lit(true)
here it will nullify the filter once everything is or
ed together in the final phase. Maybe we can pass back an Option and skip the partition filter that returns None
I think we should add a doc explaining how each function leads into the next. Makes it easier for reviewers or people working on this code to make changes. |
Simple solution: use a |
This article may be helpful in selecting the hash - https://medium.com/@tprodanov/benchmarking-non-cryptographic-hash-functions-in-rust-2e6091077d11 and this doc https://docs.google.com/spreadsheets/d/1HmqDj-suH4wBFNg7etwE8WVBlfCufvD5-gAnIENs94k/edit?gid=1915335726#gid=1915335726 |
I will try and review this PR tomorrow |
I'm going to rename the |
/// RandomState used by RepartitionExec for consistent hash partitioning | ||
/// This must match the seeds used in RepartitionExec to ensure our hash-based | ||
/// filter expressions compute the same partition assignments as the actual partitioning | ||
const REPARTITION_RANDOM_STATE: RandomState = RandomState::with_seeds(0, 0, 0, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this available in the repartition physical plan module and re-use it from there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've done this. In part because it was also necessary to test this change to be able to have deterministic partitioning (i.e. given the input values I know some will end up in each partition). That change will also allow an optimizer rule to swap out hash functions without us having to quibble about which one to choose.
} | ||
|
||
/// Create final optimized filter from all partition bounds using OR logic | ||
pub(crate) fn create_optimized_filter_from_partition_bounds( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see it being worthwhile to skip this step. For example, it ties in nicely with #17529 where we can just push the hash comparison physical expr as a partition dependent predicate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it's always worth having stats based predicates e.g. because they can be used for stats pruning of files / row groups / pages whereas hash tables / bloom filters cannot (not sure about IN LIST
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed I just mean the part where we remove the hash check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point is that #17632 will need the hash check anyway? I guess what we'd probably end up doing is something like ((col >= 1 and col <= 2) or (col >= 2 and col <= 5)) and (case hash(col) % n == 0 when 0 then <check hash map 0> when 1 <check hash map 1> else true end)
that is keep the hash check for the hash table lookups but remove it for the min max stats so that those become "simple" expressions that can be used by PruningPredicate and such.
If it's solely for internal use at the moment I wonder if it's better to just make it a private |
I made it a |
This commit refactors the hash join dynamic filtering implementation to apply filters progressively as each build-side partition completes, rather than waiting for all partitions to finish. This reduces probe-side scan overhead and improves performance. - **Immediate filtering**: Filters are applied as soon as each partition completes - **Hash-based expressions**: Uses `(hash(cols) % num_partitions != partition_id OR bounds)` to ensure correctness without waiting for all partitions - **Optimization**: Removes hash checks when all partitions complete - **SharedBoundsAccumulator**: Removed barrier-based coordination, added progressive filter injection - **HashJoinStream**: Removed WaitPartitionBoundsReport state, made bounds reporting synchronous - **Filter expressions**: Progressive phase uses AND-combined hash filters, final phase uses OR-combined bounds The hash-based filter expression ensures no false negatives: - Data for completed partitions passes through via exact bounds checks - Data for incomplete partitions passes through via hash modulo checks - Progressive improvement in filter selectivity as more partitions complete Maintains correctness while enabling immediate probe-side filtering. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit addresses a critical correctness issue identified in PR review where the hash function used in filter expressions had different RandomState seeds than the RepartitionExec operator, potentially causing incorrect partition assignments and filtering. ## Key Fixes **RandomState Consistency**: - Use RepartitionExec's RandomState (0,0,0,0) instead of custom seeds ('H','A','S','H') - Ensures `hash(row) % num_partitions` produces same partition assignments as actual partitioning - Prevents false negatives in progressive filtering logic **Performance Optimization**: - Cache ConfigOptions in SharedBoundsAccumulator to avoid repeated allocations - Single Arc<ConfigOptions> shared across all hash expression creations **Code Quality**: - Improved comment clarity replacing outdated "HEAD" references - Better documentation of deduplication logic ## Why This Matters Without matching RandomState seeds, our progressive filter expressions could: - Incorrectly filter out valid join candidates (correctness issue) - Allow through data that doesn't belong to a partition (performance issue) - Break the fundamental assumption that hash-based filtering matches partitioning Resolves: apache#17632 (comment) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
fb6a78e
to
db2ac80
Compare
@alamb when you review, would you mind kicking off some join benchmarks? thank you |
@@ -0,0 +1,154 @@ | |||
// Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes (a good chunk of this PR) are split out into #17648
drop(inner); | ||
|
||
// Update the dynamic filter | ||
if let Some(filter_expr) = filter_expr { | ||
self.dynamic_filter.update(filter_expr)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The manual drop(guard)
before self.dynamic_filter.update()
may to create a race condition.
This opens a window where a final, optimized filter calculated by one thread could be overwritten by filter generated in progressive phase from another thread that wins the race to call update().
+----------------------------------------------------------------------------------+
Time | Thread 1 | Thread 2 | Thread 3
| | | |
V | self.inner.lock() [ACQUIRED] | |
| +-> Calculates filter_expr_1 | |
| drop(guard) [RELEASED] | |
| | |
| | self.inner.lock() [ACQUIRED] |
| | +-> Calculates filter_expr_2 |
| | drop(guard) [RELEASED] |
| | |
| | | |
| | | |
V | self.dynamic_filter.update(...) | | self.inner.lock() [ACQUIRED]
| | | | |
| +-> Acquires write lock | | |
| | | +-> Calculates filter_expr_3
| | | (The **optimal** one)
| | |
| | |
| +-> Writes filter_1 to state | |
| (Current State: filter_1) | |
| | |
| +-> Releases write lock | | drop(guard) [RELEASED]
| | |
| +----------------------------------------------------------------------------------+
V | |
| Thread 1's update is complete. Now Thread 2 and 3 race to apply their updates. |
+----------------------------------------------------------------------------------+
| | | |
V | | | self.dynamic_filter.update(...)
| | | |
| | | +-> Acquires write lock
| | | +-> Writes filter_3
| | | (Current State: filter_3)
| | | +-> Releases write lock
| | |
| | self.dynamic_filter.update(...) |
| | | |
| | +-> Acquires write lock |
| | +-> Writes filter_2 (stale) |
| | (Current State: filter_2) |
| | +-> Releases write lock |
| | |
| | | |
V +----------------------------------------------------------------------------------+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will try to look and address next week
Hi folks, thanks for looking at this. I need to do some work on this to make it a stacked PR on top of #17648. I also already have some feedback to address. I will be at a team offsite next week, but I hope to do some work on it Sunday at the airport. |
🤖 |
🤖: Benchmark completed Details
|
👀 |
Several other improvements in the tpch benches it looks like. Promising if true! I've been flat out at a company offsite this past week and will probably need some time next week to get back into the swing of things. But I promise I'll come back here, revisit, break up into smaller PRs, etc. It will be nice in the end I think. |
Didn't look at the PR details yet, but why did aggregation benchmarks improve as well (clickbench)? |
Summary
This PR refactors the DataFusion hash join dynamic filtering implementation to support progressive bounds application instead of waiting for all build-side partitions to complete. This optimization reduces probe-side scan overhead and improves join performance.
Background
Previously, the dynamic filtering system used a barrier-based approach that waited for ALL build-side partitions to complete before applying any filters. This caused unnecessary delays in probe-side filtering.
Key Innovation
The refactored approach uses hash-based filter expressions that ensure correctness without coordination:
Why This Works
(col >= min_N AND col <= max_N)
correctly filters(hash(cols) % num_partitions != N)
lets all potential matches pass throughChanges Made
Core Components
SharedBoundsAccumulator (
shared_bounds.rs
):tokio::sync::Barrier
coordinationHashJoinStream (
stream.rs
):WaitPartitionBoundsReport
stateHash Function (
hash.rs
):Filter Expression Evolution
(hash(cols) % n != partition_id OR bounds_partition)
(bounds_0 OR bounds_1 OR ...)
Performance Benefits
Testing
Test plan
🤖 Generated with Claude Code