Skip to content

Commit ec00b10

Browse files
committed
use seperate runtime
1 parent b861aca commit ec00b10

File tree

1 file changed

+17
-18
lines changed
  • src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter

1 file changed

+17
-18
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::collections::HashMap;
1616
use std::sync::Arc;
1717

18+
use databend_common_base::runtime::execute_futures_in_parallel;
1819
use databend_common_catalog::runtime_filter_info::RuntimeFilterBloom;
1920
use databend_common_catalog::runtime_filter_info::RuntimeFilterEntry;
2021
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
@@ -286,25 +287,23 @@ async fn build_bloom_filter(
286287
.map(|chunk| chunk.to_vec())
287288
.collect();
288289

289-
let tasks: Vec<_> = chunks
290-
.into_iter()
291-
.map(|chunk| {
292-
databend_common_base::runtime::spawn(async move {
293-
let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01)
294-
.map_err(|e| ErrorCode::Internal(e.to_string()))?;
295-
296-
filter.insert_hash_batch(&chunk);
297-
Ok::<Sbbf, ErrorCode>(filter)
298-
})
299-
})
300-
.collect();
301-
302-
let task_results = futures::future::join_all(tasks).await;
290+
let tasks = chunks.into_iter().map(|chunk| async move {
291+
let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01)
292+
.map_err(|e| ErrorCode::Internal(e.to_string()))?;
303293

304-
let filters: Vec<Sbbf> = task_results
305-
.into_iter()
306-
.map(|r| r.expect("Task panicked"))
307-
.collect::<Result<Vec<_>>>()?;
294+
filter.insert_hash_batch(&chunk);
295+
Ok::<Sbbf, ErrorCode>(filter)
296+
});
297+
298+
let filters: Vec<Sbbf> = execute_futures_in_parallel(
299+
tasks,
300+
max_threads,
301+
max_threads,
302+
"runtime-filter-bloom-worker".to_owned(),
303+
)
304+
.await?
305+
.into_iter()
306+
.collect::<Result<Vec<_>>>()?;
308307

309308
let merged_filter = merge_bloom_filters_tree(filters);
310309

0 commit comments

Comments
 (0)