diff --git a/crates/storage/src/schema/revision_0066.rs b/crates/storage/src/schema/revision_0066.rs index 544ff6f323..a748ed8622 100644 --- a/crates/storage/src/schema/revision_0066.rs +++ b/crates/storage/src/schema/revision_0066.rs @@ -1,12 +1,13 @@ +use std::time::Instant; + use anyhow::Context; use pathfinder_common::BlockNumber; use crate::bloom::{AggregateBloom, BloomFilter}; use crate::params::params; -#[allow(dead_code)] pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { - tracing::info!("Creating event_filters table and migrating filters"); + tracing::info!("Creating event_filters table and migrating event Bloom filters"); tx.execute( r" @@ -17,67 +18,84 @@ pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { UNIQUE(from_block, to_block) ) ", - params![], + [], ) .context("Creating event_filters table")?; - migrate_individual_filters(tx)?; + migrate_event_filters(tx).context("Migrating event Bloom filters")?; - tx.execute("DROP TABLE starknet_events_filters", params![])?; + tx.execute("DROP TABLE starknet_events_filters", []) + .context("Dropping starknet_events_filters table")?; Ok(()) } -/// Migrate individual bloom filters to the new aggregate table. We only need to -/// migrate all of the [BLOCK_RANGE_LEN](AggregateBloom::BLOCK_RANGE_LEN) sized -/// chunks. The remainder will be reconstructed by the -/// [StorageManager](crate::StorageManager) as the +/// Migrate individual event bloom filters to the new aggregate table. We only +/// need to migrate all of the [AggregateBloom::BLOCK_RANGE_LEN] sized chunks. +/// The remainder will be reconstructed by the [crate::StorageManager] as the /// [RunningEventFilter](crate::connection::event::RunningEventFilter). -fn migrate_individual_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { - let mut select_old_bloom_stmt = - tx.prepare("SELECT bloom FROM starknet_events_filters ORDER BY block_number")?; - let bloom_filters: Vec = select_old_bloom_stmt - .query_and_then(params![], |row| { - let bytes: Vec = row.get(0)?; - Ok(BloomFilter::from_compressed_bytes(&bytes)) +fn migrate_event_filters(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { + let bloom_filter_count = tx + .query_row("SELECT COUNT(*) FROM starknet_events_filters", [], |row| { + row.get::<_, u64>(0) }) - .context("Querying old Bloom filters")? - .collect::>()?; + .context("Counting existing event Bloom filters")?; - if bloom_filters.is_empty() { - // There are no bloom filters to migrate. + if bloom_filter_count == 0 { + // No event Bloom filters to migrate. return Ok(()); } - let mut insert_aggregate_stmt = tx.prepare( + let mut fetch_bloom_stmt = + tx.prepare("SELECT bloom FROM starknet_events_filters ORDER BY block_number")?; + + let mut insert_aggregate_stmt = tx.prepare_cached( r" - INSERT INTO event_filters - (from_block, to_block, bitmap) + INSERT INTO event_filters (from_block, to_block, bitmap) VALUES (?, ?, ?) ", )?; + + let mut bloom_filters = fetch_bloom_stmt + .query_map([], |row| { + let bloom: Vec = row.get(0)?; + Ok(BloomFilter::from_compressed_bytes(&bloom)) + }) + .context("Querying old Bloom filters")?; + let mut aggregate = AggregateBloom::new(BlockNumber::GENESIS); - bloom_filters - .iter() - .enumerate() - .try_for_each(|(i, bloom_filter)| -> anyhow::Result<()> { - let block_number = BlockNumber::new_or_panic(i as u64); - - aggregate.add_bloom(bloom_filter, block_number); - if block_number == aggregate.to_block { - insert_aggregate_stmt - .execute(params![ - &aggregate.from_block, - &aggregate.to_block, - &aggregate.compress_bitmap() - ]) - .context("Inserting aggregate bloom filter")?; - - aggregate = AggregateBloom::new(block_number + 1); - } - - Ok(()) - })?; + let mut migrated_count: u64 = 0; + let mut last_progress_report = Instant::now(); + + while let Some(bloom_filter) = bloom_filters.next().transpose()? { + let current_block = BlockNumber::new_or_panic(migrated_count); + + aggregate.add_bloom(&bloom_filter, current_block); + + if current_block == aggregate.to_block { + insert_aggregate_stmt + .execute(params![ + &aggregate.from_block, + &aggregate.to_block, + &aggregate.compress_bitmap() + ]) + .context("Inserting aggregate bloom filter")?; + + aggregate = AggregateBloom::new(current_block + 1); + } + + migrated_count += 1; + + if last_progress_report.elapsed().as_secs() >= 10 { + tracing::info!( + "Migrating event Bloom filters: {:.2}% ({}/{})", + migrated_count as f64 / bloom_filter_count as f64 * 100.0, + migrated_count, + bloom_filter_count + ); + last_progress_report = Instant::now(); + } + } Ok(()) }