Skip to content

Commit f187331

Browse files
committed
valid file client incoming headers
1 parent 4653d3d commit f187331

File tree

4 files changed

+113
-36
lines changed

4 files changed

+113
-36
lines changed

crates/cli/commands/src/import.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,13 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
8787
let mut total_decoded_blocks = 0;
8888
let mut total_decoded_txns = 0;
8989

90-
while let Some(file_client) = reader.next_chunk::<FileClient<_>>().await? {
90+
let mut sealed_header = provider_factory
91+
.sealed_header(provider_factory.last_block_number()?)?
92+
.expect("should have genesis");
93+
94+
while let Some(file_client) =
95+
reader.next_chunk::<BlockTy<N>>(consensus.clone(), Some(sealed_header)).await?
96+
{
9197
// create a new FileClient from chunk read from file
9298
info!(target: "reth::cli",
9399
"Importing chain file chunk"
@@ -125,6 +131,10 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
125131
res = pipeline.run() => res?,
126132
_ = tokio::signal::ctrl_c() => {},
127133
}
134+
135+
sealed_header = provider_factory
136+
.sealed_header(provider_factory.last_block_number()?)?
137+
.expect("should have genesis");
128138
}
129139

130140
let provider = provider_factory.provider()?;

crates/consensus/consensus/src/noop.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,14 @@ use reth_primitives_traits::Block;
88
#[non_exhaustive]
99
pub struct NoopConsensus;
1010

11+
#[cfg(any(test, feature = "test-utils"))]
12+
impl NoopConsensus {
13+
/// Creates an Arc instance of Self.
14+
pub fn arc() -> std::sync::Arc<Self> {
15+
std::sync::Arc::new(Self::default())
16+
}
17+
}
18+
1119
impl<H> HeaderValidator<H> for NoopConsensus {
1220
fn validate_header(&self, _header: &SealedHeader<H>) -> Result<(), ConsensusError> {
1321
Ok(())

crates/net/downloaders/src/file_client.rs

Lines changed: 81 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
use std::{collections::HashMap, io, path::Path};
1+
use std::{collections::HashMap, io, path::Path, sync::Arc};
22

33
use alloy_consensus::BlockHeader;
44
use alloy_eips::BlockHashOrNumber;
55
use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
66
use futures::Future;
77
use itertools::Either;
8+
use reth_consensus::{ConsensusError, HeaderValidator};
89
use reth_network_p2p::{
910
bodies::client::{BodiesClient, BodiesFut},
1011
download::DownloadClient,
@@ -56,6 +57,10 @@ pub struct FileClient<B: Block = reth_primitives::Block> {
5657
/// An error that can occur when constructing and using a [`FileClient`].
5758
#[derive(Debug, Error)]
5859
pub enum FileClientError {
60+
/// An error occurred when validating a header from file.
61+
#[error(transparent)]
62+
Consensus(#[from] ConsensusError),
63+
5964
/// An error occurred when opening or reading the file.
6065
#[error(transparent)]
6166
Io(#[from] std::io::Error),
@@ -77,21 +82,30 @@ impl From<&'static str> for FileClientError {
7782

7883
impl<B: FullBlock> FileClient<B> {
7984
/// Create a new file client from a file path.
80-
pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, FileClientError> {
85+
pub async fn new<P: AsRef<Path>>(
86+
path: P,
87+
consensus: Arc<dyn HeaderValidator<B::Header>>,
88+
) -> Result<Self, FileClientError> {
8189
let file = File::open(path).await?;
82-
Self::from_file(file).await
90+
Self::from_file(file, consensus).await
8391
}
8492

8593
/// Initialize the [`FileClient`] with a file directly.
86-
pub(crate) async fn from_file(mut file: File) -> Result<Self, FileClientError> {
94+
pub(crate) async fn from_file(
95+
mut file: File,
96+
consensus: Arc<dyn HeaderValidator<B::Header>>,
97+
) -> Result<Self, FileClientError> {
8798
// get file len from metadata before reading
8899
let metadata = file.metadata().await?;
89100
let file_len = metadata.len();
90101

91102
let mut reader = vec![];
92103
file.read_to_end(&mut reader).await?;
93104

94-
Ok(Self::from_reader(&reader[..], file_len).await?.file_client)
105+
Ok(FileClientBuilder { consensus, parent_header: None }
106+
.build(&reader[..], file_len)
107+
.await?
108+
.file_client)
95109
}
96110

97111
/// Get the tip hash of the chain.
@@ -183,14 +197,23 @@ impl<B: FullBlock> FileClient<B> {
183197
}
184198
}
185199

186-
impl<B: FullBlock> FromReader for FileClient<B> {
200+
struct FileClientBuilder<B: Block = reth_primitives::Block> {
201+
pub consensus: Arc<dyn HeaderValidator<B::Header>>,
202+
pub parent_header: Option<SealedHeader<B::Header>>,
203+
}
204+
205+
impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
206+
for FileClientBuilder<B>
207+
{
187208
type Error = FileClientError;
209+
type Output = FileClient<B>;
188210

189211
/// Initialize the [`FileClient`] from bytes that have been read from file.
190-
fn from_reader<R>(
212+
fn build<R>(
213+
&self,
191214
reader: R,
192215
num_bytes: u64,
193-
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
216+
) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
194217
where
195218
R: AsyncReadExt + Unpin,
196219
{
@@ -213,6 +236,8 @@ impl<B: FullBlock> FromReader for FileClient<B> {
213236
let mut log_interval = 0;
214237
let mut log_interval_start_block = 0;
215238

239+
let mut parent_header = self.parent_header.clone();
240+
216241
async move {
217242
while let Some(block_res) = stream.next().await {
218243
let block = match block_res {
@@ -231,6 +256,14 @@ impl<B: FullBlock> FromReader for FileClient<B> {
231256
let block_number = block.header().number();
232257
let block_hash = block.header().hash_slow();
233258

259+
// Validate incoming header
260+
let sealed = SealedHeader::new(block.header().clone(), block_hash);
261+
self.consensus.validate_header(&sealed)?;
262+
if let Some(parent) = &parent_header {
263+
self.consensus.validate_header_against_parent(&sealed, parent)?;
264+
parent_header = Some(sealed);
265+
}
266+
234267
// add to the internal maps
235268
headers.insert(block.header().number(), block.header().clone());
236269
hash_to_number.insert(block_hash, block.header().number());
@@ -255,7 +288,7 @@ impl<B: FullBlock> FromReader for FileClient<B> {
255288
trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
256289

257290
Ok(DecodedFileChunk {
258-
file_client: Self { headers, hash_to_number, bodies },
291+
file_client: FileClient { headers, hash_to_number, bodies },
259292
remaining_bytes,
260293
highest_block: None,
261294
})
@@ -452,15 +485,18 @@ impl ChunkedFileReader {
452485
}
453486

454487
/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
455-
pub async fn next_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
456-
where
457-
T: FromReader,
458-
{
488+
pub async fn next_chunk<B: FullBlock>(
489+
&mut self,
490+
consensus: Arc<dyn HeaderValidator<B::Header>>,
491+
parent_header: Option<SealedHeader<B::Header>>,
492+
) -> Result<Option<FileClient<B>>, FileClientError> {
459493
let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
460494

461495
// make new file client from chunk
462496
let DecodedFileChunk { file_client, remaining_bytes, .. } =
463-
T::from_reader(&self.chunk[..], next_chunk_byte_len).await?;
497+
FileClientBuilder { consensus, parent_header }
498+
.build(&self.chunk[..], next_chunk_byte_len)
499+
.await?;
464500

465501
// save left over bytes
466502
self.chunk = remaining_bytes;
@@ -494,14 +530,18 @@ pub trait FromReader {
494530
/// Error returned by file client type.
495531
type Error: From<io::Error>;
496532

533+
/// Output returned by file client type.
534+
type Output;
535+
497536
/// Returns a file client
498-
fn from_reader<B>(
499-
reader: B,
537+
fn build<R>(
538+
&self,
539+
reader: R,
500540
num_bytes: u64,
501-
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
541+
) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
502542
where
503543
Self: Sized,
504-
B: AsyncReadExt + Unpin;
544+
R: AsyncReadExt + Unpin;
505545
}
506546

507547
/// Output from decoding a file chunk with [`FromReader::from_reader`].
@@ -530,11 +570,12 @@ mod tests {
530570
use assert_matches::assert_matches;
531571
use futures_util::stream::StreamExt;
532572
use rand::Rng;
533-
use reth_consensus::test_utils::TestConsensus;
573+
use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
534574
use reth_network_p2p::{
535575
bodies::downloader::BodyDownloader,
536576
headers::downloader::{HeaderDownloader, SyncTarget},
537577
};
578+
use reth_primitives::Block;
538579
use reth_provider::test_utils::create_test_provider_factory;
539580
use std::sync::Arc;
540581

@@ -549,8 +590,12 @@ mod tests {
549590
// create an empty file
550591
let file = tempfile::tempfile().unwrap();
551592

552-
let client: Arc<FileClient> =
553-
Arc::new(FileClient::from_file(file.into()).await.unwrap().with_bodies(bodies.clone()));
593+
let client: Arc<FileClient> = Arc::new(
594+
FileClient::from_file(file.into(), NoopConsensus::arc())
595+
.await
596+
.unwrap()
597+
.with_bodies(bodies.clone()),
598+
);
554599
let mut downloader = BodiesDownloaderBuilder::default()
555600
.build::<reth_primitives::Block, _, _>(
556601
client.clone(),
@@ -576,12 +621,14 @@ mod tests {
576621

577622
let file = tempfile::tempfile().unwrap();
578623
let client: Arc<FileClient> = Arc::new(
579-
FileClient::from_file(file.into()).await.unwrap().with_headers(HashMap::from([
580-
(0u64, p0.clone_header()),
581-
(1, p1.clone_header()),
582-
(2, p2.clone_header()),
583-
(3, p3.clone_header()),
584-
])),
624+
FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
625+
HashMap::from([
626+
(0u64, p0.clone_header()),
627+
(1, p1.clone_header()),
628+
(2, p2.clone_header()),
629+
(3, p3.clone_header()),
630+
]),
631+
),
585632
);
586633

587634
let mut downloader = ReverseHeadersDownloaderBuilder::default()
@@ -604,7 +651,8 @@ mod tests {
604651
// Generate some random blocks
605652
let (file, headers, _) = generate_bodies_file(0..=19).await;
606653
// now try to read them back
607-
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
654+
let client: Arc<FileClient> =
655+
Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
608656

609657
// construct headers downloader and use first header
610658
let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
@@ -629,7 +677,8 @@ mod tests {
629677
let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
630678

631679
// now try to read them back
632-
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
680+
let client: Arc<FileClient> =
681+
Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
633682

634683
// insert headers in db for the bodies downloader
635684
insert_headers(factory.db_ref().db(), &headers);
@@ -668,7 +717,9 @@ mod tests {
668717
let mut local_header = headers.first().unwrap().clone();
669718

670719
// test
671-
while let Some(client) = reader.next_chunk::<FileClient>().await.unwrap() {
720+
while let Some(client) =
721+
reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
722+
{
672723
let sync_target = client.tip_header().unwrap();
673724

674725
let sync_target_hash = sync_target.hash();

crates/optimism/cli/src/commands/import.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,13 @@ use reth_cli_commands::{
99
use reth_consensus::noop::NoopConsensus;
1010
use reth_db::tables;
1111
use reth_db_api::transaction::DbTx;
12-
use reth_downloaders::file_client::{
13-
ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE,
14-
};
12+
use reth_downloaders::file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE};
1513
use reth_node_builder::BlockTy;
1614
use reth_node_core::version::SHORT_VERSION;
1715
use reth_optimism_chainspec::OpChainSpec;
1816
use reth_optimism_evm::OpExecutorProvider;
1917
use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives};
20-
use reth_provider::{ChainSpecProvider, StageCheckpointReader};
18+
use reth_provider::{BlockNumReader, ChainSpecProvider, HeaderProvider, StageCheckpointReader};
2119
use reth_prune::PruneModes;
2220
use reth_stages::StageId;
2321
use reth_static_file::StaticFileProducer;
@@ -70,7 +68,13 @@ impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> ImportOpCommand<C> {
7068
let mut total_decoded_txns = 0;
7169
let mut total_filtered_out_dup_txns = 0;
7270

73-
while let Some(mut file_client) = reader.next_chunk::<FileClient<BlockTy<N>>>().await? {
71+
let mut sealed_header = provider_factory
72+
.sealed_header(provider_factory.last_block_number()?)?
73+
.expect("should have genesis");
74+
75+
while let Some(mut file_client) =
76+
reader.next_chunk::<BlockTy<N>>(consensus.clone(), Some(sealed_header)).await?
77+
{
7478
// create a new FileClient from chunk read from file
7579
info!(target: "reth::cli",
7680
"Importing chain file chunk"
@@ -118,6 +122,10 @@ impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> ImportOpCommand<C> {
118122
res = pipeline.run() => res?,
119123
_ = tokio::signal::ctrl_c() => {},
120124
}
125+
126+
sealed_header = provider_factory
127+
.sealed_header(provider_factory.last_block_number()?)?
128+
.expect("should have genesis");
121129
}
122130

123131
let provider = provider_factory.provider()?;

0 commit comments

Comments
 (0)