From 6b1b6aaec47c627c20e3d3c5e9aefb7be81ecdd1 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Thu, 5 Jan 2023 18:28:49 +0100 Subject: [PATCH] Boiler plate code for blobs pruning --- beacon_node/store/src/hot_cold_store.rs | 140 ++++++++++++++++++++++++ beacon_node/store/src/lib.rs | 1 + 2 files changed, 141 insertions(+) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 00aa0b2af13..58331bd2b4e 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -477,6 +477,12 @@ impl, Cold: ItemStore> HotColdDB .map(|payload| payload.is_some()) } + /// Check if the blobs sidecar for a block exists on disk. + pub fn blobs_sidecar_exists(&self, block_root: &Hash256) -> Result { + self.get_item::(block_root) + .map(|blobs| blobs.is_some()) + } + /// Determine whether a block exists in the database. pub fn block_exists(&self, block_root: &Hash256) -> Result { self.hot_db @@ -777,6 +783,11 @@ impl, Cold: ItemStore> HotColdDB key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); } + StoreOp::DeleteBlobs(block_root) => { + let key = get_key_for_col(DBColumn::BeaconBlob.into(), block_root.as_bytes()); + key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); + } + StoreOp::DeleteState(state_root, slot) => { let state_summary_key = get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes()); @@ -826,6 +837,10 @@ impl, Cold: ItemStore> HotColdDB guard.pop(block_root); } + StoreOp::DeleteBlobs(block_root) => { + guard_blob.pop(block_root); + } + StoreOp::DeleteState(_, _) => (), StoreOp::DeleteExecutionPayload(_) => (), @@ -835,6 +850,7 @@ impl, Cold: ItemStore> HotColdDB self.hot_db .do_atomically(self.convert_to_kv_batch(batch)?)?; drop(guard); + drop(guard_blob); Ok(()) } @@ -1667,6 +1683,130 @@ impl, Cold: ItemStore> HotColdDB ); Ok(()) } + + pub fn try_prune_blobs(&self, force: bool) -> Result<(), Error> { + let split = self.get_split_info(); + + if split.slot == 0 { + return Ok(()); + } + + let eip4844_fork_slot = if let Some(epoch) = self.spec.eip4844_fork_epoch { + epoch.start_slot(E::slots_per_epoch()) + } else { + return Ok(()); + }; + + // Load the split state so we can backtrack to find blobs sidecars. + // todo(emhane): MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS + let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or( + HotColdDBError::MissingSplitState(split.state_root, split.slot), + )?; + + // The finalized block may or may not have its blobs sidecar stored, depending on + // whether it was at a skipped slot. However for a fully pruned database its parent + // should *always* have been pruned. In case of a long split (no parent found) we + // continue as if the payloads are pruned, as the node probably has other things to worry + // about. + let split_block_root = split_state.get_latest_block_root(split.state_root); + + let already_pruned = + process_results(split_state.rev_iter_block_roots(&self.spec), |mut iter| { + iter.find(|(_, block_root)| { + move || -> bool { + if *block_root != split_block_root { + if let Ok(Some(split_parent_block)) = + self.get_blinded_block(&block_root) + { + if let Ok(expected_kzg_commitments) = + split_parent_block.message().body().blob_kzg_commitments() + { + if expected_kzg_commitments.len() > 0 { + return true; + } + } + } + } + false + }() + }) + .map_or(Ok(true), |(_, split_parent_root)| { + self.blobs_sidecar_exists(&split_parent_root) + .map(|exists| !exists) + }) + })??; + + if already_pruned && !force { + info!(self.log, "Blobs sidecars are pruned"); + return Ok(()); + } + + // Iterate block roots backwards to the Eip48444 fork or the latest blob slot, whichever + // comes first. + warn!( + self.log, + "Pruning finalized blobs sidecars"; + "info" => "you may notice degraded I/O performance while this runs" + ); + let latest_blob_slot = self.get_blob_info().map(|info| info.latest_blob_slot); + + let mut ops = vec![]; + let mut last_pruned_block_root = None; + + for res in std::iter::once(Ok((split_block_root, split.slot))) + .chain(BlockRootsIterator::new(self, &split_state)) + { + let (block_root, slot) = match res { + Ok(tuple) => tuple, + Err(e) => { + warn!( + self.log, + "Stopping blobs sidecar pruning early"; + "error" => ?e, + ); + break; + } + }; + + if slot < eip4844_fork_slot { + info!( + self.log, + "Blobs sidecar pruning reached Eip4844 boundary"; + ); + break; + } + + if Some(block_root) != last_pruned_block_root + && self.blobs_sidecar_exists(&block_root)? + { + debug!( + self.log, + "Pruning blobs sidecar"; + "slot" => slot, + "block_root" => ?block_root, + ); + last_pruned_block_root = Some(block_root); + ops.push(StoreOp::DeleteBlobs(block_root)); + } + + if Some(slot) == latest_blob_slot { + info!( + self.log, + "Blobs sidecar pruning reached anchor state"; + "slot" => slot + ); + break; + } + } + let blobs_sidecars_pruned = ops.len(); + self.do_atomically(ops)?; + info!( + self.log, + "Blobs sidecar pruning complete"; + "blobs_sidecars_pruned" => blobs_sidecars_pruned, + ); + Ok(()) + } } /// Advance the split point of the store, moving new finalized states to the freezer. diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index e940c0f25ec..545bdf7fb03 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -161,6 +161,7 @@ pub enum StoreOp<'a, E: EthSpec> { PutStateTemporaryFlag(Hash256), DeleteStateTemporaryFlag(Hash256), DeleteBlock(Hash256), + DeleteBlobs(Hash256), DeleteState(Hash256, Option), DeleteExecutionPayload(Hash256), }