Skip to content

Commit

Permalink
remove individual bloom filter usage
Browse files Browse the repository at this point in the history
- Remove `aggregate_bloom` feature flag.
- Remove old Bloom filter load limit CLI parameter.
- Rename aggregate Bloom event filter table to `event_filters`.
  • Loading branch information
sistemd committed Dec 6, 2024
1 parent dc62f15 commit 36fea80
Show file tree
Hide file tree
Showing 26 changed files with 366 additions and 1,270 deletions.
1 change: 0 additions & 1 deletion crates/pathfinder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ path = "src/lib.rs"
[features]
tokio-console = ["console-subscriber", "tokio/tracing"]
p2p = []
aggregate_bloom = ["pathfinder-storage/aggregate_bloom", "pathfinder-rpc/aggregate_bloom"]

[dependencies]
anyhow = { workspace = true }
Expand Down
27 changes: 6 additions & 21 deletions crates/pathfinder/src/bin/pathfinder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,24 +275,14 @@ This should only be enabled for debugging purposes as it adds substantial proces
get_events_max_blocks_to_scan: std::num::NonZeroUsize,

#[arg(
long = "rpc.get-events-max-uncached-bloom-filters-to-load",
long_help = "The number of Bloom filters to load for events when querying for events. \
This limit is used to prevent queries from taking too long.",
env = "PATHFINDER_RPC_GET_EVENTS_MAX_UNCACHED_BLOOM_FILTERS_TO_LOAD",
default_value = "100000"
)]
get_events_max_uncached_bloom_filters_to_load: std::num::NonZeroUsize,

#[cfg(feature = "aggregate_bloom")]
#[arg(
long = "rpc.get-events-max-bloom-filters-to-load",
long_help = format!("The number of Bloom filters to load for events when querying for events. \
long = "rpc.get-events-max-event-filters-to-load",
long_help = format!("The number of aggregate Bloom filters to load for events when querying for events. \
Each filter covers a {} block range. \
This limit is used to prevent queries from taking too long.", pathfinder_storage::BLOCK_RANGE_LEN),
env = "PATHFINDER_RPC_GET_EVENTS_MAX_BLOOM_FILTERS_TO_LOAD",
env = "PATHFINDER_RPC_GET_EVENTS_MAX_EVENT_FILTERS_TO_LOAD",
default_value = "3"
)]
get_events_max_bloom_filters_to_load: std::num::NonZeroUsize,
get_events_max_event_filters_to_load: std::num::NonZeroUsize,

#[arg(
long = "storage.state-tries",
Expand Down Expand Up @@ -724,9 +714,7 @@ pub struct Config {
pub gateway_timeout: Duration,
pub event_bloom_filter_cache_size: NonZeroUsize,
pub get_events_max_blocks_to_scan: NonZeroUsize,
pub get_events_max_uncached_bloom_filters_to_load: NonZeroUsize,
#[cfg(feature = "aggregate_bloom")]
pub get_events_max_bloom_filters_to_load: NonZeroUsize,
pub get_events_max_event_filters_to_load: NonZeroUsize,
pub state_tries: Option<StateTries>,
pub custom_versioned_constants: Option<VersionedConstants>,
pub feeder_gateway_fetch_concurrency: NonZeroUsize,
Expand Down Expand Up @@ -1016,10 +1004,7 @@ impl Config {
gateway_api_key: cli.gateway_api_key,
event_bloom_filter_cache_size: cli.event_bloom_filter_cache_size,
get_events_max_blocks_to_scan: cli.get_events_max_blocks_to_scan,
get_events_max_uncached_bloom_filters_to_load: cli
.get_events_max_uncached_bloom_filters_to_load,
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: cli.get_events_max_bloom_filters_to_load,
get_events_max_event_filters_to_load: cli.get_events_max_event_filters_to_load,
gateway_timeout: Duration::from_secs(cli.gateway_timeout.get()),
feeder_gateway_fetch_concurrency: cli.feeder_gateway_fetch_concurrency,
state_tries: cli.state_tries,
Expand Down
5 changes: 1 addition & 4 deletions crates/pathfinder/src/bin/pathfinder/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,7 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst
let rpc_config = pathfinder_rpc::context::RpcConfig {
batch_concurrency_limit: config.rpc_batch_concurrency_limit,
get_events_max_blocks_to_scan: config.get_events_max_blocks_to_scan,
get_events_max_uncached_bloom_filters_to_load: config
.get_events_max_uncached_bloom_filters_to_load,
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: config.get_events_max_bloom_filters_to_load,
get_events_max_event_filters_to_load: config.get_events_max_event_filters_to_load,
custom_versioned_constants: config.custom_versioned_constants.take(),
};

Expand Down
3 changes: 1 addition & 2 deletions crates/pathfinder/src/state/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1080,10 +1080,9 @@ async fn l2_reorg(
head -= 1;
}

#[cfg(feature = "aggregate_bloom")]
transaction
.reconstruct_running_event_filter()
.context("Reconstructing running aggregate bloom")?;
.context("Reconstructing running event filter after purge")?;

// Track combined L1 and L2 state.
let l1_l2_head = transaction.l1_l2_pointer().context("Query L1-L2 head")?;
Expand Down
22 changes: 1 addition & 21 deletions crates/pathfinder/src/sync/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,30 +279,11 @@ where
#[tracing::instrument(level = "debug", skip(self))]
async fn sync_events(&self, stop: BlockNumber) -> Result<(), SyncError> {
let Some(start) = events::next_missing(self.storage.clone(), stop)
.await
.context("Finding next block with missing events")?
else {
return Ok(());
};

// TODO:
// Replace `start` with the code below once individual aggregate filters
// are removed.
#[cfg(feature = "aggregate_bloom")]
{
if let Some(start_aggregate) =
events::next_missing_aggregate(self.storage.clone(), stop)?
{
if start_aggregate != start {
tracing::error!(
"Running event filter block mismatch. Expected: {}, got: {}",
start,
start_aggregate
);
}
}
}

let event_stream = self.p2p.clone().event_stream(
start,
stop,
Expand Down Expand Up @@ -686,10 +667,9 @@ async fn rollback_to_anchor(
head -= 1;
}

#[cfg(feature = "aggregate_bloom")]
transaction
.reconstruct_running_event_filter()
.context("Reconstructing running aggregate bloom")?;
.context("Reconstructing running event filter after purge")?;

Ok(())
})
Expand Down
26 changes: 1 addition & 25 deletions crates/pathfinder/src/sync/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,7 @@ use crate::sync::stream::ProcessStage;

/// Returns the first block number whose events are missing in storage, counting
/// from genesis
pub(super) async fn next_missing(
storage: Storage,
head: BlockNumber,
) -> anyhow::Result<Option<BlockNumber>> {
spawn_blocking(move || {
let mut db = storage
.connection()
.context("Creating database connection")?;
let db = db.transaction().context("Creating database transaction")?;

if let Some(highest) = db
.highest_block_with_all_events_downloaded()
.context("Querying highest block with events")?
{
Ok((highest < head).then_some(highest + 1))
} else {
Ok(Some(BlockNumber::GENESIS))
}
})
.await
.context("Joining blocking task")?
}

#[cfg(feature = "aggregate_bloom")]
pub(super) fn next_missing_aggregate(
pub(super) fn next_missing(
storage: Storage,
head: BlockNumber,
) -> anyhow::Result<Option<BlockNumber>> {
Expand Down
1 change: 0 additions & 1 deletion crates/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ rust-version = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
aggregate_bloom = []

[dependencies]
anyhow = { workspace = true }
Expand Down
Binary file modified crates/rpc/fixtures/mainnet.sqlite
Binary file not shown.
8 changes: 2 additions & 6 deletions crates/rpc/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ use tokio::sync::watch as tokio_watch;
pub struct RpcConfig {
pub batch_concurrency_limit: NonZeroUsize,
pub get_events_max_blocks_to_scan: NonZeroUsize,
pub get_events_max_uncached_bloom_filters_to_load: NonZeroUsize,
#[cfg(feature = "aggregate_bloom")]
pub get_events_max_bloom_filters_to_load: NonZeroUsize,
pub get_events_max_event_filters_to_load: NonZeroUsize,
pub custom_versioned_constants: Option<VersionedConstants>,
}

Expand Down Expand Up @@ -122,9 +120,7 @@ impl RpcContext {
let config = RpcConfig {
batch_concurrency_limit: NonZeroUsize::new(8).unwrap(),
get_events_max_blocks_to_scan: NonZeroUsize::new(1000).unwrap(),
get_events_max_uncached_bloom_filters_to_load: NonZeroUsize::new(1000).unwrap(),
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: NonZeroUsize::new(1000).unwrap(),
get_events_max_event_filters_to_load: NonZeroUsize::new(1000).unwrap(),
custom_versioned_constants: None,
};

Expand Down
4 changes: 1 addition & 3 deletions crates/rpc/src/jsonrpc/router/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,9 +1026,7 @@ mod tests {
config: RpcConfig {
batch_concurrency_limit: 1.try_into().unwrap(),
get_events_max_blocks_to_scan: 1.try_into().unwrap(),
get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(),
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: 1.try_into().unwrap(),
get_events_max_event_filters_to_load: 1.try_into().unwrap(),
custom_versioned_constants: None,
},
};
Expand Down
55 changes: 3 additions & 52 deletions crates/rpc/src/method/get_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pub async fn get_events(
None => (from_block, 0),
};

let filter = pathfinder_storage::EventFilter {
let constraints = pathfinder_storage::EventConstraints {
from_block,
to_block,
contract_address: request.address,
Expand All @@ -217,66 +217,17 @@ pub async fn get_events(
offset: requested_offset,
};

// TODO:
// Instrumentation and `AggregateBloom` version of fetching events
// for the given `EventFilter` are under a feature flag for now and
// we do not execute them during testing because they would only
// slow the tests down and would not have any impact on their outcome.
// Follow-up PR will use the `AggregateBloom` logic to create the output,
// then the conditions will be removed.

#[cfg(all(feature = "aggregate_bloom", not(test)))]
let start = std::time::Instant::now();

let page = transaction
.events(
&filter,
&constraints,
context.config.get_events_max_blocks_to_scan,
context.config.get_events_max_uncached_bloom_filters_to_load,
context.config.get_events_max_event_filters_to_load,
)
.map_err(|e| match e {
EventFilterError::Internal(e) => GetEventsError::Internal(e),
EventFilterError::PageSizeTooSmall => GetEventsError::Custom(e.into()),
})?;

#[cfg(all(feature = "aggregate_bloom", not(test)))]
{
let elapsed = start.elapsed();

tracing::info!(
"Getting events (individual Bloom filters) took {:?}",
elapsed
);

let start = std::time::Instant::now();
let page_from_aggregate = transaction
.events_from_aggregate(
&filter,
context.config.get_events_max_blocks_to_scan,
context.config.get_events_max_bloom_filters_to_load,
)
.map_err(|e| match e {
EventFilterError::Internal(e) => GetEventsError::Internal(e),
EventFilterError::PageSizeTooSmall => GetEventsError::Custom(e.into()),
})?;
let elapsed = start.elapsed();

tracing::info!(
"Getting events (aggregate Bloom filters) took {:?}",
elapsed
);

if page != page_from_aggregate {
tracing::error!(
"Page of events from individual and aggregate bloom filters does not match!"
);
tracing::error!("Individual: {:?}", page);
tracing::error!("Aggregate: {:?}", page_from_aggregate);
} else {
tracing::info!("Page of events from individual and aggregate bloom filters match!");
}
}

let mut events = GetEventsResult {
events: page.events.into_iter().map(|e| e.into()).collect(),
continuation_token: page.continuation_token.map(|token| {
Expand Down
27 changes: 1 addition & 26 deletions crates/rpc/src/method/subscribe_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,33 +108,10 @@ impl RpcSubscriptionFlow for SubscribeEvents {
from,
to,
params.from_address,
#[cfg(feature = "aggregate_bloom")]
params.keys.clone().unwrap_or_default(),
#[cfg(not(feature = "aggregate_bloom"))]
params.keys.unwrap_or_default(),
)
.map_err(RpcError::InternalError)?;

#[cfg(feature = "aggregate_bloom")]
{
let events_from_aggregate = db
.events_in_range_aggregate(
from,
to,
params.from_address,
params.keys.unwrap_or_default(),
)
.unwrap();

assert_eq!(events.0.len(), events_from_aggregate.0.len());
for (event, event_from_aggregate) in
events.0.iter().zip(events_from_aggregate.0.iter())
{
assert_eq!(event, event_from_aggregate);
}
assert_eq!(events.1, events_from_aggregate.1);
}

Ok(events)
})
.await
Expand Down Expand Up @@ -761,9 +738,7 @@ mod tests {
config: RpcConfig {
batch_concurrency_limit: 64.try_into().unwrap(),
get_events_max_blocks_to_scan: 1024.try_into().unwrap(),
get_events_max_uncached_bloom_filters_to_load: 1024.try_into().unwrap(),
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: 1.try_into().unwrap(),
get_events_max_event_filters_to_load: 1.try_into().unwrap(),
custom_versioned_constants: None,
},
};
Expand Down
4 changes: 1 addition & 3 deletions crates/rpc/src/method/subscribe_new_heads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,7 @@ mod tests {
config: RpcConfig {
batch_concurrency_limit: 1.try_into().unwrap(),
get_events_max_blocks_to_scan: 1.try_into().unwrap(),
get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(),
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: 1.try_into().unwrap(),
get_events_max_event_filters_to_load: 1.try_into().unwrap(),
custom_versioned_constants: None,
},
};
Expand Down
4 changes: 1 addition & 3 deletions crates/rpc/src/method/subscribe_pending_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,9 +492,7 @@ mod tests {
config: RpcConfig {
batch_concurrency_limit: 1.try_into().unwrap(),
get_events_max_blocks_to_scan: 1.try_into().unwrap(),
get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(),
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: 1.try_into().unwrap(),
get_events_max_event_filters_to_load: 1.try_into().unwrap(),
custom_versioned_constants: None,
},
};
Expand Down
4 changes: 1 addition & 3 deletions crates/rpc/src/method/subscribe_transaction_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1168,9 +1168,7 @@ mod tests {
config: RpcConfig {
batch_concurrency_limit: 1.try_into().unwrap(),
get_events_max_blocks_to_scan: 1.try_into().unwrap(),
get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(),
#[cfg(feature = "aggregate_bloom")]
get_events_max_bloom_filters_to_load: 1.try_into().unwrap(),
get_events_max_event_filters_to_load: 1.try_into().unwrap(),
custom_versioned_constants: None,
},
};
Expand Down
10 changes: 4 additions & 6 deletions crates/rpc/src/v06/method/trace_block_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,13 +643,11 @@ pub(crate) mod tests {

// Need to avoid skipping blocks for `insert_transaction_data`.
(0..619596)
.collect::<Vec<_>>()
.chunks(pathfinder_storage::BLOCK_RANGE_LEN as usize)
.map(|range| *range.last().unwrap() as u64)
.for_each(|block| {
let block = BlockNumber::new_or_panic(block);
.step_by(pathfinder_storage::BLOCK_RANGE_LEN as usize)
.for_each(|block: u64| {
let block = BlockNumber::new_or_panic(block.saturating_sub(1));
transaction
.insert_transaction_data(block, &[], None)
.insert_transaction_data(block, &[], Some(&[]))
.unwrap();
});

Expand Down
10 changes: 4 additions & 6 deletions crates/rpc/src/v06/method/trace_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,11 @@ pub mod tests {

// Need to avoid skipping blocks for `insert_transaction_data`.
(0..619596)
.collect::<Vec<_>>()
.chunks(pathfinder_storage::BLOCK_RANGE_LEN as usize)
.map(|range| *range.last().unwrap() as u64)
.for_each(|block| {
let block = BlockNumber::new_or_panic(block);
.step_by(pathfinder_storage::BLOCK_RANGE_LEN as usize)
.for_each(|block: u64| {
let block = BlockNumber::new_or_panic(block.saturating_sub(1));
transaction
.insert_transaction_data(block, &[], None)
.insert_transaction_data(block, &[], Some(&[]))
.unwrap();
});

Expand Down
Loading

0 comments on commit 36fea80

Please sign in to comment.