Skip to content

Commit

Permalink
avoid cloning event filter bitmap
Browse files Browse the repository at this point in the history
  • Loading branch information
sistemd committed Dec 13, 2024
1 parent 9fb30f0 commit 82ff7ca
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 18 deletions.
40 changes: 26 additions & 14 deletions crates/storage/src/bloom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
//! filter.
use std::collections::BTreeSet;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};

use bloomfilter::Bloom;
use cached::{Cached, SizedCache};
Expand Down Expand Up @@ -238,7 +238,9 @@ struct CacheKey {
to_block: BlockNumber,
}

pub(crate) struct AggregateBloomCache(Mutex<SizedCache<CacheKey, AggregateBloom>>);
/// A cache for [`AggregateBloom`] filters. It is very expensive to clone these
/// filters, so we store them in an [`Arc`] and clone it instead.
pub(crate) struct AggregateBloomCache(Mutex<SizedCache<CacheKey, Arc<AggregateBloom>>>);

impl AggregateBloomCache {
pub fn with_size(size: usize) -> Self {
Expand All @@ -249,7 +251,11 @@ impl AggregateBloomCache {
self.0.lock().unwrap().cache_reset();
}

pub fn get_many(&self, from_block: BlockNumber, to_block: BlockNumber) -> Vec<AggregateBloom> {
pub fn get_many(
&self,
from_block: BlockNumber,
to_block: BlockNumber,
) -> Vec<Arc<AggregateBloom>> {
let mut cache = self.0.lock().unwrap();

let from_block = from_block.get();
Expand Down Expand Up @@ -277,19 +283,19 @@ impl AggregateBloomCache {
from_block,
to_block,
};
cache.cache_get(&k).cloned()
cache.cache_get(&k).map(Arc::clone)
})
.collect()
}

pub fn set_many(&self, filters: &[AggregateBloom]) {
pub fn set_many(&self, filters: &[Arc<AggregateBloom>]) {
let mut cache = self.0.lock().unwrap();
filters.iter().for_each(|filter| {
let k = CacheKey {
from_block: filter.from_block,
to_block: filter.to_block,
};
cache.cache_set(k, filter.clone());
cache.cache_set(k, Arc::clone(filter));
});
}
}
Expand Down Expand Up @@ -503,8 +509,8 @@ mod tests {
let second_range_end = second_range_start + AggregateBloom::BLOCK_RANGE_LEN - 1;

let filters = vec![
AggregateBloom::new(first_range_start),
AggregateBloom::new(second_range_start),
Arc::new(AggregateBloom::new(first_range_start)),
Arc::new(AggregateBloom::new(second_range_start)),
];

cache.set_many(&filters);
Expand All @@ -522,8 +528,8 @@ mod tests {
let second_range_start = BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN;

let filters = vec![
AggregateBloom::new(first_range_start),
AggregateBloom::new(second_range_start),
Arc::new(AggregateBloom::new(first_range_start)),
Arc::new(AggregateBloom::new(second_range_start)),
];

let start = first_range_start + 15;
Expand All @@ -541,10 +547,16 @@ mod tests {
let cache = AggregateBloomCache::with_size(4);

let filters = vec![
AggregateBloom::new(BlockNumber::GENESIS),
AggregateBloom::new(BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN),
AggregateBloom::new(BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN),
AggregateBloom::new(BlockNumber::GENESIS + 3 * AggregateBloom::BLOCK_RANGE_LEN),
Arc::new(AggregateBloom::new(BlockNumber::GENESIS)),
Arc::new(AggregateBloom::new(
BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN,
)),
Arc::new(AggregateBloom::new(
BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN,
)),
Arc::new(AggregateBloom::new(
BlockNumber::GENESIS + 3 * AggregateBloom::BLOCK_RANGE_LEN,
)),
];

cache.set_many(&filters);
Expand Down
10 changes: 6 additions & 4 deletions crates/storage/src/connection/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::BTreeSet;
use std::num::NonZeroUsize;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Instant;

use anyhow::{Context, Result};
Expand Down Expand Up @@ -392,7 +393,7 @@ impl Transaction<'_> {
start_block: BlockNumber,
end_block: BlockNumber,
max_event_filters_to_load: Option<NonZeroUsize>,
) -> anyhow::Result<(Vec<AggregateBloom>, bool)> {
) -> anyhow::Result<(Vec<Arc<AggregateBloom>>, bool)> {
let mut total_filters_stmt = self.inner().prepare_cached(
r"
SELECT COUNT(*)
Expand Down Expand Up @@ -439,6 +440,7 @@ impl Transaction<'_> {

let mut event_filters = load_stmt
.query_map(
// Cannot use crate::params::named_params![] here because of the rarray.
rusqlite::named_params![
":end_block": &end_block.get(),
":start_block": &start_block.get(),
Expand All @@ -450,11 +452,11 @@ impl Transaction<'_> {
let to_block = row.get_block_number(1)?;
let compressed_bitmap: Vec<u8> = row.get(2)?;

Ok(AggregateBloom::from_existing_compressed(
Ok(Arc::new(AggregateBloom::from_existing_compressed(
from_block,
to_block,
compressed_bitmap,
))
)))
},
)
.context("Querying event filter range")?
Expand All @@ -475,7 +477,7 @@ impl Transaction<'_> {

if should_include_running && !load_limit_reached {
let running_event_filter = self.running_event_filter.lock().unwrap();
event_filters.push(running_event_filter.filter.clone());
event_filters.push(Arc::new(running_event_filter.filter.clone()));
}

Ok((event_filters, load_limit_reached))
Expand Down

0 comments on commit 82ff7ca

Please sign in to comment.