Skip to content

Commit 5b0aa37

Browse files
Refactor state management in HashJoinExec and use CASE expressions for more precise filters (#18451)
## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in #17171. A "target state" is tracked in #18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - #18448 - #18449 (depends on #18448) - (This PR): #18451 ## Changes in this PR This PR refactors state management in HashJoinExec to make filter pushdown more efficient and prepare for pushing down membership tests. - Refactor internal data structures to clean up state management and make usage more idiomatic (use `Option` instead of comparing integers, etc.) - Uses CASE expressions to evaluate pushed-down filters selectively by partition Example: `CASE hash_repartition % N WHEN partition_id THEN condition ELSE false END` --------- Co-authored-by: Lía Adriana <lia.castaneda@datadoghq.com>
1 parent f9231fc commit 5b0aa37

File tree

8 files changed

+796
-239
lines changed

8 files changed

+796
-239
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 285 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::sync::{Arc, LazyLock};
1919

2020
use arrow::{
21-
array::record_batch,
21+
array::{record_batch, Float64Array, Int32Array, RecordBatch, StringArray},
2222
datatypes::{DataType, Field, Schema, SchemaRef},
2323
util::pretty::pretty_format_batches,
2424
};
@@ -278,7 +278,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
278278
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
279279
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
280280
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
281-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
281+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= aa AND d@0 <= ab ELSE false END ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
282282
"
283283
);
284284
}
@@ -1305,7 +1305,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
13051305
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
13061306
- CoalesceBatchesExec: target_batch_size=8192
13071307
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1308-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
1308+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ELSE false END ]
13091309
"
13101310
);
13111311

@@ -1322,7 +1322,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
13221322
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
13231323
- CoalesceBatchesExec: target_batch_size=8192
13241324
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1325-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
1325+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 0 THEN a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ELSE false END ]
13261326
"
13271327
);
13281328

@@ -1667,8 +1667,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
16671667
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
16681668
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
16691669
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
1670-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab ]
1671-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb ]
1670+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN b@0 >= aa AND b@0 <= ab ELSE false END ]
1671+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= ca AND d@0 <= cb ELSE false END ]
16721672
"
16731673
);
16741674
}
@@ -2330,3 +2330,282 @@ fn test_pushdown_with_computed_grouping_key() {
23302330
"
23312331
);
23322332
}
2333+
2334+
#[tokio::test]
2335+
async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
2336+
use datafusion_common::JoinType;
2337+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
2338+
2339+
// Test scenario where all build-side partitions are empty
2340+
// This validates the code path that sets the filter to `false` when no rows can match
2341+
2342+
// Create empty build side
2343+
let build_batches = vec![];
2344+
let build_side_schema = Arc::new(Schema::new(vec![
2345+
Field::new("a", DataType::Utf8, false),
2346+
Field::new("b", DataType::Utf8, false),
2347+
]));
2348+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
2349+
.with_support(true)
2350+
.with_batches(build_batches)
2351+
.build();
2352+
2353+
// Create probe side with some data
2354+
let probe_batches = vec![record_batch!(
2355+
("a", Utf8, ["aa", "ab", "ac"]),
2356+
("b", Utf8, ["ba", "bb", "bc"])
2357+
)
2358+
.unwrap()];
2359+
let probe_side_schema = Arc::new(Schema::new(vec![
2360+
Field::new("a", DataType::Utf8, false),
2361+
Field::new("b", DataType::Utf8, false),
2362+
]));
2363+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
2364+
.with_support(true)
2365+
.with_batches(probe_batches)
2366+
.build();
2367+
2368+
// Create RepartitionExec nodes for both sides
2369+
let partition_count = 4;
2370+
2371+
let build_hash_exprs = vec![
2372+
col("a", &build_side_schema).unwrap(),
2373+
col("b", &build_side_schema).unwrap(),
2374+
];
2375+
let build_repartition = Arc::new(
2376+
RepartitionExec::try_new(
2377+
build_scan,
2378+
Partitioning::Hash(build_hash_exprs, partition_count),
2379+
)
2380+
.unwrap(),
2381+
);
2382+
let build_coalesce = Arc::new(CoalesceBatchesExec::new(build_repartition, 8192));
2383+
2384+
let probe_hash_exprs = vec![
2385+
col("a", &probe_side_schema).unwrap(),
2386+
col("b", &probe_side_schema).unwrap(),
2387+
];
2388+
let probe_repartition = Arc::new(
2389+
RepartitionExec::try_new(
2390+
Arc::clone(&probe_scan),
2391+
Partitioning::Hash(probe_hash_exprs, partition_count),
2392+
)
2393+
.unwrap(),
2394+
);
2395+
let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192));
2396+
2397+
// Create HashJoinExec
2398+
let on = vec![
2399+
(
2400+
col("a", &build_side_schema).unwrap(),
2401+
col("a", &probe_side_schema).unwrap(),
2402+
),
2403+
(
2404+
col("b", &build_side_schema).unwrap(),
2405+
col("b", &probe_side_schema).unwrap(),
2406+
),
2407+
];
2408+
let hash_join = Arc::new(
2409+
HashJoinExec::try_new(
2410+
build_coalesce,
2411+
probe_coalesce,
2412+
on,
2413+
None,
2414+
&JoinType::Inner,
2415+
None,
2416+
PartitionMode::Partitioned,
2417+
datafusion_common::NullEquality::NullEqualsNothing,
2418+
)
2419+
.unwrap(),
2420+
);
2421+
2422+
let plan =
2423+
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;
2424+
2425+
// Apply the filter pushdown optimizer
2426+
let mut config = SessionConfig::new();
2427+
config.options_mut().execution.parquet.pushdown_filters = true;
2428+
let optimizer = FilterPushdown::new_post_optimization();
2429+
let plan = optimizer.optimize(plan, config.options()).unwrap();
2430+
2431+
insta::assert_snapshot!(
2432+
format_plan_for_test(&plan),
2433+
@r"
2434+
- CoalesceBatchesExec: target_batch_size=8192
2435+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
2436+
- CoalesceBatchesExec: target_batch_size=8192
2437+
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
2438+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
2439+
- CoalesceBatchesExec: target_batch_size=8192
2440+
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
2441+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
2442+
"
2443+
);
2444+
2445+
// Put some data through the plan to check that the filter is updated to reflect the TopK state
2446+
let session_ctx = SessionContext::new_with_config(config);
2447+
session_ctx.register_object_store(
2448+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
2449+
Arc::new(InMemory::new()),
2450+
);
2451+
let state = session_ctx.state();
2452+
let task_ctx = state.task_ctx();
2453+
// Execute all partitions (required for partitioned hash join coordination)
2454+
let _batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
2455+
.await
2456+
.unwrap();
2457+
2458+
// Test that filters are pushed down correctly to each side of the join
2459+
insta::assert_snapshot!(
2460+
format_plan_for_test(&plan),
2461+
@r"
2462+
- CoalesceBatchesExec: target_batch_size=8192
2463+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
2464+
- CoalesceBatchesExec: target_batch_size=8192
2465+
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
2466+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
2467+
- CoalesceBatchesExec: target_batch_size=8192
2468+
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
2469+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ false ]
2470+
"
2471+
);
2472+
}
2473+
2474+
#[tokio::test]
2475+
async fn test_hashjoin_dynamic_filter_with_nulls() {
2476+
use datafusion_common::JoinType;
2477+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
2478+
2479+
// Test scenario where build side has NULL values in join keys
2480+
// This validates NULL handling in bounds computation and filter generation
2481+
2482+
// Create build side with NULL values
2483+
let build_batch = RecordBatch::try_new(
2484+
Arc::new(Schema::new(vec![
2485+
Field::new("a", DataType::Utf8, true), // nullable
2486+
Field::new("b", DataType::Int32, true), // nullable
2487+
])),
2488+
vec![
2489+
Arc::new(StringArray::from(vec![Some("aa"), None, Some("ab")])),
2490+
Arc::new(Int32Array::from(vec![Some(1), Some(2), None])),
2491+
],
2492+
)
2493+
.unwrap();
2494+
let build_batches = vec![build_batch];
2495+
let build_side_schema = Arc::new(Schema::new(vec![
2496+
Field::new("a", DataType::Utf8, true),
2497+
Field::new("b", DataType::Int32, true),
2498+
]));
2499+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
2500+
.with_support(true)
2501+
.with_batches(build_batches)
2502+
.build();
2503+
2504+
// Create probe side with nullable fields
2505+
let probe_batch = RecordBatch::try_new(
2506+
Arc::new(Schema::new(vec![
2507+
Field::new("a", DataType::Utf8, true),
2508+
Field::new("b", DataType::Int32, true),
2509+
Field::new("c", DataType::Float64, false),
2510+
])),
2511+
vec![
2512+
Arc::new(StringArray::from(vec![
2513+
Some("aa"),
2514+
Some("ab"),
2515+
Some("ac"),
2516+
None,
2517+
])),
2518+
Arc::new(Int32Array::from(vec![Some(1), Some(3), Some(4), Some(5)])),
2519+
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
2520+
],
2521+
)
2522+
.unwrap();
2523+
let probe_batches = vec![probe_batch];
2524+
let probe_side_schema = Arc::new(Schema::new(vec![
2525+
Field::new("a", DataType::Utf8, true),
2526+
Field::new("b", DataType::Int32, true),
2527+
Field::new("c", DataType::Float64, false),
2528+
]));
2529+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
2530+
.with_support(true)
2531+
.with_batches(probe_batches)
2532+
.build();
2533+
2534+
// Create HashJoinExec in CollectLeft mode (simpler for this test)
2535+
let on = vec![
2536+
(
2537+
col("a", &build_side_schema).unwrap(),
2538+
col("a", &probe_side_schema).unwrap(),
2539+
),
2540+
(
2541+
col("b", &build_side_schema).unwrap(),
2542+
col("b", &probe_side_schema).unwrap(),
2543+
),
2544+
];
2545+
let hash_join = Arc::new(
2546+
HashJoinExec::try_new(
2547+
build_scan,
2548+
Arc::clone(&probe_scan),
2549+
on,
2550+
None,
2551+
&JoinType::Inner,
2552+
None,
2553+
PartitionMode::CollectLeft,
2554+
datafusion_common::NullEquality::NullEqualsNothing,
2555+
)
2556+
.unwrap(),
2557+
);
2558+
2559+
let plan =
2560+
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;
2561+
2562+
// Apply the filter pushdown optimizer
2563+
let mut config = SessionConfig::new();
2564+
config.options_mut().execution.parquet.pushdown_filters = true;
2565+
let optimizer = FilterPushdown::new_post_optimization();
2566+
let plan = optimizer.optimize(plan, config.options()).unwrap();
2567+
2568+
insta::assert_snapshot!(
2569+
format_plan_for_test(&plan),
2570+
@r"
2571+
- CoalesceBatchesExec: target_batch_size=8192
2572+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
2573+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
2574+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
2575+
"
2576+
);
2577+
2578+
// Put some data through the plan to check that the filter is updated to reflect the TopK state
2579+
let session_ctx = SessionContext::new_with_config(config);
2580+
session_ctx.register_object_store(
2581+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
2582+
Arc::new(InMemory::new()),
2583+
);
2584+
let state = session_ctx.state();
2585+
let task_ctx = state.task_ctx();
2586+
// Execute all partitions (required for partitioned hash join coordination)
2587+
let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
2588+
.await
2589+
.unwrap();
2590+
2591+
// Test that filters are pushed down correctly to each side of the join
2592+
insta::assert_snapshot!(
2593+
format_plan_for_test(&plan),
2594+
@r"
2595+
- CoalesceBatchesExec: target_batch_size=8192
2596+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
2597+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
2598+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 ]
2599+
"
2600+
);
2601+
2602+
#[rustfmt::skip]
2603+
let expected = [
2604+
"+----+---+----+---+-----+",
2605+
"| a | b | a | b | c |",
2606+
"+----+---+----+---+-----+",
2607+
"| aa | 1 | aa | 1 | 1.0 |",
2608+
"+----+---+----+---+-----+",
2609+
];
2610+
assert_batches_eq!(&expected, &batches);
2611+
}

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ pub struct TestOpener {
6060
impl FileOpener for TestOpener {
6161
fn open(&self, _partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
6262
let mut batches = self.batches.clone();
63+
if self.batches.is_empty() {
64+
return Ok((async { Ok(TestStream::new(vec![]).boxed()) }).boxed());
65+
}
6366
if let Some(batch_size) = self.batch_size {
6467
let batch = concat_batches(&batches[0].schema(), &batches)?;
6568
let mut new_batches = Vec::new();
@@ -337,11 +340,12 @@ impl TestStream {
337340
/// least one entry in data (for the schema)
338341
pub fn new(data: Vec<RecordBatch>) -> Self {
339342
// check that there is at least one entry in data and that all batches have the same schema
340-
assert!(!data.is_empty(), "data must not be empty");
341-
assert!(
342-
data.iter().all(|batch| batch.schema() == data[0].schema()),
343-
"all batches must have the same schema"
344-
);
343+
if let Some(first) = data.first() {
344+
assert!(
345+
data.iter().all(|batch| batch.schema() == first.schema()),
346+
"all batches must have the same schema"
347+
);
348+
}
345349
Self {
346350
data,
347351
..Default::default()

0 commit comments

Comments
 (0)