diff --git a/crates/pathfinder/src/bin/pathfinder/config.rs b/crates/pathfinder/src/bin/pathfinder/config.rs index 3fb9d92cee..6475c3a9ce 100644 --- a/crates/pathfinder/src/bin/pathfinder/config.rs +++ b/crates/pathfinder/src/bin/pathfinder/config.rs @@ -256,33 +256,39 @@ This should only be enabled for debugging purposes as it adds substantial proces feeder_gateway_fetch_concurrency: std::num::NonZeroUsize, #[arg( - long = "storage.event-bloom-filter-cache-size", - long_help = "The number of blocks whose event bloom filters are cached in memory. This \ - cache speeds up event related RPC queries at the cost of using extra memory. \ - Each cached filter takes 2 KiB of memory.", - env = "PATHFINDER_STORAGE_BLOOM_FILTER_CACHE_SIZE", - default_value = "524288" + long = "storage.event-filter-cache-size", + long_help = format!( + "The number of aggregate event bloom filters to cache in memory. Each filter covers a {} block range. + This cache speeds up event related RPC queries at the cost of using extra memory. + Each cached filter takes 16 MiB of memory.", + pathfinder_storage::BLOCK_RANGE_LEN + ), + env = "PATHFINDER_STORAGE_EVENT_FILTER_CACHE_SIZE", + default_value = "64" )] - event_bloom_filter_cache_size: std::num::NonZeroUsize, + event_filter_cache_size: std::num::NonZeroUsize, #[arg( long = "rpc.get-events-max-blocks-to-scan", - long_help = "The number of blocks to scan for events when querying for events. This limit \ - is used to prevent queries from taking too long.", + long_help = "The number of blocks to scan when querying for events. This limit is used to \ + prevent queries from taking too long.", env = "PATHFINDER_RPC_GET_EVENTS_MAX_BLOCKS_TO_SCAN", default_value = "500" )] get_events_max_blocks_to_scan: std::num::NonZeroUsize, #[arg( - long = "rpc.get-events-max-event-filters-to-load", - long_help = format!("The number of aggregate Bloom filters to load for events when querying for events. \ - Each filter covers a {} block range. \ - This limit is used to prevent queries from taking too long.", pathfinder_storage::BLOCK_RANGE_LEN), - env = "PATHFINDER_RPC_GET_EVENTS_MAX_EVENT_FILTERS_TO_LOAD", - default_value = "3" + long = "rpc.get-events-max-uncached-event-filters-to-load", + long_help = format!( + "The number of uncached aggregate Bloom filters to load when querying for events. + Each filter covers a {} block range. + This limit is used to prevent queries from taking too long.", + pathfinder_storage::BLOCK_RANGE_LEN + ), + env = "PATHFINDER_RPC_GET_EVENTS_MAX_UNCACHED_EVENT_FILTERS_TO_LOAD", + default_value = "12" )] - get_events_max_event_filters_to_load: std::num::NonZeroUsize, + get_events_max_uncached_event_filters_to_load: std::num::NonZeroUsize, #[arg( long = "storage.state-tries", @@ -711,9 +717,9 @@ pub struct Config { pub is_rpc_enabled: bool, pub gateway_api_key: Option, pub gateway_timeout: Duration, - pub event_bloom_filter_cache_size: NonZeroUsize, + pub event_filter_cache_size: NonZeroUsize, pub get_events_max_blocks_to_scan: NonZeroUsize, - pub get_events_max_event_filters_to_load: NonZeroUsize, + pub get_events_max_uncached_event_filters_to_load: NonZeroUsize, pub state_tries: Option, pub custom_versioned_constants: Option, pub feeder_gateway_fetch_concurrency: NonZeroUsize, @@ -1001,9 +1007,10 @@ impl Config { is_sync_enabled: cli.is_sync_enabled, is_rpc_enabled: cli.is_rpc_enabled, gateway_api_key: cli.gateway_api_key, - event_bloom_filter_cache_size: cli.event_bloom_filter_cache_size, + event_filter_cache_size: cli.event_filter_cache_size, get_events_max_blocks_to_scan: cli.get_events_max_blocks_to_scan, - get_events_max_event_filters_to_load: cli.get_events_max_event_filters_to_load, + get_events_max_uncached_event_filters_to_load: cli + .get_events_max_uncached_event_filters_to_load, gateway_timeout: Duration::from_secs(cli.gateway_timeout.get()), feeder_gateway_fetch_concurrency: cli.feeder_gateway_fetch_concurrency, state_tries: cli.state_tries, diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index cda893bb3e..8252b7d8be 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -136,7 +136,7 @@ async fn async_main() -> anyhow::Result<()> { let storage_manager = pathfinder_storage::StorageBuilder::file(pathfinder_context.database.clone()) .journal_mode(config.sqlite_wal) - .bloom_filter_cache_size(config.event_bloom_filter_cache_size.get()) + .event_filter_cache_size(config.event_filter_cache_size.get()) .trie_prune_mode(match config.state_tries { Some(StateTries::Pruned(num_blocks_kept)) => { Some(pathfinder_storage::TriePruneMode::Prune { num_blocks_kept }) @@ -217,7 +217,8 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst let rpc_config = pathfinder_rpc::context::RpcConfig { batch_concurrency_limit: config.rpc_batch_concurrency_limit, get_events_max_blocks_to_scan: config.get_events_max_blocks_to_scan, - get_events_max_event_filters_to_load: config.get_events_max_event_filters_to_load, + get_events_max_uncached_event_filters_to_load: config + .get_events_max_uncached_event_filters_to_load, custom_versioned_constants: config.custom_versioned_constants.take(), }; diff --git a/crates/pathfinder/src/state/sync.rs b/crates/pathfinder/src/state/sync.rs index 1b90f06117..9fb4ac02be 100644 --- a/crates/pathfinder/src/state/sync.rs +++ b/crates/pathfinder/src/state/sync.rs @@ -1081,8 +1081,8 @@ async fn l2_reorg( } transaction - .reconstruct_running_event_filter() - .context("Reconstructing running event filter after purge")?; + .reset() + .context("Resetting local DB state after reorg")?; // Track combined L1 and L2 state. let l1_l2_head = transaction.l1_l2_pointer().context("Query L1-L2 head")?; diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index 7decbff4cf..60af655b26 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -668,8 +668,8 @@ async fn rollback_to_anchor( } transaction - .reconstruct_running_event_filter() - .context("Reconstructing running event filter after purge")?; + .reset() + .context("Resetting local DB state after reorg")?; Ok(()) }) diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index a5a1b33b5e..8d2fd4f1e9 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -7,8 +7,6 @@ license = { workspace = true } rust-version = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html -[features] - [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } diff --git a/crates/rpc/src/context.rs b/crates/rpc/src/context.rs index 3aba7090fd..ab3306d364 100644 --- a/crates/rpc/src/context.rs +++ b/crates/rpc/src/context.rs @@ -19,7 +19,7 @@ use tokio::sync::watch as tokio_watch; pub struct RpcConfig { pub batch_concurrency_limit: NonZeroUsize, pub get_events_max_blocks_to_scan: NonZeroUsize, - pub get_events_max_event_filters_to_load: NonZeroUsize, + pub get_events_max_uncached_event_filters_to_load: NonZeroUsize, pub custom_versioned_constants: Option, } @@ -120,7 +120,7 @@ impl RpcContext { let config = RpcConfig { batch_concurrency_limit: NonZeroUsize::new(8).unwrap(), get_events_max_blocks_to_scan: NonZeroUsize::new(1000).unwrap(), - get_events_max_event_filters_to_load: NonZeroUsize::new(1000).unwrap(), + get_events_max_uncached_event_filters_to_load: NonZeroUsize::new(1000).unwrap(), custom_versioned_constants: None, }; diff --git a/crates/rpc/src/jsonrpc/router/subscription.rs b/crates/rpc/src/jsonrpc/router/subscription.rs index 5b20b17ed7..73a936c8f0 100644 --- a/crates/rpc/src/jsonrpc/router/subscription.rs +++ b/crates/rpc/src/jsonrpc/router/subscription.rs @@ -1026,7 +1026,7 @@ mod tests { config: RpcConfig { batch_concurrency_limit: 1.try_into().unwrap(), get_events_max_blocks_to_scan: 1.try_into().unwrap(), - get_events_max_event_filters_to_load: 1.try_into().unwrap(), + get_events_max_uncached_event_filters_to_load: 1.try_into().unwrap(), custom_versioned_constants: None, }, }; diff --git a/crates/rpc/src/method/get_events.rs b/crates/rpc/src/method/get_events.rs index 944d412250..9b526d775b 100644 --- a/crates/rpc/src/method/get_events.rs +++ b/crates/rpc/src/method/get_events.rs @@ -221,7 +221,7 @@ pub async fn get_events( .events( &constraints, context.config.get_events_max_blocks_to_scan, - context.config.get_events_max_event_filters_to_load, + context.config.get_events_max_uncached_event_filters_to_load, ) .map_err(|e| match e { EventFilterError::Internal(e) => GetEventsError::Internal(e), diff --git a/crates/rpc/src/method/subscribe_events.rs b/crates/rpc/src/method/subscribe_events.rs index d148eae5c0..5d6b15b4f2 100644 --- a/crates/rpc/src/method/subscribe_events.rs +++ b/crates/rpc/src/method/subscribe_events.rs @@ -738,7 +738,7 @@ mod tests { config: RpcConfig { batch_concurrency_limit: 64.try_into().unwrap(), get_events_max_blocks_to_scan: 1024.try_into().unwrap(), - get_events_max_event_filters_to_load: 1.try_into().unwrap(), + get_events_max_uncached_event_filters_to_load: 1.try_into().unwrap(), custom_versioned_constants: None, }, }; diff --git a/crates/rpc/src/method/subscribe_new_heads.rs b/crates/rpc/src/method/subscribe_new_heads.rs index 182e2ae75d..ec6e84f794 100644 --- a/crates/rpc/src/method/subscribe_new_heads.rs +++ b/crates/rpc/src/method/subscribe_new_heads.rs @@ -547,7 +547,7 @@ mod tests { config: RpcConfig { batch_concurrency_limit: 1.try_into().unwrap(), get_events_max_blocks_to_scan: 1.try_into().unwrap(), - get_events_max_event_filters_to_load: 1.try_into().unwrap(), + get_events_max_uncached_event_filters_to_load: 1.try_into().unwrap(), custom_versioned_constants: None, }, }; diff --git a/crates/rpc/src/method/subscribe_pending_transactions.rs b/crates/rpc/src/method/subscribe_pending_transactions.rs index d0528ad52b..ff6852fe34 100644 --- a/crates/rpc/src/method/subscribe_pending_transactions.rs +++ b/crates/rpc/src/method/subscribe_pending_transactions.rs @@ -492,7 +492,7 @@ mod tests { config: RpcConfig { batch_concurrency_limit: 1.try_into().unwrap(), get_events_max_blocks_to_scan: 1.try_into().unwrap(), - get_events_max_event_filters_to_load: 1.try_into().unwrap(), + get_events_max_uncached_event_filters_to_load: 1.try_into().unwrap(), custom_versioned_constants: None, }, }; diff --git a/crates/rpc/src/method/subscribe_transaction_status.rs b/crates/rpc/src/method/subscribe_transaction_status.rs index 08d844d2f5..6b2b9b4aa4 100644 --- a/crates/rpc/src/method/subscribe_transaction_status.rs +++ b/crates/rpc/src/method/subscribe_transaction_status.rs @@ -1168,7 +1168,7 @@ mod tests { config: RpcConfig { batch_concurrency_limit: 1.try_into().unwrap(), get_events_max_blocks_to_scan: 1.try_into().unwrap(), - get_events_max_event_filters_to_load: 1.try_into().unwrap(), + get_events_max_uncached_event_filters_to_load: 1.try_into().unwrap(), custom_versioned_constants: None, }, }; diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index a2cbf3234f..4ce401146c 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -30,7 +30,12 @@ primitive-types = { workspace = true } r2d2 = { workspace = true } r2d2_sqlite = { workspace = true } rand = { workspace = true } -rusqlite = { workspace = true, features = ["bundled", "functions"] } +rusqlite = { workspace = true, features = [ + "bundled", + "functions", + "vtab", + "array", +] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true, features = [ "arbitrary_precision", diff --git a/crates/storage/src/bloom.rs b/crates/storage/src/bloom.rs index 953cab48e3..5e1da0c898 100644 --- a/crates/storage/src/bloom.rs +++ b/crates/storage/src/bloom.rs @@ -61,8 +61,10 @@ //! filter. use std::collections::BTreeSet; +use std::sync::{Arc, Mutex}; use bloomfilter::Bloom; +use cached::{Cached, SizedCache}; use pathfinder_common::BlockNumber; use pathfinder_crypto::Felt; @@ -71,9 +73,7 @@ pub const BLOCK_RANGE_LEN: u64 = AggregateBloom::BLOCK_RANGE_LEN; /// An aggregate of all Bloom filters for a given range of blocks. /// Before being added to `AggregateBloom`, each [`BloomFilter`] is /// rotated by 90 degrees (transposed). -#[derive(Debug, Clone)] -// TODO: -#[allow(dead_code)] +#[derive(Clone)] pub struct AggregateBloom { /// A [Self::BLOCK_RANGE_LEN] by [BloomFilter::BITVEC_LEN] matrix stored in /// a single array. @@ -86,11 +86,9 @@ pub struct AggregateBloom { pub to_block: BlockNumber, } -// TODO: -#[allow(dead_code)] impl AggregateBloom { /// Maximum number of blocks to aggregate in a single `AggregateBloom`. - pub const BLOCK_RANGE_LEN: u64 = 32_768; + pub const BLOCK_RANGE_LEN: u64 = 8192; const BLOCK_RANGE_BYTES: u64 = Self::BLOCK_RANGE_LEN / 8; /// Create a new `AggregateBloom` for the (`from_block`, `from_block` + @@ -218,6 +216,90 @@ impl AggregateBloom { } } +impl std::fmt::Debug for AggregateBloom { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use std::hash::{DefaultHasher, Hash, Hasher}; + + let mut hasher = DefaultHasher::new(); + self.bitmap.hash(&mut hasher); + let bitmap_hash = hasher.finish(); + + f.debug_struct("AggregateBloom") + .field("from_block", &self.from_block) + .field("to_block", &self.to_block) + .field("bitmap_hash", &format!("{:#x}", bitmap_hash)) + .finish() + } +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +struct CacheKey { + from_block: BlockNumber, + to_block: BlockNumber, +} + +/// A cache for [`AggregateBloom`] filters. It is very expensive to clone these +/// filters, so we store them in an [`Arc`] and clone it instead. +pub(crate) struct AggregateBloomCache(Mutex>>); + +impl AggregateBloomCache { + pub fn with_size(size: usize) -> Self { + Self(Mutex::new(SizedCache::with_size(size))) + } + + pub fn reset(&self) { + self.0.lock().unwrap().cache_reset(); + } + + pub fn get_many( + &self, + from_block: BlockNumber, + to_block: BlockNumber, + ) -> Vec> { + let mut cache = self.0.lock().unwrap(); + + let from_block = from_block.get(); + let to_block = to_block.get(); + + // Align to the nearest lower multiple of BLOCK_RANGE_LEN. + let from_block_aligned = from_block - from_block % AggregateBloom::BLOCK_RANGE_LEN; + // Align to the nearest higher multiple of BLOCK_RANGE_LEN, then subtract 1 + // (zero based indexing). + let to_block_aligned = to_block + AggregateBloom::BLOCK_RANGE_LEN + - (to_block % AggregateBloom::BLOCK_RANGE_LEN) + - 1; + + (from_block_aligned..=to_block_aligned) + .step_by(AggregateBloom::BLOCK_RANGE_LEN as usize) + .map(|from| { + let to = from + AggregateBloom::BLOCK_RANGE_LEN - 1; + ( + BlockNumber::new_or_panic(from), + BlockNumber::new_or_panic(to), + ) + }) + .filter_map(|(from_block, to_block)| { + let k = CacheKey { + from_block, + to_block, + }; + cache.cache_get(&k).map(Arc::clone) + }) + .collect() + } + + pub fn set_many(&self, filters: &[Arc]) { + let mut cache = self.0.lock().unwrap(); + filters.iter().for_each(|filter| { + let k = CacheKey { + from_block: filter.from_block, + to_block: filter.to_block, + }; + cache.cache_set(k, Arc::clone(filter)); + }); + } +} + #[derive(Clone)] pub(crate) struct BloomFilter(Bloom); @@ -310,98 +392,181 @@ mod tests { use super::*; const KEY: Felt = felt!("0x0218b538681900fad5a0b2ffe1d6781c0c3f14df5d32071ace0bdc9d46cb69ea"); - #[allow(dead_code)] const KEY1: Felt = felt!("0x0218b538681900fad5a0b2ffe1d6781c0c3f14df5d32071ace0bdc9d46cb69eb"); const KEY_NOT_IN_FILTER: Felt = felt!("0x0218b538681900fad5a0b2ffe1d6781c0c3f14df5d32071ace0bdc9d46cb69ec"); - #[test] - fn add_bloom_and_check_single_block_found() { - let from_block = BlockNumber::new_or_panic(0); - let mut aggregate_bloom_filter = AggregateBloom::new(from_block); + mod filters { + use super::*; - let mut bloom = BloomFilter::new(); - bloom.set(&KEY); - bloom.set(&KEY1); + #[test] + fn add_bloom_and_check_single_block_found() { + let from_block = BlockNumber::new_or_panic(0); + let mut aggregate_bloom_filter = AggregateBloom::new(from_block); - aggregate_bloom_filter.add_bloom(&bloom, from_block); + let mut bloom = BloomFilter::new(); + bloom.set(&KEY); + bloom.set(&KEY1); - let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); - let expected = BTreeSet::from_iter(vec![from_block]); - assert_eq!(block_matches, expected); - } + aggregate_bloom_filter.add_bloom(&bloom, from_block); - #[test] - fn add_blooms_and_check_multiple_blocks_found() { - let from_block = BlockNumber::new_or_panic(0); - let mut aggregate_bloom_filter = AggregateBloom::new(from_block); + let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); + let expected = BTreeSet::from_iter(vec![from_block]); + assert_eq!(block_matches, expected); + } - let mut bloom = BloomFilter::new(); - bloom.set(&KEY); + #[test] + fn add_blooms_and_check_multiple_blocks_found() { + let from_block = BlockNumber::new_or_panic(0); + let mut aggregate_bloom_filter = AggregateBloom::new(from_block); - aggregate_bloom_filter.add_bloom(&bloom, from_block); - aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); + let mut bloom = BloomFilter::new(); + bloom.set(&KEY); - let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); - let expected = BTreeSet::from_iter(vec![from_block, from_block + 1]); - assert_eq!(block_matches, expected); - } + aggregate_bloom_filter.add_bloom(&bloom, from_block); + aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); - #[test] - fn key_not_in_filter_returns_empty_vec() { - let from_block = BlockNumber::new_or_panic(0); - let mut aggregate_bloom_filter = AggregateBloom::new(from_block); + let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); + let expected = BTreeSet::from_iter(vec![from_block, from_block + 1]); + assert_eq!(block_matches, expected); + } - let mut bloom = BloomFilter::new(); - bloom.set(&KEY); - bloom.set(&KEY1); + #[test] + fn key_not_in_filter_returns_empty_vec() { + let from_block = BlockNumber::new_or_panic(0); + let mut aggregate_bloom_filter = AggregateBloom::new(from_block); - aggregate_bloom_filter.add_bloom(&bloom, from_block); - aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); + let mut bloom = BloomFilter::new(); + bloom.set(&KEY); + bloom.set(&KEY1); - let block_matches_empty = aggregate_bloom_filter.blocks_for_keys(&[KEY_NOT_IN_FILTER]); - assert_eq!(block_matches_empty, BTreeSet::new()); - } + aggregate_bloom_filter.add_bloom(&bloom, from_block); + aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); - #[test] - fn serialize_aggregate_roundtrip() { - let from_block = BlockNumber::new_or_panic(0); - let mut aggregate_bloom_filter = AggregateBloom::new(from_block); + let block_matches_empty = aggregate_bloom_filter.blocks_for_keys(&[KEY_NOT_IN_FILTER]); + assert_eq!(block_matches_empty, BTreeSet::new()); + } - let mut bloom = BloomFilter::new(); - bloom.set(&KEY); + #[test] + fn serialize_aggregate_roundtrip() { + let from_block = BlockNumber::new_or_panic(0); + let mut aggregate_bloom_filter = AggregateBloom::new(from_block); - aggregate_bloom_filter.add_bloom(&bloom, from_block); - aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); + let mut bloom = BloomFilter::new(); + bloom.set(&KEY); - let compressed_bitmap = aggregate_bloom_filter.compress_bitmap(); - let mut decompressed = AggregateBloom::from_existing_compressed( - aggregate_bloom_filter.from_block, - aggregate_bloom_filter.to_block, - compressed_bitmap, - ); - decompressed.add_bloom(&bloom, from_block + 2); + aggregate_bloom_filter.add_bloom(&bloom, from_block); + aggregate_bloom_filter.add_bloom(&bloom, from_block + 1); + + let compressed_bitmap = aggregate_bloom_filter.compress_bitmap(); + let mut decompressed = AggregateBloom::from_existing_compressed( + aggregate_bloom_filter.from_block, + aggregate_bloom_filter.to_block, + compressed_bitmap, + ); + decompressed.add_bloom(&bloom, from_block + 2); - let block_matches = decompressed.blocks_for_keys(&[KEY]); - let expected = BTreeSet::from_iter(vec![from_block, from_block + 1, from_block + 2]); - assert_eq!(block_matches, expected,); + let block_matches = decompressed.blocks_for_keys(&[KEY]); + let expected = BTreeSet::from_iter(vec![from_block, from_block + 1, from_block + 2]); + assert_eq!(block_matches, expected,); - let block_matches_empty = decompressed.blocks_for_keys(&[KEY_NOT_IN_FILTER]); - assert_eq!(block_matches_empty, BTreeSet::new()); + let block_matches_empty = decompressed.blocks_for_keys(&[KEY_NOT_IN_FILTER]); + assert_eq!(block_matches_empty, BTreeSet::new()); + } + + #[test] + #[should_panic] + fn invalid_insert_pos() { + let from_block = BlockNumber::new_or_panic(0); + let mut aggregate_bloom_filter = AggregateBloom::new(from_block); + + let mut bloom = BloomFilter::new(); + bloom.set(&KEY); + + aggregate_bloom_filter.add_bloom(&bloom, from_block); + + let invalid_insert_pos = from_block + AggregateBloom::BLOCK_RANGE_LEN; + aggregate_bloom_filter.add_bloom(&bloom, invalid_insert_pos); + } } - #[test] - #[should_panic] - fn invalid_insert_pos() { - let from_block = BlockNumber::new_or_panic(0); - let mut aggregate_bloom_filter = AggregateBloom::new(from_block); + mod cache { + use super::*; + + // Tests only use ranges so no need to compare bitmap. + impl PartialEq for AggregateBloom { + fn eq(&self, other: &Self) -> bool { + self.from_block == other.from_block && self.to_block == other.to_block + } + } + + #[test] + fn set_then_get_many_aligned() { + let cache = AggregateBloomCache::with_size(2); + + let first_range_start = BlockNumber::GENESIS; + let second_range_start = BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN; + let second_range_end = second_range_start + AggregateBloom::BLOCK_RANGE_LEN - 1; + + let filters = vec![ + Arc::new(AggregateBloom::new(first_range_start)), + Arc::new(AggregateBloom::new(second_range_start)), + ]; + + cache.set_many(&filters); + + let retrieved = cache.get_many(first_range_start, second_range_end); - let mut bloom = BloomFilter::new(); - bloom.set(&KEY); + assert_eq!(retrieved, filters); + } + + #[test] + fn set_then_get_many_unaligned() { + let cache = AggregateBloomCache::with_size(2); + + let first_range_start = BlockNumber::GENESIS; + let second_range_start = BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN; + + let filters = vec![ + Arc::new(AggregateBloom::new(first_range_start)), + Arc::new(AggregateBloom::new(second_range_start)), + ]; + + let start = first_range_start + 15; + let end = second_range_start + 15; + + cache.set_many(&filters); + + let retrieved = cache.get_many(start, end); - aggregate_bloom_filter.add_bloom(&bloom, from_block); + assert_eq!(retrieved, filters); + } + + #[test] + fn filters_outside_of_range_not_returned() { + let cache = AggregateBloomCache::with_size(4); + + let filters = vec![ + Arc::new(AggregateBloom::new(BlockNumber::GENESIS)), + Arc::new(AggregateBloom::new( + BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN, + )), + Arc::new(AggregateBloom::new( + BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN, + )), + Arc::new(AggregateBloom::new( + BlockNumber::GENESIS + 3 * AggregateBloom::BLOCK_RANGE_LEN, + )), + ]; + + cache.set_many(&filters); - let invalid_insert_pos = from_block + AggregateBloom::BLOCK_RANGE_LEN; - aggregate_bloom_filter.add_bloom(&bloom, invalid_insert_pos); + let first_range_start = BlockNumber::GENESIS; + let second_range_end = BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN - 1; + + let retrieved = cache.get_many(first_range_start, second_range_end); + + assert_eq!(retrieved, filters[0..2].to_vec()); + } } } diff --git a/crates/storage/src/connection.rs b/crates/storage/src/connection.rs index 9974455d6e..bd1c9f35e0 100644 --- a/crates/storage/src/connection.rs +++ b/crates/storage/src/connection.rs @@ -28,10 +28,13 @@ pub(crate) use reorg_counter::ReorgCounter; pub use rusqlite::TransactionBehavior; pub use trie::{Node, NodeRef, RootIndexUpdate, StoredNode, TrieUpdate}; +use crate::bloom::AggregateBloomCache; + type PooledConnection = r2d2::PooledConnection; pub struct Connection { connection: PooledConnection, + event_filter_cache: Arc, running_event_filter: Arc>, trie_prune_mode: TriePruneMode, } @@ -39,11 +42,13 @@ pub struct Connection { impl Connection { pub(crate) fn new( connection: PooledConnection, + event_filter_cache: Arc, running_event_filter: Arc>, trie_prune_mode: TriePruneMode, ) -> Self { Self { connection, + event_filter_cache, running_event_filter, trie_prune_mode, } @@ -53,6 +58,7 @@ impl Connection { let tx = self.connection.transaction()?; Ok(Transaction { transaction: tx, + event_filter_cache: self.event_filter_cache.clone(), running_event_filter: self.running_event_filter.clone(), trie_prune_mode: self.trie_prune_mode, }) @@ -65,6 +71,7 @@ impl Connection { let tx = self.connection.transaction_with_behavior(behavior)?; Ok(Transaction { transaction: tx, + event_filter_cache: self.event_filter_cache.clone(), running_event_filter: self.running_event_filter.clone(), trie_prune_mode: self.trie_prune_mode, }) @@ -73,6 +80,7 @@ impl Connection { pub struct Transaction<'inner> { transaction: rusqlite::Transaction<'inner>, + event_filter_cache: Arc, running_event_filter: Arc>, trie_prune_mode: TriePruneMode, } @@ -108,4 +116,12 @@ impl Transaction<'_> { pub fn trie_pruning_enabled(&self) -> bool { matches!(self.trie_prune_mode, TriePruneMode::Prune { .. }) } + + /// Resets the [`Storage`](crate::Storage) state. Required after each reorg. + pub fn reset(&self) -> anyhow::Result<()> { + self.rebuild_running_event_filter()?; + self.event_filter_cache.reset(); + + Ok(()) + } } diff --git a/crates/storage/src/connection/event.rs b/crates/storage/src/connection/event.rs index c705fde068..e0ec7a08d6 100644 --- a/crates/storage/src/connection/event.rs +++ b/crates/storage/src/connection/event.rs @@ -1,5 +1,8 @@ use std::collections::BTreeSet; use std::num::NonZeroUsize; +use std::rc::Rc; +use std::sync::Arc; +use std::time::Instant; use anyhow::{Context, Result}; use pathfinder_common::event::Event; @@ -11,6 +14,7 @@ use pathfinder_common::{ EventKey, TransactionHash, }; +use rusqlite::types::Value; use crate::bloom::{AggregateBloom, BloomFilter}; use crate::prelude::*; @@ -69,8 +73,8 @@ pub struct PageOfEvents { } impl Transaction<'_> { - pub fn reconstruct_running_event_filter(&self) -> anyhow::Result<()> { - let event_filter = reconstruct_running_event_filter(self.inner())?; + pub fn rebuild_running_event_filter(&self) -> anyhow::Result<()> { + let event_filter = rebuild_running_event_filter(self.inner())?; let mut running_event_filter = self.running_event_filter.lock().unwrap(); *running_event_filter = event_filter; @@ -88,7 +92,7 @@ impl Transaction<'_> { block_number: BlockNumber, events: impl Iterator, ) -> anyhow::Result<()> { - let mut stmt = self.inner().prepare_cached( + let mut insert_stmt = self.inner().prepare_cached( r" INSERT INTO event_filters (from_block, to_block, bitmap) @@ -111,7 +115,7 @@ impl Transaction<'_> { // This check is the reason that blocks cannot be skipped, if they were we would // risk missing the last block of the running event filter's range. if block_number == running_event_filter.filter.to_block { - stmt.execute(params![ + insert_stmt.execute(params![ &running_event_filter.filter.from_block, &running_event_filter.filter.to_block, &running_event_filter.filter.compress_bitmap() @@ -138,12 +142,13 @@ impl Transaction<'_> { keys: Vec>, ) -> anyhow::Result<(Vec, Option)> { let Some(latest_block) = self.block_number(crate::BlockId::Latest)? else { - // No blocks in the database + // No blocks in the database. return Ok((vec![], None)); }; if from_block > latest_block { return Ok((vec![], None)); } + let to_block = std::cmp::min(to_block, latest_block); let constraints = EventConstraints { contract_address, @@ -152,14 +157,14 @@ impl Transaction<'_> { ..Default::default() }; - let event_filters = self.load_event_filter_range(from_block, to_block)?; + let (event_filters, _) = self.load_event_filter_range(from_block, to_block, None)?; let blocks_to_scan = event_filters .iter() .flat_map(|filter| filter.check(&constraints)) .filter(|&block| (from_block..=to_block).contains(&block)); - let key_filter_is_empty = constraints.keys.iter().flatten().count() == 0; + let no_key_constraints = constraints.keys.iter().flatten().count() == 0; let keys: Vec> = constraints .keys .iter() @@ -189,7 +194,7 @@ impl Transaction<'_> { None => true, }) .filter(|(event, _)| { - if key_filter_is_empty { + if no_key_constraints { return true; } @@ -215,13 +220,7 @@ impl Transaction<'_> { emitted_events.extend(events); } - let last_scanned_block = if latest_block > to_block { - to_block - } else { - latest_block - }; - - Ok((emitted_events, Some(last_scanned_block))) + Ok((emitted_events, Some(to_block))) } #[tracing::instrument(skip(self))] @@ -235,11 +234,22 @@ impl Transaction<'_> { return Err(EventFilterError::PageSizeTooSmall); } + let Some(latest_block) = self.block_number(crate::BlockId::Latest)? else { + // No blocks in the database. + return Ok(PageOfEvents { + events: vec![], + continuation_token: None, + }); + }; + let from_block = constraints.from_block.unwrap_or(BlockNumber::GENESIS); - let to_block = constraints.to_block.unwrap_or(BlockNumber::MAX); + let to_block = match constraints.to_block { + Some(to_block) => std::cmp::min(to_block, latest_block), + None => latest_block, + }; let (event_filters, load_limit_reached) = - self.load_limited_event_filter_range(from_block, to_block, max_event_filters_to_load)?; + self.load_event_filter_range(from_block, to_block, Some(max_event_filters_to_load))?; let blocks_to_scan = event_filters .iter() @@ -252,7 +262,7 @@ impl Transaction<'_> { .map(|keys| keys.iter().collect()) .collect(); - let key_filter_is_empty = constraints.keys.iter().flatten().count() == 0; + let no_key_constraints = constraints.keys.iter().flatten().count() == 0; let mut offset = constraints.offset; let mut emitted_events = vec![]; @@ -272,9 +282,9 @@ impl Transaction<'_> { let events_required = constraints.page_size + 1 - emitted_events.len(); tracing::trace!(%block, %events_required, "Processing block"); - let Some(block_header) = self.block_header(crate::BlockId::Number(block))? else { - break; - }; + let block_header = self + .block_header(crate::BlockId::Number(block))? + .expect("to_block <= BlockId::Latest"); let events = match self.events_for_block(block.into())? { Some(events) => events, @@ -297,7 +307,7 @@ impl Transaction<'_> { None => true, }) .filter(|(event, _)| { - if key_filter_is_empty { + if no_key_constraints { return true; } @@ -374,118 +384,100 @@ impl Transaction<'_> { }) } } + + /// Load the event bloom filters (either from the cache or the database) for + /// the given block range with an optional database load limit. Returns the + /// loaded filters and a boolean indicating if the load limit was reached. fn load_event_filter_range( &self, start_block: BlockNumber, end_block: BlockNumber, - ) -> anyhow::Result> { - let mut stmt = self.inner().prepare_cached( + max_event_filters_to_load: Option, + ) -> anyhow::Result<(Vec>, bool)> { + let mut total_filters_stmt = self.inner().prepare_cached( r" - SELECT from_block, to_block, bitmap + SELECT COUNT(*) FROM event_filters WHERE from_block <= :end_block AND to_block >= :start_block - ORDER BY from_block ", )?; + let total_event_filters = total_filters_stmt.query_row( + named_params![ + ":end_block": &end_block, + ":start_block": &start_block, + ], + |row| row.get::<_, u64>(0), + )?; - let mut event_filters = stmt - .query_map( - named_params![ - ":end_block": &end_block, - ":start_block": &start_block, - ], - |row| { - let from_block = row.get_block_number(0)?; - let to_block = row.get_block_number(1)?; - let compressed_bitmap: Vec = row.get(2)?; - - Ok(AggregateBloom::from_existing_compressed( - from_block, - to_block, - compressed_bitmap, - )) - }, - ) - .context("Querying event filter range")? - .collect::, _>>()?; - - // There are no event filters in the database yet or the loaded ones - // don't cover the requested range. - let should_include_running = event_filters - .last() - .map_or(true, |a| end_block > a.to_block); - - if should_include_running { - let running_event_filter = self.running_event_filter.lock().unwrap(); - event_filters.push(running_event_filter.filter.clone()); - } - - Ok(event_filters) - } + let cached_filters = self.event_filter_cache.get_many(start_block, end_block); + let cache_hits = cached_filters.len() as u64; + + let cached_filters_rarray = Rc::new( + cached_filters + .iter() + // The `event_filters` table has a unique constraint over (from_block, to_block) + // pairs but tuples cannot be used here. Technically, both columns individually + // _should_ also have unique elements. + .map(|filter| i64::try_from(filter.from_block.get()).unwrap()) + .map(Value::from) + .collect::>(), + ); - fn load_limited_event_filter_range( - &self, - start_block: BlockNumber, - end_block: BlockNumber, - max_event_filters_to_load: NonZeroUsize, - ) -> anyhow::Result<(Vec, bool)> { - let mut event_filters_in_range_stmt = self.inner().prepare_cached( + let mut load_stmt = self.inner().prepare_cached( r" SELECT from_block, to_block, bitmap FROM event_filters WHERE from_block <= :end_block AND to_block >= :start_block + AND from_block NOT IN rarray(:cached_filters) ORDER BY from_block LIMIT :max_event_filters_to_load ", )?; - let mut total_filters_stmt = self.inner().prepare_cached( - r" - SELECT COUNT(*) - FROM event_filters - WHERE from_block <= :end_block AND to_block >= :start_block - ", - )?; + // Use limit if provided, otherwise set it to the number of filters that cover + // the entire requested range. + let max_event_filters_to_load = + max_event_filters_to_load.map_or(total_event_filters, |limit| limit.get() as u64); - let mut event_filters = event_filters_in_range_stmt + let mut event_filters = load_stmt .query_map( - named_params![ - ":end_block": &end_block, - ":start_block": &start_block, - ":max_event_filters_to_load": &max_event_filters_to_load.get(), + // Cannot use crate::params::named_params![] here because of the rarray. + rusqlite::named_params![ + ":end_block": &end_block.get(), + ":start_block": &start_block.get(), + ":cached_filters": &cached_filters_rarray, + ":max_event_filters_to_load": &max_event_filters_to_load, ], |row| { let from_block = row.get_block_number(0)?; let to_block = row.get_block_number(1)?; let compressed_bitmap: Vec = row.get(2)?; - Ok(AggregateBloom::from_existing_compressed( + Ok(Arc::new(AggregateBloom::from_existing_compressed( from_block, to_block, compressed_bitmap, - )) + ))) }, ) .context("Querying event filter range")? .collect::, _>>()?; + self.event_filter_cache.set_many(&event_filters); + event_filters.extend(cached_filters); + event_filters.sort_by_key(|filter| filter.from_block); + + let total_loaded_filters = total_event_filters - cache_hits; + let load_limit_reached = total_loaded_filters > max_event_filters_to_load; + // There are no event filters in the database yet or the loaded ones // don't cover the requested range. let should_include_running = event_filters .last() - .map_or(true, |a| end_block > a.to_block); - - let total_event_filters = total_filters_stmt.query_row( - named_params![ - ":end_block": &end_block, - ":start_block": &start_block, - ], - |row| row.get::<_, u64>(0), - )?; - let load_limit_reached = total_event_filters > max_event_filters_to_load.get() as u64; + .map_or(true, |last| end_block > last.to_block); if should_include_running && !load_limit_reached { let running_event_filter = self.running_event_filter.lock().unwrap(); - event_filters.push(running_event_filter.filter.clone()); + event_filters.push(Arc::new(running_event_filter.filter.clone())); } Ok((event_filters, load_limit_reached)) @@ -552,24 +544,30 @@ impl BloomFilter { } } -#[derive(Debug)] pub(crate) struct RunningEventFilter { filter: AggregateBloom, next_block: BlockNumber, } -/// Reconstruct the [event filter](RunningEventFilter) for the -/// range of blocks between the last stored `to_block` in the event -/// filter table and the last overall block in the database. This is needed -/// because the aggregate event filter for each [block -/// range](crate::bloom::AggregateBloom::BLOCK_RANGE_LEN) is stored once the -/// range is complete, before that it is kept in memory and can be lost upon -/// shutdown. -pub(crate) fn reconstruct_running_event_filter( +/// Rebuild the [event filter](RunningEventFilter) for the range of blocks +/// between the last stored `to_block` in the event filter table and the last +/// overall block in the database. This is needed because the aggregate event +/// filter for each [block range](crate::bloom::AggregateBloom::BLOCK_RANGE_LEN) +/// is stored once the range is complete, before that it is kept in memory and +/// can be lost upon shutdown. +pub(crate) fn rebuild_running_event_filter( tx: &rusqlite::Transaction<'_>, ) -> anyhow::Result { use super::transaction; + let mut latest_stmt = tx.prepare( + r" + SELECT number + FROM canonical_blocks + ORDER BY number + DESC LIMIT 1 + ", + )?; let mut last_to_block_stmt = tx.prepare( r" SELECT to_block @@ -577,7 +575,7 @@ pub(crate) fn reconstruct_running_event_filter( ORDER BY from_block DESC LIMIT 1 ", )?; - let mut events_to_reconstruct_stmt = tx.prepare( + let mut load_events_stmt = tx.prepare( r" SELECT events FROM transactions @@ -585,23 +583,63 @@ pub(crate) fn reconstruct_running_event_filter( ", )?; + let Some(latest) = latest_stmt + .query_row([], |row| row.get_block_number(0)) + .optional() + .context("Querying latest block number")? + else { + // Empty DB, there is nothing to rebuild. + return Ok(RunningEventFilter { + filter: AggregateBloom::new(BlockNumber::GENESIS), + next_block: BlockNumber::GENESIS, + }); + }; let last_to_block = last_to_block_stmt .query_row([], |row| row.get::<_, u64>(0)) .optional() .context("Querying last stored event filter to_block")?; let first_running_event_filter_block = match last_to_block { + // Last stored block was at the end of the running event filter range, no need + // to rebuild. + Some(last_to_block) if last_to_block == latest.get() => { + let next_block = latest + 1; + + return Ok(RunningEventFilter { + filter: AggregateBloom::new(next_block), + next_block, + }); + } Some(last_to_block) => BlockNumber::new_or_panic(last_to_block + 1), - // Event filter table is empty -> reconstruct running filter - // from the genesis block. + // Event filter table is empty, rebuild running filter from the genesis block. None => BlockNumber::GENESIS, }; - let events_to_reconstruct: Vec>>> = events_to_reconstruct_stmt + let total_blocks_to_cover = latest.get() - first_running_event_filter_block.get(); + let mut covered_blocks = 0; + let mut last_progress_report = Instant::now(); + + tracing::info!( + "Rebuilding running event filter: 0.00% (0/{}) blocks covered", + total_blocks_to_cover + ); + let rebuilt_filters: Vec> = load_events_stmt .query_and_then( named_params![":first_running_event_filter_block": &first_running_event_filter_block], |row| { - let events: Option = row + if last_progress_report.elapsed().as_secs() >= 3 { + tracing::info!( + "Rebuilding running event filter: {:.2}% ({}/{}) blocks covered", + covered_blocks as f64 / total_blocks_to_cover as f64 * 100.0, + covered_blocks, + total_blocks_to_cover + ); + last_progress_report = Instant::now(); + } + + covered_blocks += 1; + + let Some(events) = row .get_optional_blob(0)? .map(|events_blob| -> anyhow::Result<_> { let events = transaction::compression::decompress_events(events_blob) @@ -613,40 +651,49 @@ pub(crate) fn reconstruct_running_event_filter( Ok(events) }) - .transpose()?; - - Ok(events.map(|events| { - events - .events() - .into_iter() - .map(|e| e.into_iter().map(Into::into).collect()) - .collect() - })) + .transpose()? + .map(|efb| { + efb.events() + .into_iter() + .flatten() + .map(Event::from) + .collect::>() + }) + else { + return Ok(None); + }; + + let mut bloom = BloomFilter::new(); + for event in events { + bloom.set_keys(&event.keys); + bloom.set_address(&event.from_address); + } + + Ok(Some(bloom)) }, ) - .context("Querying events to reconstruct")? + .context("Querying events to rebuild")? .collect::>()?; + tracing::info!( + "Rebuilding running event filter: 100.00% ({total}/{total}) blocks covered", + total = total_blocks_to_cover, + ); let mut filter = AggregateBloom::new(first_running_event_filter_block); - for (block, events_for_block) in events_to_reconstruct.iter().enumerate() { - let Some(events) = events_for_block else { + for (block, block_bloom_filter) in rebuilt_filters.iter().enumerate() { + let Some(bloom) = block_bloom_filter else { + // Reached the end of P2P (checkpoint) synced events. break; }; - let mut bloom = BloomFilter::new(); - for event in events.iter().flatten() { - bloom.set_keys(&event.keys); - bloom.set_address(&event.from_address); - } - let block_number = first_running_event_filter_block + block as u64; - filter.add_bloom(&bloom, block_number); + filter.add_bloom(bloom, block_number); } Ok(RunningEventFilter { filter, - next_block: first_running_event_filter_block + events_to_reconstruct.len() as u64, + next_block: first_running_event_filter_block + rebuilt_filters.len() as u64, }) } diff --git a/crates/storage/src/connection/transaction.rs b/crates/storage/src/connection/transaction.rs index 9e320bd6dd..a3b037a967 100644 --- a/crates/storage/src/connection/transaction.rs +++ b/crates/storage/src/connection/transaction.rs @@ -99,14 +99,11 @@ impl Transaction<'_> { transactions: &[(StarknetTransaction, Receipt)], events: Option<&[Vec]>, ) -> anyhow::Result<()> { - if transactions.is_empty() { - if let Some(events) = events { - // Advance the running event bloom filter even if there's nothing to add since - // it requires that no blocks are skipped. - self.upsert_block_event_filters(block_number, events.iter().flatten()) - .context("Inserting events into Bloom filter")?; - }; - + if let Some(events) = events { + self.upsert_block_event_filters(block_number, events.iter().flatten()) + .context("Inserting events into Bloom filter")?; + } + if transactions.is_empty() && events.map_or(true, |evts| evts.is_empty()) { return Ok(()); } @@ -150,20 +147,19 @@ impl Transaction<'_> { compression::compress_transactions(&transactions_with_receipts) .context("Compressing transaction")?; - let encoded_events = match events { - Some(events) => { + let encoded_events = events + .map(|evts| { let events = dto::EventsForBlock::V0 { - events: events + events: evts .iter() - .map(|events| events.iter().cloned().map(Into::into).collect()) + .map(|evts| evts.iter().cloned().map(Into::into).collect()) .collect(), }; let events = bincode::serde::encode_to_vec(events, bincode::config::standard()) .context("Serializing events")?; - Some(compression::compress_events(&events).context("Compressing events")?) - } - None => None, - }; + compression::compress_events(&events).context("Compressing events") + }) + .transpose()?; insert_transaction_stmt .execute(named_params![ @@ -173,12 +169,6 @@ impl Transaction<'_> { ]) .context("Inserting transaction data")?; - if let Some(events) = events { - let events = events.iter().flatten(); - self.upsert_block_event_filters(block_number, events) - .context("Inserting events into Bloom filter")?; - } - Ok(()) } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 6ba98a8f5b..8ef4a945cb 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -6,6 +6,7 @@ mod prelude; mod bloom; +use bloom::AggregateBloomCache; pub use bloom::BLOCK_RANGE_LEN; mod connection; pub mod fake; @@ -91,6 +92,7 @@ struct Inner { /// Uses [`Arc`] to allow _shallow_ [Storage] cloning database_path: Arc, pool: Pool, + event_filter_cache: Arc, running_event_filter: Arc>, trie_prune_mode: TriePruneMode, } @@ -98,6 +100,7 @@ struct Inner { pub struct StorageManager { database_path: PathBuf, journal_mode: JournalMode, + event_filter_cache: Arc, running_event_filter: Arc>, trie_prune_mode: TriePruneMode, } @@ -129,6 +132,7 @@ impl StorageManager { Ok(Storage(Inner { database_path: Arc::new(self.database_path.clone()), pool, + event_filter_cache: self.event_filter_cache.clone(), running_event_filter: self.running_event_filter.clone(), trie_prune_mode: self.trie_prune_mode, })) @@ -149,7 +153,7 @@ impl StorageManager { pub struct StorageBuilder { database_path: PathBuf, journal_mode: JournalMode, - bloom_filter_cache_size: usize, + event_filter_cache_size: usize, trie_prune_mode: Option, } @@ -158,7 +162,7 @@ impl StorageBuilder { Self { database_path, journal_mode: JournalMode::WAL, - bloom_filter_cache_size: 16, + event_filter_cache_size: 16, trie_prune_mode: None, } } @@ -168,8 +172,8 @@ impl StorageBuilder { self } - pub fn bloom_filter_cache_size(mut self, bloom_filter_cache_size: usize) -> Self { - self.bloom_filter_cache_size = bloom_filter_cache_size; + pub fn event_filter_cache_size(mut self, event_filter_cache_size: usize) -> Self { + self.event_filter_cache_size = event_filter_cache_size; self } @@ -309,9 +313,8 @@ impl StorageBuilder { tracing::info!("Merkle trie pruning disabled"); } - let running_event_filter = - event::reconstruct_running_event_filter(&connection.transaction()?) - .context("Reconstructing running event filter")?; + let running_event_filter = event::rebuild_running_event_filter(&connection.transaction()?) + .context("Rebuilding running event filter")?; connection .close() @@ -321,6 +324,9 @@ impl StorageBuilder { Ok(StorageManager { database_path: self.database_path, journal_mode: self.journal_mode, + event_filter_cache: Arc::new(AggregateBloomCache::with_size( + self.event_filter_cache_size, + )), running_event_filter: Arc::new(Mutex::new(running_event_filter)), trie_prune_mode, }) @@ -392,6 +398,7 @@ impl Storage { let conn = self.0.pool.get()?; Ok(Connection::new( conn, + self.0.event_filter_cache.clone(), self.0.running_event_filter.clone(), self.0.trie_prune_mode, )) @@ -446,6 +453,10 @@ fn setup_connection( } }; + // Register the rarray module on the connection. + // See: https://docs.rs/rusqlite/0.29.0/rusqlite/vtab/array/index.html + rusqlite::vtab::array::load_module(connection)?; + Ok(()) } @@ -666,7 +677,7 @@ mod tests { } #[test] - fn running_event_filter_reconstructed_after_shutdown() { + fn running_event_filter_rebuilt_after_shutdown() { use std::num::NonZeroUsize; use std::sync::LazyLock; diff --git a/crates/storage/src/schema/revision_0066.rs b/crates/storage/src/schema/revision_0066.rs index a748ed8622..6cfa73ea36 100644 --- a/crates/storage/src/schema/revision_0066.rs +++ b/crates/storage/src/schema/revision_0066.rs @@ -67,6 +67,10 @@ fn migrate_event_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { let mut migrated_count: u64 = 0; let mut last_progress_report = Instant::now(); + tracing::info!( + "Migrating event Bloom filters: 0.00% (0/{})", + bloom_filter_count + ); while let Some(bloom_filter) = bloom_filters.next().transpose()? { let current_block = BlockNumber::new_or_panic(migrated_count); @@ -96,6 +100,10 @@ fn migrate_event_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { last_progress_report = Instant::now(); } } + tracing::info!( + "Migrating event Bloom filters: 100.00% ({count}/{count})", + count = bloom_filter_count, + ); Ok(()) }