diff --git a/crates/storage/src/connection/event.rs b/crates/storage/src/connection/event.rs index bacd87dd6b..3479205441 100644 --- a/crates/storage/src/connection/event.rs +++ b/crates/storage/src/connection/event.rs @@ -1,6 +1,7 @@ use std::collections::BTreeSet; use std::num::NonZeroUsize; use std::rc::Rc; +use std::time::Instant; use anyhow::{Context, Result}; use pathfinder_common::event::Event; @@ -557,6 +558,14 @@ pub(crate) fn rebuild_running_event_filter( ) -> anyhow::Result { use super::transaction; + let mut latest_stmt = tx.prepare( + r" + SELECT number + FROM canonical_blocks + ORDER BY number + DESC LIMIT 1 + ", + )?; let mut last_to_block_stmt = tx.prepare( r" SELECT to_block @@ -572,6 +581,17 @@ pub(crate) fn rebuild_running_event_filter( ", )?; + let Some(latest) = latest_stmt + .query_row([], |row| row.get::<_, u64>(0)) + .optional() + .context("Querying latest block number")? + else { + // Empty DB, there is nothing to rebuild. + return Ok(RunningEventFilter { + filter: AggregateBloom::new(BlockNumber::GENESIS), + next_block: BlockNumber::GENESIS, + }); + }; let last_to_block = last_to_block_stmt .query_row([], |row| row.get::<_, u64>(0)) .optional() @@ -583,10 +603,30 @@ pub(crate) fn rebuild_running_event_filter( None => BlockNumber::GENESIS, }; + let total_blocks_to_cover = latest - first_running_event_filter_block.get(); + let mut covered_blocks = 0; + let mut last_progress_report = Instant::now(); + + tracing::info!( + "Rebuilding running event filter: 0.00% (0/{}) blocks covered", + total_blocks_to_cover + ); let rebuilt_filters: Vec> = load_events_stmt .query_and_then( named_params![":first_running_event_filter_block": &first_running_event_filter_block], |row| { + if last_progress_report.elapsed().as_secs() >= 3 { + tracing::info!( + "Rebuilding running event filter: {:.2}% ({}/{}) blocks covered", + covered_blocks as f64 / total_blocks_to_cover as f64 * 100.0, + covered_blocks, + total_blocks_to_cover + ); + last_progress_report = Instant::now(); + } + + covered_blocks += 1; + let Some(events) = row .get_optional_blob(0)? .map(|events_blob| -> anyhow::Result<_> { @@ -622,6 +662,10 @@ pub(crate) fn rebuild_running_event_filter( ) .context("Querying events to rebuild")? .collect::>()?; + tracing::info!( + "Rebuilding running event filter: 100.00% ({total}/{total}) blocks covered", + total = total_blocks_to_cover, + ); let mut filter = AggregateBloom::new(first_running_event_filter_block); diff --git a/crates/storage/src/schema/revision_0066.rs b/crates/storage/src/schema/revision_0066.rs index a748ed8622..6cfa73ea36 100644 --- a/crates/storage/src/schema/revision_0066.rs +++ b/crates/storage/src/schema/revision_0066.rs @@ -67,6 +67,10 @@ fn migrate_event_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { let mut migrated_count: u64 = 0; let mut last_progress_report = Instant::now(); + tracing::info!( + "Migrating event Bloom filters: 0.00% (0/{})", + bloom_filter_count + ); while let Some(bloom_filter) = bloom_filters.next().transpose()? { let current_block = BlockNumber::new_or_panic(migrated_count); @@ -96,6 +100,10 @@ fn migrate_event_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { last_progress_report = Instant::now(); } } + tracing::info!( + "Migrating event Bloom filters: 100.00% ({count}/{count})", + count = bloom_filter_count, + ); Ok(()) }