Skip to content

Commit

Permalink
Merge pull request #3852 from emhane/prune_blobs
Browse files Browse the repository at this point in the history
Prune blobs
  • Loading branch information
realbigsean authored Feb 8, 2023
2 parents a42d075 + 6a37e84 commit 4156719
Show file tree
Hide file tree
Showing 20 changed files with 525 additions and 78 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ genesis.ssz

# IntelliJ
/*.iml

# VSCode
/.vscode
75 changes: 54 additions & 21 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
start_slot,
end_slot,
|| {
(
Ok((
head.beacon_state.clone_with_only_committee_caches(),
head.beacon_block_root,
)
))
},
&self.spec,
)?;
Expand Down Expand Up @@ -708,10 +708,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
start_slot,
end_slot,
|| {
(
Ok((
head.beacon_state.clone_with_only_committee_caches(),
head.beacon_state_root(),
)
))
},
&self.spec,
)?;
Expand Down Expand Up @@ -2878,7 +2878,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// is so we don't have to think about lock ordering with respect to the fork choice lock.
// There are a bunch of places where we lock both fork choice and the pubkey cache and it
// would be difficult to check that they all lock fork choice first.
let mut kv_store_ops = self
let mut ops = self
.validator_pubkey_cache
.try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?
Expand Down Expand Up @@ -2981,9 +2981,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// ---------------------------- BLOCK PROBABLY ATTESTABLE ----------------------------------
// Most blocks are now capable of being attested to thanks to the `early_attester_cache`
// cache above. Resume non-essential processing.
//
// It is important NOT to return errors here before the database commit, because the block
// has already been added to fork choice and the database would be left in an inconsistent
// state if we returned early without committing. In other words, an error here would
// corrupt the node's database permanently.
// -----------------------------------------------------------------------------------------

self.import_block_update_shuffling_cache(block_root, &mut state)?;
self.import_block_update_shuffling_cache(block_root, &mut state);
self.import_block_observe_attestations(
block,
&state,
Expand All @@ -3008,25 +3013,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// See https://github.com/sigp/lighthouse/issues/2028
let (signed_block, blobs) = signed_block.deconstruct();
let block = signed_block.message();
let mut ops: Vec<_> = confirmed_state_roots
.into_iter()
.map(StoreOp::DeleteStateTemporaryFlag)
.collect();
ops.extend(
confirmed_state_roots
.into_iter()
.map(StoreOp::DeleteStateTemporaryFlag),
);
ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
ops.push(StoreOp::PutState(block.state_root(), &state));

if let Some(blobs) = blobs {
if blobs.blobs.len() > 0 {
//FIXME(sean) using this for debugging for now
info!(self.log, "Writing blobs to store"; "block_root" => ?block_root);
ops.push(StoreOp::PutBlobs(block_root, blobs));
// Only consider blobs if the eip4844 fork is enabled.
if let Some(data_availability_boundary) = self.data_availability_boundary() {
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
let margin_epochs = self.store.get_config().blob_prune_margin_epochs;
let import_boundary = data_availability_boundary - margin_epochs;

// Only store blobs at the data availability boundary, minus any configured epochs
// margin, or younger (of higher epoch number).
if block_epoch >= import_boundary {
if let Some(blobs) = blobs {
if blobs.blobs.len() > 0 {
//FIXME(sean) using this for debugging for now
info!(
self.log, "Writing blobs to store";
"block_root" => ?block_root
);
ops.push(StoreOp::PutBlobs(block_root, blobs));
}
}
}
};
let txn_lock = self.store.hot_db.begin_rw_transaction();
}

kv_store_ops.extend(self.store.convert_to_kv_batch(ops)?);
let txn_lock = self.store.hot_db.begin_rw_transaction();

if let Err(e) = self.store.hot_db.do_atomically(kv_store_ops) {
if let Err(e) = self.store.do_atomically(ops) {
error!(
self.log,
"Database write failed!";
Expand Down Expand Up @@ -3455,13 +3474,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
fn import_block_update_shuffling_cache(
&self,
block_root: Hash256,
state: &mut BeaconState<T::EthSpec>,
) {
if let Err(e) = self.import_block_update_shuffling_cache_fallible(block_root, state) {
warn!(
self.log,
"Failed to prime shuffling cache";
"error" => ?e
);
}
}

fn import_block_update_shuffling_cache_fallible(
&self,
block_root: Hash256,
state: &mut BeaconState<T::EthSpec>,
) -> Result<(), BlockError<T::EthSpec>> {
// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] {
let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?;

Expand Down
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,11 @@ where
);
}

// Prune blobs sidecars older than the blob data availability boundary in the background.
beacon_chain
.store_migrator
.process_prune_blobs(beacon_chain.data_availability_boundary());

Ok(beacon_chain)
}
}
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Drop the old cache head nice and early to try and free the memory as soon as possible.
drop(old_cached_head);

// Prune blobs in the background.
self.store_migrator
.process_prune_blobs(self.data_availability_boundary());

// If the finalized checkpoint changed, perform some updates.
//
// The `after_finalization` function will take a write-lock on `fork_choice`, therefore it
Expand Down
45 changes: 43 additions & 2 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub enum PruningError {
pub enum Notification {
Finalization(FinalizationNotification),
Reconstruction,
PruneBlobs(Option<Epoch>),
}

pub struct FinalizationNotification {
Expand Down Expand Up @@ -152,6 +153,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}
}

pub fn process_prune_blobs(&self, data_availability_boundary: Option<Epoch>) {
if let Some(Notification::PruneBlobs(data_availability_boundary)) =
self.send_background_notification(Notification::PruneBlobs(data_availability_boundary))
{
Self::run_prune_blobs(self.db.clone(), data_availability_boundary, &self.log);
}
}

pub fn run_reconstruction(db: Arc<HotColdDB<E, Hot, Cold>>, log: &Logger) {
if let Err(e) = db.reconstruct_historic_states() {
error!(
Expand All @@ -162,6 +171,20 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}
}

pub fn run_prune_blobs(
db: Arc<HotColdDB<E, Hot, Cold>>,
data_availability_boundary: Option<Epoch>,
log: &Logger,
) {
if let Err(e) = db.try_prune_blobs(false, data_availability_boundary) {
error!(
log,
"Blobs pruning failed";
"error" => ?e,
);
}
}

/// If configured to run in the background, send `notif` to the background thread.
///
/// Return `None` if the message was sent to the background thread, `Some(notif)` otherwise.
Expand Down Expand Up @@ -320,11 +343,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
best
}
}
(Notification::Finalization(_), Notification::PruneBlobs(_)) => best,
(Notification::PruneBlobs(_), Notification::Finalization(_)) => other,
(Notification::PruneBlobs(dab1), Notification::PruneBlobs(dab2)) => {
if dab2 > dab1 {
other
} else {
best
}
}
});

match notif {
Notification::Reconstruction => Self::run_reconstruction(db.clone(), &log),
Notification::Finalization(fin) => Self::run_migration(db.clone(), fin, &log),
Notification::PruneBlobs(dab) => Self::run_prune_blobs(db.clone(), dab, &log),
}
}
});
Expand Down Expand Up @@ -569,10 +602,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
.into_iter()
.map(Into::into)
.flat_map(|block_root: Hash256| {
[
let mut store_ops = vec![
StoreOp::DeleteBlock(block_root),
StoreOp::DeleteExecutionPayload(block_root),
]
];
if let Ok(true) = store.blobs_sidecar_exists(&block_root) {
// Keep track of non-empty orphaned blobs sidecars.
store_ops.extend([
StoreOp::DeleteBlobs(block_root),
StoreOp::PutOrphanedBlobsKey(block_root),
]);
}
store_ops
})
.chain(
abandoned_states
Expand Down
17 changes: 11 additions & 6 deletions beacon_node/beacon_chain/src/validator_pubkey_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
use std::collections::HashMap;
use std::convert::TryInto;
use std::marker::PhantomData;
use store::{DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreItem};
use store::{DBColumn, Error as StoreError, StoreItem, StoreOp};
use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes};

/// Provides a mapping of `validator_index -> validator_publickey`.
Expand Down Expand Up @@ -38,7 +38,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
};

let store_ops = cache.import_new_pubkeys(state)?;
store.hot_db.do_atomically(store_ops)?;
store.do_atomically(store_ops)?;

Ok(cache)
}
Expand Down Expand Up @@ -79,7 +79,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pub fn import_new_pubkeys(
&mut self,
state: &BeaconState<T::EthSpec>,
) -> Result<Vec<KeyValueStoreOp>, BeaconChainError> {
) -> Result<Vec<StoreOp<'static, T::EthSpec>>, BeaconChainError> {
if state.validators().len() > self.pubkeys.len() {
self.import(
state.validators()[self.pubkeys.len()..]
Expand All @@ -92,7 +92,10 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
}

/// Adds zero or more validators to `self`.
fn import<I>(&mut self, validator_keys: I) -> Result<Vec<KeyValueStoreOp>, BeaconChainError>
fn import<I>(
&mut self,
validator_keys: I,
) -> Result<Vec<StoreOp<'static, T::EthSpec>>, BeaconChainError>
where
I: Iterator<Item = PublicKeyBytes> + ExactSizeIterator,
{
Expand All @@ -112,7 +115,9 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
// It will be committed atomically when the block that introduced it is written to disk.
// Notably it is NOT written while the write lock on the cache is held.
// See: https://github.com/sigp/lighthouse/issues/2327
store_ops.push(DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)));
store_ops.push(StoreOp::KeyValueOp(
DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)),
));

self.pubkeys.push(
(&pubkey)
Expand Down Expand Up @@ -294,7 +299,7 @@ mod test {
let ops = cache
.import_new_pubkeys(&state)
.expect("should import pubkeys");
store.hot_db.do_atomically(ops).unwrap();
store.do_atomically(ops).unwrap();
check_cache_get(&cache, &keypairs[..]);
drop(cache);

Expand Down
10 changes: 4 additions & 6 deletions beacon_node/network/src/beacon_processor/worker/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,12 +688,10 @@ impl<T: BeaconChainTypes> Worker<T> {
let serve_blobs_from_slot = if start_epoch < data_availability_boundary {
// Attempt to serve from the earliest block in our database, falling back to the data
// availability boundary
let oldest_blob_slot = self
.chain
.store
.get_blob_info()
.map(|blob_info| blob_info.oldest_blob_slot)
.unwrap_or(data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch()));
let oldest_blob_slot =
self.chain.store.get_blob_info().oldest_blob_slot.unwrap_or(
data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch()),
);

debug!(
self.log,
Expand Down
25 changes: 25 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,31 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.takes_value(true)
.default_value("true")
)
.arg(
Arg::with_name("prune-blobs")
.long("prune-blobs")
.help("Prune blobs from Lighthouse's database when they are older than the data \
data availability boundary relative to the current epoch.")
.takes_value(true)
.default_value("true")
)
.arg(
Arg::with_name("epochs-per-blob-prune")
.long("epochs-per-blob-prune")
.help("The epoch interval with which to prune blobs from Lighthouse's \
database when they are older than the data availability boundary \
relative to the current epoch.")
.takes_value(true)
.default_value("1")
)
.arg(
Arg::with_name("blob-prune-margin-epochs")
.long("blob-prune-margin-epochs")
.help("The margin for blob pruning in epochs. The oldest blobs are pruned \
up until data_availability_boundary - blob_prune_margin_epochs.")
.takes_value(true)
.default_value("0")
)

/*
* Misc.
Expand Down
16 changes: 16 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,22 @@ pub fn get_config<E: EthSpec>(
client_config.store.prune_payloads = prune_payloads;
}

if let Some(prune_blobs) = clap_utils::parse_optional(cli_args, "prune-blobs")? {
client_config.store.prune_blobs = prune_blobs;
}

if let Some(epochs_per_blob_prune) =
clap_utils::parse_optional(cli_args, "epochs-per-blob-prune")?
{
client_config.store.epochs_per_blob_prune = epochs_per_blob_prune;
}

if let Some(blob_prune_margin_epochs) =
clap_utils::parse_optional(cli_args, "blob-prune-margin-epochs")?
{
client_config.store.blob_prune_margin_epochs = blob_prune_margin_epochs;
}

/*
* Zero-ports
*
Expand Down
Loading

0 comments on commit 4156719

Please sign in to comment.