Skip to content

Commit 374da4a

Browse files
committed
refactor old hash join
1 parent 7836cc0 commit 374da4a

File tree

5 files changed

+338
-66
lines changed

5 files changed

+338
-66
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use parking_lot::RwLock;
5959

6060
use super::concat_buffer::ConcatBuffer;
6161
use super::desc::RuntimeFilterDesc;
62+
use super::runtime_filter::JoinRuntimeFilterPacket;
6263
use crate::pipelines::memory_settings::MemorySettingsExt;
6364
use crate::pipelines::processors::transforms::hash_join::common::wrap_true_validity;
6465
use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_FALSE;
@@ -104,6 +105,7 @@ pub struct HashJoinBuildState {
104105
pub(crate) concat_buffer: Mutex<ConcatBuffer>,
105106
pub(crate) broadcast_id: Option<u32>,
106107
pub(crate) is_runtime_filter_added: AtomicBool,
108+
runtime_filter_packets: Mutex<Vec<JoinRuntimeFilterPacket>>,
107109
}
108110

109111
impl HashJoinBuildState {
@@ -154,6 +156,7 @@ impl HashJoinBuildState {
154156
concat_buffer: Mutex::new(ConcatBuffer::new(concat_threshold)),
155157
broadcast_id,
156158
is_runtime_filter_added: AtomicBool::new(false),
159+
runtime_filter_packets: Mutex::new(Vec::new()),
157160
}))
158161
}
159162

@@ -875,6 +878,15 @@ impl HashJoinBuildState {
875878
&self.hash_join_state.hash_join_desc.runtime_filter.filters
876879
}
877880

881+
pub fn add_runtime_filter_packet(&self, packet: JoinRuntimeFilterPacket) {
882+
self.runtime_filter_packets.lock().push(packet);
883+
}
884+
885+
pub fn take_runtime_filter_packets(&self) -> Vec<JoinRuntimeFilterPacket> {
886+
let mut guard = self.runtime_filter_packets.lock();
887+
guard.drain(..).collect()
888+
}
889+
878890
/// only used for test
879891
pub fn get_enable_bloom_runtime_filter(&self) -> bool {
880892
self.hash_join_state

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs

Lines changed: 9 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -12,64 +12,22 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::atomic::Ordering;
1615
use std::time::Instant;
1716

1817
use databend_common_exception::Result;
19-
use databend_common_expression::DataBlock;
2018
use databend_common_storages_fuse::TableContext;
2119

2220
use super::convert::build_runtime_filter_infos;
2321
use super::global::get_global_runtime_filter_packet;
24-
use crate::pipelines::processors::transforms::build_runtime_filter_packet;
22+
use crate::pipelines::processors::transforms::JoinRuntimeFilterPacket;
2523
use crate::pipelines::processors::HashJoinBuildState;
2624

2725
pub async fn build_and_push_down_runtime_filter(
28-
build_chunks: &[DataBlock],
29-
build_num_rows: usize,
26+
mut packet: JoinRuntimeFilterPacket,
3027
join: &HashJoinBuildState,
3128
) -> Result<()> {
3229
let overall_start = Instant::now();
3330

34-
let is_spill_happened = join.hash_join_state.need_next_round.load(Ordering::Acquire)
35-
|| join
36-
.hash_join_state
37-
.is_spill_happened
38-
.load(Ordering::Acquire);
39-
40-
let inlist_threshold = join
41-
.ctx
42-
.get_settings()
43-
.get_inlist_runtime_filter_threshold()? as usize;
44-
let bloom_threshold = join
45-
.ctx
46-
.get_settings()
47-
.get_bloom_runtime_filter_threshold()? as usize;
48-
let min_max_threshold = join
49-
.ctx
50-
.get_settings()
51-
.get_min_max_runtime_filter_threshold()? as usize;
52-
let selectivity_threshold = join
53-
.ctx
54-
.get_settings()
55-
.get_join_runtime_filter_selectivity_threshold()?;
56-
57-
let build_start = Instant::now();
58-
let mut packet = build_runtime_filter_packet(
59-
build_chunks,
60-
build_num_rows,
61-
join.runtime_filter_desc(),
62-
&join.func_ctx,
63-
inlist_threshold,
64-
bloom_threshold,
65-
min_max_threshold,
66-
selectivity_threshold,
67-
is_spill_happened,
68-
)?;
69-
let build_time = build_start.elapsed();
70-
71-
log::info!("RUNTIME-FILTER: build runtime filter packet: {:?}, build_num_rows: {}, runtime_filter_desc: {:?}", packet, build_num_rows, join.runtime_filter_desc());
72-
7331
if let Some(broadcast_id) = join.broadcast_id {
7432
let merge_start = Instant::now();
7533
packet = get_global_runtime_filter_packet(broadcast_id, packet, &join.ctx).await?;
@@ -85,7 +43,12 @@ pub async fn build_and_push_down_runtime_filter(
8543
.iter()
8644
.map(|r| (r.id, r))
8745
.collect();
46+
let selectivity_threshold = join
47+
.ctx
48+
.get_settings()
49+
.get_join_runtime_filter_selectivity_threshold()?;
8850
let max_threads = join.ctx.get_settings().get_max_threads()? as usize;
51+
let build_rows = packet.build_rows;
8952
let runtime_filter_infos = build_runtime_filter_infos(
9053
packet,
9154
runtime_filter_descs,
@@ -98,11 +61,10 @@ pub async fn build_and_push_down_runtime_filter(
9861
let filter_count = runtime_filter_infos.len();
9962

10063
log::info!(
101-
"RUNTIME-FILTER: Built and deployed {} filters in {:?} (build: {:?}) for {} rows",
64+
"RUNTIME-FILTER: Built and deployed {} filters in {:?} for {} rows",
10265
filter_count,
10366
total_time,
104-
build_time,
105-
build_num_rows
67+
build_rows
10668
);
10769
log::info!(
10870
"RUNTIME-FILTER: runtime_filter_infos: {:?}",

0 commit comments

Comments
 (0)