feat: implement hash join spill for memory pressure handling#23795
feat: implement hash join spill for memory pressure handling#23795aunjgr wants to merge 13 commits intomatrixorigin:mainfrom
Conversation
Implement spill-to-disk functionality for hash join operations to handle memory pressure by partitioning data into 32 buckets and writing to disk when memory threshold is exceeded. Key changes: - Add SpillThreshold configuration to HashBuild and HashJoin operators - Implement xxhash-based partitioning for consistent bucket distribution - Add buffering (8192 rows) before flushing to disk to reduce I/O overhead - Reuse existing probe logic for processing spilled buckets - Track spill statistics (SpillSize, SpillRows) in operator analyzer - Properly clean up spill files and batches to prevent memory leaks Implementation details: - HashBuild spills all batches when threshold exceeded, sends empty JoinMap with spill metadata to HashJoin - HashJoin creates matching probe spill files and partitions probe batches - After normal join completes, processes spilled buckets one at a time by rebuilding hashmap and probing in memory - Uses consistent xxhash computation across build and probe sides - Spill files use format: [count][size][batch_data][magic] for validation Performance optimizations: - Buffer 8192 rows per bucket before flushing to reduce syscalls - Reuse probe logic to avoid code duplication - Clean up batches immediately after processing to minimize memory usage This enables hash joins to handle datasets larger than available memory without OOM errors, with graceful degradation to disk-based processing.
Review Summary by QodoImplement hash join spill-to-disk for memory pressure handling
WalkthroughsDescription• Implement spill-to-disk for hash join operations handling memory pressure • Partition data into 32 buckets using xxhash with 8192-row buffering • Process spilled buckets by rebuilding hashmap and probing in memory • Track spill statistics (SpillSize, SpillRows) in operator analyzer • Add join_spill_mem system variable for configurable memory threshold Diagramflowchart LR
A["Build Phase"] -->|"Memory Threshold Exceeded"| B["Create 32 Spill Buckets"]
B -->|"Partition by xxhash"| C["Write to Disk with Buffering"]
C -->|"Send Empty JoinMap"| D["HashJoin Receives Spill Info"]
D -->|"Partition Probe Batches"| E["Create Probe Spill Files"]
E -->|"Process Each Bucket"| F["Rebuild Hashmap + Probe"]
F -->|"Output Results"| G["Final Join Output"]
File Changes1. pkg/sql/colexec/hashbuild/spill.go
|
Code Review by Qodo
1.
|
What type of PR is this?
Which issue(s) this PR fixes:
issue #3433 #23353
What this PR does / why we need it:
Implement spill-to-disk functionality for hash join operations to handle memory pressure by partitioning data into 32 buckets and writing to disk when memory threshold is exceeded.
Key changes:
Implementation details:
Performance optimizations:
This enables hash joins to handle datasets larger than available memory without OOM errors, with graceful degradation to disk-based processing.