From 36fea801d80ff6eeffe9140b596405913a9562ce Mon Sep 17 00:00:00 2001 From: sistemd Date: Fri, 6 Dec 2024 12:49:08 +0100 Subject: [PATCH] remove individual bloom filter usage - Remove `aggregate_bloom` feature flag. - Remove old Bloom filter load limit CLI parameter. - Rename aggregate Bloom event filter table to `event_filters`. --- crates/pathfinder/Cargo.toml | 1 - .../pathfinder/src/bin/pathfinder/config.rs | 27 +- crates/pathfinder/src/bin/pathfinder/main.rs | 5 +- crates/pathfinder/src/state/sync.rs | 3 +- crates/pathfinder/src/sync/checkpoint.rs | 22 +- crates/pathfinder/src/sync/events.rs | 26 +- crates/rpc/Cargo.toml | 1 - crates/rpc/fixtures/mainnet.sqlite | Bin 417792 -> 417792 bytes crates/rpc/src/context.rs | 8 +- crates/rpc/src/jsonrpc/router/subscription.rs | 4 +- crates/rpc/src/method/get_events.rs | 55 +- crates/rpc/src/method/subscribe_events.rs | 27 +- crates/rpc/src/method/subscribe_new_heads.rs | 4 +- .../method/subscribe_pending_transactions.rs | 4 +- .../method/subscribe_transaction_status.rs | 4 +- .../v06/method/trace_block_transactions.rs | 10 +- .../rpc/src/v06/method/trace_transaction.rs | 10 +- crates/storage/Cargo.toml | 3 - crates/storage/src/bloom.rs | 170 +-- crates/storage/src/connection.rs | 20 +- crates/storage/src/connection/block.rs | 21 +- crates/storage/src/connection/event.rs | 1092 +++++------------ crates/storage/src/connection/transaction.rs | 31 +- crates/storage/src/lib.rs | 72 +- crates/storage/src/schema.rs | 2 - crates/storage/src/schema/revision_0066.rs | 14 +- 26 files changed, 366 insertions(+), 1270 deletions(-) diff --git a/crates/pathfinder/Cargo.toml b/crates/pathfinder/Cargo.toml index 2c5231b8fe..8f4468d4a1 100644 --- a/crates/pathfinder/Cargo.toml +++ b/crates/pathfinder/Cargo.toml @@ -14,7 +14,6 @@ path = "src/lib.rs" [features] tokio-console = ["console-subscriber", "tokio/tracing"] p2p = [] -aggregate_bloom = ["pathfinder-storage/aggregate_bloom", "pathfinder-rpc/aggregate_bloom"] [dependencies] anyhow = { workspace = true } diff --git a/crates/pathfinder/src/bin/pathfinder/config.rs b/crates/pathfinder/src/bin/pathfinder/config.rs index c8eb2f00b8..577b184006 100644 --- a/crates/pathfinder/src/bin/pathfinder/config.rs +++ b/crates/pathfinder/src/bin/pathfinder/config.rs @@ -275,24 +275,14 @@ This should only be enabled for debugging purposes as it adds substantial proces get_events_max_blocks_to_scan: std::num::NonZeroUsize, #[arg( - long = "rpc.get-events-max-uncached-bloom-filters-to-load", - long_help = "The number of Bloom filters to load for events when querying for events. \ - This limit is used to prevent queries from taking too long.", - env = "PATHFINDER_RPC_GET_EVENTS_MAX_UNCACHED_BLOOM_FILTERS_TO_LOAD", - default_value = "100000" - )] - get_events_max_uncached_bloom_filters_to_load: std::num::NonZeroUsize, - - #[cfg(feature = "aggregate_bloom")] - #[arg( - long = "rpc.get-events-max-bloom-filters-to-load", - long_help = format!("The number of Bloom filters to load for events when querying for events. \ + long = "rpc.get-events-max-event-filters-to-load", + long_help = format!("The number of aggregate Bloom filters to load for events when querying for events. \ Each filter covers a {} block range. \ This limit is used to prevent queries from taking too long.", pathfinder_storage::BLOCK_RANGE_LEN), - env = "PATHFINDER_RPC_GET_EVENTS_MAX_BLOOM_FILTERS_TO_LOAD", + env = "PATHFINDER_RPC_GET_EVENTS_MAX_EVENT_FILTERS_TO_LOAD", default_value = "3" )] - get_events_max_bloom_filters_to_load: std::num::NonZeroUsize, + get_events_max_event_filters_to_load: std::num::NonZeroUsize, #[arg( long = "storage.state-tries", @@ -724,9 +714,7 @@ pub struct Config { pub gateway_timeout: Duration, pub event_bloom_filter_cache_size: NonZeroUsize, pub get_events_max_blocks_to_scan: NonZeroUsize, - pub get_events_max_uncached_bloom_filters_to_load: NonZeroUsize, - #[cfg(feature = "aggregate_bloom")] - pub get_events_max_bloom_filters_to_load: NonZeroUsize, + pub get_events_max_event_filters_to_load: NonZeroUsize, pub state_tries: Option, pub custom_versioned_constants: Option, pub feeder_gateway_fetch_concurrency: NonZeroUsize, @@ -1016,10 +1004,7 @@ impl Config { gateway_api_key: cli.gateway_api_key, event_bloom_filter_cache_size: cli.event_bloom_filter_cache_size, get_events_max_blocks_to_scan: cli.get_events_max_blocks_to_scan, - get_events_max_uncached_bloom_filters_to_load: cli - .get_events_max_uncached_bloom_filters_to_load, - #[cfg(feature = "aggregate_bloom")] - get_events_max_bloom_filters_to_load: cli.get_events_max_bloom_filters_to_load, + get_events_max_event_filters_to_load: cli.get_events_max_event_filters_to_load, gateway_timeout: Duration::from_secs(cli.gateway_timeout.get()), feeder_gateway_fetch_concurrency: cli.feeder_gateway_fetch_concurrency, state_tries: cli.state_tries, diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index 641b71a07c..6e8c3afc5f 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -217,10 +217,7 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst let rpc_config = pathfinder_rpc::context::RpcConfig { batch_concurrency_limit: config.rpc_batch_concurrency_limit, get_events_max_blocks_to_scan: config.get_events_max_blocks_to_scan, - get_events_max_uncached_bloom_filters_to_load: config - .get_events_max_uncached_bloom_filters_to_load, - #[cfg(feature = "aggregate_bloom")] - get_events_max_bloom_filters_to_load: config.get_events_max_bloom_filters_to_load, + get_events_max_event_filters_to_load: config.get_events_max_event_filters_to_load, custom_versioned_constants: config.custom_versioned_constants.take(), }; diff --git a/crates/pathfinder/src/state/sync.rs b/crates/pathfinder/src/state/sync.rs index b2702eefba..9bd7f107a2 100644 --- a/crates/pathfinder/src/state/sync.rs +++ b/crates/pathfinder/src/state/sync.rs @@ -1080,10 +1080,9 @@ async fn l2_reorg( head -= 1; } - #[cfg(feature = "aggregate_bloom")] transaction .reconstruct_running_event_filter() - .context("Reconstructing running aggregate bloom")?; + .context("Reconstructing running event filter after purge")?; // Track combined L1 and L2 state. let l1_l2_head = transaction.l1_l2_pointer().context("Query L1-L2 head")?; diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index 740427f8e2..7decbff4cf 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -279,30 +279,11 @@ where #[tracing::instrument(level = "debug", skip(self))] async fn sync_events(&self, stop: BlockNumber) -> Result<(), SyncError> { let Some(start) = events::next_missing(self.storage.clone(), stop) - .await .context("Finding next block with missing events")? else { return Ok(()); }; - // TODO: - // Replace `start` with the code below once individual aggregate filters - // are removed. - #[cfg(feature = "aggregate_bloom")] - { - if let Some(start_aggregate) = - events::next_missing_aggregate(self.storage.clone(), stop)? - { - if start_aggregate != start { - tracing::error!( - "Running event filter block mismatch. Expected: {}, got: {}", - start, - start_aggregate - ); - } - } - } - let event_stream = self.p2p.clone().event_stream( start, stop, @@ -686,10 +667,9 @@ async fn rollback_to_anchor( head -= 1; } - #[cfg(feature = "aggregate_bloom")] transaction .reconstruct_running_event_filter() - .context("Reconstructing running aggregate bloom")?; + .context("Reconstructing running event filter after purge")?; Ok(()) }) diff --git a/crates/pathfinder/src/sync/events.rs b/crates/pathfinder/src/sync/events.rs index 6726f77f18..f9b11c20d2 100644 --- a/crates/pathfinder/src/sync/events.rs +++ b/crates/pathfinder/src/sync/events.rs @@ -27,31 +27,7 @@ use crate::sync::stream::ProcessStage; /// Returns the first block number whose events are missing in storage, counting /// from genesis -pub(super) async fn next_missing( - storage: Storage, - head: BlockNumber, -) -> anyhow::Result> { - spawn_blocking(move || { - let mut db = storage - .connection() - .context("Creating database connection")?; - let db = db.transaction().context("Creating database transaction")?; - - if let Some(highest) = db - .highest_block_with_all_events_downloaded() - .context("Querying highest block with events")? - { - Ok((highest < head).then_some(highest + 1)) - } else { - Ok(Some(BlockNumber::GENESIS)) - } - }) - .await - .context("Joining blocking task")? -} - -#[cfg(feature = "aggregate_bloom")] -pub(super) fn next_missing_aggregate( +pub(super) fn next_missing( storage: Storage, head: BlockNumber, ) -> anyhow::Result> { diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 228efaf67b..a5a1b33b5e 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -8,7 +8,6 @@ rust-version = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -aggregate_bloom = [] [dependencies] anyhow = { workspace = true } diff --git a/crates/rpc/fixtures/mainnet.sqlite b/crates/rpc/fixtures/mainnet.sqlite index 0a5316c3fbb3d6ae1168d27f454e80fa1c0e83f4..17b3c9348bc4dd0bfca08db64bc87853c353bf96 100644 GIT binary patch delta 254 zcmZoTAlYz0a)Pv=G6Mrc8W39oF(VLnOw=)ERBlYzn!vcQo}XtngBZgL2L1-V+kAGs z3Dbod7*!cxOkU`qxjnvt(T0ZwsN-n+{2s>b^Lv=ujak?eI6}5-9bn>NVPe+VywE{| z*`={CgI!!*ov~T5Brz!`HMJ}?uOvP#Gp8iAs2G*w9OUX4;;Inh=;Y(7fFd=$<~Y*^ zCdRRYWCj3k CxKJ$s delta 255 zcmZoTAlYz0a)Pv=JOcwm8W6)k>qH%6M)}5stqF_^>jioVd69$0_V@-y8y=QMCujEd`8|x==l3wR8?&(e;n3Nxb%2S7g^B6J_BRcTN0|7V zbTin+{rwr6Jxdaka#E)s>|qq=icd^WFG@{MEP?R35&YEYH+vXWkQFRB&a{CoF_TTa z)PK6&X(kD<2|NfL@rLMTZtpw6RJ5I)7wCL0et~xO24)~;+0Nd;%5q^dcfwbG01X~d ARsaA1 diff --git a/crates/rpc/src/context.rs b/crates/rpc/src/context.rs index 405f5a9e69..3aba7090fd 100644 --- a/crates/rpc/src/context.rs +++ b/crates/rpc/src/context.rs @@ -19,9 +19,7 @@ use tokio::sync::watch as tokio_watch; pub struct RpcConfig { pub batch_concurrency_limit: NonZeroUsize, pub get_events_max_blocks_to_scan: NonZeroUsize, - pub get_events_max_uncached_bloom_filters_to_load: NonZeroUsize, - #[cfg(feature = "aggregate_bloom")] - pub get_events_max_bloom_filters_to_load: NonZeroUsize, + pub get_events_max_event_filters_to_load: NonZeroUsize, pub custom_versioned_constants: Option, } @@ -122,9 +120,7 @@ impl RpcContext { let config = RpcConfig { batch_concurrency_limit: NonZeroUsize::new(8).unwrap(), get_events_max_blocks_to_scan: NonZeroUsize::new(1000).unwrap(), - get_events_max_uncached_bloom_filters_to_load: NonZeroUsize::new(1000).unwrap(), - #[cfg(feature = "aggregate_bloom")] - get_events_max_bloom_filters_to_load: NonZeroUsize::new(1000).unwrap(), + get_events_max_event_filters_to_load: NonZeroUsize::new(1000).unwrap(), custom_versioned_constants: None, }; diff --git a/crates/rpc/src/jsonrpc/router/subscription.rs b/crates/rpc/src/jsonrpc/router/subscription.rs index b293351d82..5b20b17ed7 100644 --- a/crates/rpc/src/jsonrpc/router/subscription.rs +++ b/crates/rpc/src/jsonrpc/router/subscription.rs @@ -1026,9 +1026,7 @@ mod tests { config: RpcConfig { batch_concurrency_limit: 1.try_into().unwrap(), get_events_max_blocks_to_scan: 1.try_into().unwrap(), - get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(), - #[cfg(feature = "aggregate_bloom")] - get_events_max_bloom_filters_to_load: 1.try_into().unwrap(), + get_events_max_event_filters_to_load: 1.try_into().unwrap(), custom_versioned_constants: None, }, }; diff --git a/crates/rpc/src/method/get_events.rs b/crates/rpc/src/method/get_events.rs index 2fc15ca540..944d412250 100644 --- a/crates/rpc/src/method/get_events.rs +++ b/crates/rpc/src/method/get_events.rs @@ -208,7 +208,7 @@ pub async fn get_events( None => (from_block, 0), }; - let filter = pathfinder_storage::EventFilter { + let constraints = pathfinder_storage::EventConstraints { from_block, to_block, contract_address: request.address, @@ -217,66 +217,17 @@ pub async fn get_events( offset: requested_offset, }; - // TODO: - // Instrumentation and `AggregateBloom` version of fetching events - // for the given `EventFilter` are under a feature flag for now and - // we do not execute them during testing because they would only - // slow the tests down and would not have any impact on their outcome. - // Follow-up PR will use the `AggregateBloom` logic to create the output, - // then the conditions will be removed. - - #[cfg(all(feature = "aggregate_bloom", not(test)))] - let start = std::time::Instant::now(); - let page = transaction .events( - &filter, + &constraints, context.config.get_events_max_blocks_to_scan, - context.config.get_events_max_uncached_bloom_filters_to_load, + context.config.get_events_max_event_filters_to_load, ) .map_err(|e| match e { EventFilterError::Internal(e) => GetEventsError::Internal(e), EventFilterError::PageSizeTooSmall => GetEventsError::Custom(e.into()), })?; - #[cfg(all(feature = "aggregate_bloom", not(test)))] - { - let elapsed = start.elapsed(); - - tracing::info!( - "Getting events (individual Bloom filters) took {:?}", - elapsed - ); - - let start = std::time::Instant::now(); - let page_from_aggregate = transaction - .events_from_aggregate( - &filter, - context.config.get_events_max_blocks_to_scan, - context.config.get_events_max_bloom_filters_to_load, - ) - .map_err(|e| match e { - EventFilterError::Internal(e) => GetEventsError::Internal(e), - EventFilterError::PageSizeTooSmall => GetEventsError::Custom(e.into()), - })?; - let elapsed = start.elapsed(); - - tracing::info!( - "Getting events (aggregate Bloom filters) took {:?}", - elapsed - ); - - if page != page_from_aggregate { - tracing::error!( - "Page of events from individual and aggregate bloom filters does not match!" - ); - tracing::error!("Individual: {:?}", page); - tracing::error!("Aggregate: {:?}", page_from_aggregate); - } else { - tracing::info!("Page of events from individual and aggregate bloom filters match!"); - } - } - let mut events = GetEventsResult { events: page.events.into_iter().map(|e| e.into()).collect(), continuation_token: page.continuation_token.map(|token| { diff --git a/crates/rpc/src/method/subscribe_events.rs b/crates/rpc/src/method/subscribe_events.rs index f1b9e9a8a7..d148eae5c0 100644 --- a/crates/rpc/src/method/subscribe_events.rs +++ b/crates/rpc/src/method/subscribe_events.rs @@ -108,33 +108,10 @@ impl RpcSubscriptionFlow for SubscribeEvents { from, to, params.from_address, - #[cfg(feature = "aggregate_bloom")] - params.keys.clone().unwrap_or_default(), - #[cfg(not(feature = "aggregate_bloom"))] params.keys.unwrap_or_default(), ) .map_err(RpcError::InternalError)?; - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = db - .events_in_range_aggregate( - from, - to, - params.from_address, - params.keys.unwrap_or_default(), - ) - .unwrap(); - - assert_eq!(events.0.len(), events_from_aggregate.0.len()); - for (event, event_from_aggregate) in - events.0.iter().zip(events_from_aggregate.0.iter()) - { - assert_eq!(event, event_from_aggregate); - } - assert_eq!(events.1, events_from_aggregate.1); - } - Ok(events) }) .await @@ -761,9 +738,7 @@ mod tests { config: RpcConfig { batch_concurrency_limit: 64.try_into().unwrap(), get_events_max_blocks_to_scan: 1024.try_into().unwrap(), - get_events_max_uncached_bloom_filters_to_load: 1024.try_into().unwrap(), - #[cfg(feature = "aggregate_bloom")] - get_events_max_bloom_filters_to_load: 1.try_into().unwrap(), + get_events_max_event_filters_to_load: 1.try_into().unwrap(), custom_versioned_constants: None, }, }; diff --git a/crates/rpc/src/method/subscribe_new_heads.rs b/crates/rpc/src/method/subscribe_new_heads.rs index 13536378f2..182e2ae75d 100644 --- a/crates/rpc/src/method/subscribe_new_heads.rs +++ b/crates/rpc/src/method/subscribe_new_heads.rs @@ -547,9 +547,7 @@ mod tests { config: RpcConfig { batch_concurrency_limit: 1.try_into().unwrap(), get_events_max_blocks_to_scan: 1.try_into().unwrap(), - get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(), - #[cfg(feature = "aggregate_bloom")] - get_events_max_bloom_filters_to_load: 1.try_into().unwrap(), + get_events_max_event_filters_to_load: 1.try_into().unwrap(), custom_versioned_constants: None, }, }; diff --git a/crates/rpc/src/method/subscribe_pending_transactions.rs b/crates/rpc/src/method/subscribe_pending_transactions.rs index 9fe150c366..d0528ad52b 100644 --- a/crates/rpc/src/method/subscribe_pending_transactions.rs +++ b/crates/rpc/src/method/subscribe_pending_transactions.rs @@ -492,9 +492,7 @@ mod tests { config: RpcConfig { batch_concurrency_limit: 1.try_into().unwrap(), get_events_max_blocks_to_scan: 1.try_into().unwrap(), - get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(), - #[cfg(feature = "aggregate_bloom")] - get_events_max_bloom_filters_to_load: 1.try_into().unwrap(), + get_events_max_event_filters_to_load: 1.try_into().unwrap(), custom_versioned_constants: None, }, }; diff --git a/crates/rpc/src/method/subscribe_transaction_status.rs b/crates/rpc/src/method/subscribe_transaction_status.rs index b8aca06d25..08d844d2f5 100644 --- a/crates/rpc/src/method/subscribe_transaction_status.rs +++ b/crates/rpc/src/method/subscribe_transaction_status.rs @@ -1168,9 +1168,7 @@ mod tests { config: RpcConfig { batch_concurrency_limit: 1.try_into().unwrap(), get_events_max_blocks_to_scan: 1.try_into().unwrap(), - get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(), - #[cfg(feature = "aggregate_bloom")] - get_events_max_bloom_filters_to_load: 1.try_into().unwrap(), + get_events_max_event_filters_to_load: 1.try_into().unwrap(), custom_versioned_constants: None, }, }; diff --git a/crates/rpc/src/v06/method/trace_block_transactions.rs b/crates/rpc/src/v06/method/trace_block_transactions.rs index fcfd508316..dc3b9bc8e6 100644 --- a/crates/rpc/src/v06/method/trace_block_transactions.rs +++ b/crates/rpc/src/v06/method/trace_block_transactions.rs @@ -643,13 +643,11 @@ pub(crate) mod tests { // Need to avoid skipping blocks for `insert_transaction_data`. (0..619596) - .collect::>() - .chunks(pathfinder_storage::BLOCK_RANGE_LEN as usize) - .map(|range| *range.last().unwrap() as u64) - .for_each(|block| { - let block = BlockNumber::new_or_panic(block); + .step_by(pathfinder_storage::BLOCK_RANGE_LEN as usize) + .for_each(|block: u64| { + let block = BlockNumber::new_or_panic(block.saturating_sub(1)); transaction - .insert_transaction_data(block, &[], None) + .insert_transaction_data(block, &[], Some(&[])) .unwrap(); }); diff --git a/crates/rpc/src/v06/method/trace_transaction.rs b/crates/rpc/src/v06/method/trace_transaction.rs index 3b8c7b565a..e10fc62829 100644 --- a/crates/rpc/src/v06/method/trace_transaction.rs +++ b/crates/rpc/src/v06/method/trace_transaction.rs @@ -315,13 +315,11 @@ pub mod tests { // Need to avoid skipping blocks for `insert_transaction_data`. (0..619596) - .collect::>() - .chunks(pathfinder_storage::BLOCK_RANGE_LEN as usize) - .map(|range| *range.last().unwrap() as u64) - .for_each(|block| { - let block = BlockNumber::new_or_panic(block); + .step_by(pathfinder_storage::BLOCK_RANGE_LEN as usize) + .for_each(|block: u64| { + let block = BlockNumber::new_or_panic(block.saturating_sub(1)); transaction - .insert_transaction_data(block, &[], None) + .insert_transaction_data(block, &[], Some(&[])) .unwrap(); }); diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 06244afd10..a2cbf3234f 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -7,9 +7,6 @@ license = { workspace = true } rust-version = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html -[features] -aggregate_bloom = [] - [dependencies] anyhow = { workspace = true } base64 = { workspace = true } diff --git a/crates/storage/src/bloom.rs b/crates/storage/src/bloom.rs index 3886e311b8..f58afe92bd 100644 --- a/crates/storage/src/bloom.rs +++ b/crates/storage/src/bloom.rs @@ -60,22 +60,10 @@ //! specific set of keys without having to load and check each individual bloom //! filter. -use std::collections::BTreeSet; -use std::sync::{Mutex, MutexGuard}; - use bloomfilter::Bloom; -use cached::{Cached, SizedCache}; -use pathfinder_common::{BlockNumber, ContractAddress, EventKey}; +use pathfinder_common::BlockNumber; use pathfinder_crypto::Felt; -use crate::{EventFilter, ReorgCounter}; - -// We're using the upper 4 bits of the 32 byte representation of a felt -// to store the index of the key in the values set in the Bloom filter. -// This allows for the maximum of 16 keys per event to be stored in the -// filter. -pub const EVENT_KEY_FILTER_LIMIT: usize = 16; - pub const BLOCK_RANGE_LEN: u64 = AggregateBloom::BLOCK_RANGE_LEN; /// An aggregate of all Bloom filters for a given range of blocks. @@ -177,40 +165,7 @@ impl AggregateBloom { /// Returns a set of [block numbers](BlockNumber) for which the given keys /// are present in the aggregate. - pub fn blocks_for_filter(&self, filter: &EventFilter) -> BTreeSet { - // Empty filters are considered present in all blocks. - if filter.contract_address.is_none() && (filter.keys.iter().flatten().count() == 0) { - return (self.from_block.get()..=self.to_block.get()) - .map(BlockNumber::new_or_panic) - .collect(); - } - - let mut blocks: BTreeSet<_> = filter - .keys - .iter() - .enumerate() - .flat_map(|(idx, keys)| { - let keys: Vec<_> = keys - .iter() - .map(|key| { - let mut key_with_idx = key.0; - key_with_idx.as_mut_be_bytes()[0] |= (idx as u8) << 4; - key_with_idx - }) - .collect(); - - self.blocks_for_keys(&keys) - }) - .collect(); - - if let Some(contract_address) = filter.contract_address { - blocks.extend(self.blocks_for_keys(&[contract_address.0])); - } - - blocks - } - - fn blocks_for_keys(&self, keys: &[Felt]) -> Vec { + pub fn blocks_for_keys(&self, keys: &[Felt]) -> Vec { let mut block_matches = vec![]; for k in keys { @@ -313,55 +268,10 @@ impl BloomFilter { self.0.bitmap() } - fn set(&mut self, key: &Felt) { + pub fn set(&mut self, key: &Felt) { self.0.set(key); } - pub fn set_address(&mut self, address: &ContractAddress) { - self.set(&address.0); - } - - pub fn set_keys(&mut self, keys: &[EventKey]) { - for (i, key) in keys.iter().take(EVENT_KEY_FILTER_LIMIT).enumerate() { - let mut key = key.0; - key.as_mut_be_bytes()[0] |= (i as u8) << 4; - self.set(&key); - } - } - - fn check(&self, key: &Felt) -> bool { - self.0.check(key) - } - - fn check_address(&self, address: &ContractAddress) -> bool { - self.check(&address.0) - } - - fn check_keys(&self, keys: &[Vec]) -> bool { - keys.iter().enumerate().all(|(idx, keys)| { - if keys.is_empty() { - return true; - }; - - keys.iter().any(|key| { - let mut key = key.0; - key.as_mut_be_bytes()[0] |= (idx as u8) << 4; - tracing::trace!(%idx, %key, "Checking key in filter"); - self.check(&key) - }) - }) - } - - pub fn check_filter(&self, filter: &crate::EventFilter) -> bool { - if let Some(contract_address) = filter.contract_address { - if !self.check_address(&contract_address) { - return false; - } - } - - self.check_keys(&filter.keys) - } - // Workaround to get the indices of the keys in the filter. // Needed because the `bloomfilter` crate doesn't provide a // way to get this information. @@ -381,34 +291,6 @@ impl BloomFilter { } } -type CacheKey = (crate::ReorgCounter, BlockNumber); -pub(crate) struct Cache(Mutex>); - -impl Cache { - pub fn with_size(size: usize) -> Self { - Self(Mutex::new(SizedCache::with_size(size))) - } - - fn locked_cache(&self) -> MutexGuard<'_, SizedCache> { - self.0.lock().unwrap() - } - - pub fn get( - &self, - reorg_counter: ReorgCounter, - block_number: BlockNumber, - ) -> Option { - self.locked_cache() - .cache_get(&(reorg_counter, block_number)) - .cloned() - } - - pub fn set(&self, reorg_counter: ReorgCounter, block_number: BlockNumber, bloom: BloomFilter) { - self.locked_cache() - .cache_set((reorg_counter, block_number), bloom); - } -} - #[cfg(test)] mod tests { use pathfinder_common::felt; @@ -422,26 +304,6 @@ mod tests { felt!("0x0218b538681900fad5a0b2ffe1d6781c0c3f14df5d32071ace0bdc9d46cb69ec"); #[test] - fn set_and_check() { - let mut bloom = BloomFilter::new(); - bloom.set(&KEY); - assert!(bloom.check(&KEY)); - assert!(!bloom.check(&KEY_NOT_IN_FILTER)); - } - - #[test] - fn serialize_roundtrip() { - let mut bloom = BloomFilter::new(); - bloom.set(&KEY); - - let bytes = bloom.to_compressed_bytes(); - let bloom = BloomFilter::from_compressed_bytes(&bytes); - assert!(bloom.check(&KEY)); - assert!(!bloom.check(&KEY_NOT_IN_FILTER)); - } - - #[test] - #[cfg(feature = "aggregate_bloom")] fn add_bloom_and_check_single_block_found() { let from_block = BlockNumber::new_or_panic(0); let mut aggregate_bloom_filter = AggregateBloom::new(from_block); @@ -454,21 +316,9 @@ mod tests { let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); assert_eq!(block_matches, vec![from_block]); - - let filter = EventFilter { - keys: vec![vec![EventKey(KEY)]], - contract_address: None, - ..Default::default() - }; - let block_matches: Vec<_> = aggregate_bloom_filter - .blocks_for_filter(&filter) - .into_iter() - .collect(); - assert_eq!(block_matches, vec![from_block]); } #[test] - #[cfg(feature = "aggregate_bloom")] fn add_blooms_and_check_multiple_blocks_found() { let from_block = BlockNumber::new_or_panic(0); let mut aggregate_bloom_filter = AggregateBloom::new(from_block); @@ -481,21 +331,9 @@ mod tests { let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]); assert_eq!(block_matches, vec![from_block, from_block + 1]); - - let filter = EventFilter { - keys: vec![vec![EventKey(KEY)]], - contract_address: None, - ..Default::default() - }; - let block_matches: Vec<_> = aggregate_bloom_filter - .blocks_for_filter(&filter) - .into_iter() - .collect(); - assert_eq!(block_matches, vec![from_block, from_block + 1]); } #[test] - #[cfg(feature = "aggregate_bloom")] fn key_not_in_filter_returns_empty_vec() { let from_block = BlockNumber::new_or_panic(0); let mut aggregate_bloom_filter = AggregateBloom::new(from_block); @@ -512,7 +350,6 @@ mod tests { } #[test] - #[cfg(feature = "aggregate_bloom")] fn serialize_aggregate_roundtrip() { let from_block = BlockNumber::new_or_panic(0); let mut aggregate_bloom_filter = AggregateBloom::new(from_block); @@ -541,7 +378,6 @@ mod tests { } #[test] - #[cfg(feature = "aggregate_bloom")] #[should_panic] fn invalid_insert_pos() { let from_block = BlockNumber::new_or_panic(0); diff --git a/crates/storage/src/connection.rs b/crates/storage/src/connection.rs index 56eeb467e9..9974455d6e 100644 --- a/crates/storage/src/connection.rs +++ b/crates/storage/src/connection.rs @@ -1,6 +1,4 @@ -use std::sync::Arc; -#[cfg(feature = "aggregate_bloom")] -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; mod block; mod class; @@ -13,11 +11,10 @@ mod state_update; pub(crate) mod transaction; mod trie; -#[cfg(feature = "aggregate_bloom")] use event::RunningEventFilter; pub use event::{ EmittedEvent, - EventFilter, + EventConstraints, EventFilterError, PageOfEvents, PAGE_SIZE_LIMIT as EVENT_PAGE_SIZE_LIMIT, @@ -35,8 +32,6 @@ type PooledConnection = r2d2::PooledConnection, - #[cfg(feature = "aggregate_bloom")] running_event_filter: Arc>, trie_prune_mode: TriePruneMode, } @@ -44,14 +39,11 @@ pub struct Connection { impl Connection { pub(crate) fn new( connection: PooledConnection, - bloom_filter_cache: Arc, - #[cfg(feature = "aggregate_bloom")] running_event_filter: Arc>, + running_event_filter: Arc>, trie_prune_mode: TriePruneMode, ) -> Self { Self { connection, - bloom_filter_cache, - #[cfg(feature = "aggregate_bloom")] running_event_filter, trie_prune_mode, } @@ -61,8 +53,6 @@ impl Connection { let tx = self.connection.transaction()?; Ok(Transaction { transaction: tx, - bloom_filter_cache: self.bloom_filter_cache.clone(), - #[cfg(feature = "aggregate_bloom")] running_event_filter: self.running_event_filter.clone(), trie_prune_mode: self.trie_prune_mode, }) @@ -75,8 +65,6 @@ impl Connection { let tx = self.connection.transaction_with_behavior(behavior)?; Ok(Transaction { transaction: tx, - bloom_filter_cache: self.bloom_filter_cache.clone(), - #[cfg(feature = "aggregate_bloom")] running_event_filter: self.running_event_filter.clone(), trie_prune_mode: self.trie_prune_mode, }) @@ -85,8 +73,6 @@ impl Connection { pub struct Transaction<'inner> { transaction: rusqlite::Transaction<'inner>, - bloom_filter_cache: Arc, - #[cfg(feature = "aggregate_bloom")] running_event_filter: Arc>, trie_prune_mode: TriePruneMode, } diff --git a/crates/storage/src/connection/block.rs b/crates/storage/src/connection/block.rs index 9e6dcf60c5..fa2bf0f9e9 100644 --- a/crates/storage/src/connection/block.rs +++ b/crates/storage/src/connection/block.rs @@ -116,24 +116,15 @@ impl Transaction<'_> { /// /// This includes block header, block body and state update information. pub fn purge_block(&self, block: BlockNumber) -> anyhow::Result<()> { - #[cfg(feature = "aggregate_bloom")] - { - self.inner() - .execute( - r" - DELETE FROM starknet_events_filters_aggregate - WHERE from_block <= :block AND to_block >= :block - ", - named_params![":block": &block], - ) - .context("Deleting aggregate bloom filter")?; - } self.inner() .execute( - "DELETE FROM starknet_events_filters WHERE block_number = ?", - params![&block], + r" + DELETE FROM event_filters + WHERE from_block <= :block AND to_block >= :block + ", + named_params![":block": &block], ) - .context("Deleting bloom filter")?; + .context("Deleting event bloom filter")?; self.inner() .execute( diff --git a/crates/storage/src/connection/event.rs b/crates/storage/src/connection/event.rs index 82f569dcc8..fc83481918 100644 --- a/crates/storage/src/connection/event.rs +++ b/crates/storage/src/connection/event.rs @@ -1,8 +1,7 @@ +use std::collections::BTreeSet; use std::num::NonZeroUsize; -#[cfg(feature = "aggregate_bloom")] -use anyhow::Context; -use anyhow::Result; +use anyhow::{Context, Result}; use pathfinder_common::event::Event; use pathfinder_common::{ BlockHash, @@ -13,16 +12,18 @@ use pathfinder_common::{ TransactionHash, }; -#[cfg(feature = "aggregate_bloom")] -use crate::bloom::AggregateBloom; -use crate::bloom::BloomFilter; +use crate::bloom::{AggregateBloom, BloomFilter}; use crate::prelude::*; -use crate::ReorgCounter; +// We're using the upper 4 bits of the 32 byte representation of a felt +// to store the index of the key in the values set in the Bloom filter. +// This allows for the maximum of 16 keys per event to be stored in the +// filter. +pub const EVENT_KEY_FILTER_LIMIT: usize = 16; pub const PAGE_SIZE_LIMIT: usize = 1_024; #[derive(Debug, Default)] -pub struct EventFilter { +pub struct EventConstraints { pub from_block: Option, pub to_block: Option, pub contract_address: Option, @@ -68,7 +69,6 @@ pub struct PageOfEvents { } impl Transaction<'_> { - #[cfg(feature = "aggregate_bloom")] pub fn reconstruct_running_event_filter(&self) -> anyhow::Result<()> { let event_filter = reconstruct_running_event_filter(self.inner())?; let mut running_event_filter = self.running_event_filter.lock().unwrap(); @@ -77,21 +77,20 @@ impl Transaction<'_> { Ok(()) } - /// Upsert the [aggregate event bloom filter](AggregateBloom) for the given - /// block number. This function operates under the assumption that + /// Upsert the [running event Bloom filter](RunningEventFilter) for the + /// given block number. This function operates under the assumption that /// blocks are _never_ skipped so even if there are no events for a /// block, this function should still be called with an empty iterator. /// When testing it is fine to skip blocks, as long as the block at the end /// of the current range is not skipped. - #[cfg(feature = "aggregate_bloom")] - pub(super) fn upsert_block_events_aggregate<'a>( + pub(super) fn upsert_block_event_filters<'a>( &self, block_number: BlockNumber, events: impl Iterator, ) -> anyhow::Result<()> { let mut stmt = self.inner().prepare_cached( r" - INSERT INTO starknet_events_filters_aggregate + INSERT INTO event_filters (from_block, to_block, bitmap) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET bitmap=excluded.bitmap @@ -110,7 +109,7 @@ impl Transaction<'_> { running_event_filter.next_block = block_number + 1; // This check is the reason that blocks cannot be skipped, if they were we would - // risk missing the last block of the current aggregate's range. + // risk missing the last block of the running event filter's range. if block_number == running_event_filter.filter.to_block { stmt.execute(params![ &running_event_filter.filter.from_block, @@ -127,28 +126,6 @@ impl Transaction<'_> { Ok(()) } - pub(super) fn upsert_block_events<'a>( - &self, - block_number: BlockNumber, - events: impl Iterator, - ) -> anyhow::Result<()> { - #[rustfmt::skip] - let mut stmt = self.inner().prepare_cached( - "INSERT INTO starknet_events_filters (block_number, bloom) VALUES (?, ?) \ - ON CONFLICT DO UPDATE SET bloom=excluded.bloom", - )?; - - let mut bloom = BloomFilter::new(); - for event in events { - bloom.set_keys(&event.keys); - bloom.set_address(&event.from_address); - } - - stmt.execute(params![&block_number, &bloom.to_compressed_bytes()])?; - - Ok(()) - } - /// Return all of the events in the given block range, filtered by the given /// keys and contract address. Along with the events, return the last /// block number that was scanned, which may be smaller than `to_block` @@ -159,72 +136,6 @@ impl Transaction<'_> { to_block: BlockNumber, contract_address: Option, keys: Vec>, - ) -> anyhow::Result<(Vec, Option)> { - let key_filter_is_empty = keys.iter().flatten().count() == 0; - let reorg_counter = self.reorg_counter()?; - let mut emitted_events = Vec::new(); - let mut block_number = from_block; - let filter = EventFilter { - contract_address, - keys, - page_size: usize::MAX - 1, - ..Default::default() - }; - loop { - // Stop if we're past the last block. - if block_number > to_block { - return Ok((emitted_events, Some(to_block))); - } - - // Check bloom filter - if !key_filter_is_empty || contract_address.is_some() { - let bloom = self.load_bloom(reorg_counter, block_number)?; - match bloom { - Filter::Missing => {} - Filter::Cached(bloom) => { - if !bloom.check_filter(&filter) { - tracing::trace!("Bloom filter did not match"); - block_number += 1; - continue; - } - } - Filter::Loaded(bloom) => { - if !bloom.check_filter(&filter) { - tracing::trace!("Bloom filter did not match"); - block_number += 1; - continue; - } - } - } - } - - match self.scan_block_into( - block_number, - &filter, - key_filter_is_empty, - 0, - &mut emitted_events, - )? { - BlockScanResult::NoSuchBlock if block_number == from_block => { - return Ok((emitted_events, None)); - } - BlockScanResult::NoSuchBlock => { - return Ok((emitted_events, Some(block_number.parent().unwrap()))); - } - BlockScanResult::Done { .. } => {} - } - - block_number += 1; - } - } - - #[cfg(feature = "aggregate_bloom")] - pub fn events_in_range_aggregate( - &self, - from_block: BlockNumber, - to_block: BlockNumber, - contract_address: Option, - keys: Vec>, ) -> anyhow::Result<(Vec, Option)> { let Some(latest_block) = self.block_number(crate::BlockId::Latest)? else { // No blocks in the database @@ -234,22 +145,22 @@ impl Transaction<'_> { return Ok((vec![], None)); } - let filter = EventFilter { + let constraints = EventConstraints { contract_address, keys, page_size: usize::MAX - 1, ..Default::default() }; - let aggregates = self.load_aggregate_bloom_range(from_block, to_block)?; + let event_filters = self.load_event_filter_range(from_block, to_block)?; - let blocks_to_scan = aggregates + let blocks_to_scan = event_filters .iter() - .flat_map(|aggregate| aggregate.blocks_for_filter(&filter)) + .flat_map(|filter| filter.check(&constraints)) .filter(|&block| (from_block..=to_block).contains(&block)); - let key_filter_is_empty = filter.keys.iter().flatten().count() == 0; - let keys: Vec> = filter + let key_filter_is_empty = constraints.keys.iter().flatten().count() == 0; + let keys: Vec> = constraints .keys .iter() .map(|keys| keys.iter().collect()) @@ -273,7 +184,7 @@ impl Transaction<'_> { .flat_map(|(transaction_hash, events)| { events.into_iter().zip(std::iter::repeat(transaction_hash)) }) - .filter(|(event, _)| match filter.contract_address { + .filter(|(event, _)| match constraints.contract_address { Some(address) => event.from_address == address, None => true, }) @@ -316,171 +227,33 @@ impl Transaction<'_> { #[tracing::instrument(skip(self))] pub fn events( &self, - filter: &EventFilter, + constraints: &EventConstraints, max_blocks_to_scan: NonZeroUsize, - max_uncached_bloom_filters_to_load: NonZeroUsize, + max_event_filters_to_load: NonZeroUsize, ) -> Result { - if filter.page_size < 1 { + if constraints.page_size < 1 { return Err(EventFilterError::PageSizeTooSmall); } - let reorg_counter = self.reorg_counter()?; - - let from_block = filter.from_block.unwrap_or(BlockNumber::GENESIS); - let to_block = filter.to_block.unwrap_or(BlockNumber::MAX); - let key_filter_is_empty = filter.keys.iter().flatten().count() == 0; - - let mut emitted_events = Vec::new(); - let mut bloom_filters_loaded: usize = 0; - let mut blocks_scanned: usize = 0; - let mut block_number = from_block; - let mut offset = filter.offset; - - enum ScanResult { - Done, - PageFull, - ContinueFrom(BlockNumber), - } - - let result = loop { - // Stop if we're past the last block. - if block_number > to_block { - break ScanResult::Done; - } - - // Check if we've reached our Bloom filter load limit - if bloom_filters_loaded >= max_uncached_bloom_filters_to_load.get() { - tracing::trace!("Bloom filter limit reached"); - break ScanResult::ContinueFrom(block_number); - } + let from_block = constraints.from_block.unwrap_or(BlockNumber::GENESIS); + let to_block = constraints.to_block.unwrap_or(BlockNumber::MAX); - // Check bloom filter - if !key_filter_is_empty || filter.contract_address.is_some() { - let bloom = self.load_bloom(reorg_counter, block_number)?; - match bloom { - Filter::Missing => {} - Filter::Cached(bloom) => { - if !bloom.check_filter(filter) { - tracing::trace!("Bloom filter did not match"); - block_number += 1; - continue; - } - } - Filter::Loaded(bloom) => { - bloom_filters_loaded += 1; - if !bloom.check_filter(filter) { - tracing::trace!("Bloom filter did not match"); - block_number += 1; - continue; - } - } - } - } - - // Check if we've reached our block scan limit - blocks_scanned += 1; - if blocks_scanned > max_blocks_to_scan.get() { - tracing::trace!("Block scan limit reached"); - break ScanResult::ContinueFrom(block_number); - } - - match self.scan_block_into( - block_number, - filter, - key_filter_is_empty, - offset, - &mut emitted_events, - )? { - BlockScanResult::NoSuchBlock => break ScanResult::Done, - BlockScanResult::Done { new_offset } => { - offset = new_offset; - } - } - - // Stop if we have a page of events plus an extra one to decide if we're on - // the last page. - if emitted_events.len() > filter.page_size { - break ScanResult::PageFull; - } - - block_number += 1; - }; - - match result { - ScanResult::Done => { - return Ok(PageOfEvents { - events: emitted_events, - continuation_token: None, - }) - } - ScanResult::PageFull => { - assert!(emitted_events.len() > filter.page_size); - let continuation_token = continuation_token( - &emitted_events, - ContinuationToken { - block_number: from_block, - offset: filter.offset, - }, - ) - .unwrap(); - emitted_events.truncate(filter.page_size); - - return Ok(PageOfEvents { - events: emitted_events, - continuation_token: Some(ContinuationToken { - block_number: continuation_token.block_number, - // account for the extra event - offset: continuation_token.offset - 1, - }), - }); - } - ScanResult::ContinueFrom(block_number) => { - // We've reached a search limit without filling the page. - // We'll need to continue from the next block. - return Ok(PageOfEvents { - events: emitted_events, - continuation_token: Some(ContinuationToken { - block_number, - offset: 0, - }), - }); - } - } - } - - #[cfg(feature = "aggregate_bloom")] - pub fn events_from_aggregate( - &self, - filter: &EventFilter, - max_blocks_to_scan: NonZeroUsize, - max_bloom_filters_to_load: NonZeroUsize, - ) -> Result { - if filter.page_size < 1 { - return Err(EventFilterError::PageSizeTooSmall); - } + let (event_filters, load_limit_reached) = + self.load_limited_event_filter_range(from_block, to_block, max_event_filters_to_load)?; - let from_block = filter.from_block.unwrap_or(BlockNumber::GENESIS); - let to_block = filter.to_block.unwrap_or(BlockNumber::MAX); - - let (aggregates, load_limit_reached) = self.load_limited_aggregate_bloom_range( - from_block, - to_block, - max_bloom_filters_to_load, - )?; - - let blocks_to_scan = aggregates + let blocks_to_scan = event_filters .iter() - .flat_map(|aggregate| aggregate.blocks_for_filter(filter)) + .flat_map(|filter| filter.check(constraints)) .filter(|&block| (from_block..=to_block).contains(&block)); - let keys: Vec> = filter + let keys: Vec> = constraints .keys .iter() .map(|keys| keys.iter().collect()) .collect(); - let key_filter_is_empty = filter.keys.iter().flatten().count() == 0; - let mut offset = filter.offset; + let key_filter_is_empty = constraints.keys.iter().flatten().count() == 0; + let mut offset = constraints.offset; let mut emitted_events = vec![]; @@ -496,7 +269,7 @@ impl Transaction<'_> { }); } - let events_required = filter.page_size + 1 - emitted_events.len(); + let events_required = constraints.page_size + 1 - emitted_events.len(); tracing::trace!(%block, %events_required, "Processing block"); let Some(block_header) = self.block_header(crate::BlockId::Number(block))? else { @@ -519,7 +292,7 @@ impl Transaction<'_> { .flat_map(|(transaction_hash, events)| { events.into_iter().zip(std::iter::repeat(transaction_hash)) }) - .filter(|(event, _)| match filter.contract_address { + .filter(|(event, _)| match constraints.contract_address { Some(address) => event.from_address == address, None => true, }) @@ -557,17 +330,17 @@ impl Transaction<'_> { // Stop if we have a page of events plus an extra one to decide if we're on // the last page. - if emitted_events.len() > filter.page_size { + if emitted_events.len() > constraints.page_size { let continuation_token = continuation_token( &emitted_events, ContinuationToken { block_number: from_block, - offset: filter.offset, + offset: constraints.offset, }, ) .unwrap(); - emitted_events.truncate(filter.page_size); + emitted_events.truncate(constraints.page_size); return Ok(PageOfEvents { events: emitted_events, @@ -581,7 +354,7 @@ impl Transaction<'_> { } if load_limit_reached { - let last_loaded_block = aggregates + let last_loaded_block = event_filters .last() .expect("At least one filter is present") .to_block; @@ -589,7 +362,7 @@ impl Transaction<'_> { Ok(PageOfEvents { events: emitted_events, continuation_token: Some(ContinuationToken { - // Bloom filter range is inclusive so + 1. + // Event filter block range is inclusive so + 1. block_number: last_loaded_block + 1, offset: 0, }), @@ -601,111 +374,7 @@ impl Transaction<'_> { }) } } - - fn scan_block_into( - &self, - block_number: BlockNumber, - filter: &EventFilter, - key_filter_is_empty: bool, - mut offset: usize, - emitted_events: &mut Vec, - ) -> Result { - let events_required = filter.page_size + 1 - emitted_events.len(); - - tracing::trace!(%block_number, %events_required, "Processing block"); - - let block_header = self.block_header(crate::BlockId::Number(block_number))?; - let Some(block_header) = block_header else { - return Ok(BlockScanResult::NoSuchBlock); - }; - - let events = self.events_for_block(block_number.into())?; - let Some(events) = events else { - return Ok(BlockScanResult::NoSuchBlock); - }; - - let keys: Vec> = filter - .keys - .iter() - .map(|keys| keys.iter().collect()) - .collect(); - - let events = events - .into_iter() - .flat_map(|(transaction_hash, events)| { - events.into_iter().zip(std::iter::repeat(transaction_hash)) - }) - .filter(|(event, _)| match filter.contract_address { - Some(address) => event.from_address == address, - None => true, - }) - .filter(|(event, _)| { - if key_filter_is_empty { - return true; - } - - if event.keys.len() < keys.len() { - return false; - } - - event - .keys - .iter() - .zip(keys.iter()) - .all(|(key, filter)| filter.is_empty() || filter.contains(key)) - }) - .skip_while(|_| { - let skip = offset > 0; - offset = offset.saturating_sub(1); - skip - }) - .take(events_required) - .map(|(event, tx_hash)| EmittedEvent { - data: event.data.clone(), - keys: event.keys.clone(), - from_address: event.from_address, - block_hash: block_header.hash, - block_number: block_header.number, - transaction_hash: tx_hash, - }); - - emitted_events.extend(events); - - Ok(BlockScanResult::Done { new_offset: offset }) - } - - fn load_bloom( - &self, - reorg_counter: ReorgCounter, - block_number: BlockNumber, - ) -> Result { - if let Some(bloom) = self.bloom_filter_cache.get(reorg_counter, block_number) { - return Ok(Filter::Cached(bloom)); - } - - let mut stmt = self - .inner() - .prepare_cached("SELECT bloom FROM starknet_events_filters WHERE block_number = ?")?; - - let bloom = stmt - .query_row(params![&block_number], |row| { - let bytes: Vec = row.get(0)?; - Ok(BloomFilter::from_compressed_bytes(&bytes)) - }) - .optional()?; - - Ok(match bloom { - Some(bloom) => { - self.bloom_filter_cache - .set(reorg_counter, block_number, bloom.clone()); - Filter::Loaded(bloom) - } - None => Filter::Missing, - }) - } - - #[cfg(feature = "aggregate_bloom")] - fn load_aggregate_bloom_range( + fn load_event_filter_range( &self, start_block: BlockNumber, end_block: BlockNumber, @@ -713,13 +382,13 @@ impl Transaction<'_> { let mut stmt = self.inner().prepare_cached( r" SELECT from_block, to_block, bitmap - FROM starknet_events_filters_aggregate + FROM event_filters WHERE from_block <= :end_block AND to_block >= :start_block ORDER BY from_block ", )?; - let mut aggregates = stmt + let mut event_filters = stmt .query_map( named_params![ ":end_block": &end_block, @@ -737,51 +406,52 @@ impl Transaction<'_> { )) }, ) - .context("Querying bloom filter range")? + .context("Querying event filter range")? .collect::, _>>()?; - // There are no aggregates in the database yet or the loaded aggregates + // There are no event filters in the database yet or the loaded ones // don't cover the requested range. - let should_include_running = aggregates.last().map_or(true, |a| end_block > a.to_block); + let should_include_running = event_filters + .last() + .map_or(true, |a| end_block > a.to_block); if should_include_running { let running_event_filter = self.running_event_filter.lock().unwrap(); - aggregates.push(running_event_filter.filter.clone()); + event_filters.push(running_event_filter.filter.clone()); } - Ok(aggregates) + Ok(event_filters) } - #[cfg(feature = "aggregate_bloom")] - fn load_limited_aggregate_bloom_range( + fn load_limited_event_filter_range( &self, start_block: BlockNumber, end_block: BlockNumber, - max_bloom_filters_to_load: NonZeroUsize, + max_event_filters_to_load: NonZeroUsize, ) -> anyhow::Result<(Vec, bool)> { - let mut select_filters_stmt = self.inner().prepare_cached( + let mut event_filters_in_range_stmt = self.inner().prepare_cached( r" SELECT from_block, to_block, bitmap - FROM starknet_events_filters_aggregate + FROM event_filters WHERE from_block <= :end_block AND to_block >= :start_block ORDER BY from_block - LIMIT :max_bloom_filters_to_load + LIMIT :max_event_filters_to_load ", )?; let mut total_filters_stmt = self.inner().prepare_cached( r" SELECT COUNT(*) - FROM starknet_events_filters_aggregate + FROM event_filters WHERE from_block <= :end_block AND to_block >= :start_block ", )?; - let mut aggregates = select_filters_stmt + let mut event_filters = event_filters_in_range_stmt .query_map( named_params![ ":end_block": &end_block, ":start_block": &start_block, - ":max_bloom_filters_to_load": &max_bloom_filters_to_load.get(), + ":max_event_filters_to_load": &max_event_filters_to_load.get(), ], |row| { let from_block = row.get_block_number(0)?; @@ -795,51 +465,102 @@ impl Transaction<'_> { )) }, ) - .context("Querying bloom filter range")? + .context("Querying event filter range")? .collect::, _>>()?; - // There are no aggregates in the database yet or the loaded aggregates + // There are no event filters in the database yet or the loaded ones // don't cover the requested range. - let should_include_running = aggregates.last().map_or(true, |a| end_block > a.to_block); + let should_include_running = event_filters + .last() + .map_or(true, |a| end_block > a.to_block); - let total_aggregate_filters = total_filters_stmt.query_row( + let total_event_filters = total_filters_stmt.query_row( named_params![ ":end_block": &end_block, ":start_block": &start_block, ], |row| row.get::<_, u64>(0), )?; - let load_limit_reached = total_aggregate_filters > max_bloom_filters_to_load.get() as u64; + let load_limit_reached = total_event_filters > max_event_filters_to_load.get() as u64; if should_include_running && !load_limit_reached { let running_event_filter = self.running_event_filter.lock().unwrap(); - aggregates.push(running_event_filter.filter.clone()); + event_filters.push(running_event_filter.filter.clone()); } - Ok((aggregates, load_limit_reached)) + Ok((event_filters, load_limit_reached)) } - #[cfg(feature = "aggregate_bloom")] pub fn next_block_without_events(&self) -> BlockNumber { self.running_event_filter.lock().unwrap().next_block } } -#[cfg(feature = "aggregate_bloom")] +impl AggregateBloom { + /// Returns the block numbers that match the given constraints. + pub fn check(&self, constraints: &EventConstraints) -> BTreeSet { + if constraints.contract_address.is_none() + && (constraints.keys.iter().flatten().count() == 0) + { + // No constraints, return filter's entire block range. + return (self.from_block.get()..=self.to_block.get()) + .map(BlockNumber::new_or_panic) + .collect(); + } + + let mut blocks: BTreeSet<_> = constraints + .keys + .iter() + .enumerate() + .flat_map(|(idx, keys)| { + let keys: Vec<_> = keys + .iter() + .map(|key| { + let mut key_with_idx = key.0; + key_with_idx.as_mut_be_bytes()[0] |= (idx as u8) << 4; + key_with_idx + }) + .collect(); + + self.blocks_for_keys(&keys) + }) + .collect(); + + if let Some(contract_address) = constraints.contract_address { + blocks.extend(self.blocks_for_keys(&[contract_address.0])); + } + + blocks + } +} + +impl BloomFilter { + pub fn set_address(&mut self, address: &ContractAddress) { + self.set(&address.0); + } + + pub fn set_keys(&mut self, keys: &[EventKey]) { + for (i, key) in keys.iter().take(EVENT_KEY_FILTER_LIMIT).enumerate() { + let mut key = key.0; + key.as_mut_be_bytes()[0] |= (i as u8) << 4; + self.set(&key); + } + } +} + #[derive(Debug)] pub(crate) struct RunningEventFilter { filter: AggregateBloom, next_block: BlockNumber, } -/// Reconstruct the [aggregate](crate::bloom::AggregateBloom) for the range of -/// blocks between the last stored `to_block` in the aggregate Bloom filter -/// table and the last overall block in the database. This is needed because the -/// aggregate Bloom filter for each [block +/// Reconstruct the [event filter](RunningEventFilter) for the +/// range of blocks between the last stored `to_block` in the event +/// filter table and the last overall block in the database. This is needed +/// because the aggregate event filter for each [block /// range](crate::bloom::AggregateBloom::BLOCK_RANGE_LEN) is stored once the /// range is complete, before that it is kept in memory and can be lost upon /// shutdown. -#[cfg(feature = "aggregate_bloom")] pub(crate) fn reconstruct_running_event_filter( tx: &rusqlite::Transaction<'_>, ) -> anyhow::Result { @@ -848,7 +569,7 @@ pub(crate) fn reconstruct_running_event_filter( let mut last_to_block_stmt = tx.prepare( r" SELECT to_block - FROM starknet_events_filters_aggregate + FROM event_filters ORDER BY from_block DESC LIMIT 1 ", )?; @@ -856,25 +577,25 @@ pub(crate) fn reconstruct_running_event_filter( r" SELECT events FROM transactions - WHERE block_number >= :first_runnining_event_filter_block + WHERE block_number >= :first_running_event_filter_block ", )?; let last_to_block = last_to_block_stmt .query_row([], |row| row.get::<_, u64>(0)) .optional() - .context("Querying last stored aggregate to_block")?; + .context("Querying last stored event filter to_block")?; - let first_runnining_event_filter_block = match last_to_block { + let first_running_event_filter_block = match last_to_block { Some(last_to_block) => BlockNumber::new_or_panic(last_to_block + 1), - // Aggregate Bloom filter table is empty -> reconstruct running aggregate + // Event filter table is empty -> reconstruct running filter // from the genesis block. None => BlockNumber::GENESIS, }; let events_to_reconstruct: Vec>>> = events_to_reconstruct_stmt .query_and_then( - named_params![":first_runnining_event_filter_block": &first_runnining_event_filter_block], + named_params![":first_running_event_filter_block": &first_running_event_filter_block], |row| { let events: Option = row .get_optional_blob(0)? @@ -902,7 +623,7 @@ pub(crate) fn reconstruct_running_event_filter( .context("Querying events to reconstruct")? .collect::>()?; - let mut aggregate = AggregateBloom::new(first_runnining_event_filter_block); + let mut filter = AggregateBloom::new(first_running_event_filter_block); for (block, events_for_block) in events_to_reconstruct.iter().enumerate() { let Some(events) = events_for_block else { @@ -915,13 +636,13 @@ pub(crate) fn reconstruct_running_event_filter( bloom.set_address(&event.from_address); } - let block_number = first_runnining_event_filter_block + block as u64; - aggregate.add_bloom(&bloom, block_number); + let block_number = first_running_event_filter_block + block as u64; + filter.add_bloom(&bloom, block_number); } Ok(RunningEventFilter { - filter: aggregate, - next_block: first_runnining_event_filter_block + events_to_reconstruct.len() as u64, + filter, + next_block: first_running_event_filter_block + events_to_reconstruct.len() as u64, }) } @@ -960,17 +681,6 @@ fn continuation_token( Some(token) } -enum BlockScanResult { - NoSuchBlock, - Done { new_offset: usize }, -} - -enum Filter { - Missing, - Cached(BloomFilter), - Loaded(BloomFilter), -} - #[cfg(test)] mod tests { use std::sync::LazyLock; @@ -987,9 +697,6 @@ mod tests { static MAX_BLOCKS_TO_SCAN: LazyLock = LazyLock::new(|| NonZeroUsize::new(100).unwrap()); static MAX_BLOOM_FILTERS_TO_LOAD: LazyLock = - LazyLock::new(|| NonZeroUsize::new(100).unwrap()); - #[cfg(feature = "aggregate_bloom")] - static MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD: LazyLock = LazyLock::new(|| NonZeroUsize::new(3).unwrap()); #[test_log::test(test)] @@ -1000,7 +707,7 @@ mod tests { let tx = connection.transaction().unwrap(); let expected_event = &emitted_events[1]; - let filter = EventFilter { + let constraints = EventConstraints { from_block: Some(expected_event.block_number), to_block: Some(expected_event.block_number), contract_address: Some(expected_event.from_address), @@ -1011,7 +718,11 @@ mod tests { }; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1020,18 +731,6 @@ mod tests { continuation_token: None, } ); - - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } } #[test] @@ -1118,7 +817,7 @@ mod tests { let addresses = tx .events( - &EventFilter { + &EventConstraints { from_block: None, to_block: None, contract_address: None, @@ -1151,7 +850,7 @@ mod tests { let tx = connection.transaction().unwrap(); const BLOCK_NUMBER: usize = 2; - let filter = EventFilter { + let constraints = EventConstraints { from_block: Some(BlockNumber::new_or_panic(BLOCK_NUMBER as u64)), to_block: Some(BlockNumber::new_or_panic(BLOCK_NUMBER as u64)), contract_address: None, @@ -1163,7 +862,11 @@ mod tests { let expected_events = &emitted_events[test_utils::EVENTS_PER_BLOCK * BLOCK_NUMBER ..test_utils::EVENTS_PER_BLOCK * (BLOCK_NUMBER + 1)]; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1172,18 +875,6 @@ mod tests { continuation_token: None, } ); - - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } } #[test] @@ -1194,7 +885,7 @@ mod tests { let tx = connection.transaction().unwrap(); const UNTIL_BLOCK_NUMBER: usize = 2; - let filter = EventFilter { + let constraints = EventConstraints { from_block: None, to_block: Some(BlockNumber::new_or_panic(UNTIL_BLOCK_NUMBER as u64)), contract_address: None, @@ -1206,7 +897,11 @@ mod tests { let expected_events = &emitted_events[..test_utils::EVENTS_PER_BLOCK * (UNTIL_BLOCK_NUMBER + 1)]; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1215,18 +910,6 @@ mod tests { continuation_token: None, } ); - - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } } #[test] @@ -1236,7 +919,7 @@ mod tests { let mut connection = storage.connection().unwrap(); let tx = connection.transaction().unwrap(); - let filter = EventFilter { + let constraints = EventConstraints { from_block: None, to_block: Some(BlockNumber::new_or_panic(1)), contract_address: None, @@ -1247,7 +930,11 @@ mod tests { let expected_events = &emitted_events[..test_utils::EVENTS_PER_BLOCK + 1]; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); pretty_assertions_sorted::assert_eq!( events, @@ -1260,20 +947,8 @@ mod tests { } ); - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } - // test continuation token - let filter = EventFilter { + let constraints = EventConstraints { from_block: Some(events.continuation_token.unwrap().block_number), to_block: Some(BlockNumber::new_or_panic(1)), contract_address: None, @@ -1285,7 +960,11 @@ mod tests { let expected_events = &emitted_events[test_utils::EVENTS_PER_BLOCK + 1..test_utils::EVENTS_PER_BLOCK * 2]; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); pretty_assertions_sorted::assert_eq!( events, @@ -1294,18 +973,6 @@ mod tests { continuation_token: None, } ); - - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } } #[test] @@ -1316,7 +983,7 @@ mod tests { let tx = connection.transaction().unwrap(); const FROM_BLOCK_NUMBER: usize = 2; - let filter = EventFilter { + let constraints = EventConstraints { from_block: Some(BlockNumber::new_or_panic(FROM_BLOCK_NUMBER as u64)), to_block: None, contract_address: None, @@ -1327,7 +994,11 @@ mod tests { let expected_events = &emitted_events[test_utils::EVENTS_PER_BLOCK * FROM_BLOCK_NUMBER..]; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1336,18 +1007,6 @@ mod tests { continuation_token: None, } ); - - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } } #[test] @@ -1359,7 +1018,7 @@ mod tests { let expected_event = &emitted_events[33]; - let filter = EventFilter { + let constraints = EventConstraints { from_block: None, to_block: None, contract_address: Some(expected_event.from_address), @@ -1369,7 +1028,11 @@ mod tests { }; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1378,18 +1041,6 @@ mod tests { continuation_token: None, } ); - - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } } #[test] @@ -1400,7 +1051,7 @@ mod tests { let tx = connection.transaction().unwrap(); let expected_event = &emitted_events[27]; - let filter = EventFilter { + let constraints = EventConstraints { from_block: None, to_block: None, contract_address: None, @@ -1410,7 +1061,11 @@ mod tests { }; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1420,26 +1075,18 @@ mod tests { } ); - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } - // try event keys in the wrong order, should not match - let filter = EventFilter { + let constraints = EventConstraints { keys: vec![vec![expected_event.keys[1]], vec![expected_event.keys[0]]], - ..filter + ..constraints }; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1448,18 +1095,6 @@ mod tests { continuation_token: None, } ); - - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } } #[test] @@ -1469,7 +1104,7 @@ mod tests { let mut connection = storage.connection().unwrap(); let tx = connection.transaction().unwrap(); - let filter = EventFilter { + let constraints = EventConstraints { from_block: None, to_block: None, contract_address: None, @@ -1479,7 +1114,11 @@ mod tests { }; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1488,18 +1127,6 @@ mod tests { continuation_token: None, } ); - - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } } #[test] @@ -1509,7 +1136,7 @@ mod tests { let mut connection = storage.connection().unwrap(); let tx = connection.transaction().unwrap(); - let filter = EventFilter { + let constraints = EventConstraints { from_block: None, to_block: None, contract_address: None, @@ -1519,7 +1146,11 @@ mod tests { }; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1532,19 +1163,7 @@ mod tests { } ); - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } - - let filter = EventFilter { + let constraints = EventConstraints { from_block: None, to_block: None, contract_address: None, @@ -1554,7 +1173,11 @@ mod tests { }; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1567,19 +1190,7 @@ mod tests { } ); - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } - - let filter = EventFilter { + let constraints = EventConstraints { from_block: None, to_block: None, contract_address: None, @@ -1589,7 +1200,11 @@ mod tests { }; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1598,18 +1213,6 @@ mod tests { continuation_token: None } ); - - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } } #[test] @@ -1619,7 +1222,7 @@ mod tests { let tx = connection.transaction().unwrap(); const PAGE_SIZE: usize = 10; - let filter = EventFilter { + let constraints = EventConstraints { from_block: None, to_block: None, contract_address: None, @@ -1630,7 +1233,11 @@ mod tests { }; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1639,18 +1246,6 @@ mod tests { continuation_token: None, } ); - - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } } #[test] @@ -1666,7 +1261,7 @@ mod tests { expected_events.iter().map(|e| e.keys[1]).collect(), ]; - let filter = EventFilter { + let constraints = EventConstraints { from_block: None, to_block: None, contract_address: None, @@ -1676,7 +1271,11 @@ mod tests { }; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1689,20 +1288,8 @@ mod tests { } ); - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } - // increase offset - let filter: EventFilter = EventFilter { + let constraints: EventConstraints = EventConstraints { from_block: None, to_block: None, contract_address: None, @@ -1712,7 +1299,11 @@ mod tests { }; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1725,20 +1316,8 @@ mod tests { } ); - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } - // using the continuation token should be equivalent to the previous query - let filter: EventFilter = EventFilter { + let constraints: EventConstraints = EventConstraints { from_block: Some(BlockNumber::new_or_panic(0)), to_block: None, contract_address: None, @@ -1748,7 +1327,11 @@ mod tests { }; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1761,20 +1344,8 @@ mod tests { } ); - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } - // increase offset by two - let filter = EventFilter { + let constraints = EventConstraints { from_block: None, to_block: None, contract_address: None, @@ -1784,7 +1355,11 @@ mod tests { }; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1794,20 +1369,8 @@ mod tests { } ); - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } - // using the continuation token should be equivalent to the previous query - let filter = EventFilter { + let constraints = EventConstraints { from_block: Some(BlockNumber::new_or_panic(3)), to_block: None, contract_address: None, @@ -1817,7 +1380,11 @@ mod tests { }; let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + *MAX_BLOCKS_TO_SCAN, + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1826,18 +1393,6 @@ mod tests { continuation_token: None, } ); - - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } } #[test] @@ -1847,7 +1402,7 @@ mod tests { let mut connection = storage.connection().unwrap(); let tx = connection.transaction().unwrap(); - let filter = EventFilter { + let constraints = EventConstraints { from_block: None, to_block: None, contract_address: None, @@ -1857,7 +1412,11 @@ mod tests { }; let events = tx - .events(&filter, 1.try_into().unwrap(), *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + 1.try_into().unwrap(), + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1870,19 +1429,7 @@ mod tests { } ); - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - 1.try_into().unwrap(), - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } - - let filter = EventFilter { + let constraints = EventConstraints { from_block: Some(BlockNumber::new_or_panic(1)), to_block: None, contract_address: None, @@ -1892,7 +1439,11 @@ mod tests { }; let events = tx - .events(&filter, 1.try_into().unwrap(), *MAX_BLOOM_FILTERS_TO_LOAD) + .events( + &constraints, + 1.try_into().unwrap(), + *MAX_BLOOM_FILTERS_TO_LOAD, + ) .unwrap(); assert_eq!( events, @@ -1904,39 +1455,26 @@ mod tests { }), } ); - - #[cfg(feature = "aggregate_bloom")] - { - let events_from_aggregate = tx - .events_from_aggregate( - &filter, - 1.try_into().unwrap(), - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, - ) - .unwrap(); - assert_eq!(events_from_aggregate, events); - } } #[test] - #[cfg(feature = "aggregate_bloom")] - fn crossing_aggregate_filter_range_stores_and_updates_running() { + fn crossing_event_filter_range_stores_and_updates_running() { let blocks: Vec = [ - // First aggregate filter start. + // First event filter start. BlockNumber::GENESIS, BlockNumber::GENESIS + 1, BlockNumber::GENESIS + 2, BlockNumber::GENESIS + 3, // End. BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN - 1, - // Second aggregate filter start. + // Second event filter start. BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN, BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 1, BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 2, BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 3, // End. BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN - 1, - // Third aggregate filter start. + // Third event filter start. BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN, BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN + 1, ] @@ -1948,16 +1486,16 @@ mod tests { let mut connection = storage.connection().unwrap(); let tx = connection.transaction().unwrap(); - let inserted_aggregate_filter_count = tx + let inserted_event_filter_count = tx .inner() - .prepare("SELECT COUNT(*) FROM starknet_events_filters_aggregate") + .prepare("SELECT COUNT(*) FROM event_filters") .unwrap() .query_row([], |row| row.get::<_, u64>(0)) .unwrap(); - assert_eq!(inserted_aggregate_filter_count, 2); + assert_eq!(inserted_event_filter_count, 2); let running_event_filter = tx.running_event_filter.lock().unwrap(); - // Running aggregate starts from next block range. + // Running event filter starts from next block range. assert_eq!( running_event_filter.filter.from_block, 2 * AggregateBloom::BLOCK_RANGE_LEN @@ -1965,24 +1503,23 @@ mod tests { } #[test] - #[cfg(feature = "aggregate_bloom")] - fn aggregate_bloom_filter_load_limit() { + fn event_filter_filter_load_limit() { let blocks: Vec = [ - // First aggregate filter start. + // First event filter start. BlockNumber::GENESIS, BlockNumber::GENESIS + 1, BlockNumber::GENESIS + 2, BlockNumber::GENESIS + 3, // End. BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN - 1, - // Second aggregate filter start. + // Second event filter start. BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN, BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 1, BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 2, BlockNumber::GENESIS + AggregateBloom::BLOCK_RANGE_LEN + 3, // End. BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN - 1, - // Third aggregate filter start. + // Third event filter start. BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN, BlockNumber::GENESIS + 2 * AggregateBloom::BLOCK_RANGE_LEN + 1, ] @@ -1995,7 +1532,7 @@ mod tests { let mut connection = storage.connection().unwrap(); let tx = connection.transaction().unwrap(); - let filter = EventFilter { + let constraints = EventConstraints { from_block: None, to_block: None, contract_address: None, @@ -2006,24 +1543,23 @@ mod tests { }; let events = tx - .events_from_aggregate(&filter, *MAX_BLOCKS_TO_SCAN, 1.try_into().unwrap()) + .events(&constraints, *MAX_BLOCKS_TO_SCAN, 1.try_into().unwrap()) .unwrap(); - let first_aggregate_filter_range = - BlockNumber::GENESIS.get()..AggregateBloom::BLOCK_RANGE_LEN; + let first_event_filter_range = BlockNumber::GENESIS.get()..AggregateBloom::BLOCK_RANGE_LEN; for event in events.events { // ...but only events from the first bloom filter range are returned. assert!( - first_aggregate_filter_range.contains(&event.block_number.get()), + first_event_filter_range.contains(&event.block_number.get()), "Event block number: {} should have been in the range: {:?}", event.block_number.get(), - first_aggregate_filter_range + first_event_filter_range ); } let continue_from_block = events.continuation_token.unwrap().block_number; - assert_eq!(continue_from_block, first_aggregate_filter_range.end); + assert_eq!(continue_from_block, first_event_filter_range.end); - let filter_with_offset = EventFilter { + let constraints_with_offset = EventConstraints { from_block: Some(events.continuation_token.unwrap().block_number), to_block: None, contract_address: None, @@ -2034,79 +1570,25 @@ mod tests { }; let events = tx - .events_from_aggregate( - &filter_with_offset, + .events( + &constraints_with_offset, *MAX_BLOCKS_TO_SCAN, 1.try_into().unwrap(), ) .unwrap(); assert!(events.continuation_token.is_none()); - let second_aggregate_filter_range = + let second_event_filter_range = AggregateBloom::BLOCK_RANGE_LEN..(2 * AggregateBloom::BLOCK_RANGE_LEN); - let third_aggregate_filter_range = + let third_event_filter_range = 2 * AggregateBloom::BLOCK_RANGE_LEN..(3 * AggregateBloom::BLOCK_RANGE_LEN); for event in events.events { - // ...but only events from the second (loaded) and third (running) bloom filter + // ...but only events from the second (loaded) and third (running) event filter // range are returned. assert!( - (second_aggregate_filter_range.start..third_aggregate_filter_range.end) + (second_event_filter_range.start..third_event_filter_range.end) .contains(&event.block_number.get()) ); } } - - #[test] - fn bloom_filter_load_limit() { - let (storage, test_data) = test_utils::setup_test_storage(); - let emitted_events = test_data.events; - let mut connection = storage.connection().unwrap(); - let tx = connection.transaction().unwrap(); - - let filter = EventFilter { - from_block: None, - to_block: None, - contract_address: None, - keys: vec![vec![], vec![emitted_events[0].keys[1]]], - page_size: emitted_events.len(), - offset: 0, - }; - - let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, 1.try_into().unwrap()) - .unwrap(); - assert_eq!( - events, - PageOfEvents { - events: emitted_events[..10].to_vec(), - continuation_token: Some(ContinuationToken { - block_number: BlockNumber::new_or_panic(1), - offset: 0 - }), - } - ); - - let filter = EventFilter { - from_block: Some(BlockNumber::new_or_panic(1)), - to_block: None, - contract_address: None, - keys: vec![vec![], vec![emitted_events[0].keys[1]]], - page_size: emitted_events.len(), - offset: 0, - }; - - let events = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, 1.try_into().unwrap()) - .unwrap(); - assert_eq!( - events, - PageOfEvents { - events: emitted_events[10..20].to_vec(), - continuation_token: Some(ContinuationToken { - block_number: BlockNumber::new_or_panic(2), - offset: 0 - }), - } - ); - } } diff --git a/crates/storage/src/connection/transaction.rs b/crates/storage/src/connection/transaction.rs index 3c60c51de4..9e320bd6dd 100644 --- a/crates/storage/src/connection/transaction.rs +++ b/crates/storage/src/connection/transaction.rs @@ -99,11 +99,14 @@ impl Transaction<'_> { transactions: &[(StarknetTransaction, Receipt)], events: Option<&[Vec]>, ) -> anyhow::Result<()> { - if transactions.is_empty() && events.map_or(true, |x| x.is_empty()) { - // Advance the running event bloom filter even if there's nothing to add since - // it requires that no blocks are skipped. - #[cfg(feature = "aggregate_bloom")] - self.upsert_block_events_aggregate(block_number, std::iter::empty())?; + if transactions.is_empty() { + if let Some(events) = events { + // Advance the running event bloom filter even if there's nothing to add since + // it requires that no blocks are skipped. + self.upsert_block_event_filters(block_number, events.iter().flatten()) + .context("Inserting events into Bloom filter")?; + }; + return Ok(()); } @@ -170,16 +173,9 @@ impl Transaction<'_> { ]) .context("Inserting transaction data")?; - #[cfg(feature = "aggregate_bloom")] - { - let events = events.unwrap_or_default().iter().flatten(); - self.upsert_block_events_aggregate(block_number, events) - .context("Inserting events into Bloom filter aggregate")?; - } - if let Some(events) = events { let events = events.iter().flatten(); - self.upsert_block_events(block_number, events) + self.upsert_block_event_filters(block_number, events) .context("Inserting events into Bloom filter")?; } @@ -221,13 +217,8 @@ impl Transaction<'_> { ]) .context("Updating events")?; - #[cfg(feature = "aggregate_bloom")] - { - let events = events.iter().flatten(); - self.upsert_block_events_aggregate(block_number, events) - .context("Inserting events into Bloom filter aggregate")?; - } - self.upsert_block_events(block_number, events.iter().flatten()) + let events = events.iter().flatten(); + self.upsert_block_event_filters(block_number, events) .context("Inserting events into Bloom filter")?; Ok(()) diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index c91178a6be..6ba98a8f5b 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -15,15 +15,12 @@ pub mod test_utils; use std::num::NonZeroU32; use std::path::{Path, PathBuf}; -use std::sync::Arc; -#[cfg(feature = "aggregate_bloom")] -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use anyhow::Context; -pub use bloom::EVENT_KEY_FILTER_LIMIT; pub use connection::*; -#[cfg(feature = "aggregate_bloom")] use event::RunningEventFilter; +pub use event::EVENT_KEY_FILTER_LIMIT; use pathfinder_common::{BlockHash, BlockNumber}; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; @@ -94,8 +91,6 @@ struct Inner { /// Uses [`Arc`] to allow _shallow_ [Storage] cloning database_path: Arc, pool: Pool, - bloom_filter_cache: Arc, - #[cfg(feature = "aggregate_bloom")] running_event_filter: Arc>, trie_prune_mode: TriePruneMode, } @@ -103,8 +98,6 @@ struct Inner { pub struct StorageManager { database_path: PathBuf, journal_mode: JournalMode, - bloom_filter_cache: Arc, - #[cfg(feature = "aggregate_bloom")] running_event_filter: Arc>, trie_prune_mode: TriePruneMode, } @@ -136,8 +129,6 @@ impl StorageManager { Ok(Storage(Inner { database_path: Arc::new(self.database_path.clone()), pool, - bloom_filter_cache: self.bloom_filter_cache.clone(), - #[cfg(feature = "aggregate_bloom")] running_event_filter: self.running_event_filter.clone(), trie_prune_mode: self.trie_prune_mode, })) @@ -318,10 +309,9 @@ impl StorageBuilder { tracing::info!("Merkle trie pruning disabled"); } - #[cfg(feature = "aggregate_bloom")] let running_event_filter = event::reconstruct_running_event_filter(&connection.transaction()?) - .context("Reconstructing running aggregate bloom filter")?; + .context("Reconstructing running event filter")?; connection .close() @@ -331,8 +321,6 @@ impl StorageBuilder { Ok(StorageManager { database_path: self.database_path, journal_mode: self.journal_mode, - bloom_filter_cache: Arc::new(bloom::Cache::with_size(self.bloom_filter_cache_size)), - #[cfg(feature = "aggregate_bloom")] running_event_filter: Arc::new(Mutex::new(running_event_filter)), trie_prune_mode, }) @@ -404,8 +392,6 @@ impl Storage { let conn = self.0.pool.get()?; Ok(Connection::new( conn, - self.0.bloom_filter_cache.clone(), - #[cfg(feature = "aggregate_bloom")] self.0.running_event_filter.clone(), self.0.trie_prune_mode, )) @@ -680,7 +666,6 @@ mod tests { } #[test] - #[cfg(feature = "aggregate_bloom")] fn running_event_filter_reconstructed_after_shutdown() { use std::num::NonZeroUsize; use std::sync::LazyLock; @@ -689,10 +674,7 @@ mod tests { static MAX_BLOCKS_TO_SCAN: LazyLock = LazyLock::new(|| NonZeroUsize::new(10).unwrap()); - static MAX_BLOOM_FILTERS_TO_LOAD: LazyLock = - LazyLock::new(|| NonZeroUsize::new(1000).unwrap()); - #[cfg(feature = "aggregate_bloom")] - static MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD: LazyLock = + static MAX_EVENT_FILTERS_TO_LOAD: LazyLock = LazyLock::new(|| NonZeroUsize::new(3).unwrap()); let blocks = [0, 1, 2, 3, 4, 5]; @@ -740,7 +722,7 @@ mod tests { insert_block_data(&tx, i); } - let filter = EventFilter { + let constraints = EventConstraints { keys: vec![ vec![], // Key present in all events as the 2nd key. @@ -750,20 +732,14 @@ mod tests { ..Default::default() }; - let events_from_aggregate_before = tx - .events_from_aggregate( - &filter, + let events_before = tx + .events( + &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap() .events; - let events_before = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) - .unwrap() - .events; - - assert_eq!(events_before, events_from_aggregate_before); // Pretend like we shut down by dropping these. tx.commit().unwrap(); @@ -786,35 +762,29 @@ mod tests { insert_block_data(&tx, i); } - let events_from_aggregate_after = tx - .events_from_aggregate( - &filter, + let events_after = tx + .events( + &constraints, *MAX_BLOCKS_TO_SCAN, - *MAX_AGGREGATE_BLOOM_FILTERS_TO_LOAD, + *MAX_EVENT_FILTERS_TO_LOAD, ) .unwrap() .events; - let events_after = tx - .events(&filter, *MAX_BLOCKS_TO_SCAN, *MAX_BLOOM_FILTERS_TO_LOAD) - .unwrap() - .events; - - assert_eq!(events_after, events_from_aggregate_after); - let inserted_aggregate_filter_count = rsqlite_conn + let inserted_event_filter_count = rsqlite_conn .transaction() .unwrap() - .prepare("SELECT COUNT(*) FROM starknet_events_filters_aggregate") + .prepare("SELECT COUNT(*) FROM event_filters") .unwrap() .query_row([], |row| row.get::<_, u64>(0)) .unwrap(); - // We are using only the running aggregate. - assert!(inserted_aggregate_filter_count == 0); - assert!(events_from_aggregate_after.len() > events_from_aggregate_before.len()); - // Events added in the first run are present in the running aggregate. - for e in events_from_aggregate_before { - assert!(events_from_aggregate_after.contains(&e)); + // We are using only the running event filter. + assert!(inserted_event_filter_count == 0); + assert!(events_after.len() > events_before.len()); + // Events added in the first run are present in the running event filter. + for e in events_before { + assert!(events_after.contains(&e)); } } } diff --git a/crates/storage/src/schema.rs b/crates/storage/src/schema.rs index ffe92147ec..eca98647e4 100644 --- a/crates/storage/src/schema.rs +++ b/crates/storage/src/schema.rs @@ -25,7 +25,6 @@ mod revision_0062; mod revision_0063; mod revision_0064; mod revision_0065; -#[cfg(feature = "aggregate_bloom")] mod revision_0066; pub(crate) use base::base_schema; @@ -60,7 +59,6 @@ pub fn migrations() -> &'static [MigrationFn] { revision_0063::migrate, revision_0064::migrate, revision_0065::migrate, - #[cfg(feature = "aggregate_bloom")] revision_0066::migrate, ] } diff --git a/crates/storage/src/schema/revision_0066.rs b/crates/storage/src/schema/revision_0066.rs index 99440ca0ab..544ff6f323 100644 --- a/crates/storage/src/schema/revision_0066.rs +++ b/crates/storage/src/schema/revision_0066.rs @@ -6,11 +6,11 @@ use crate::params::params; #[allow(dead_code)] pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { - tracing::info!("Creating starknet_events_filters_aggregate table and migrating filters"); + tracing::info!("Creating event_filters table and migrating filters"); tx.execute( r" - CREATE TABLE starknet_events_filters_aggregate ( + CREATE TABLE event_filters ( from_block INTEGER NOT NULL, to_block INTEGER NOT NULL, bitmap BLOB NOT NULL, @@ -19,12 +19,11 @@ pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { ", params![], ) - .context("Creating starknet_events_filters_aggregate table")?; + .context("Creating event_filters table")?; migrate_individual_filters(tx)?; - // TODO: - // Delete old filters table + tx.execute("DROP TABLE starknet_events_filters", params![])?; Ok(()) } @@ -32,7 +31,8 @@ pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { /// Migrate individual bloom filters to the new aggregate table. We only need to /// migrate all of the [BLOCK_RANGE_LEN](AggregateBloom::BLOCK_RANGE_LEN) sized /// chunks. The remainder will be reconstructed by the -/// [StorageManager](crate::StorageManager) as the running aggregate. +/// [StorageManager](crate::StorageManager) as the +/// [RunningEventFilter](crate::connection::event::RunningEventFilter). fn migrate_individual_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { let mut select_old_bloom_stmt = tx.prepare("SELECT bloom FROM starknet_events_filters ORDER BY block_number")?; @@ -51,7 +51,7 @@ fn migrate_individual_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result< let mut insert_aggregate_stmt = tx.prepare( r" - INSERT INTO starknet_events_filters_aggregate + INSERT INTO event_filters (from_block, to_block, bitmap) VALUES (?, ?, ?) ",