Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 158 additions & 1 deletion src/query/catalog/src/sbbf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@

use core::simd::cmp::SimdPartialEq;
use core::simd::Simd;
use std::mem::size_of;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use databend_common_base::runtime::Runtime;
use databend_common_base::runtime::TrySpawn;

/// Salt values as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
const SALT: [u32; 8] = [
Expand Down Expand Up @@ -165,16 +172,41 @@ impl std::ops::IndexMut<usize> for Block {
}
}

#[derive(Debug)]
#[repr(transparent)]
struct BlockAtomic([AtomicU32; 8]);

impl BlockAtomic {
fn new() -> Self {
Self(std::array::from_fn(|_| AtomicU32::new(0)))
}

fn insert(&self, hash: u32) {
let mask = Block::mask(hash);
for (slot, value) in self.0.iter().zip(mask.0.iter()) {
slot.fetch_or(*value, Ordering::Relaxed);
}
}
}

/// A split block Bloom filter.
///
/// The creation of this structure is based on the [`crate::file::properties::BloomFilterProperties`]
/// struct set via [`crate::file::properties::WriterProperties`] and is thus hidden by default.
#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);

#[derive(Debug)]
pub struct SbbfAtomic(Vec<BlockAtomic>);

pub(crate) const BITSET_MIN_LENGTH: usize = 32;
pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;

#[inline]
fn hash_to_block_index_for_blocks(hash: u64, num_blocks: usize) -> usize {
unsafe { (((hash >> 32).unchecked_mul(num_blocks as u64)) >> 32) as usize }
}

#[inline]
fn optimal_num_of_bytes(num_bytes: usize) -> usize {
let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
Expand Down Expand Up @@ -219,7 +251,7 @@ impl Sbbf {
fn hash_to_block_index(&self, hash: u64) -> usize {
// unchecked_mul is unstable, but in reality this is safe, we'd just use saturating mul
// but it will not saturate
(((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
hash_to_block_index_for_blocks(hash, self.0.len())
}

/// Insert a hash into the filter. The caller must provide a well-distributed 64-bit hash.
Expand Down Expand Up @@ -277,6 +309,109 @@ impl Sbbf {
}
}

impl SbbfAtomic {
pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, String> {
if !(0.0..1.0).contains(&fpp) {
return Err(format!(
"False positive probability must be between 0.0 and 1.0, got {fpp}"
));
}
let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
Ok(Self::new_with_num_of_bytes(num_bits / 8))
}

pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
let num_bytes = optimal_num_of_bytes(num_bytes);
assert_eq!(size_of::<BlockAtomic>(), size_of::<Block>());
assert_eq!(num_bytes % size_of::<BlockAtomic>(), 0);
let num_blocks = num_bytes / size_of::<BlockAtomic>();
let bitset = (0..num_blocks).map(|_| BlockAtomic::new()).collect();
Self(bitset)
}

#[inline]
fn hash_to_block_index(&self, hash: u64) -> usize {
hash_to_block_index_for_blocks(hash, self.0.len())
}

pub fn insert_hash(&self, hash: u64) {
let block_index = self.hash_to_block_index(hash);
self.0[block_index].insert(hash as u32)
}

pub fn insert_hash_batch(&self, hashes: &[u64]) {
for &hash in hashes {
self.insert_hash(hash);
}
}

pub fn insert_hash_batch_parallel(self, hashes: Vec<u64>, max_threads: usize) -> Self {
if hashes.is_empty() || max_threads <= 1 || self.0.len() < 2 {
self.insert_hash_batch(&hashes);
return self;
}

let worker_nums = max_threads.min(hashes.len()).max(1);
let chunk_size = hashes.len().div_ceil(worker_nums).max(1);
let runtime = Runtime::with_worker_threads(worker_nums, Some("sbbf-insert".to_string()))
.expect("failed to create runtime for inserting bloom filter hashes");

let hashes = Arc::new(hashes);
let builder = Arc::new(self);
let total = hashes.len();
let mut join_handlers = Vec::with_capacity(total.div_ceil(chunk_size));

for start in (0..total).step_by(chunk_size) {
let end = (start + chunk_size).min(total);
let hashes = hashes.clone();
let builder = builder.clone();

let handler = runtime
.try_spawn(
async move {
for hash in &hashes[start..end] {
builder.insert_hash(*hash);
}
},
None,
)
.expect("failed to spawn runtime task for inserting bloom filter hashes");
join_handlers.push(handler);
}

runtime
.block_on(async move {
for handler in join_handlers {
handler.await?;
}
Ok(())
})
.expect("runtime bloom filter insert tasks failed");

Arc::try_unwrap(builder)
.expect("unexpected extra references when finishing bloom filter insert")
}

pub fn finish(self) -> Sbbf {
let blocks: Vec<Block> = self
.0
.into_iter()
.map(|block| {
let mut arr = [0u32; 8];
for (dst, src) in arr.iter_mut().zip(block.0.iter()) {
*dst = src.load(Ordering::Relaxed);
}
Block(arr)
})
.collect();
Sbbf(blocks)
}

pub fn estimated_memory_size(&self) -> usize {
self.0.capacity() * size_of::<BlockAtomic>()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -336,6 +471,28 @@ mod tests {
}
}

#[test]
fn test_sbbf_atomic_parallel_matches_serial() {
let hashes: Vec<u64> = (0..100_000)
.map(|i| {
let val = i as u64;
val.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407)
})
.collect();

let mut serial = Sbbf::new_with_ndv_fpp(hashes.len() as u64, 0.01).unwrap();
serial.insert_hash_batch(&hashes);

let builder = SbbfAtomic::new_with_ndv_fpp(hashes.len() as u64, 0.01).unwrap();
let builder = builder.insert_hash_batch_parallel(hashes.clone(), 8);
let atomic = builder.finish();

for hash in &hashes {
assert_eq!(serial.check_hash(*hash), atomic.check_hash(*hash));
}
}

#[test]
fn test_optimal_num_of_bytes() {
for (input, expected) in &[
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/physical_plans/physical_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ impl HashJoin {
stage_sync_barrier.clone(),
self.projections.clone(),
rf_desc.clone(),
);
)?;

join_pipe_items.push(PipeItem::create(
hash_join,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use parking_lot::RwLock;

use super::concat_buffer::ConcatBuffer;
use super::desc::RuntimeFilterDesc;
use super::runtime_filter::JoinRuntimeFilterPacket;
use crate::pipelines::memory_settings::MemorySettingsExt;
use crate::pipelines::processors::transforms::hash_join::common::wrap_true_validity;
use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_FALSE;
Expand Down Expand Up @@ -104,6 +105,7 @@ pub struct HashJoinBuildState {
pub(crate) concat_buffer: Mutex<ConcatBuffer>,
pub(crate) broadcast_id: Option<u32>,
pub(crate) is_runtime_filter_added: AtomicBool,
runtime_filter_packets: Mutex<Vec<JoinRuntimeFilterPacket>>,
}

impl HashJoinBuildState {
Expand Down Expand Up @@ -154,6 +156,7 @@ impl HashJoinBuildState {
concat_buffer: Mutex::new(ConcatBuffer::new(concat_threshold)),
broadcast_id,
is_runtime_filter_added: AtomicBool::new(false),
runtime_filter_packets: Mutex::new(Vec::new()),
}))
}

Expand Down Expand Up @@ -875,6 +878,15 @@ impl HashJoinBuildState {
&self.hash_join_state.hash_join_desc.runtime_filter.filters
}

pub fn add_runtime_filter_packet(&self, packet: JoinRuntimeFilterPacket) {
self.runtime_filter_packets.lock().push(packet);
}

pub fn take_runtime_filter_packets(&self) -> Vec<JoinRuntimeFilterPacket> {
let mut guard = self.runtime_filter_packets.lock();
guard.drain(..).collect()
}

/// only used for test
pub fn get_enable_bloom_runtime_filter(&self) -> bool {
self.hash_join_state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_catalog::runtime_filter_info::RuntimeFilterEntry;
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
use databend_common_catalog::runtime_filter_info::RuntimeFilterStats;
use databend_common_catalog::sbbf::Sbbf;
use databend_common_catalog::sbbf::SbbfAtomic;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::type_check;
Expand Down Expand Up @@ -74,7 +75,10 @@ pub async fn build_runtime_filter_infos(
probe_expr: probe_key.clone(),
bloom: if enabled {
if let Some(ref bloom) = packet.bloom {
Some(build_bloom_filter(bloom.clone(), probe_key, max_threads).await?)
Some(
build_bloom_filter(bloom.clone(), probe_key, max_threads, desc.id)
.await?,
)
} else {
None
}
Expand Down Expand Up @@ -256,6 +260,7 @@ async fn build_bloom_filter(
bloom: Vec<u64>,
probe_key: &Expr<String>,
max_threads: usize,
filter_id: usize,
) -> Result<RuntimeFilterBloom> {
let probe_key = match probe_key {
Expr::ColumnRef(col) => col,
Expand All @@ -269,7 +274,7 @@ async fn build_bloom_filter(
let column_name = probe_key.id.to_string();
let total_items = bloom.len();

if total_items < 50000 {
if total_items < 3_000_000 {
let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01)
.map_err(|e| ErrorCode::Internal(e.to_string()))?;
filter.insert_hash_batch(&bloom);
Expand All @@ -279,65 +284,23 @@ async fn build_bloom_filter(
});
}

let chunk_size = total_items.div_ceil(max_threads);

let chunks: Vec<Vec<u64>> = bloom
.chunks(chunk_size)
.map(|chunk| chunk.to_vec())
.collect();

let tasks: Vec<_> = chunks
.into_iter()
.map(|chunk| {
databend_common_base::runtime::spawn(async move {
let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01)
.map_err(|e| ErrorCode::Internal(e.to_string()))?;

filter.insert_hash_batch(&chunk);
Ok::<Sbbf, ErrorCode>(filter)
})
})
.collect();

let task_results = futures::future::join_all(tasks).await;

let filters: Vec<Sbbf> = task_results
.into_iter()
.map(|r| r.expect("Task panicked"))
.collect::<Result<Vec<_>>>()?;

let merged_filter = merge_bloom_filters_tree(filters);
let start = std::time::Instant::now();
let builder = SbbfAtomic::new_with_ndv_fpp(total_items as u64, 0.01)
.map_err(|e| ErrorCode::Internal(e.to_string()))?
.insert_hash_batch_parallel(bloom, max_threads);
let filter = builder.finish();
log::info!(
"filter_id: {}, build_time: {:?}",
filter_id,
start.elapsed()
);

Ok(RuntimeFilterBloom {
column_name,
filter: merged_filter,
filter,
})
}

fn merge_bloom_filters_tree(mut filters: Vec<Sbbf>) -> Sbbf {
if filters.is_empty() {
return Sbbf::new_with_ndv_fpp(1, 0.01).unwrap();
}

while filters.len() > 1 {
let mut next_level = Vec::new();
let mut iter = filters.into_iter();

while let Some(mut left) = iter.next() {
if let Some(right) = iter.next() {
left.union(&right);
next_level.push(left);
} else {
next_level.push(left);
}
}

filters = next_level;
}

filters.pop().unwrap()
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down
Loading
Loading