Skip to content

Commit

Permalink
log event Bloom filter migration progress
Browse files Browse the repository at this point in the history
  • Loading branch information
sistemd committed Dec 6, 2024
1 parent 36fea80 commit 868a993
Showing 1 changed file with 62 additions and 44 deletions.
106 changes: 62 additions & 44 deletions crates/storage/src/schema/revision_0066.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::time::Instant;

use anyhow::Context;
use pathfinder_common::BlockNumber;

use crate::bloom::{AggregateBloom, BloomFilter};
use crate::params::params;

#[allow(dead_code)]
pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> {
tracing::info!("Creating event_filters table and migrating filters");
tracing::info!("Creating event_filters table and migrating event Bloom filters");

tx.execute(
r"
Expand All @@ -17,67 +18,84 @@ pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> {
UNIQUE(from_block, to_block)
)
",
params![],
[],
)
.context("Creating event_filters table")?;

migrate_individual_filters(tx)?;
migrate_event_filters(tx).context("Migrating event Bloom filters")?;

tx.execute("DROP TABLE starknet_events_filters", params![])?;
tx.execute("DROP TABLE starknet_events_filters", [])
.context("Dropping starknet_events_filters table")?;

Ok(())
}

/// Migrate individual bloom filters to the new aggregate table. We only need to
/// migrate all of the [BLOCK_RANGE_LEN](AggregateBloom::BLOCK_RANGE_LEN) sized
/// chunks. The remainder will be reconstructed by the
/// [StorageManager](crate::StorageManager) as the
/// Migrate individual event bloom filters to the new aggregate table. We only
/// need to migrate all of the [AggregateBloom::BLOCK_RANGE_LEN] sized chunks.
/// The remainder will be reconstructed by the [crate::StorageManager] as the
/// [RunningEventFilter](crate::connection::event::RunningEventFilter).
fn migrate_individual_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> {
let mut select_old_bloom_stmt =
tx.prepare("SELECT bloom FROM starknet_events_filters ORDER BY block_number")?;
let bloom_filters: Vec<BloomFilter> = select_old_bloom_stmt
.query_and_then(params![], |row| {
let bytes: Vec<u8> = row.get(0)?;
Ok(BloomFilter::from_compressed_bytes(&bytes))
fn migrate_event_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> {
let bloom_filter_count = tx
.query_row("SELECT COUNT(*) FROM starknet_events_filters", [], |row| {
row.get::<_, u64>(0)
})
.context("Querying old Bloom filters")?
.collect::<anyhow::Result<_>>()?;
.context("Counting existing event Bloom filters")?;

if bloom_filters.is_empty() {
// There are no bloom filters to migrate.
if bloom_filter_count == 0 {
// No event Bloom filters to migrate.
return Ok(());
}

let mut insert_aggregate_stmt = tx.prepare(
let mut fetch_bloom_stmt =
tx.prepare("SELECT bloom FROM starknet_events_filters ORDER BY block_number")?;

let mut insert_aggregate_stmt = tx.prepare_cached(
r"
INSERT INTO event_filters
(from_block, to_block, bitmap)
INSERT INTO event_filters (from_block, to_block, bitmap)
VALUES (?, ?, ?)
",
)?;

let mut bloom_filters = fetch_bloom_stmt
.query_map([], |row| {
let bloom: Vec<u8> = row.get(0)?;
Ok(BloomFilter::from_compressed_bytes(&bloom))
})
.context("Querying old Bloom filters")?;

let mut aggregate = AggregateBloom::new(BlockNumber::GENESIS);
bloom_filters
.iter()
.enumerate()
.try_for_each(|(i, bloom_filter)| -> anyhow::Result<()> {
let block_number = BlockNumber::new_or_panic(i as u64);

aggregate.add_bloom(bloom_filter, block_number);
if block_number == aggregate.to_block {
insert_aggregate_stmt
.execute(params![
&aggregate.from_block,
&aggregate.to_block,
&aggregate.compress_bitmap()
])
.context("Inserting aggregate bloom filter")?;

aggregate = AggregateBloom::new(block_number + 1);
}

Ok(())
})?;
let mut migrated_count: u64 = 0;
let mut last_progress_report = Instant::now();

while let Some(bloom_filter) = bloom_filters.next().transpose()? {
let current_block = BlockNumber::new_or_panic(migrated_count);

aggregate.add_bloom(&bloom_filter, current_block);

if current_block == aggregate.to_block {
insert_aggregate_stmt
.execute(params![
&aggregate.from_block,
&aggregate.to_block,
&aggregate.compress_bitmap()
])
.context("Inserting aggregate bloom filter")?;

aggregate = AggregateBloom::new(current_block + 1);
}

migrated_count += 1;

if last_progress_report.elapsed().as_secs() >= 10 {
tracing::info!(
"Migrating event Bloom filters: {:.2}% ({}/{})",
migrated_count as f64 / bloom_filter_count as f64 * 100.0,
migrated_count,
bloom_filter_count
);
last_progress_report = Instant::now();
}
}

Ok(())
}

0 comments on commit 868a993

Please sign in to comment.