Skip to content

Conversation

@yangzhg
Copy link
Collaborator

@yangzhg yangzhg commented Dec 22, 2025

What problem does this PR solve?

Issue Number: close #51

Type of Change

  • 🐛 Bug fix (non-breaking change which fixes an issue)
  • ✨ New feature (non-breaking change which adds functionality)
  • 🚀 Performance improvement (optimization)
  • ⚠️ Breaking change (fix or feature that would cause existing functionality to change)
  • 🔨 Refactoring (no logic changes)
  • 🔧 Build/CI or Infrastructure changes
  • 📝 Documentation only

Description

Summary:

This PR introduces an optimization for Hash Join in Array mode. When the build side has low cardinality but a high number of rows (high duplication factor), the linked lists in the hash table often point to scattered memory locations in the RowContainer. This causes severe cache thrashing during the probe phase.

This change adds a tryRecluster step in HashTable::prepareJoinTable. If the duplication ratio exceeds a threshold, it creates a new, sorted RowContainer where rows with the same key are stored contiguously.

Implementation Details:

Added QueryConfig for enable_hash_join_array_recluster.
Implemented reclusterDataByKey in HashTable or by sort.
Implemented RowContainer::cloneByOrder to deep copy rows into a new container based on the sorted order.
Added logic to estimate Probe side row count to decide if optimization costs are justified.

Performance Impact

  • No Impact: This change does not affect the critical path (e.g., build system, doc, error handling).
  • Positive Impact: I have run benchmarks.
Click to view Benchmark Results

Performance Results (TPC-DS 100G):

Environment:

spark.master                     local[32]
spark.executor.memoryOverhead    5g
spark.driver.maxResultSize       2g
spark.driver.memory              40g
spark.executor.memory            40g
spark.memory.offHeap.enabled     true
spark.memory.offHeap.size        30g
spark.gluten.memory.task.offHeap.size.in.bytes 102400
spark.gluten.memory.offHeap.size.in.bytes 1024000
spark.executor.instances         1
spark.executor.cores             1

BF config:

spark.sql.optimizer.runtime.bloomFilter.enabled true
spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold 0

Pre-sorted: Sort inventory by inv_item_sk
Target Query: Q72 (High duplication, low cardinality join).

Scenario Optimization Time (s) Improvement
BF Disabled Baseline 2112.70 -
BF Disabled New (Recluster) 620.66 3.4x
BF Enabled Baseline 20.18 -
BF Enabled New (Recluster) 20.30 Neutral
Pre-sorted Input Baseline 630.44 -
Pre-sorted Input New (Recluster) 496.33 ~1.27x

For simple query on tpcds 100G

SELECT count(*) total_cnt
FROM   catalog_sales
       JOIN inventory
         ON cs_item_sk = inv_item_sk; 

Baseline: 1711.516s
New: 178.003s

Analysis:

  • Significant Gain: When Bloom Filter is disabled (or theoretically when filter selectivity is low), the optimization reduces execution time by 3.4x. This confirms the bottleneck was indeed random memory access.
  • Neutral with BF: With aggressive Runtime Bloom Filters, the probe phase is skipped for most rows, hiding the hash table lookup latency. However, this optimization makes the operator much more robust.
  • With Pre Sort: Even when inputs are pre-sorted (Pre-sorted Input case), strictly reclustering in the RowContainer yields an additional ~27% speedup (630s -> 496s) because it guarantees physical compactness better than the upstream iterator order (which might be chunked).
  • Negative Impact: Explained below (e.g., trade-off for correctness).

Release Note

Please describe the changes in this PR

Release Note:

Release Note:
Introduces an optimization for Hash Join in Array mode. When the build side has low cardinality but a high number of rows (high duplication factor), the linked lists in the hash table often point to scattered memory locations in the RowContainer. This causes severe cache thrashing during the probe phase.

Checklist (For Author)

  • I have added/updated unit tests (ctest).
  • I have verified the code with local build (Release/Debug).
  • I have run clang-format / linters.
  • (Optional) I have run Sanitizers (ASAN/TSAN) locally for complex C++ changes.
  • No need to test or manual test.

Breaking Changes

  • No
  • Yes (Description: ...)
Click to view Breaking Changes
```text
Breaking Changes:
- Description of the breaking change.
- Possible solutions or workarounds.
- Any other relevant information.
```

@yangzhg yangzhg force-pushed the opt_array_hash_join branch 2 times, most recently from 5268135 to 9b97269 Compare December 22, 2025 11:21
@yangzhg yangzhg force-pushed the opt_array_hash_join branch 3 times, most recently from 3f5e8a9 to fe7a1cd Compare December 23, 2025 11:39
Comment on lines +407 to +427
if (typeKind == TypeKind::ROW || typeKind == TypeKind::ARRAY ||
typeKind == TypeKind::MAP) {
auto sourceView = valueAt<std::string_view>(sourceRow, col.offset());
if (!sourceView.empty()) {
RowSizeTracker tracker(
targetRow[rowSizeOffset_], targetStringAllocator);
targetStringAllocator.copyMultipart(
StringView(sourceView.data(), sourceView.size()),
targetRow,
col.offset());
}
} else if (
typeKind == TypeKind::VARCHAR || typeKind == TypeKind::VARBINARY) {
StringView sourceView = valueAt<StringView>(sourceRow, col.offset());
if (!sourceView.isInline()) {
RowSizeTracker tracker(
targetRow[rowSizeOffset_], targetStringAllocator);
targetStringAllocator.copyMultipart(
sourceView, targetRow, col.offset());
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no deed to copy too large string? as the internal memory may also be discontinous

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also considering skipping the reordering/copying for large string keys.

For large strings, the performance bottleneck during the probe phase shifts from row locality to memory bandwidth and the cost of string comparison. Even if we physically reorder the rows, loading large strings into the cache effectively 'pollutes' the cache lines anyway, offering diminishing returns on locality. Therefore, the cost of copying these huge strings during the build phase likely outweighs the minimal gain in the probe phase.

if (adaptive_ && allowPreload_.load()) {
allowPreload_ = false;
LOG(WARNING) << "Disallow scan preload due to limited memory";
LOG(INFO) << "Disallow scan preload due to limited memory";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disallowing preload is unexpected, so I preperf waning msg.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log appears excessively when running queries like TPC-DS in CLI. Using WARNING causes significant noise in the CLI (especially in spark-sql/shell).
I switched to INFO to prevent it from polluting the console output, while ensuring it is still recorded in the log files.

Comment on lines +1855 to +1857
} else if (
reclusterConfig_.reclusterMode ==
HashTableReclusterConfig::ReclusterMode::kHash) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try to reduce memory cost?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the goal isn't to save memory. Both methods are designed to improve cache locality during the probe phase by physically reordering the RowContainer.

The sort-based method guarantees that identical keys are stored continuously, but it introduces significant overhead about 7x slower than hash.
The hash-based method is a trade-off: while it doesn't guarantee strict continuity like sorting, it effectively clusters similar keys with much lower computational overhead. It provides a better balance between build time and probe performance.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean it needs to reduce memory cost here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inplace recluster is much slower than copy the clustered data and then replace rowcontainer

@yangzhg yangzhg added enhancement New feature or request performance performance improvement needed labels Jan 9, 2026
return;
}

if (!reclusterConfig_.enableArrayRecluster) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this line to the beginning of this function?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@yangzhg yangzhg force-pushed the opt_array_hash_join branch from fe7a1cd to 43d0716 Compare January 14, 2026 02:31
@yangzhg yangzhg requested a review from kexianda January 15, 2026 10:46
…ve cache locality

Introduces an optimization for Hash Join in Array mode. When the build side has low cardinality but a high number of rows (high duplication factor), the linked lists in the hash table often point to scattered memory locations in the RowContainer. This causes severe cache thrashing during the probe phase.
@yangzhg yangzhg force-pushed the opt_array_hash_join branch from 43d0716 to bc48b0f Compare January 19, 2026 07:54
@yangzhg yangzhg requested review from fzhedu and zhangxffff January 19, 2026 07:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request performance performance improvement needed

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Performance] Improve HashJoin performance in Array Mode via RowContainer reclustering to reduce cache misses

4 participants