Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hierarchical state diffs #5978

Open
wants to merge 52 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
ec974b8
Start extracting freezer changes for tree-states
michaelsproul Jun 21, 2024
df5e716
Remove unused config args
dapplion Jun 21, 2024
17ce7d0
Add comments
dapplion Jun 21, 2024
3c5d722
Remove unwraps
dapplion Jun 21, 2024
31bcd84
Subjective more clear implementation
dapplion Jun 21, 2024
394abba
Clean up hdiff
michaelsproul Jul 2, 2024
cac7672
Update xdelta3
michaelsproul Jul 4, 2024
b87c6bb
Tree states archive metrics (#6040)
dapplion Jul 4, 2024
e578f5d
Port and clean up forwards iterator changes
michaelsproul Jul 5, 2024
bdcc818
Add and polish hierarchy-config flag
michaelsproul Jul 5, 2024
1501ba5
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Jul 5, 2024
aba6b8b
Cleaner errors
michaelsproul Jul 5, 2024
420e524
Fix beacon_chain test compilation
michaelsproul Jul 5, 2024
3bec78b
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Jul 9, 2024
0500e64
Patch a few more freezer block roots
michaelsproul Jul 9, 2024
2715f60
Fix genesis block root bug
michaelsproul Jul 11, 2024
fa1a941
Fix test failing due to pending updates
michaelsproul Jul 12, 2024
ee032df
Beacon chain tests passing
michaelsproul Jul 16, 2024
8b7b362
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Jul 16, 2024
3f87cd8
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Jul 29, 2024
d2049ca
Fix doc lint
michaelsproul Jul 29, 2024
57b73df
Implement DB schema upgrade for hierarchical state diffs (#6193)
dapplion Aug 19, 2024
71738b8
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Aug 19, 2024
17985a6
Fix test compilation
michaelsproul Aug 19, 2024
b2f785a
Update schema downgrade test
michaelsproul Aug 19, 2024
7789725
Fix tests
michaelsproul Aug 19, 2024
a4582c5
Fix null anchor migration
michaelsproul Aug 26, 2024
c8cea79
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Sep 2, 2024
47afa49
Fix tree states upgrade migration (#6328)
michaelsproul Sep 3, 2024
1e6b2d6
Clean hdiff CLI flag and metrics
michaelsproul Sep 5, 2024
45a0762
Fix "staged reconstruction"
michaelsproul Sep 9, 2024
ca7a7d7
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Sep 9, 2024
e00f639
Fix alloy issues
michaelsproul Sep 9, 2024
907a7c0
Fix staged reconstruction logic
michaelsproul Sep 9, 2024
024843e
Prevent weird slot drift
michaelsproul Sep 9, 2024
bdf04c8
Remove "allow" flag
michaelsproul Sep 9, 2024
2d9ce8f
Update CLI help
michaelsproul Sep 9, 2024
9de88fd
Remove FIXME about downgrade
michaelsproul Sep 9, 2024
05f93dd
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Sep 11, 2024
8a50f2a
Remove some unnecessary error variants
michaelsproul Sep 11, 2024
bcbf9b8
Fix new test
michaelsproul Sep 11, 2024
cf75901
Tree states archive - review comments and metrics (#6386)
dapplion Sep 16, 2024
dbd52f3
Update beacon_node/store/src/hot_cold_store.rs
michaelsproul Sep 16, 2024
5d3a83d
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Sep 16, 2024
3d90ac6
Clarify comment and remove anchor_slot garbage
michaelsproul Sep 16, 2024
1890278
Simplify database anchor (#6397)
michaelsproul Sep 19, 2024
f6118d2
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Sep 19, 2024
5d69f9c
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Sep 24, 2024
b66aa9a
More metrics
michaelsproul Sep 27, 2024
a33130a
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Oct 8, 2024
ab9c275
New historic state cache (#6475)
michaelsproul Oct 16, 2024
e87a618
Update database docs
michaelsproul Oct 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
399 changes: 318 additions & 81 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ unused_port = { path = "common/unused_port" }
validator_client = { path = "validator_client" }
validator_dir = { path = "common/validator_dir" }
warp_utils = { path = "common/warp_utils" }
xdelta3 = { git = "http://github.com/sigp/xdelta3-rs", rev = "2a06390cd5b61b44ca3eaa89632b4ba3410d3d7f" }
zstd = "0.13"

[profile.maxperf]
inherits = "release"
Expand Down
24 changes: 10 additions & 14 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
start_slot,
local_head.beacon_state.clone(),
local_head.beacon_block_root,
&self.spec,
)?;

Ok(iter.map(|result| result.map_err(Into::into)))
Expand All @@ -794,12 +793,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

self.with_head(move |head| {
let iter = self.store.forwards_block_roots_iterator_until(
start_slot,
end_slot,
|| Ok((head.beacon_state.clone(), head.beacon_block_root)),
&self.spec,
)?;
let iter =
self.store
.forwards_block_roots_iterator_until(start_slot, end_slot, || {
Ok((head.beacon_state.clone(), head.beacon_block_root))
})?;
Ok(iter
.map(|result| result.map_err(Into::into))
.take_while(move |result| {
Expand Down Expand Up @@ -869,7 +867,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
start_slot,
local_head.beacon_state_root(),
local_head.beacon_state.clone(),
&self.spec,
)?;

Ok(iter.map(|result| result.map_err(Into::into)))
Expand All @@ -886,12 +883,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
end_slot: Slot,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
self.with_head(move |head| {
let iter = self.store.forwards_state_roots_iterator_until(
start_slot,
end_slot,
|| Ok((head.beacon_state.clone(), head.beacon_state_root())),
&self.spec,
)?;
let iter =
self.store
.forwards_state_roots_iterator_until(start_slot, end_slot, || {
Ok((head.beacon_state.clone(), head.beacon_state_root()))
})?;
Ok(iter
.map(|result| result.map_err(Into::into))
.take_while(move |result| {
Expand Down
19 changes: 0 additions & 19 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,9 +838,6 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {

let block_root = get_block_header_root(block_header);

// Disallow blocks that conflict with the anchor (weak subjectivity checkpoint), if any.
check_block_against_anchor_slot(block.message(), chain)?;

// Do not gossip a block from a finalized slot.
check_block_against_finalized_slot(block.message(), block_root, chain)?;

Expand Down Expand Up @@ -1073,9 +1070,6 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
.fork_name(&chain.spec)
.map_err(BlockError::InconsistentFork)?;

// Check the anchor slot before loading the parent, to avoid spurious lookups.
check_block_against_anchor_slot(block.message(), chain)?;

let (mut parent, block) = load_parent(block, chain)?;

let state = cheap_state_advance_to_obtain_committees::<_, BlockError>(
Expand Down Expand Up @@ -1710,19 +1704,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
}
}

/// Returns `Ok(())` if the block's slot is greater than the anchor block's slot (if any).
fn check_block_against_anchor_slot<T: BeaconChainTypes>(
block: BeaconBlockRef<'_, T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<(), BlockError> {
if let Some(anchor_slot) = chain.store.get_anchor_slot() {
if block.slot() <= anchor_slot {
return Err(BlockError::WeakSubjectivityConflict);
}
}
Ok(())
}

/// Returns `Ok(())` if the block is later than the finalized slot on `chain`.
///
/// Returns an error if the block is earlier or equal to the finalized slot, or there was an error
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ where
store
.put_block(&beacon_block_root, beacon_block.clone())
.map_err(|e| format!("Failed to store genesis block: {:?}", e))?;
store
.store_frozen_block_root_at_skip_slots(Slot::new(0), Slot::new(1), beacon_block_root)
.and_then(|ops| store.cold_db.do_atomically(ops))
.map_err(|e| format!("Failed to store genesis block root: {e:?}"))?;

// Store the genesis block under the `ZERO_HASH` key.
store
Expand Down
28 changes: 13 additions & 15 deletions beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::borrow::Cow;
use std::iter;
use std::time::Duration;
use store::metadata::DataColumnInfo;
use store::{chunked_vector::BlockRoots, AnchorInfo, BlobInfo, ChunkWriter, KeyValueStore};
use store::{get_key_for_col, AnchorInfo, BlobInfo, DBColumn, KeyValueStore, KeyValueStoreOp};
use types::{FixedBytesExtended, Hash256, Slot};

/// Use a longer timeout on the pubkey cache.
Expand All @@ -33,8 +33,6 @@ pub enum HistoricalBlockError {
InvalidSignature,
/// Transitory error, caller should retry with the same blocks.
ValidatorPubkeyCacheTimeout,
/// No historical sync needed.
NoAnchorInfo,
/// Logic error: should never occur.
IndexOutOfBounds,
}
Expand Down Expand Up @@ -62,10 +60,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
mut blocks: Vec<AvailableBlock<T::EthSpec>>,
) -> Result<usize, Error> {
let anchor_info = self
.store
.get_anchor_info()
.ok_or(HistoricalBlockError::NoAnchorInfo)?;
let anchor_info = self.store.get_anchor_info();
let blob_info = self.store.get_blob_info();
let data_column_info = self.store.get_data_column_info();

Expand Down Expand Up @@ -109,8 +104,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let mut expected_block_root = anchor_info.oldest_block_parent;
let mut prev_block_slot = anchor_info.oldest_block_slot;
let mut chunk_writer =
ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?;
let mut new_oldest_blob_slot = blob_info.oldest_blob_slot;
let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot;

Expand Down Expand Up @@ -149,8 +142,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

// Store block roots, including at all skip slots in the freezer DB.
for slot in (block.slot().as_usize()..prev_block_slot.as_usize()).rev() {
chunk_writer.set(slot, block_root, &mut cold_batch)?;
for slot in (block.slot().as_u64()..prev_block_slot.as_u64()).rev() {
cold_batch.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(DBColumn::BeaconBlockRoots.into(), &slot.to_be_bytes()),
block_root.as_slice().to_vec(),
));
}

prev_block_slot = block.slot();
Expand All @@ -162,15 +158,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// completion.
if expected_block_root == self.genesis_block_root {
let genesis_slot = self.spec.genesis_slot;
for slot in genesis_slot.as_usize()..prev_block_slot.as_usize() {
chunk_writer.set(slot, self.genesis_block_root, &mut cold_batch)?;
for slot in genesis_slot.as_u64()..prev_block_slot.as_u64() {
cold_batch.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(DBColumn::BeaconBlockRoots.into(), &slot.to_be_bytes()),
self.genesis_block_root.as_slice().to_vec(),
));
}
prev_block_slot = genesis_slot;
expected_block_root = Hash256::zero();
break;
}
}
chunk_writer.write(&mut cold_batch)?;
// these were pushed in reverse order so we reverse again
signed_blocks.reverse();

Expand Down Expand Up @@ -262,7 +260,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let backfill_complete = new_anchor.block_backfill_complete(self.genesis_backfill_slot);
anchor_and_blob_batch.push(
self.store
.compare_and_set_anchor_info(Some(anchor_info), Some(new_anchor))?,
.compare_and_set_anchor_info(anchor_info, new_anchor)?,
);
self.store.hot_db.do_atomically(anchor_and_blob_batch)?;

Expand Down
3 changes: 3 additions & 0 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1928,6 +1928,7 @@ pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
let attestation_stats = beacon_chain.op_pool.attestation_stats();
let chain_metrics = beacon_chain.metrics();

// Kept duplicated for backwards compatibility
set_gauge_by_usize(
&BLOCK_PROCESSING_SNAPSHOT_CACHE_SIZE,
beacon_chain.store.state_cache_len(),
Expand Down Expand Up @@ -1991,6 +1992,8 @@ pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
.canonical_head
.fork_choice_read_lock()
.scrape_for_metrics();

beacon_chain.store.register_metrics();
}

/// Scrape the given `state` assuming it's the head state, updating the `DEFAULT_REGISTRY`.
Expand Down
63 changes: 46 additions & 17 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ const MAX_COMPACTION_PERIOD_SECONDS: u64 = 604800;
const MIN_COMPACTION_PERIOD_SECONDS: u64 = 7200;
/// Compact after a large finality gap, if we respect `MIN_COMPACTION_PERIOD_SECONDS`.
const COMPACTION_FINALITY_DISTANCE: u64 = 1024;
/// Maximum number of blocks applied in each reconstruction burst.
///
/// This limits the amount of time that the finalization migration is paused for.
const BLOCKS_PER_RECONSTRUCTION: usize = 8192 * 4;

/// Default number of epochs to wait between finalization migrations.
pub const DEFAULT_EPOCHS_PER_MIGRATION: u64 = 1;
Expand Down Expand Up @@ -188,7 +192,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
if let Some(Notification::Reconstruction) =
self.send_background_notification(Notification::Reconstruction)
{
Self::run_reconstruction(self.db.clone(), &self.log);
// If we are running in foreground mode (as in tests), then this will just run a single
// batch. We may need to tweak this in future.
Self::run_reconstruction(self.db.clone(), None, &self.log);
}
}

Expand All @@ -200,13 +206,34 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}
}

pub fn run_reconstruction(db: Arc<HotColdDB<E, Hot, Cold>>, log: &Logger) {
if let Err(e) = db.reconstruct_historic_states() {
error!(
log,
"State reconstruction failed";
"error" => ?e,
);
pub fn run_reconstruction(
db: Arc<HotColdDB<E, Hot, Cold>>,
opt_tx: Option<mpsc::Sender<Notification>>,
log: &Logger,
) {
match db.reconstruct_historic_states(Some(BLOCKS_PER_RECONSTRUCTION)) {
Ok(()) => {
// Schedule another reconstruction batch if required and we have access to the
// channel for requeueing.
if let Some(tx) = opt_tx {
if !db.get_anchor_info().all_historic_states_stored() {
if let Err(e) = tx.send(Notification::Reconstruction) {
error!(
log,
"Unable to requeue reconstruction notification";
"error" => ?e
);
}
}
}
}
Err(e) => {
error!(
log,
"State reconstruction failed";
"error" => ?e,
);
}
}
}

Expand Down Expand Up @@ -388,6 +415,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
log: Logger,
) -> (mpsc::Sender<Notification>, thread::JoinHandle<()>) {
let (tx, rx) = mpsc::channel();
let inner_tx = tx.clone();
let thread = thread::spawn(move || {
while let Ok(notif) = rx.recv() {
let mut reconstruction_notif = None;
Expand Down Expand Up @@ -418,16 +446,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}
}
}
// If reconstruction is on-going, ignore finalization migration and blob pruning.
// Run finalization and blob pruning migrations first, then a reconstruction batch.
// This prevents finalization from being starved while reconstruciton runs (a
// problem in previous LH versions).
if let Some(fin) = finalization_notif {
Self::run_migration(db.clone(), fin, &log);
}
if let Some(dab) = prune_blobs_notif {
Self::run_prune_blobs(db.clone(), dab, &log);
}
if reconstruction_notif.is_some() {
Self::run_reconstruction(db.clone(), &log);
} else {
if let Some(fin) = finalization_notif {
Self::run_migration(db.clone(), fin, &log);
}
if let Some(dab) = prune_blobs_notif {
Self::run_prune_blobs(db.clone(), dab, &log);
}
Self::run_reconstruction(db.clone(), Some(inner_tx.clone()), &log);
}
}
});
Expand Down
34 changes: 12 additions & 22 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,38 @@
//! Utilities for managing database schema changes.
mod migration_schema_v20;
mod migration_schema_v21;
mod migration_schema_v22;

use crate::beacon_chain::BeaconChainTypes;
use crate::types::ChainSpec;
use slog::Logger;
use std::sync::Arc;
use store::hot_cold_store::{HotColdDB, HotColdDBError};
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::Error as StoreError;
use types::Hash256;

/// Migrate the database from one schema version to another, applying all requisite mutations.
#[allow(clippy::only_used_in_recursion)] // spec is not used but likely to be used in future
pub fn migrate_schema<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
deposit_contract_deploy_block: u64,
genesis_state_root: Option<Hash256>,
from: SchemaVersion,
to: SchemaVersion,
log: Logger,
spec: &ChainSpec,
) -> Result<(), StoreError> {
match (from, to) {
// Migrating from the current schema version to itself is always OK, a no-op.
(_, _) if from == to && to == CURRENT_SCHEMA_VERSION => Ok(()),
// Upgrade across multiple versions by recursively migrating one step at a time.
(_, _) if from.as_u64() + 1 < to.as_u64() => {
let next = SchemaVersion(from.as_u64() + 1);
migrate_schema::<T>(
db.clone(),
deposit_contract_deploy_block,
from,
next,
log.clone(),
spec,
)?;
migrate_schema::<T>(db, deposit_contract_deploy_block, next, to, log, spec)
migrate_schema::<T>(db.clone(), genesis_state_root, from, next, log.clone())?;
migrate_schema::<T>(db, genesis_state_root, next, to, log)
}
// Downgrade across multiple versions by recursively migrating one step at a time.
(_, _) if to.as_u64() + 1 < from.as_u64() => {
let next = SchemaVersion(from.as_u64() - 1);
migrate_schema::<T>(
db.clone(),
deposit_contract_deploy_block,
from,
next,
log.clone(),
spec,
)?;
migrate_schema::<T>(db, deposit_contract_deploy_block, next, to, log, spec)
migrate_schema::<T>(db.clone(), genesis_state_root, from, next, log.clone())?;
migrate_schema::<T>(db, genesis_state_root, next, to, log)
}

//
Expand All @@ -69,6 +54,11 @@ pub fn migrate_schema<T: BeaconChainTypes>(
let ops = migration_schema_v21::downgrade_from_v21::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(21), SchemaVersion(22)) => {
// This migration needs to sync data between hot and cold DBs. The schema version is
// bumped inside the upgrade_to_v22 fn
migration_schema_v22::upgrade_to_v22::<T>(db.clone(), genesis_state_root, log)
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,
Expand Down
Loading
Loading