Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: validate headers loaded from file on reth import #14050

Merged
merged 1 commit into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion crates/cli/commands/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,13 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
let mut total_decoded_blocks = 0;
let mut total_decoded_txns = 0;

while let Some(file_client) = reader.next_chunk::<FileClient<_>>().await? {
let mut sealed_header = provider_factory
.sealed_header(provider_factory.last_block_number()?)?
.expect("should have genesis");

while let Some(file_client) =
reader.next_chunk::<BlockTy<N>>(consensus.clone(), Some(sealed_header)).await?
Comment on lines +94 to +95
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this works, but I wonder if a nicer solution to this would be a custom stream type and not require the consensus per next_chunk call.

but this entire type is weird so seems fine

Copy link
Collaborator Author

@joshieDo joshieDo Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, would rather we track this separately tbh, just want it to work for now for hive tests

{
// create a new FileClient from chunk read from file
info!(target: "reth::cli",
"Importing chain file chunk"
Expand Down Expand Up @@ -125,6 +131,10 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {},
}

sealed_header = provider_factory
.sealed_header(provider_factory.last_block_number()?)?
.expect("should have genesis");
}

let provider = provider_factory.provider()?;
Expand Down
8 changes: 8 additions & 0 deletions crates/consensus/consensus/src/noop.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{Consensus, ConsensusError, FullConsensus, HeaderValidator, PostExecutionInput};
use alloc::sync::Arc;
use alloy_primitives::U256;
use reth_primitives::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
use reth_primitives_traits::Block;
Expand All @@ -8,6 +9,13 @@ use reth_primitives_traits::Block;
#[non_exhaustive]
pub struct NoopConsensus;

impl NoopConsensus {
/// Creates an Arc instance of Self.
pub fn arc() -> Arc<Self> {
Arc::new(Self::default())
}
}

impl<H> HeaderValidator<H> for NoopConsensus {
fn validate_header(&self, _header: &SealedHeader<H>) -> Result<(), ConsensusError> {
Ok(())
Expand Down
113 changes: 82 additions & 31 deletions crates/net/downloaders/src/file_client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{collections::HashMap, io, path::Path};
use std::{collections::HashMap, io, path::Path, sync::Arc};

use alloy_consensus::BlockHeader;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
use futures::Future;
use itertools::Either;
use reth_consensus::{ConsensusError, HeaderValidator};
use reth_network_p2p::{
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
Expand Down Expand Up @@ -56,6 +57,10 @@ pub struct FileClient<B: Block = reth_primitives::Block> {
/// An error that can occur when constructing and using a [`FileClient`].
#[derive(Debug, Error)]
pub enum FileClientError {
/// An error occurred when validating a header from file.
#[error(transparent)]
Consensus(#[from] ConsensusError),

/// An error occurred when opening or reading the file.
#[error(transparent)]
Io(#[from] std::io::Error),
Expand All @@ -77,21 +82,30 @@ impl From<&'static str> for FileClientError {

impl<B: FullBlock> FileClient<B> {
/// Create a new file client from a file path.
pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, FileClientError> {
pub async fn new<P: AsRef<Path>>(
path: P,
consensus: Arc<dyn HeaderValidator<B::Header>>,
) -> Result<Self, FileClientError> {
let file = File::open(path).await?;
Self::from_file(file).await
Self::from_file(file, consensus).await
}

/// Initialize the [`FileClient`] with a file directly.
pub(crate) async fn from_file(mut file: File) -> Result<Self, FileClientError> {
pub(crate) async fn from_file(
mut file: File,
consensus: Arc<dyn HeaderValidator<B::Header>>,
) -> Result<Self, FileClientError> {
// get file len from metadata before reading
let metadata = file.metadata().await?;
let file_len = metadata.len();

let mut reader = vec![];
file.read_to_end(&mut reader).await?;

Ok(Self::from_reader(&reader[..], file_len).await?.file_client)
Ok(FileClientBuilder { consensus, parent_header: None }
.build(&reader[..], file_len)
.await?
.file_client)
}

/// Get the tip hash of the chain.
Expand Down Expand Up @@ -183,14 +197,23 @@ impl<B: FullBlock> FileClient<B> {
}
}

impl<B: FullBlock> FromReader for FileClient<B> {
struct FileClientBuilder<B: Block = reth_primitives::Block> {
pub consensus: Arc<dyn HeaderValidator<B::Header>>,
pub parent_header: Option<SealedHeader<B::Header>>,
}

impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
for FileClientBuilder<B>
{
type Error = FileClientError;
type Output = FileClient<B>;

/// Initialize the [`FileClient`] from bytes that have been read from file.
fn from_reader<R>(
fn build<R>(
&self,
reader: R,
num_bytes: u64,
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
where
R: AsyncReadExt + Unpin,
{
Expand All @@ -213,6 +236,8 @@ impl<B: FullBlock> FromReader for FileClient<B> {
let mut log_interval = 0;
let mut log_interval_start_block = 0;

let mut parent_header = self.parent_header.clone();

async move {
while let Some(block_res) = stream.next().await {
let block = match block_res {
Expand All @@ -231,6 +256,14 @@ impl<B: FullBlock> FromReader for FileClient<B> {
let block_number = block.header().number();
let block_hash = block.header().hash_slow();

// Validate incoming header
let sealed = SealedHeader::new(block.header().clone(), block_hash);
self.consensus.validate_header(&sealed)?;
if let Some(parent) = &parent_header {
self.consensus.validate_header_against_parent(&sealed, parent)?;
parent_header = Some(sealed);
}

// add to the internal maps
headers.insert(block.header().number(), block.header().clone());
hash_to_number.insert(block_hash, block.header().number());
Expand All @@ -255,7 +288,7 @@ impl<B: FullBlock> FromReader for FileClient<B> {
trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");

Ok(DecodedFileChunk {
file_client: Self { headers, hash_to_number, bodies },
file_client: FileClient { headers, hash_to_number, bodies },
remaining_bytes,
highest_block: None,
})
Expand Down Expand Up @@ -452,15 +485,18 @@ impl ChunkedFileReader {
}

/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
pub async fn next_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
where
T: FromReader,
{
pub async fn next_chunk<B: FullBlock>(
&mut self,
consensus: Arc<dyn HeaderValidator<B::Header>>,
parent_header: Option<SealedHeader<B::Header>>,
) -> Result<Option<FileClient<B>>, FileClientError> {
let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };

// make new file client from chunk
let DecodedFileChunk { file_client, remaining_bytes, .. } =
T::from_reader(&self.chunk[..], next_chunk_byte_len).await?;
FileClientBuilder { consensus, parent_header }
.build(&self.chunk[..], next_chunk_byte_len)
.await?;

// save left over bytes
self.chunk = remaining_bytes;
Expand Down Expand Up @@ -494,17 +530,21 @@ pub trait FromReader {
/// Error returned by file client type.
type Error: From<io::Error>;

/// Output returned by file client type.
type Output;

/// Returns a file client
fn from_reader<B>(
reader: B,
fn build<R>(
&self,
reader: R,
num_bytes: u64,
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
where
Self: Sized,
B: AsyncReadExt + Unpin;
R: AsyncReadExt + Unpin;
}

/// Output from decoding a file chunk with [`FromReader::from_reader`].
/// Output from decoding a file chunk with [`FromReader::build`].
#[derive(Debug)]
pub struct DecodedFileChunk<T> {
/// File client, i.e. the decoded part of chunk.
Expand All @@ -530,11 +570,12 @@ mod tests {
use assert_matches::assert_matches;
use futures_util::stream::StreamExt;
use rand::Rng;
use reth_consensus::test_utils::TestConsensus;
use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
use reth_network_p2p::{
bodies::downloader::BodyDownloader,
headers::downloader::{HeaderDownloader, SyncTarget},
};
use reth_primitives::Block;
use reth_provider::test_utils::create_test_provider_factory;
use std::sync::Arc;

Expand All @@ -549,8 +590,12 @@ mod tests {
// create an empty file
let file = tempfile::tempfile().unwrap();

let client: Arc<FileClient> =
Arc::new(FileClient::from_file(file.into()).await.unwrap().with_bodies(bodies.clone()));
let client: Arc<FileClient> = Arc::new(
FileClient::from_file(file.into(), NoopConsensus::arc())
.await
.unwrap()
.with_bodies(bodies.clone()),
);
let mut downloader = BodiesDownloaderBuilder::default()
.build::<reth_primitives::Block, _, _>(
client.clone(),
Expand All @@ -576,12 +621,14 @@ mod tests {

let file = tempfile::tempfile().unwrap();
let client: Arc<FileClient> = Arc::new(
FileClient::from_file(file.into()).await.unwrap().with_headers(HashMap::from([
(0u64, p0.clone_header()),
(1, p1.clone_header()),
(2, p2.clone_header()),
(3, p3.clone_header()),
])),
FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
HashMap::from([
(0u64, p0.clone_header()),
(1, p1.clone_header()),
(2, p2.clone_header()),
(3, p3.clone_header()),
]),
),
);

let mut downloader = ReverseHeadersDownloaderBuilder::default()
Expand All @@ -604,7 +651,8 @@ mod tests {
// Generate some random blocks
let (file, headers, _) = generate_bodies_file(0..=19).await;
// now try to read them back
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
let client: Arc<FileClient> =
Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());

// construct headers downloader and use first header
let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
Expand All @@ -629,7 +677,8 @@ mod tests {
let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;

// now try to read them back
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
let client: Arc<FileClient> =
Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());

// insert headers in db for the bodies downloader
insert_headers(factory.db_ref().db(), &headers);
Expand Down Expand Up @@ -668,7 +717,9 @@ mod tests {
let mut local_header = headers.first().unwrap().clone();

// test
while let Some(client) = reader.next_chunk::<FileClient>().await.unwrap() {
while let Some(client) =
reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
{
let sync_target = client.tip_header().unwrap();

let sync_target_hash = sync_target.hash();
Expand Down
18 changes: 13 additions & 5 deletions crates/optimism/cli/src/commands/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ use reth_cli_commands::{
use reth_consensus::noop::NoopConsensus;
use reth_db::tables;
use reth_db_api::transaction::DbTx;
use reth_downloaders::file_client::{
ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE,
};
use reth_downloaders::file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE};
use reth_node_builder::BlockTy;
use reth_node_core::version::SHORT_VERSION;
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_evm::OpExecutorProvider;
use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives};
use reth_provider::{ChainSpecProvider, StageCheckpointReader};
use reth_provider::{BlockNumReader, ChainSpecProvider, HeaderProvider, StageCheckpointReader};
use reth_prune::PruneModes;
use reth_stages::StageId;
use reth_static_file::StaticFileProducer;
Expand Down Expand Up @@ -70,7 +68,13 @@ impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> ImportOpCommand<C> {
let mut total_decoded_txns = 0;
let mut total_filtered_out_dup_txns = 0;

while let Some(mut file_client) = reader.next_chunk::<FileClient<BlockTy<N>>>().await? {
let mut sealed_header = provider_factory
.sealed_header(provider_factory.last_block_number()?)?
.expect("should have genesis");

while let Some(mut file_client) =
reader.next_chunk::<BlockTy<N>>(consensus.clone(), Some(sealed_header)).await?
{
// create a new FileClient from chunk read from file
info!(target: "reth::cli",
"Importing chain file chunk"
Expand Down Expand Up @@ -118,6 +122,10 @@ impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> ImportOpCommand<C> {
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {},
}

sealed_header = provider_factory
.sealed_header(provider_factory.last_block_number()?)?
.expect("should have genesis");
}

let provider = provider_factory.provider()?;
Expand Down
Loading