Skip to content

Commit

Permalink
valid file client incoming headers
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo committed Jan 28, 2025
1 parent 4653d3d commit 3b2de27
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 37 deletions.
12 changes: 11 additions & 1 deletion crates/cli/commands/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,13 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
let mut total_decoded_blocks = 0;
let mut total_decoded_txns = 0;

while let Some(file_client) = reader.next_chunk::<FileClient<_>>().await? {
let mut sealed_header = provider_factory
.sealed_header(provider_factory.last_block_number()?)?
.expect("should have genesis");

while let Some(file_client) =
reader.next_chunk::<BlockTy<N>>(consensus.clone(), Some(sealed_header)).await?
{
// create a new FileClient from chunk read from file
info!(target: "reth::cli",
"Importing chain file chunk"
Expand Down Expand Up @@ -125,6 +131,10 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {},
}

sealed_header = provider_factory
.sealed_header(provider_factory.last_block_number()?)?
.expect("should have genesis");
}

let provider = provider_factory.provider()?;
Expand Down
8 changes: 8 additions & 0 deletions crates/consensus/consensus/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ use reth_primitives_traits::Block;
#[non_exhaustive]
pub struct NoopConsensus;

#[cfg(any(test, feature = "test-utils"))]
impl NoopConsensus {
/// Creates an Arc instance of Self.
pub fn arc() -> std::sync::Arc<Self> {
std::sync::Arc::new(Self::default())
}
}

impl<H> HeaderValidator<H> for NoopConsensus {
fn validate_header(&self, _header: &SealedHeader<H>) -> Result<(), ConsensusError> {
Ok(())
Expand Down
113 changes: 82 additions & 31 deletions crates/net/downloaders/src/file_client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{collections::HashMap, io, path::Path};
use std::{collections::HashMap, io, path::Path, sync::Arc};

use alloy_consensus::BlockHeader;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
use futures::Future;
use itertools::Either;
use reth_consensus::{ConsensusError, HeaderValidator};
use reth_network_p2p::{
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
Expand Down Expand Up @@ -56,6 +57,10 @@ pub struct FileClient<B: Block = reth_primitives::Block> {
/// An error that can occur when constructing and using a [`FileClient`].
#[derive(Debug, Error)]
pub enum FileClientError {
/// An error occurred when validating a header from file.
#[error(transparent)]
Consensus(#[from] ConsensusError),

/// An error occurred when opening or reading the file.
#[error(transparent)]
Io(#[from] std::io::Error),
Expand All @@ -77,21 +82,30 @@ impl From<&'static str> for FileClientError {

impl<B: FullBlock> FileClient<B> {
/// Create a new file client from a file path.
pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, FileClientError> {
pub async fn new<P: AsRef<Path>>(
path: P,
consensus: Arc<dyn HeaderValidator<B::Header>>,
) -> Result<Self, FileClientError> {
let file = File::open(path).await?;
Self::from_file(file).await
Self::from_file(file, consensus).await
}

/// Initialize the [`FileClient`] with a file directly.
pub(crate) async fn from_file(mut file: File) -> Result<Self, FileClientError> {
pub(crate) async fn from_file(
mut file: File,
consensus: Arc<dyn HeaderValidator<B::Header>>,
) -> Result<Self, FileClientError> {
// get file len from metadata before reading
let metadata = file.metadata().await?;
let file_len = metadata.len();

let mut reader = vec![];
file.read_to_end(&mut reader).await?;

Ok(Self::from_reader(&reader[..], file_len).await?.file_client)
Ok(FileClientBuilder { consensus, parent_header: None }
.build(&reader[..], file_len)
.await?
.file_client)
}

/// Get the tip hash of the chain.
Expand Down Expand Up @@ -183,14 +197,23 @@ impl<B: FullBlock> FileClient<B> {
}
}

impl<B: FullBlock> FromReader for FileClient<B> {
struct FileClientBuilder<B: Block = reth_primitives::Block> {
pub consensus: Arc<dyn HeaderValidator<B::Header>>,
pub parent_header: Option<SealedHeader<B::Header>>,
}

impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
for FileClientBuilder<B>
{
type Error = FileClientError;
type Output = FileClient<B>;

/// Initialize the [`FileClient`] from bytes that have been read from file.
fn from_reader<R>(
fn build<R>(
&self,
reader: R,
num_bytes: u64,
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
where
R: AsyncReadExt + Unpin,
{
Expand All @@ -213,6 +236,8 @@ impl<B: FullBlock> FromReader for FileClient<B> {
let mut log_interval = 0;
let mut log_interval_start_block = 0;

let mut parent_header = self.parent_header.clone();

async move {
while let Some(block_res) = stream.next().await {
let block = match block_res {
Expand All @@ -231,6 +256,14 @@ impl<B: FullBlock> FromReader for FileClient<B> {
let block_number = block.header().number();
let block_hash = block.header().hash_slow();

// Validate incoming header
let sealed = SealedHeader::new(block.header().clone(), block_hash);
self.consensus.validate_header(&sealed)?;
if let Some(parent) = &parent_header {
self.consensus.validate_header_against_parent(&sealed, parent)?;
parent_header = Some(sealed);
}

// add to the internal maps
headers.insert(block.header().number(), block.header().clone());
hash_to_number.insert(block_hash, block.header().number());
Expand All @@ -255,7 +288,7 @@ impl<B: FullBlock> FromReader for FileClient<B> {
trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");

Ok(DecodedFileChunk {
file_client: Self { headers, hash_to_number, bodies },
file_client: FileClient { headers, hash_to_number, bodies },
remaining_bytes,
highest_block: None,
})
Expand Down Expand Up @@ -452,15 +485,18 @@ impl ChunkedFileReader {
}

/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
pub async fn next_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
where
T: FromReader,
{
pub async fn next_chunk<B: FullBlock>(
&mut self,
consensus: Arc<dyn HeaderValidator<B::Header>>,
parent_header: Option<SealedHeader<B::Header>>,
) -> Result<Option<FileClient<B>>, FileClientError> {
let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };

// make new file client from chunk
let DecodedFileChunk { file_client, remaining_bytes, .. } =
T::from_reader(&self.chunk[..], next_chunk_byte_len).await?;
FileClientBuilder { consensus, parent_header }
.build(&self.chunk[..], next_chunk_byte_len)
.await?;

// save left over bytes
self.chunk = remaining_bytes;
Expand Down Expand Up @@ -494,17 +530,21 @@ pub trait FromReader {
/// Error returned by file client type.
type Error: From<io::Error>;

/// Output returned by file client type.
type Output;

/// Returns a file client
fn from_reader<B>(
reader: B,
fn build<R>(
&self,
reader: R,
num_bytes: u64,
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
where
Self: Sized,
B: AsyncReadExt + Unpin;
R: AsyncReadExt + Unpin;
}

/// Output from decoding a file chunk with [`FromReader::from_reader`].
/// Output from decoding a file chunk with [`FromReader::build`].
#[derive(Debug)]
pub struct DecodedFileChunk<T> {
/// File client, i.e. the decoded part of chunk.
Expand All @@ -530,11 +570,12 @@ mod tests {
use assert_matches::assert_matches;
use futures_util::stream::StreamExt;
use rand::Rng;
use reth_consensus::test_utils::TestConsensus;
use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
use reth_network_p2p::{
bodies::downloader::BodyDownloader,
headers::downloader::{HeaderDownloader, SyncTarget},
};
use reth_primitives::Block;
use reth_provider::test_utils::create_test_provider_factory;
use std::sync::Arc;

Expand All @@ -549,8 +590,12 @@ mod tests {
// create an empty file
let file = tempfile::tempfile().unwrap();

let client: Arc<FileClient> =
Arc::new(FileClient::from_file(file.into()).await.unwrap().with_bodies(bodies.clone()));
let client: Arc<FileClient> = Arc::new(
FileClient::from_file(file.into(), NoopConsensus::arc())
.await
.unwrap()
.with_bodies(bodies.clone()),
);
let mut downloader = BodiesDownloaderBuilder::default()
.build::<reth_primitives::Block, _, _>(
client.clone(),
Expand All @@ -576,12 +621,14 @@ mod tests {

let file = tempfile::tempfile().unwrap();
let client: Arc<FileClient> = Arc::new(
FileClient::from_file(file.into()).await.unwrap().with_headers(HashMap::from([
(0u64, p0.clone_header()),
(1, p1.clone_header()),
(2, p2.clone_header()),
(3, p3.clone_header()),
])),
FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
HashMap::from([
(0u64, p0.clone_header()),
(1, p1.clone_header()),
(2, p2.clone_header()),
(3, p3.clone_header()),
]),
),
);

let mut downloader = ReverseHeadersDownloaderBuilder::default()
Expand All @@ -604,7 +651,8 @@ mod tests {
// Generate some random blocks
let (file, headers, _) = generate_bodies_file(0..=19).await;
// now try to read them back
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
let client: Arc<FileClient> =
Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());

// construct headers downloader and use first header
let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
Expand All @@ -629,7 +677,8 @@ mod tests {
let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;

// now try to read them back
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
let client: Arc<FileClient> =
Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());

// insert headers in db for the bodies downloader
insert_headers(factory.db_ref().db(), &headers);
Expand Down Expand Up @@ -668,7 +717,9 @@ mod tests {
let mut local_header = headers.first().unwrap().clone();

// test
while let Some(client) = reader.next_chunk::<FileClient>().await.unwrap() {
while let Some(client) =
reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
{
let sync_target = client.tip_header().unwrap();

let sync_target_hash = sync_target.hash();
Expand Down
18 changes: 13 additions & 5 deletions crates/optimism/cli/src/commands/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ use reth_cli_commands::{
use reth_consensus::noop::NoopConsensus;
use reth_db::tables;
use reth_db_api::transaction::DbTx;
use reth_downloaders::file_client::{
ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE,
};
use reth_downloaders::file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE};
use reth_node_builder::BlockTy;
use reth_node_core::version::SHORT_VERSION;
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_evm::OpExecutorProvider;
use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives};
use reth_provider::{ChainSpecProvider, StageCheckpointReader};
use reth_provider::{BlockNumReader, ChainSpecProvider, HeaderProvider, StageCheckpointReader};
use reth_prune::PruneModes;
use reth_stages::StageId;
use reth_static_file::StaticFileProducer;
Expand Down Expand Up @@ -70,7 +68,13 @@ impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> ImportOpCommand<C> {
let mut total_decoded_txns = 0;
let mut total_filtered_out_dup_txns = 0;

while let Some(mut file_client) = reader.next_chunk::<FileClient<BlockTy<N>>>().await? {
let mut sealed_header = provider_factory
.sealed_header(provider_factory.last_block_number()?)?
.expect("should have genesis");

while let Some(mut file_client) =
reader.next_chunk::<BlockTy<N>>(consensus.clone(), Some(sealed_header)).await?
{
// create a new FileClient from chunk read from file
info!(target: "reth::cli",
"Importing chain file chunk"
Expand Down Expand Up @@ -118,6 +122,10 @@ impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> ImportOpCommand<C> {
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {},
}

sealed_header = provider_factory
.sealed_header(provider_factory.last_block_number()?)?
.expect("should have genesis");
}

let provider = provider_factory.provider()?;
Expand Down

0 comments on commit 3b2de27

Please sign in to comment.