Skip to content

Commit

Permalink
log running event filter rebuild progress
Browse files Browse the repository at this point in the history
  • Loading branch information
sistemd committed Dec 13, 2024
1 parent 8490ba1 commit 9fb30f0
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
44 changes: 44 additions & 0 deletions crates/storage/src/connection/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::BTreeSet;
use std::num::NonZeroUsize;
use std::rc::Rc;
use std::time::Instant;

use anyhow::{Context, Result};
use pathfinder_common::event::Event;
Expand Down Expand Up @@ -557,6 +558,14 @@ pub(crate) fn rebuild_running_event_filter(
) -> anyhow::Result<RunningEventFilter> {
use super::transaction;

let mut latest_stmt = tx.prepare(
r"
SELECT number
FROM canonical_blocks
ORDER BY number
DESC LIMIT 1
",
)?;
let mut last_to_block_stmt = tx.prepare(
r"
SELECT to_block
Expand All @@ -572,6 +581,17 @@ pub(crate) fn rebuild_running_event_filter(
",
)?;

let Some(latest) = latest_stmt
.query_row([], |row| row.get::<_, u64>(0))
.optional()
.context("Querying latest block number")?
else {
// Empty DB, there is nothing to rebuild.
return Ok(RunningEventFilter {
filter: AggregateBloom::new(BlockNumber::GENESIS),
next_block: BlockNumber::GENESIS,
});
};
let last_to_block = last_to_block_stmt
.query_row([], |row| row.get::<_, u64>(0))
.optional()
Expand All @@ -583,10 +603,30 @@ pub(crate) fn rebuild_running_event_filter(
None => BlockNumber::GENESIS,
};

let total_blocks_to_cover = latest - first_running_event_filter_block.get();
let mut covered_blocks = 0;
let mut last_progress_report = Instant::now();

tracing::info!(
"Rebuilding running event filter: 0.00% (0/{}) blocks covered",
total_blocks_to_cover
);
let rebuilt_filters: Vec<Option<BloomFilter>> = load_events_stmt
.query_and_then(
named_params![":first_running_event_filter_block": &first_running_event_filter_block],
|row| {
if last_progress_report.elapsed().as_secs() >= 3 {
tracing::info!(
"Rebuilding running event filter: {:.2}% ({}/{}) blocks covered",
covered_blocks as f64 / total_blocks_to_cover as f64 * 100.0,
covered_blocks,
total_blocks_to_cover
);
last_progress_report = Instant::now();
}

covered_blocks += 1;

let Some(events) = row
.get_optional_blob(0)?
.map(|events_blob| -> anyhow::Result<_> {
Expand Down Expand Up @@ -622,6 +662,10 @@ pub(crate) fn rebuild_running_event_filter(
)
.context("Querying events to rebuild")?
.collect::<anyhow::Result<_>>()?;
tracing::info!(
"Rebuilding running event filter: 100.00% ({total}/{total}) blocks covered",
total = total_blocks_to_cover,
);

let mut filter = AggregateBloom::new(first_running_event_filter_block);

Expand Down
8 changes: 8 additions & 0 deletions crates/storage/src/schema/revision_0066.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ fn migrate_event_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> {
let mut migrated_count: u64 = 0;
let mut last_progress_report = Instant::now();

tracing::info!(
"Migrating event Bloom filters: 0.00% (0/{})",
bloom_filter_count
);
while let Some(bloom_filter) = bloom_filters.next().transpose()? {
let current_block = BlockNumber::new_or_panic(migrated_count);

Expand Down Expand Up @@ -96,6 +100,10 @@ fn migrate_event_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> {
last_progress_report = Instant::now();
}
}
tracing::info!(
"Migrating event Bloom filters: 100.00% ({count}/{count})",
count = bloom_filter_count,
);

Ok(())
}

0 comments on commit 9fb30f0

Please sign in to comment.