From 3b2de277235e0fdae3e852935816a52d189de9ec Mon Sep 17 00:00:00 2001 From: joshie <93316087+joshieDo@users.noreply.github.com> Date: Tue, 28 Jan 2025 19:10:04 +0000 Subject: [PATCH] valid file client incoming headers --- crates/cli/commands/src/import.rs | 12 ++- crates/consensus/consensus/src/noop.rs | 8 ++ crates/net/downloaders/src/file_client.rs | 113 +++++++++++++++------ crates/optimism/cli/src/commands/import.rs | 18 +++- 4 files changed, 114 insertions(+), 37 deletions(-) diff --git a/crates/cli/commands/src/import.rs b/crates/cli/commands/src/import.rs index 1f297ad33bdde..13b3e3f6c662f 100644 --- a/crates/cli/commands/src/import.rs +++ b/crates/cli/commands/src/import.rs @@ -87,7 +87,13 @@ impl> ImportComm let mut total_decoded_blocks = 0; let mut total_decoded_txns = 0; - while let Some(file_client) = reader.next_chunk::>().await? { + let mut sealed_header = provider_factory + .sealed_header(provider_factory.last_block_number()?)? + .expect("should have genesis"); + + while let Some(file_client) = + reader.next_chunk::>(consensus.clone(), Some(sealed_header)).await? + { // create a new FileClient from chunk read from file info!(target: "reth::cli", "Importing chain file chunk" @@ -125,6 +131,10 @@ impl> ImportComm res = pipeline.run() => res?, _ = tokio::signal::ctrl_c() => {}, } + + sealed_header = provider_factory + .sealed_header(provider_factory.last_block_number()?)? + .expect("should have genesis"); } let provider = provider_factory.provider()?; diff --git a/crates/consensus/consensus/src/noop.rs b/crates/consensus/consensus/src/noop.rs index 3df809ebe1081..79c873a7f6085 100644 --- a/crates/consensus/consensus/src/noop.rs +++ b/crates/consensus/consensus/src/noop.rs @@ -8,6 +8,14 @@ use reth_primitives_traits::Block; #[non_exhaustive] pub struct NoopConsensus; +#[cfg(any(test, feature = "test-utils"))] +impl NoopConsensus { + /// Creates an Arc instance of Self. + pub fn arc() -> std::sync::Arc { + std::sync::Arc::new(Self::default()) + } +} + impl HeaderValidator for NoopConsensus { fn validate_header(&self, _header: &SealedHeader) -> Result<(), ConsensusError> { Ok(()) diff --git a/crates/net/downloaders/src/file_client.rs b/crates/net/downloaders/src/file_client.rs index 4cc9f0eaa1447..b17b8cf6a4c7e 100644 --- a/crates/net/downloaders/src/file_client.rs +++ b/crates/net/downloaders/src/file_client.rs @@ -1,10 +1,11 @@ -use std::{collections::HashMap, io, path::Path}; +use std::{collections::HashMap, io, path::Path, sync::Arc}; use alloy_consensus::BlockHeader; use alloy_eips::BlockHashOrNumber; use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256}; use futures::Future; use itertools::Either; +use reth_consensus::{ConsensusError, HeaderValidator}; use reth_network_p2p::{ bodies::client::{BodiesClient, BodiesFut}, download::DownloadClient, @@ -56,6 +57,10 @@ pub struct FileClient { /// An error that can occur when constructing and using a [`FileClient`]. #[derive(Debug, Error)] pub enum FileClientError { + /// An error occurred when validating a header from file. + #[error(transparent)] + Consensus(#[from] ConsensusError), + /// An error occurred when opening or reading the file. #[error(transparent)] Io(#[from] std::io::Error), @@ -77,13 +82,19 @@ impl From<&'static str> for FileClientError { impl FileClient { /// Create a new file client from a file path. - pub async fn new>(path: P) -> Result { + pub async fn new>( + path: P, + consensus: Arc>, + ) -> Result { let file = File::open(path).await?; - Self::from_file(file).await + Self::from_file(file, consensus).await } /// Initialize the [`FileClient`] with a file directly. - pub(crate) async fn from_file(mut file: File) -> Result { + pub(crate) async fn from_file( + mut file: File, + consensus: Arc>, + ) -> Result { // get file len from metadata before reading let metadata = file.metadata().await?; let file_len = metadata.len(); @@ -91,7 +102,10 @@ impl FileClient { let mut reader = vec![]; file.read_to_end(&mut reader).await?; - Ok(Self::from_reader(&reader[..], file_len).await?.file_client) + Ok(FileClientBuilder { consensus, parent_header: None } + .build(&reader[..], file_len) + .await? + .file_client) } /// Get the tip hash of the chain. @@ -183,14 +197,23 @@ impl FileClient { } } -impl FromReader for FileClient { +struct FileClientBuilder { + pub consensus: Arc>, + pub parent_header: Option>, +} + +impl> FromReader + for FileClientBuilder +{ type Error = FileClientError; + type Output = FileClient; /// Initialize the [`FileClient`] from bytes that have been read from file. - fn from_reader( + fn build( + &self, reader: R, num_bytes: u64, - ) -> impl Future, Self::Error>> + ) -> impl Future, Self::Error>> where R: AsyncReadExt + Unpin, { @@ -213,6 +236,8 @@ impl FromReader for FileClient { let mut log_interval = 0; let mut log_interval_start_block = 0; + let mut parent_header = self.parent_header.clone(); + async move { while let Some(block_res) = stream.next().await { let block = match block_res { @@ -231,6 +256,14 @@ impl FromReader for FileClient { let block_number = block.header().number(); let block_hash = block.header().hash_slow(); + // Validate incoming header + let sealed = SealedHeader::new(block.header().clone(), block_hash); + self.consensus.validate_header(&sealed)?; + if let Some(parent) = &parent_header { + self.consensus.validate_header_against_parent(&sealed, parent)?; + parent_header = Some(sealed); + } + // add to the internal maps headers.insert(block.header().number(), block.header().clone()); hash_to_number.insert(block_hash, block.header().number()); @@ -255,7 +288,7 @@ impl FromReader for FileClient { trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client"); Ok(DecodedFileChunk { - file_client: Self { headers, hash_to_number, bodies }, + file_client: FileClient { headers, hash_to_number, bodies }, remaining_bytes, highest_block: None, }) @@ -452,15 +485,18 @@ impl ChunkedFileReader { } /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk. - pub async fn next_chunk(&mut self) -> Result, T::Error> - where - T: FromReader, - { + pub async fn next_chunk( + &mut self, + consensus: Arc>, + parent_header: Option>, + ) -> Result>, FileClientError> { let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) }; // make new file client from chunk let DecodedFileChunk { file_client, remaining_bytes, .. } = - T::from_reader(&self.chunk[..], next_chunk_byte_len).await?; + FileClientBuilder { consensus, parent_header } + .build(&self.chunk[..], next_chunk_byte_len) + .await?; // save left over bytes self.chunk = remaining_bytes; @@ -494,17 +530,21 @@ pub trait FromReader { /// Error returned by file client type. type Error: From; + /// Output returned by file client type. + type Output; + /// Returns a file client - fn from_reader( - reader: B, + fn build( + &self, + reader: R, num_bytes: u64, - ) -> impl Future, Self::Error>> + ) -> impl Future, Self::Error>> where Self: Sized, - B: AsyncReadExt + Unpin; + R: AsyncReadExt + Unpin; } -/// Output from decoding a file chunk with [`FromReader::from_reader`]. +/// Output from decoding a file chunk with [`FromReader::build`]. #[derive(Debug)] pub struct DecodedFileChunk { /// File client, i.e. the decoded part of chunk. @@ -530,11 +570,12 @@ mod tests { use assert_matches::assert_matches; use futures_util::stream::StreamExt; use rand::Rng; - use reth_consensus::test_utils::TestConsensus; + use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus}; use reth_network_p2p::{ bodies::downloader::BodyDownloader, headers::downloader::{HeaderDownloader, SyncTarget}, }; + use reth_primitives::Block; use reth_provider::test_utils::create_test_provider_factory; use std::sync::Arc; @@ -549,8 +590,12 @@ mod tests { // create an empty file let file = tempfile::tempfile().unwrap(); - let client: Arc = - Arc::new(FileClient::from_file(file.into()).await.unwrap().with_bodies(bodies.clone())); + let client: Arc = Arc::new( + FileClient::from_file(file.into(), NoopConsensus::arc()) + .await + .unwrap() + .with_bodies(bodies.clone()), + ); let mut downloader = BodiesDownloaderBuilder::default() .build::( client.clone(), @@ -576,12 +621,14 @@ mod tests { let file = tempfile::tempfile().unwrap(); let client: Arc = Arc::new( - FileClient::from_file(file.into()).await.unwrap().with_headers(HashMap::from([ - (0u64, p0.clone_header()), - (1, p1.clone_header()), - (2, p2.clone_header()), - (3, p3.clone_header()), - ])), + FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers( + HashMap::from([ + (0u64, p0.clone_header()), + (1, p1.clone_header()), + (2, p2.clone_header()), + (3, p3.clone_header()), + ]), + ), ); let mut downloader = ReverseHeadersDownloaderBuilder::default() @@ -604,7 +651,8 @@ mod tests { // Generate some random blocks let (file, headers, _) = generate_bodies_file(0..=19).await; // now try to read them back - let client: Arc = Arc::new(FileClient::from_file(file).await.unwrap()); + let client: Arc = + Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap()); // construct headers downloader and use first header let mut header_downloader = ReverseHeadersDownloaderBuilder::default() @@ -629,7 +677,8 @@ mod tests { let (file, headers, mut bodies) = generate_bodies_file(0..=19).await; // now try to read them back - let client: Arc = Arc::new(FileClient::from_file(file).await.unwrap()); + let client: Arc = + Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap()); // insert headers in db for the bodies downloader insert_headers(factory.db_ref().db(), &headers); @@ -668,7 +717,9 @@ mod tests { let mut local_header = headers.first().unwrap().clone(); // test - while let Some(client) = reader.next_chunk::().await.unwrap() { + while let Some(client) = + reader.next_chunk::(NoopConsensus::arc(), None).await.unwrap() + { let sync_target = client.tip_header().unwrap(); let sync_target_hash = sync_target.hash(); diff --git a/crates/optimism/cli/src/commands/import.rs b/crates/optimism/cli/src/commands/import.rs index 90a2acdec0d53..a2d10a10a3dd4 100644 --- a/crates/optimism/cli/src/commands/import.rs +++ b/crates/optimism/cli/src/commands/import.rs @@ -9,15 +9,13 @@ use reth_cli_commands::{ use reth_consensus::noop::NoopConsensus; use reth_db::tables; use reth_db_api::transaction::DbTx; -use reth_downloaders::file_client::{ - ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE, -}; +use reth_downloaders::file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE}; use reth_node_builder::BlockTy; use reth_node_core::version::SHORT_VERSION; use reth_optimism_chainspec::OpChainSpec; use reth_optimism_evm::OpExecutorProvider; use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives}; -use reth_provider::{ChainSpecProvider, StageCheckpointReader}; +use reth_provider::{BlockNumReader, ChainSpecProvider, HeaderProvider, StageCheckpointReader}; use reth_prune::PruneModes; use reth_stages::StageId; use reth_static_file::StaticFileProducer; @@ -70,7 +68,13 @@ impl> ImportOpCommand { let mut total_decoded_txns = 0; let mut total_filtered_out_dup_txns = 0; - while let Some(mut file_client) = reader.next_chunk::>>().await? { + let mut sealed_header = provider_factory + .sealed_header(provider_factory.last_block_number()?)? + .expect("should have genesis"); + + while let Some(mut file_client) = + reader.next_chunk::>(consensus.clone(), Some(sealed_header)).await? + { // create a new FileClient from chunk read from file info!(target: "reth::cli", "Importing chain file chunk" @@ -118,6 +122,10 @@ impl> ImportOpCommand { res = pipeline.run() => res?, _ = tokio::signal::ctrl_c() => {}, } + + sealed_header = provider_factory + .sealed_header(provider_factory.last_block_number()?)? + .expect("should have genesis"); } let provider = provider_factory.provider()?;