Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hierarchical state diffs #5978

Open
wants to merge 52 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
ec974b8
Start extracting freezer changes for tree-states
michaelsproul Jun 21, 2024
df5e716
Remove unused config args
dapplion Jun 21, 2024
17ce7d0
Add comments
dapplion Jun 21, 2024
3c5d722
Remove unwraps
dapplion Jun 21, 2024
31bcd84
Subjective more clear implementation
dapplion Jun 21, 2024
394abba
Clean up hdiff
michaelsproul Jul 2, 2024
cac7672
Update xdelta3
michaelsproul Jul 4, 2024
b87c6bb
Tree states archive metrics (#6040)
dapplion Jul 4, 2024
e578f5d
Port and clean up forwards iterator changes
michaelsproul Jul 5, 2024
bdcc818
Add and polish hierarchy-config flag
michaelsproul Jul 5, 2024
1501ba5
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Jul 5, 2024
aba6b8b
Cleaner errors
michaelsproul Jul 5, 2024
420e524
Fix beacon_chain test compilation
michaelsproul Jul 5, 2024
3bec78b
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Jul 9, 2024
0500e64
Patch a few more freezer block roots
michaelsproul Jul 9, 2024
2715f60
Fix genesis block root bug
michaelsproul Jul 11, 2024
fa1a941
Fix test failing due to pending updates
michaelsproul Jul 12, 2024
ee032df
Beacon chain tests passing
michaelsproul Jul 16, 2024
8b7b362
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Jul 16, 2024
3f87cd8
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Jul 29, 2024
d2049ca
Fix doc lint
michaelsproul Jul 29, 2024
57b73df
Implement DB schema upgrade for hierarchical state diffs (#6193)
dapplion Aug 19, 2024
71738b8
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Aug 19, 2024
17985a6
Fix test compilation
michaelsproul Aug 19, 2024
b2f785a
Update schema downgrade test
michaelsproul Aug 19, 2024
7789725
Fix tests
michaelsproul Aug 19, 2024
a4582c5
Fix null anchor migration
michaelsproul Aug 26, 2024
c8cea79
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Sep 2, 2024
47afa49
Fix tree states upgrade migration (#6328)
michaelsproul Sep 3, 2024
1e6b2d6
Clean hdiff CLI flag and metrics
michaelsproul Sep 5, 2024
45a0762
Fix "staged reconstruction"
michaelsproul Sep 9, 2024
ca7a7d7
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Sep 9, 2024
e00f639
Fix alloy issues
michaelsproul Sep 9, 2024
907a7c0
Fix staged reconstruction logic
michaelsproul Sep 9, 2024
024843e
Prevent weird slot drift
michaelsproul Sep 9, 2024
bdf04c8
Remove "allow" flag
michaelsproul Sep 9, 2024
2d9ce8f
Update CLI help
michaelsproul Sep 9, 2024
9de88fd
Remove FIXME about downgrade
michaelsproul Sep 9, 2024
05f93dd
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Sep 11, 2024
8a50f2a
Remove some unnecessary error variants
michaelsproul Sep 11, 2024
bcbf9b8
Fix new test
michaelsproul Sep 11, 2024
cf75901
Tree states archive - review comments and metrics (#6386)
dapplion Sep 16, 2024
dbd52f3
Update beacon_node/store/src/hot_cold_store.rs
michaelsproul Sep 16, 2024
5d3a83d
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Sep 16, 2024
3d90ac6
Clarify comment and remove anchor_slot garbage
michaelsproul Sep 16, 2024
1890278
Simplify database anchor (#6397)
michaelsproul Sep 19, 2024
f6118d2
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Sep 19, 2024
5d69f9c
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Sep 24, 2024
b66aa9a
More metrics
michaelsproul Sep 27, 2024
a33130a
Merge remote-tracking branch 'origin/unstable' into tree-states-archive
michaelsproul Oct 8, 2024
ab9c275
New historic state cache (#6475)
michaelsproul Oct 16, 2024
e87a618
Update database docs
michaelsproul Oct 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
403 changes: 320 additions & 83 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ unused_port = { path = "common/unused_port" }
validator_client = { path = "validator_client" }
validator_dir = { path = "common/validator_dir" }
warp_utils = { path = "common/warp_utils" }
xdelta3 = { git = "http://github.com/sigp/xdelta3-rs", rev = "2a06390cd5b61b44ca3eaa89632b4ba3410d3d7f" }
zstd = "0.13"

[profile.maxperf]
inherits = "release"
Expand Down
24 changes: 10 additions & 14 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
start_slot,
local_head.beacon_state.clone(),
local_head.beacon_block_root,
&self.spec,
)?;

Ok(iter.map(|result| result.map_err(Into::into)))
Expand All @@ -794,12 +793,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

self.with_head(move |head| {
let iter = self.store.forwards_block_roots_iterator_until(
start_slot,
end_slot,
|| Ok((head.beacon_state.clone(), head.beacon_block_root)),
&self.spec,
)?;
let iter =
self.store
.forwards_block_roots_iterator_until(start_slot, end_slot, || {
Ok((head.beacon_state.clone(), head.beacon_block_root))
})?;
Ok(iter
.map(|result| result.map_err(Into::into))
.take_while(move |result| {
Expand Down Expand Up @@ -869,7 +867,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
start_slot,
local_head.beacon_state_root(),
local_head.beacon_state.clone(),
&self.spec,
)?;

Ok(iter.map(|result| result.map_err(Into::into)))
Expand All @@ -886,12 +883,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
end_slot: Slot,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
self.with_head(move |head| {
let iter = self.store.forwards_state_roots_iterator_until(
start_slot,
end_slot,
|| Ok((head.beacon_state.clone(), head.beacon_state_root())),
&self.spec,
)?;
let iter =
self.store
.forwards_state_roots_iterator_until(start_slot, end_slot, || {
Ok((head.beacon_state.clone(), head.beacon_state_root()))
})?;
Ok(iter
.map(|result| result.map_err(Into::into))
.take_while(move |result| {
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ where
store
.put_block(&beacon_block_root, beacon_block.clone())
.map_err(|e| format!("Failed to store genesis block: {:?}", e))?;
store
.store_frozen_block_root_at_skip_slots(Slot::new(0), Slot::new(1), beacon_block_root)
.and_then(|ops| store.cold_db.do_atomically(ops))
.map_err(|e| format!("Failed to store genesis block root: {e:?}"))?;

// Store the genesis block under the `ZERO_HASH` key.
store
Expand Down
19 changes: 11 additions & 8 deletions beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::borrow::Cow;
use std::iter;
use std::time::Duration;
use store::metadata::DataColumnInfo;
use store::{chunked_vector::BlockRoots, AnchorInfo, BlobInfo, ChunkWriter, KeyValueStore};
use store::{get_key_for_col, AnchorInfo, BlobInfo, DBColumn, KeyValueStore, KeyValueStoreOp};
use types::{FixedBytesExtended, Hash256, Slot};

/// Use a longer timeout on the pubkey cache.
Expand Down Expand Up @@ -107,8 +107,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let mut expected_block_root = anchor_info.oldest_block_parent;
let mut prev_block_slot = anchor_info.oldest_block_slot;
let mut chunk_writer =
ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?;
let mut new_oldest_blob_slot = blob_info.oldest_blob_slot;
let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot;

Expand Down Expand Up @@ -147,8 +145,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

// Store block roots, including at all skip slots in the freezer DB.
for slot in (block.slot().as_usize()..prev_block_slot.as_usize()).rev() {
chunk_writer.set(slot, block_root, &mut cold_batch)?;
for slot in (block.slot().as_u64()..prev_block_slot.as_u64()).rev() {
cold_batch.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(DBColumn::BeaconBlockRoots.into(), &slot.to_be_bytes()),
block_root.as_slice().to_vec(),
));
}

prev_block_slot = block.slot();
Expand All @@ -160,15 +161,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// completion.
if expected_block_root == self.genesis_block_root {
let genesis_slot = self.spec.genesis_slot;
for slot in genesis_slot.as_usize()..prev_block_slot.as_usize() {
chunk_writer.set(slot, self.genesis_block_root, &mut cold_batch)?;
for slot in genesis_slot.as_u64()..prev_block_slot.as_u64() {
cold_batch.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(DBColumn::BeaconBlockRoots.into(), &slot.to_be_bytes()),
self.genesis_block_root.as_slice().to_vec(),
));
}
prev_block_slot = genesis_slot;
expected_block_root = Hash256::zero();
break;
}
}
chunk_writer.write(&mut cold_batch)?;
// these were pushed in reverse order so we reverse again
signed_blocks.reverse();

Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1928,6 +1928,7 @@ pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
let attestation_stats = beacon_chain.op_pool.attestation_stats();
let chain_metrics = beacon_chain.metrics();

// Kept duplicated for backwards compatibility
set_gauge_by_usize(
&BLOCK_PROCESSING_SNAPSHOT_CACHE_SIZE,
beacon_chain.store.state_cache_len(),
Expand Down
63 changes: 46 additions & 17 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ const MAX_COMPACTION_PERIOD_SECONDS: u64 = 604800;
const MIN_COMPACTION_PERIOD_SECONDS: u64 = 7200;
/// Compact after a large finality gap, if we respect `MIN_COMPACTION_PERIOD_SECONDS`.
const COMPACTION_FINALITY_DISTANCE: u64 = 1024;
/// Maximum number of blocks applied in each reconstruction burst.
///
/// This limits the amount of time that the finalization migration is paused for.
const BLOCKS_PER_RECONSTRUCTION: usize = 8192 * 4;

/// Default number of epochs to wait between finalization migrations.
pub const DEFAULT_EPOCHS_PER_MIGRATION: u64 = 1;
Expand Down Expand Up @@ -188,7 +192,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
if let Some(Notification::Reconstruction) =
self.send_background_notification(Notification::Reconstruction)
{
Self::run_reconstruction(self.db.clone(), &self.log);
// If we are running in foreground mode (as in tests), then this will just run a single
// batch. We may need to tweak this in future.
Self::run_reconstruction(self.db.clone(), None, &self.log);
}
}

Expand All @@ -200,13 +206,34 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}
}

pub fn run_reconstruction(db: Arc<HotColdDB<E, Hot, Cold>>, log: &Logger) {
if let Err(e) = db.reconstruct_historic_states() {
error!(
log,
"State reconstruction failed";
"error" => ?e,
);
pub fn run_reconstruction(
db: Arc<HotColdDB<E, Hot, Cold>>,
opt_tx: Option<mpsc::Sender<Notification>>,
log: &Logger,
) {
match db.reconstruct_historic_states(Some(BLOCKS_PER_RECONSTRUCTION)) {
Ok(()) => {
// Schedule another reconstruction batch if required and we have access to the
// channel for requeueing.
if let Some(tx) = opt_tx {
if db.get_anchor_info().is_some() {
if let Err(e) = tx.send(Notification::Reconstruction) {
error!(
log,
"Unable to requeue reconstruction notification";
"error" => ?e
);
}
}
}
}
Err(e) => {
error!(
log,
"State reconstruction failed";
"error" => ?e,
);
}
}
}

Expand Down Expand Up @@ -388,6 +415,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
log: Logger,
) -> (mpsc::Sender<Notification>, thread::JoinHandle<()>) {
let (tx, rx) = mpsc::channel();
let inner_tx = tx.clone();
let thread = thread::spawn(move || {
while let Ok(notif) = rx.recv() {
let mut reconstruction_notif = None;
Expand Down Expand Up @@ -418,16 +446,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}
}
}
// If reconstruction is on-going, ignore finalization migration and blob pruning.
// Run finalization and blob pruning migrations first, then a reconstruction batch.
// This prevents finalization from being starved while reconstruciton runs (a
// problem in previous LH versions).
if let Some(fin) = finalization_notif {
Self::run_migration(db.clone(), fin, &log);
}
if let Some(dab) = prune_blobs_notif {
Self::run_prune_blobs(db.clone(), dab, &log);
}
if reconstruction_notif.is_some() {
Self::run_reconstruction(db.clone(), &log);
} else {
if let Some(fin) = finalization_notif {
Self::run_migration(db.clone(), fin, &log);
}
if let Some(dab) = prune_blobs_notif {
Self::run_prune_blobs(db.clone(), dab, &log);
}
Self::run_reconstruction(db.clone(), Some(inner_tx.clone()), &log);
}
}
});
Expand Down
32 changes: 10 additions & 22 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,38 @@
//! Utilities for managing database schema changes.
mod migration_schema_v20;
mod migration_schema_v21;
mod migration_schema_v22;

use crate::beacon_chain::BeaconChainTypes;
use crate::types::ChainSpec;
use slog::Logger;
use std::sync::Arc;
use store::hot_cold_store::{HotColdDB, HotColdDBError};
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::Error as StoreError;
use types::Hash256;

/// Migrate the database from one schema version to another, applying all requisite mutations.
#[allow(clippy::only_used_in_recursion)] // spec is not used but likely to be used in future
pub fn migrate_schema<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
deposit_contract_deploy_block: u64,
genesis_state_root: Option<Hash256>,
from: SchemaVersion,
to: SchemaVersion,
log: Logger,
spec: &ChainSpec,
) -> Result<(), StoreError> {
match (from, to) {
// Migrating from the current schema version to itself is always OK, a no-op.
(_, _) if from == to && to == CURRENT_SCHEMA_VERSION => Ok(()),
// Upgrade across multiple versions by recursively migrating one step at a time.
(_, _) if from.as_u64() + 1 < to.as_u64() => {
let next = SchemaVersion(from.as_u64() + 1);
migrate_schema::<T>(
db.clone(),
deposit_contract_deploy_block,
from,
next,
log.clone(),
spec,
)?;
migrate_schema::<T>(db, deposit_contract_deploy_block, next, to, log, spec)
migrate_schema::<T>(db.clone(), genesis_state_root, from, next, log.clone())?;
migrate_schema::<T>(db, genesis_state_root, next, to, log)
}
// Downgrade across multiple versions by recursively migrating one step at a time.
(_, _) if to.as_u64() + 1 < from.as_u64() => {
let next = SchemaVersion(from.as_u64() - 1);
migrate_schema::<T>(
db.clone(),
deposit_contract_deploy_block,
from,
next,
log.clone(),
spec,
)?;
migrate_schema::<T>(db, deposit_contract_deploy_block, next, to, log, spec)
migrate_schema::<T>(db.clone(), genesis_state_root, from, next, log.clone())?;
migrate_schema::<T>(db, genesis_state_root, next, to, log)
}

//
Expand All @@ -69,6 +54,9 @@ pub fn migrate_schema<T: BeaconChainTypes>(
let ops = migration_schema_v21::downgrade_from_v21::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(21), SchemaVersion(22)) => {
migration_schema_v22::upgrade_to_v22::<T>(db.clone(), genesis_state_root, log)
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,
Expand Down
Loading
Loading