Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: writes sidecars to static files when saving blocks #145

Merged
merged 3 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions crates/stages/stages/src/stages/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// Write sidecars
let sidecars = block.sidecars.unwrap_or_default();
static_file_producer_sc.append_sidecars(
sidecars,
&sidecars,
block_number,
block.header.hash(),
&block.header.hash(),
)?;

// Write ommers if any
Expand All @@ -284,9 +284,9 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
BlockResponse::Empty(header) => {
// Write empty sidecars
static_file_producer_sc.append_sidecars(
Default::default(),
&Default::default(),
block_number,
header.hash(),
&header.hash(),
)?;
}
};
Expand Down Expand Up @@ -827,7 +827,10 @@ mod tests {
static_file_producer_sc.append_sidecars(
Default::default(),
block_number,
blocks.get(block_number as usize).map(|b| b.header.hash()).unwrap(),
&blocks
.get(block_number as usize)
.map(|b| b.header.hash())
.unwrap(),
)?;
tx.put::<tables::Sidecars>(block_number, Default::default())?;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/stages/src/test_utils/test_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ impl TestStageDB {
let segment_header = writer.user_header();
if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
for block_number in 0..block_number {
writer.append_sidecars(Default::default(), block_number, B256::ZERO)?;
writer.append_sidecars(Default::default(), block_number, &B256::ZERO)?;
}
}

writer.append_sidecars(Default::default(), block_number, hash)?;
writer.append_sidecars(Default::default(), block_number, &hash)?;
} else {
tx.put::<tables::Sidecars>(block_number, Default::default())?;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/static-file/static-file/src/segments/sidecars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl<DB: Database> Segment<DB> for Sidecars {
debug_assert_eq!(header_block, canonical_header_block);

let _static_file_block =
static_file_writer.append_sidecars(sidecar, header_block, canonical_header)?;
static_file_writer.append_sidecars(&sidecar, header_block, &canonical_header)?;
debug_assert_eq!(_static_file_block, header_block);
}

Expand Down
2 changes: 1 addition & 1 deletion crates/storage/db-common/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ pub fn insert_genesis_header<DB: Database>(

// skip the zero block index
let mut writer = static_file_provider.latest_writer(StaticFileSegment::Sidecars)?;
writer.append_sidecars(Default::default(), 0, B256::ZERO)?;
writer.append_sidecars(&Default::default(), 0, &B256::ZERO)?;
}
Ok(Some(_)) => {}
Err(e) => return Err(e),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ impl StaticFileProvider {
// append empty sidecars
for block_number in range_start..=checkpoint_block_number {
let hash = provider.block_hash(block_number)?.unwrap_or_default();
writer.append_sidecars(Default::default(), block_number, hash)?;
writer.append_sidecars(&Default::default(), block_number, &hash)?;
}
writer.commit()?;
self.writers.set_writer(segment, Some(writer));
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/provider/src/providers/static_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ mod tests {
for sidecars in sidecars_set.clone() {
let block_number = sidecars[0].block_number.to();
let hash = sidecars[0].block_hash;
writer.append_sidecars(sidecars, block_number, hash).unwrap();
writer.append_sidecars(&sidecars, block_number, &hash).unwrap();
}
writer.commit().unwrap();
}
Expand Down
4 changes: 2 additions & 2 deletions crates/storage/provider/src/providers/static_file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,9 +580,9 @@ impl StaticFileProviderRW {
/// Returns the current [`BlockNumber`] as seen in the static file.
pub fn append_sidecars(
&mut self,
sidecars: BlobSidecars,
sidecars: &BlobSidecars,
block_number: BlockNumber,
hash: BlockHash,
hash: &BlockHash,
) -> ProviderResult<BlockNumber> {
let start = Instant::now();
self.ensure_no_queued_prune()?;
Expand Down
41 changes: 37 additions & 4 deletions crates/storage/provider/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use reth_db::{
use reth_errors::{ProviderError, ProviderResult};
use reth_execution_types::ExecutionOutcome;
use reth_primitives::{
parlia::Snapshot, BlockNumber, Header, SealedBlock, StaticFileSegment, TransactionSignedNoHash,
B256, U256,
parlia::Snapshot, BlobSidecars, BlockNumber, Header, SealedBlock, StaticFileSegment,
TransactionSignedNoHash, B256, U256,
};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::{
Expand Down Expand Up @@ -209,8 +209,8 @@ where
Ok(())
}

/// Writes the header & transactions to static files, and updates their respective checkpoints
/// on database.
/// Writes the header & transactions & sidecars to static files, and updates their respective
/// checkpoints on database.
#[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()) target = "storage")]
fn save_header_and_transactions(&self, block: Arc<SealedBlock>) -> ProviderResult<()> {
debug!(target: "provider::storage_writer", "Writing headers and transactions.");
Expand Down Expand Up @@ -243,6 +243,16 @@ where
block.header().number,
std::iter::once(&no_hash_transactions),
)?;

let sidecar_writer =
self.static_file().get_writer(block.number, StaticFileSegment::Sidecars)?;
let mut storage_writer = UnifiedStorageWriter::from(self.database(), sidecar_writer);
storage_writer.append_sidecars_from_blocks(std::iter::once((
&block.sidecars.clone().unwrap_or_default(),
block.number,
block.hash(),
)))?;

self.database()
.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block.number))?;
}
Expand Down Expand Up @@ -400,6 +410,29 @@ where
}
Ok(())
}

/// Appends sidecars to static files
///
/// NOTE: The static file writer used to construct this [`UnifiedStorageWriter`] MUST be a
/// writer for the Sidecars segment.
pub fn append_sidecars_from_blocks<SC, I>(
&mut self,
sidecars: impl Iterator<Item = I>,
) -> ProviderResult<()>
where
I: Borrow<(SC, u64, B256)>,
SC: Borrow<BlobSidecars>,
{
self.ensure_static_file_segment(StaticFileSegment::Sidecars)?;

for pair in sidecars {
let (sc, block_number, hash) = pair.borrow();
let sc = sc.borrow();
self.static_file_mut().append_sidecars(sc, *block_number, hash)?;
}

Ok(())
}
}

impl<'a, 'b, TX> UnifiedStorageWriter<'a, TX, StaticFileProviderRWRefMut<'b>>
Expand Down