Skip to content

Commit

Permalink
Boiler plate code for blobs pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
emhane committed Jan 5, 2023
1 parent 8a77b05 commit 6b1b6aa
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 0 deletions.
140 changes: 140 additions & 0 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.map(|payload| payload.is_some())
}

/// Check if the blobs sidecar for a block exists on disk.
pub fn blobs_sidecar_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
self.get_item::<BlobInfo>(block_root)
.map(|blobs| blobs.is_some())
}

/// Determine whether a block exists in the database.
pub fn block_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
self.hot_db
Expand Down Expand Up @@ -777,6 +783,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
}

StoreOp::DeleteBlobs(block_root) => {
let key = get_key_for_col(DBColumn::BeaconBlob.into(), block_root.as_bytes());
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
}

StoreOp::DeleteState(state_root, slot) => {
let state_summary_key =
get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes());
Expand Down Expand Up @@ -826,6 +837,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
guard.pop(block_root);
}

StoreOp::DeleteBlobs(block_root) => {
guard_blob.pop(block_root);
}

StoreOp::DeleteState(_, _) => (),

StoreOp::DeleteExecutionPayload(_) => (),
Expand All @@ -835,6 +850,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.hot_db
.do_atomically(self.convert_to_kv_batch(batch)?)?;
drop(guard);
drop(guard_blob);

Ok(())
}
Expand Down Expand Up @@ -1667,6 +1683,130 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
);
Ok(())
}

pub fn try_prune_blobs(&self, force: bool) -> Result<(), Error> {
let split = self.get_split_info();

if split.slot == 0 {
return Ok(());
}

let eip4844_fork_slot = if let Some(epoch) = self.spec.eip4844_fork_epoch {
epoch.start_slot(E::slots_per_epoch())
} else {
return Ok(());
};

// Load the split state so we can backtrack to find blobs sidecars.
// todo(emhane): MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS
let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or(
HotColdDBError::MissingSplitState(split.state_root, split.slot),
)?;

// The finalized block may or may not have its blobs sidecar stored, depending on
// whether it was at a skipped slot. However for a fully pruned database its parent
// should *always* have been pruned. In case of a long split (no parent found) we
// continue as if the payloads are pruned, as the node probably has other things to worry
// about.
let split_block_root = split_state.get_latest_block_root(split.state_root);

let already_pruned =
process_results(split_state.rev_iter_block_roots(&self.spec), |mut iter| {
iter.find(|(_, block_root)| {
move || -> bool {
if *block_root != split_block_root {
if let Ok(Some(split_parent_block)) =
self.get_blinded_block(&block_root)
{
if let Ok(expected_kzg_commitments) =
split_parent_block.message().body().blob_kzg_commitments()
{
if expected_kzg_commitments.len() > 0 {
return true;
}
}
}
}
false
}()
})
.map_or(Ok(true), |(_, split_parent_root)| {
self.blobs_sidecar_exists(&split_parent_root)
.map(|exists| !exists)
})
})??;

if already_pruned && !force {
info!(self.log, "Blobs sidecars are pruned");
return Ok(());
}

// Iterate block roots backwards to the Eip48444 fork or the latest blob slot, whichever
// comes first.
warn!(
self.log,
"Pruning finalized blobs sidecars";
"info" => "you may notice degraded I/O performance while this runs"
);
let latest_blob_slot = self.get_blob_info().map(|info| info.latest_blob_slot);

let mut ops = vec![];
let mut last_pruned_block_root = None;

for res in std::iter::once(Ok((split_block_root, split.slot)))
.chain(BlockRootsIterator::new(self, &split_state))
{
let (block_root, slot) = match res {
Ok(tuple) => tuple,
Err(e) => {
warn!(
self.log,
"Stopping blobs sidecar pruning early";
"error" => ?e,
);
break;
}
};

if slot < eip4844_fork_slot {
info!(
self.log,
"Blobs sidecar pruning reached Eip4844 boundary";
);
break;
}

if Some(block_root) != last_pruned_block_root
&& self.blobs_sidecar_exists(&block_root)?
{
debug!(
self.log,
"Pruning blobs sidecar";
"slot" => slot,
"block_root" => ?block_root,
);
last_pruned_block_root = Some(block_root);
ops.push(StoreOp::DeleteBlobs(block_root));
}

if Some(slot) == latest_blob_slot {
info!(
self.log,
"Blobs sidecar pruning reached anchor state";
"slot" => slot
);
break;
}
}
let blobs_sidecars_pruned = ops.len();
self.do_atomically(ops)?;
info!(
self.log,
"Blobs sidecar pruning complete";
"blobs_sidecars_pruned" => blobs_sidecars_pruned,
);
Ok(())
}
}

/// Advance the split point of the store, moving new finalized states to the freezer.
Expand Down
1 change: 1 addition & 0 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ pub enum StoreOp<'a, E: EthSpec> {
PutStateTemporaryFlag(Hash256),
DeleteStateTemporaryFlag(Hash256),
DeleteBlock(Hash256),
DeleteBlobs(Hash256),
DeleteState(Hash256, Option<Slot>),
DeleteExecutionPayload(Hash256),
}
Expand Down

0 comments on commit 6b1b6aa

Please sign in to comment.