diff --git a/src/main.rs b/src/main.rs index 12657f7..8ae1e0c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,11 @@ use eyre::Result; use processor::snapshot::{ exporter::SnapshotExporter, importer::SnapshotImporter, SnapshotBuilder, }; -use state_reconstruct_fetcher::{constants::storage, l1_fetcher::L1Fetcher, types::CommitBlock}; +use state_reconstruct_fetcher::{ + constants::{ethereum, storage}, + l1_fetcher::{L1Fetcher, L1FetcherOptions}, + types::CommitBlock, +}; use tikv_jemallocator::Jemalloc; use tokio::sync::mpsc; use tracing_subscriber::{filter::LevelFilter, EnvFilter}; @@ -153,10 +157,20 @@ async fn main() -> Result<()> { l1_fetcher_options, db_path, } => { - let fetcher_options = l1_fetcher_options.into(); - let fetcher = L1Fetcher::new(fetcher_options, None)?; let processor = SnapshotBuilder::new(db_path); + let mut fetcher_options: L1FetcherOptions = l1_fetcher_options.into(); + if let Ok(Some(batch_number)) = processor.get_last_l1_batch_number() { + if batch_number > ethereum::GENESIS_BLOCK { + tracing::info!( + "Found a preexisting snapshot db, continuing from L1 block: {batch_number}" + ); + fetcher_options.start_block = batch_number + 1; + } + } + + let fetcher = L1Fetcher::new(fetcher_options, None)?; + let (tx, rx) = mpsc::channel::(5); let processor_handle = tokio::spawn(async move { processor.run(rx).await; diff --git a/src/processor/snapshot/database.rs b/src/processor/snapshot/database.rs index e45ac56..80f1c47 100644 --- a/src/processor/snapshot/database.rs +++ b/src/processor/snapshot/database.rs @@ -14,6 +14,7 @@ pub const FACTORY_DEPS: &str = "factory_deps"; const METADATA: &str = "metadata"; const LAST_REPEATED_KEY_INDEX: &str = "LAST_REPEATED_KEY_INDEX"; +const LAST_L1_BATCH_NUMBER: &str = "LAST_L1_BATCH_NUMBER"; #[allow(clippy::enum_variant_names)] #[derive(Error, Debug)] @@ -118,6 +119,27 @@ impl SnapshotDB { .map_err(Into::into) } + pub fn get_last_l1_batch_number(&self) -> Result> { + // Unwrapping column family handle here is safe because presence of + // those CFs is ensured in construction of this DB. + let metadata = self.cf_handle(METADATA).unwrap(); + let batch = self.get_cf(metadata, LAST_L1_BATCH_NUMBER)?.map(|bytes| { + u64::from_be_bytes([ + bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], + ]) + }); + + Ok(batch) + } + + pub fn set_last_l1_batch_number(&self, batch_number: u64) -> Result<()> { + // Unwrapping column family handle here is safe because presence of + // those CFs is ensured in construction of this DB. + let metadata = self.cf_handle(METADATA).unwrap(); + self.put_cf(metadata, LAST_L1_BATCH_NUMBER, batch_number.to_be_bytes()) + .map_err(Into::into) + } + pub fn get_storage_log(&self, key: &[u8]) -> Result> { // Unwrapping column family handle here is safe because presence of // those CFs is ensured in construction of this DB. diff --git a/src/processor/snapshot/exporter.rs b/src/processor/snapshot/exporter.rs index 7fd35a1..33b5c7c 100644 --- a/src/processor/snapshot/exporter.rs +++ b/src/processor/snapshot/exporter.rs @@ -4,6 +4,7 @@ use std::{ }; use bytes::BytesMut; +use ethers::types::U64; use eyre::Result; use flate2::{write::GzEncoder, Compression}; use prost::Message; @@ -38,7 +39,17 @@ impl SnapshotExporter { } pub fn export_snapshot(&self, chunk_size: u64) -> Result<()> { - let mut header = SnapshotHeader::default(); + let l1_batch_number = U64::from( + self.database + .get_last_l1_batch_number()? + .expect("snapshot db contains no L1 batch number"), + ); + + let mut header = SnapshotHeader { + l1_batch_number, + ..Default::default() + }; + self.export_storage_logs(chunk_size, &mut header)?; self.export_factory_deps(&mut header)?; @@ -146,7 +157,6 @@ impl SnapshotExporter { }; chunk.storage_logs.push(pb); - header.l1_batch_number = entry.l1_batch_number_of_initial_write; } } else { has_more = false; @@ -159,7 +169,7 @@ impl SnapshotExporter { buf.reserve(chunk_len - buf.capacity()); } - let path = PathBuf::new().join(&self.basedir).join(format!( + let path = &self.basedir.join(format!( "snapshot_l1_batch_{}_storage_logs_part_{:0>4}.proto.gzip", header.l1_batch_number, chunk_id )); diff --git a/src/processor/snapshot/mod.rs b/src/processor/snapshot/mod.rs index 9f21c75..448593c 100644 --- a/src/processor/snapshot/mod.rs +++ b/src/processor/snapshot/mod.rs @@ -53,6 +53,11 @@ impl SnapshotBuilder { Self { database } } + + // Gets the next L1 batch number to be processed for ues in state recovery. + pub fn get_last_l1_batch_number(&self) -> Result> { + self.database.get_last_l1_batch_number() + } } #[async_trait] @@ -109,6 +114,10 @@ impl Processor for SnapshotBuilder { }) .expect("failed to save factory dep"); } + + if let Some(number) = block.l1_block_number { + let _ = self.database.set_last_l1_batch_number(number); + }; } } } diff --git a/src/processor/tree/tree_wrapper.rs b/src/processor/tree/tree_wrapper.rs index fe0336c..ba3ad16 100644 --- a/src/processor/tree/tree_wrapper.rs +++ b/src/processor/tree/tree_wrapper.rs @@ -131,7 +131,9 @@ impl TreeWrapper { ) -> Result<()> { let mut tree_entries = Vec::new(); - for chunk in &chunks { + for (i, chunk) in chunks.iter().enumerate() { + tracing::info!("Importing chunk {}/{}...", i + 1, chunks.len()); + for log in &chunk.storage_logs { let key = U256::from_big_endian(log.storage_key()); let index = log.enumeration_index(); @@ -146,15 +148,18 @@ impl TreeWrapper { .add_key(&key) .expect("cannot add key"); } + + tracing::info!("Chunk {} was succesfully imported!", i + 1); } + tracing::info!("Extending merkle tree with imported storage logs..."); let num_tree_entries = tree_entries.len(); self.tree.extend(tree_entries); tracing::info!("Succesfully imported snapshot containing {num_tree_entries} storage logs!",); let snapshot = self.snapshot.lock().await; - snapshot.set_latest_l1_block_number(l1_batch_number.as_u64())?; + snapshot.set_latest_l1_block_number(l1_batch_number.as_u64() + 1)?; Ok(()) }