From dc697b0badfc908257525231203cde2acc65fe1f Mon Sep 17 00:00:00 2001 From: Jonathan <94441036+zeapoz@users.noreply.github.com> Date: Thu, 11 Apr 2024 10:07:15 +0200 Subject: [PATCH] ref: abstract away protobuf from snapshot (#84) Introduces a new trait, `Proto`, that defines methods for converting between native structs and ones generated by protobuf, and ones for encoding/decoding those structs into/from gzipped protobuf files. --- src/processor/snapshot/exporter.rs | 106 +++--------------- src/processor/snapshot/importer.rs | 24 +--- src/processor/snapshot/mod.rs | 6 +- src/processor/snapshot/types.rs | 172 ++++++++++++++++++++++++++--- src/processor/tree/tree_wrapper.rs | 12 +- 5 files changed, 184 insertions(+), 136 deletions(-) diff --git a/src/processor/snapshot/exporter.rs b/src/processor/snapshot/exporter.rs index 33b5c7c..b76a0db 100644 --- a/src/processor/snapshot/exporter.rs +++ b/src/processor/snapshot/exporter.rs @@ -1,23 +1,16 @@ -use std::{ - io::Write, - path::{Path, PathBuf}, -}; +use std::path::{Path, PathBuf}; -use bytes::BytesMut; use ethers::types::U64; use eyre::Result; -use flate2::{write::GzEncoder, Compression}; -use prost::Message; use super::{ database::{self, SnapshotDB}, - types::{self, SnapshotFactoryDependency, SnapshotHeader}, + types::{SnapshotFactoryDependency, SnapshotHeader}, DEFAULT_DB_PATH, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME, }; - -pub mod protobuf { - include!(concat!(env!("OUT_DIR"), "/protobuf.rs")); -} +use crate::processor::snapshot::types::{ + Proto, SnapshotFactoryDependencies, SnapshotStorageLogsChunk, SnapshotStorageLogsChunkMetadata, +}; pub struct SnapshotExporter { basedir: PathBuf, @@ -68,26 +61,15 @@ impl SnapshotExporter { fn export_factory_deps(&self, header: &mut SnapshotHeader) -> Result<()> { tracing::info!("Exporting factory dependencies..."); - let mut buf = BytesMut::new(); - let storage_logs = self.database.cf_handle(database::FACTORY_DEPS).unwrap(); let mut iterator = self .database .iterator_cf(storage_logs, rocksdb::IteratorMode::Start); - let mut factory_deps = protobuf::SnapshotFactoryDependencies::default(); + let mut factory_deps = SnapshotFactoryDependencies::default(); while let Some(Ok((_, bs))) = iterator.next() { let factory_dep: SnapshotFactoryDependency = bincode::deserialize(&bs)?; - factory_deps - .factory_deps - .push(protobuf::SnapshotFactoryDependency { - bytecode: Some(factory_dep.bytecode), - }); - } - - let fd_len = factory_deps.encoded_len(); - if buf.capacity() < fd_len { - buf.reserve(fd_len - buf.capacity()); + factory_deps.factory_deps.push(factory_dep); } let path = self.basedir.join(format!( @@ -100,20 +82,7 @@ impl SnapshotExporter { .into_string() .expect("path to string"); - // Serialize chunk. - factory_deps.encode(&mut buf)?; - - let outfile = std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(false) - .open(path)?; - - // Wrap in gzip compression before writing. - let mut encoder = GzEncoder::new(outfile, Compression::default()); - encoder.write_all(&buf)?; - encoder.finish()?; - + factory_deps.encode(&path)?; tracing::info!("All factory dependencies were successfully serialized!"); Ok(()) } @@ -121,62 +90,36 @@ impl SnapshotExporter { fn export_storage_logs(&self, chunk_size: u64, header: &mut SnapshotHeader) -> Result<()> { tracing::info!("Exporting storage logs..."); - let mut buf = BytesMut::new(); - let mut chunk_id = 0; - let num_logs = self.database.get_last_repeated_key_index()?; tracing::info!("Found {num_logs} logs."); - let total_num_chunks = (num_logs / chunk_size) + 1; - let index_to_key_map = self.database.cf_handle(database::INDEX_TO_KEY_MAP).unwrap(); let mut iterator = self .database .iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start); - let mut has_more = true; - - while has_more { + let total_num_chunks = (num_logs / chunk_size) + 1; + for chunk_id in 0..total_num_chunks { tracing::info!("Serializing chunk {}/{}...", chunk_id + 1, total_num_chunks); - let mut chunk = protobuf::SnapshotStorageLogsChunk { - storage_logs: vec![], - }; - + let mut chunk = SnapshotStorageLogsChunk::default(); for _ in 0..chunk_size { if let Some(Ok((_, key))) = iterator.next() { if let Ok(Some(entry)) = self.database.get_storage_log(key.as_ref()) { - let pb = protobuf::SnapshotStorageLog { - account_address: None, - storage_key: Some(key.to_vec()), - storage_value: Some(entry.value.0.to_vec()), - l1_batch_number_of_initial_write: Some( - entry.l1_batch_number_of_initial_write.as_u32(), - ), - enumeration_index: Some(entry.enumeration_index), - }; - - chunk.storage_logs.push(pb); + chunk.storage_logs.push(entry); } } else { - has_more = false; + break; } } - // Ensure that write buffer has enough capacity. - let chunk_len = chunk.encoded_len(); - if buf.capacity() < chunk_len { - buf.reserve(chunk_len - buf.capacity()); - } - - let path = &self.basedir.join(format!( + let path = self.basedir.join(format!( "snapshot_l1_batch_{}_storage_logs_part_{:0>4}.proto.gzip", header.l1_batch_number, chunk_id )); - header .storage_logs_chunks - .push(types::SnapshotStorageLogsChunkMetadata { + .push(SnapshotStorageLogsChunkMetadata { chunk_id, filepath: path .clone() @@ -185,25 +128,8 @@ impl SnapshotExporter { .expect("path to string"), }); - // Serialize chunk. - chunk.encode(&mut buf)?; - - let outfile = std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(false) - .open(path)?; - - // Wrap in gzip compression before writing. - let mut encoder = GzEncoder::new(outfile, Compression::default()); - encoder.write_all(&buf)?; - encoder.finish()?; - - // Clear $tmp buffer. - buf.truncate(0); - + chunk.encode(&path)?; tracing::info!("Chunk {} was successfully serialized!", chunk_id + 1); - chunk_id += 1; } tracing::info!("All storage logs were successfully serialized!"); diff --git a/src/processor/snapshot/importer.rs b/src/processor/snapshot/importer.rs index e351e05..42428ab 100644 --- a/src/processor/snapshot/importer.rs +++ b/src/processor/snapshot/importer.rs @@ -1,19 +1,15 @@ use std::{ fs, - io::Read, path::{Path, PathBuf}, sync::Arc, }; use eyre::Result; -use flate2::read::GzDecoder; -use prost::Message; use state_reconstruct_fetcher::{constants::storage::INNER_DB_NAME, database::InnerDB}; use tokio::sync::Mutex; use super::{ - exporter::protobuf::{SnapshotFactoryDependencies, SnapshotStorageLogsChunk}, - types::SnapshotHeader, + types::{Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk}, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME, }; use crate::processor::tree::tree_wrapper::TreeWrapper; @@ -59,13 +55,7 @@ impl SnapshotImporter { header.l1_batch_number, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX )); let bytes = fs::read(factory_deps_path)?; - let mut decoder = GzDecoder::new(&bytes[..]); - - let mut decompressed_bytes = Vec::new(); - decoder.read_to_end(&mut decompressed_bytes)?; - - let factory_deps = SnapshotFactoryDependencies::decode(&decompressed_bytes[..])?; - Ok(factory_deps) + SnapshotFactoryDependencies::decode(&bytes) } fn read_storage_logs_chunks( @@ -85,15 +75,7 @@ impl SnapshotImporter { .directory .join(path.file_name().expect("path has no file name")); let bytes = fs::read(factory_deps_path)?; - let mut decoder = GzDecoder::new(&bytes[..]); - - let mut decompressed_bytes = Vec::new(); - decoder.read_to_end(&mut decompressed_bytes)?; - - // TODO: It would be nice to avoid the intermediary step of decoding. Something like - // implementing a method on the types::* that does it automatically. Will improve - // readabitly for the export code too as a bonus. - let storage_logs_chunk = SnapshotStorageLogsChunk::decode(&decompressed_bytes[..])?; + let storage_logs_chunk = SnapshotStorageLogsChunk::decode(&bytes)?; chunks.push(storage_logs_chunk); } Ok(chunks) diff --git a/src/processor/snapshot/mod.rs b/src/processor/snapshot/mod.rs index 448593c..4c94425 100644 --- a/src/processor/snapshot/mod.rs +++ b/src/processor/snapshot/mod.rs @@ -3,9 +3,9 @@ use std::{fs, path::PathBuf, str::FromStr}; pub mod database; pub mod exporter; pub mod importer; +pub mod types; mod bytecode; -mod types; use async_trait::async_trait; use blake2::{Blake2s256, Digest}; @@ -28,6 +28,10 @@ pub const DEFAULT_DB_PATH: &str = "snapshot_db"; pub const SNAPSHOT_HEADER_FILE_NAME: &str = "snapshot-header.json"; pub const SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX: &str = "factory_deps.proto.gzip"; +pub mod protobuf { + include!(concat!(env!("OUT_DIR"), "/protobuf.rs")); +} + pub struct SnapshotBuilder { database: SnapshotDB, } diff --git a/src/processor/snapshot/types.rs b/src/processor/snapshot/types.rs index 0f4f779..60836c0 100644 --- a/src/processor/snapshot/types.rs +++ b/src/processor/snapshot/types.rs @@ -1,15 +1,74 @@ -use std::fmt; +use std::{ + io::{Read, Write}, + path::Path, +}; +use bytes::BytesMut; use chrono::{offset::Utc, DateTime}; use ethers::types::{H256, U256, U64}; +use eyre::Result; +use flate2::{read::GzDecoder, write::GzEncoder, Compression}; +use prost::Message; use serde::{Deserialize, Serialize}; +use super::{bytecode, protobuf}; + pub type L1BatchNumber = U64; pub type MiniblockNumber = U64; pub type StorageKey = U256; pub type StorageValue = H256; +pub trait Proto { + type ProtoStruct: Message + Default; + + /// Convert [`Self`] into its protobuf generated equivalent. + fn to_proto(&self) -> Self::ProtoStruct; + + /// Convert from a generated protobuf struct. + fn from_proto(proto: Self::ProtoStruct) -> Result + where + Self: Sized; + + /// Encode [`Self`] to file using gzip compression. + fn encode(&self, path: &Path) -> Result<()> { + let proto = Self::to_proto(self); + + // Ensure that write buffer has enough capacity. + let mut buf = BytesMut::new(); + let len = proto.encoded_len(); + if buf.capacity() < len { + buf.reserve(len - buf.capacity()); + } + + Self::ProtoStruct::encode(&proto, &mut buf)?; + let outfile = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(false) + .open(path)?; + + let mut encoder = GzEncoder::new(outfile, Compression::default()); + encoder.write_all(&buf)?; + encoder.finish()?; + + Ok(()) + } + + /// Decode a slice of gzip-compressed bytes into [`Self`]. + fn decode(bytes: &[u8]) -> Result + where + Self: Sized, + { + let mut decoder = GzDecoder::new(bytes); + let mut decompressed_bytes = Vec::new(); + decoder.read_to_end(&mut decompressed_bytes)?; + + let proto = Self::ProtoStruct::decode(&decompressed_bytes[..])?; + Self::from_proto(proto) + } +} + #[derive(Default, Debug, Serialize, Deserialize)] pub struct SnapshotHeader { pub l1_batch_number: L1BatchNumber, @@ -37,10 +96,32 @@ pub struct SnapshotStorageKey { } #[derive(Default, Debug, Serialize, Deserialize)] -pub struct SnapshotChunk { - // Sorted by `hashed_keys` interpreted as little-endian numbers +pub struct SnapshotStorageLogsChunk { pub storage_logs: Vec, - pub factory_deps: Vec, +} + +impl Proto for SnapshotStorageLogsChunk { + type ProtoStruct = protobuf::SnapshotStorageLogsChunk; + + fn to_proto(&self) -> Self::ProtoStruct { + Self::ProtoStruct { + storage_logs: self + .storage_logs + .iter() + .map(SnapshotStorageLog::to_proto) + .collect(), + } + } + + fn from_proto(proto: Self::ProtoStruct) -> Result { + Ok(Self { + storage_logs: proto + .storage_logs + .into_iter() + .map(SnapshotStorageLog::from_proto) + .collect::>>()?, + }) + } } // "most recent" for each key together with info when the key was first used @@ -53,17 +134,60 @@ pub struct SnapshotStorageLog { pub enumeration_index: u64, } -impl fmt::Display for SnapshotStorageLog { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "{},{},{},{},{}", - self.key, - hex::encode(self.value), - self.miniblock_number_of_initial_write, - self.l1_batch_number_of_initial_write, - self.enumeration_index - ) +impl Proto for SnapshotStorageLog { + type ProtoStruct = protobuf::SnapshotStorageLog; + + fn to_proto(&self) -> Self::ProtoStruct { + let mut key = [0u8; 32]; + self.key.to_big_endian(&mut key); + + Self::ProtoStruct { + account_address: None, + storage_key: Some(key.to_vec()), + storage_value: Some(self.value.as_bytes().to_vec()), + l1_batch_number_of_initial_write: Some(self.l1_batch_number_of_initial_write.as_u32()), + enumeration_index: Some(self.enumeration_index), + } + } + + fn from_proto(proto: Self::ProtoStruct) -> Result { + let value_bytes: [u8; 32] = proto.storage_value().try_into()?; + Ok(Self { + key: U256::from_big_endian(proto.storage_key()), + value: StorageValue::from(&value_bytes), + miniblock_number_of_initial_write: U64::from(0), + l1_batch_number_of_initial_write: proto.l1_batch_number_of_initial_write().into(), + enumeration_index: proto.enumeration_index(), + }) + } +} + +#[derive(Default, Debug, Serialize, Deserialize)] +pub struct SnapshotFactoryDependencies { + pub factory_deps: Vec, +} + +impl Proto for SnapshotFactoryDependencies { + type ProtoStruct = protobuf::SnapshotFactoryDependencies; + + fn to_proto(&self) -> Self::ProtoStruct { + Self::ProtoStruct { + factory_deps: self + .factory_deps + .iter() + .map(SnapshotFactoryDependency::to_proto) + .collect(), + } + } + + fn from_proto(proto: Self::ProtoStruct) -> Result { + Ok(Self { + factory_deps: proto + .factory_deps + .into_iter() + .map(SnapshotFactoryDependency::from_proto) + .collect::>>()?, + }) } } @@ -72,3 +196,21 @@ pub struct SnapshotFactoryDependency { pub bytecode_hash: H256, pub bytecode: Vec, } + +impl Proto for SnapshotFactoryDependency { + type ProtoStruct = protobuf::SnapshotFactoryDependency; + + fn to_proto(&self) -> Self::ProtoStruct { + Self::ProtoStruct { + bytecode: Some(self.bytecode.clone()), + } + } + + fn from_proto(proto: Self::ProtoStruct) -> Result { + let bytecode = proto.bytecode(); + Ok(Self { + bytecode_hash: bytecode::hash_bytecode(bytecode), + bytecode: bytecode.to_vec(), + }) + } +} diff --git a/src/processor/tree/tree_wrapper.rs b/src/processor/tree/tree_wrapper.rs index ba3ad16..08de511 100644 --- a/src/processor/tree/tree_wrapper.rs +++ b/src/processor/tree/tree_wrapper.rs @@ -14,7 +14,7 @@ use zksync_merkle_tree::{Database, MerkleTree, RocksDBWrapper, TreeEntry}; use zksync_storage::{RocksDB, RocksDBOptions}; use super::RootHash; -use crate::processor::snapshot::exporter::protobuf::SnapshotStorageLogsChunk; +use crate::processor::snapshot::types::SnapshotStorageLogsChunk; #[derive(Error, Debug)] pub enum TreeError { @@ -135,17 +135,11 @@ impl TreeWrapper { tracing::info!("Importing chunk {}/{}...", i + 1, chunks.len()); for log in &chunk.storage_logs { - let key = U256::from_big_endian(log.storage_key()); - let index = log.enumeration_index(); - - let value_bytes: [u8; 32] = log.storage_value().try_into()?; - let value = H256::from(&value_bytes); - - tree_entries.push(TreeEntry::new(key, index, value)); + tree_entries.push(TreeEntry::new(log.key, log.enumeration_index, log.value)); self.snapshot .lock() .await - .add_key(&key) + .add_key(&log.key) .expect("cannot add key"); }