diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index f120e18..efd58ef 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::time::{Duration, SystemTime}; use ethlambda_state_transition::is_proposer; +use ethlambda_storage::Store; use ethlambda_types::{ attestation::{Attestation, AttestationData, SignedAttestation}, block::{BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation}, @@ -12,7 +13,6 @@ use ethlambda_types::{ use spawned_concurrency::tasks::{ CallResponse, CastResponse, GenServer, GenServerHandle, send_after, }; -use store::Store; use tokio::sync::mpsc; use tracing::{error, info, warn}; @@ -109,8 +109,7 @@ impl BlockChainServer { .flatten(); // Tick the store first - this accepts attestations at interval 0 if we have a proposal - self.store - .on_tick(timestamp, proposer_validator_id.is_some()); + store::on_tick(&mut self.store, timestamp, proposer_validator_id.is_some()); // Now build and publish the block (after attestations have been accepted) if let Some(validator_id) = proposer_validator_id { @@ -144,7 +143,7 @@ impl BlockChainServer { let num_validators = head_state.validators.len() as u64; // Produce attestation data once for all validators - let attestation_data = self.store.produce_attestation_data(slot); + let attestation_data = store::produce_attestation_data(&self.store, slot); // For each registered validator, produce and publish attestation for validator_id in self.key_manager.validator_ids() { @@ -191,10 +190,9 @@ impl BlockChainServer { info!(%slot, %validator_id, "We are the proposer for this slot"); // Build the block with attestation signatures - let Ok((block, attestation_signatures)) = self - .store - .produce_block_with_signatures(slot, validator_id) - .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) + let Ok((block, attestation_signatures)) = + store::produce_block_with_signatures(&mut self.store, slot, validator_id) + .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) else { return; }; @@ -208,8 +206,8 @@ impl BlockChainServer { root: block.tree_hash_root(), slot: block.slot, }, - target: self.store.get_attestation_target(), - source: *self.store.latest_justified(), + target: store::get_attestation_target(&self.store), + source: self.store.latest_justified(), }, }; @@ -261,7 +259,7 @@ impl BlockChainServer { signed_block: SignedBlockWithAttestation, ) -> Result<(), StoreError> { let slot = signed_block.message.block.slot; - self.store.on_block(signed_block)?; + store::on_block(&mut self.store, signed_block)?; metrics::update_head_slot(slot); metrics::update_latest_justified_slot(self.store.latest_justified().slot); metrics::update_latest_finalized_slot(self.store.latest_finalized().slot); @@ -277,7 +275,7 @@ impl BlockChainServer { } fn on_gossip_attestation(&mut self, attestation: SignedAttestation) { - if let Err(err) = self.store.on_gossip_attestation(attestation) { + if let Err(err) = store::on_gossip_attestation(&mut self.store, attestation) { warn!(%err, "Failed to process gossiped attestation"); } } diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 180354a..42b891d 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -4,6 +4,7 @@ use ethlambda_crypto::aggregate_signatures; use ethlambda_state_transition::{ is_proposer, process_block, process_slots, slot_is_justifiable_after, }; +use ethlambda_storage::{ForkCheckpoints, SignatureKey, Store}; use ethlambda_types::{ attestation::{AggregatedAttestation, Attestation, AttestationData, SignedAttestation}, block::{ @@ -12,7 +13,7 @@ use ethlambda_types::{ }, primitives::{H256, TreeHash}, signature::ValidatorSignature, - state::{ChainConfig, Checkpoint, State, Validator}, + state::{Checkpoint, State, Validator}, }; use tracing::{info, trace, warn}; @@ -20,755 +21,530 @@ use crate::{SECONDS_PER_SLOT, metrics}; const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; -/// Key for looking up individual validator signatures. -/// Used to index signature caches by (validator, message) pairs. -/// -/// Values are (validator_index, attestation_data_root). -type SignatureKey = (u64, H256); - -/// Forkchoice store tracking chain state and validator attestations. -/// -/// This is the "local view" that a node uses to run LMD GHOST. It contains: -/// -/// - which blocks and states are known, -/// - which checkpoints are justified and finalized, -/// - which block is currently considered the head, -/// - and, for each validator, their latest attestation that should influence fork choice. -/// -/// The `Store` is updated whenever: -/// - a new block is processed, -/// - an attestation is received (via a block or gossip), -/// - an interval tick occurs (activating new attestations), -/// - or when the head is recomputed. -#[derive(Clone)] -pub struct Store { - /// Current time in intervals since genesis. - time: u64, - - /// Chain configuration parameters. - config: ChainConfig, - - /// Root of the current canonical chain head block. - /// - /// This is the result of running the fork choice algorithm on the current contents of the `Store`. - head: H256, - - /// Root of the current safe target for attestation. - /// - /// This can be used by higher-level logic to restrict which blocks are - /// considered safe to attest to, based on additional safety conditions. - /// - safe_target: H256, - - /// Highest slot justified checkpoint known to the store. - /// - /// LMD GHOST starts from this checkpoint when computing the head. - /// - /// Only descendants of this checkpoint are considered viable. - latest_justified: Checkpoint, - - /// Highest slot finalized checkpoint known to the store. - /// - /// Everything strictly before this checkpoint can be considered immutable. - /// - /// Fork choice will never revert finalized history. - latest_finalized: Checkpoint, - - /// Mapping from block root to Block objects. - /// - /// This is the set of blocks that the node currently knows about. - /// - /// Every block that might participate in fork choice must appear here. - blocks: HashMap, - - /// Mapping from block root to State objects. - /// - /// For each known block, we keep its post-state. - /// - /// These states carry justified and finalized checkpoints that we use to update the - /// `Store`'s latest justified and latest finalized checkpoints. - states: HashMap, - - /// Latest signed attestations by validator that have been processed. - /// - /// - These attestations are "known" and contribute to fork choice weights. - /// - Keyed by validator index to enforce one attestation per validator. - latest_known_attestations: HashMap, - - /// Latest signed attestations by validator that are pending processing. - /// - /// - These attestations are "new" and do not yet contribute to fork choice. - /// - They migrate to `latest_known_attestations` via interval ticks. - /// - Keyed by validator index to enforce one attestation per validator. - latest_new_attestations: HashMap, - - /// Per-validator XMSS signatures learned from gossip. - /// - /// Keyed by SignatureKey(validator_id, attestation_data_root). - gossip_signatures: HashMap, - - /// Aggregated signature proofs learned from blocks. - /// - Keyed by SignatureKey(validator_id, attestation_data_root). - /// - Values are lists of AggregatedSignatureProof, each containing the participants - /// bitfield indicating which validators signed. - /// - Used for recursive signature aggregation when building blocks. - /// - Populated by on_block. - aggregated_payloads: HashMap>, +/// Accept new attestations, moving them from pending to known. +fn accept_new_attestations(store: &mut Store) { + store.promote_new_attestations(); + update_head(store); } -impl Store { - pub fn from_genesis(mut genesis_state: State) -> Self { - // Ensure the header state root is zero before computing the state root - genesis_state.latest_block_header.state_root = H256::ZERO; - - let genesis_state_root = genesis_state.tree_hash_root(); - let genesis_block = Block { - slot: 0, - proposer_index: 0, - parent_root: H256::ZERO, - state_root: genesis_state_root, - body: Default::default(), - }; - Self::get_forkchoice_store(genesis_state, genesis_block) +/// Update the head based on the fork choice rule. +fn update_head(store: &mut Store) { + let blocks: HashMap = store.iter_blocks().collect(); + let attestations: HashMap = store.iter_known_attestations().collect(); + let old_head = store.head(); + let new_head = ethlambda_fork_choice::compute_lmd_ghost_head( + store.latest_justified().root, + &blocks, + &attestations, + 0, + ); + if is_reorg(old_head, new_head, store) { + metrics::inc_fork_choice_reorgs(); + info!(%old_head, %new_head, "Fork choice reorg detected"); } + store.update_checkpoints(ForkCheckpoints::head_only(new_head)); +} - pub fn get_forkchoice_store(anchor_state: State, anchor_block: Block) -> Self { - let anchor_state_root = anchor_state.tree_hash_root(); - let anchor_block_root = anchor_block.tree_hash_root(); - - let mut blocks = HashMap::new(); - blocks.insert(anchor_block_root, anchor_block.clone()); - - let mut states = HashMap::new(); - states.insert(anchor_block_root, anchor_state.clone()); - - let anchor_checkpoint = Checkpoint { - root: anchor_block_root, - slot: 0, - }; +/// Update the safe target for attestation. +fn update_safe_target(store: &mut Store) { + let head_state = store.get_state(&store.head()).expect("head state exists"); + let num_validators = head_state.validators.len() as u64; + + let min_target_score = (num_validators * 2).div_ceil(3); + + let blocks: HashMap = store.iter_blocks().collect(); + let attestations: HashMap = store.iter_new_attestations().collect(); + let safe_target = ethlambda_fork_choice::compute_lmd_ghost_head( + store.latest_justified().root, + &blocks, + &attestations, + min_target_score, + ); + store.set_safe_target(safe_target); +} - info!(%anchor_state_root, %anchor_block_root, "Initialized store"); - - Self { - time: 0, - config: anchor_state.config.clone(), - head: anchor_block_root, - safe_target: anchor_block_root, - latest_justified: anchor_checkpoint, - latest_finalized: anchor_checkpoint, - blocks, - states, - latest_known_attestations: HashMap::new(), - latest_new_attestations: HashMap::new(), - gossip_signatures: HashMap::new(), - aggregated_payloads: HashMap::new(), - } +/// Validate incoming attestation before processing. +/// +/// Ensures the vote respects the basic laws of time and topology: +/// 1. The blocks voted for must exist in our store. +/// 2. A vote cannot span backwards in time (source > target). +/// 3. A vote cannot be for a future slot. +fn validate_attestation(store: &Store, attestation: &Attestation) -> Result<(), StoreError> { + let data = &attestation.data; + + // Availability Check - We cannot count a vote if we haven't seen the blocks involved. + let source_block = store + .get_block(&data.source.root) + .ok_or(StoreError::UnknownSourceBlock(data.source.root))?; + let target_block = store + .get_block(&data.target.root) + .ok_or(StoreError::UnknownTargetBlock(data.target.root))?; + + if !store.contains_block(&data.head.root) { + return Err(StoreError::UnknownHeadBlock(data.head.root)); } - pub fn accept_new_attestations(&mut self) { - let mut latest_new_attestations = std::mem::take(&mut self.latest_new_attestations); - self.latest_known_attestations - .extend(latest_new_attestations.drain()); - self.latest_new_attestations = latest_new_attestations; - - self.update_head(); + // Topology Check - Source must be older than Target. + if data.source.slot > data.target.slot { + return Err(StoreError::SourceExceedsTarget); } - pub fn update_head(&mut self) { - let old_head = self.head; - let new_head = ethlambda_fork_choice::compute_lmd_ghost_head( - self.latest_justified.root, - &self.blocks, - &self.latest_known_attestations, - 0, - ); - - if is_reorg(old_head, new_head, &self.blocks) { - metrics::inc_fork_choice_reorgs(); - info!(%old_head, %new_head, "Fork choice reorg detected"); - } - - self.head = new_head; + // Consistency Check - Validate checkpoint slots match block slots. + if source_block.slot != data.source.slot { + return Err(StoreError::SourceSlotMismatch { + checkpoint_slot: data.source.slot, + block_slot: source_block.slot, + }); } - - pub fn update_safe_target(&mut self) { - let head_state = &self.states[&self.head]; - let num_validators = head_state.validators.len() as u64; - - let min_target_score = (num_validators * 2).div_ceil(3); - - let safe_target = ethlambda_fork_choice::compute_lmd_ghost_head( - self.latest_justified.root, - &self.blocks, - &self.latest_new_attestations, - min_target_score, - ); - self.safe_target = safe_target; + if target_block.slot != data.target.slot { + return Err(StoreError::TargetSlotMismatch { + checkpoint_slot: data.target.slot, + block_slot: target_block.slot, + }); } - /// Validate incoming attestation before processing. - /// - /// Ensures the vote respects the basic laws of time and topology: - /// 1. The blocks voted for must exist in our store. - /// 2. A vote cannot span backwards in time (source > target). - /// 3. A vote cannot be for a future slot. - pub fn validate_attestation(&self, attestation: &Attestation) -> Result<(), StoreError> { - let data = &attestation.data; - - // Availability Check - We cannot count a vote if we haven't seen the blocks involved. - let source_block = self - .blocks - .get(&data.source.root) - .ok_or(StoreError::UnknownSourceBlock(data.source.root))?; - let target_block = self - .blocks - .get(&data.target.root) - .ok_or(StoreError::UnknownTargetBlock(data.target.root))?; - - if !self.blocks.contains_key(&data.head.root) { - return Err(StoreError::UnknownHeadBlock(data.head.root)); - } - - // Topology Check - Source must be older than Target. - if data.source.slot > data.target.slot { - return Err(StoreError::SourceExceedsTarget); - } + // Time Check - Validate attestation is not too far in the future. + // We allow a small margin for clock disparity (1 slot), but no further. + let current_slot = store.time() / SECONDS_PER_SLOT; + if data.slot > current_slot + 1 { + return Err(StoreError::AttestationTooFarInFuture { + attestation_slot: data.slot, + current_slot, + }); + } - // Consistency Check - Validate checkpoint slots match block slots. - if source_block.slot != data.source.slot { - return Err(StoreError::SourceSlotMismatch { - checkpoint_slot: data.source.slot, - block_slot: source_block.slot, - }); - } - if target_block.slot != data.target.slot { - return Err(StoreError::TargetSlotMismatch { - checkpoint_slot: data.target.slot, - block_slot: target_block.slot, - }); - } + Ok(()) +} - // Time Check - Validate attestation is not too far in the future. - // We allow a small margin for clock disparity (1 slot), but no further. - let current_slot = self.time / SECONDS_PER_SLOT; - if data.slot > current_slot + 1 { - return Err(StoreError::AttestationTooFarInFuture { - attestation_slot: data.slot, - current_slot, - }); - } +/// Process a tick event. +pub fn on_tick(store: &mut Store, timestamp: u64, has_proposal: bool) { + let time = timestamp - store.config().genesis_time; - Ok(()) + // If we're more than a slot behind, fast-forward to a slot before. + // Operations are idempotent, so this should be fine. + if time.saturating_sub(store.time()) > SECONDS_PER_SLOT { + store.set_time(time - SECONDS_PER_SLOT); } - pub fn on_tick(&mut self, timestamp: u64, has_proposal: bool) { - let time = timestamp - self.config.genesis_time; - - // If we're more than a slot behind, fast-forward to a slot before. - // Operations are idempotent, so this should be fine. - if time.saturating_sub(self.time) > SECONDS_PER_SLOT { - self.time = time - SECONDS_PER_SLOT; - } - - while self.time < time { - self.time += 1; + while store.time() < time { + store.set_time(store.time() + 1); - let slot = self.time / SECONDS_PER_SLOT; - let interval = self.time % SECONDS_PER_SLOT; + let slot = store.time() / SECONDS_PER_SLOT; + let interval = store.time() % SECONDS_PER_SLOT; - trace!(%slot, %interval, "processing tick"); + trace!(%slot, %interval, "processing tick"); - // has_proposal is only signaled for the final tick (matching Python spec behavior) - let is_final_tick = self.time == time; - let should_signal_proposal = has_proposal && is_final_tick; + // has_proposal is only signaled for the final tick (matching Python spec behavior) + let is_final_tick = store.time() == time; + let should_signal_proposal = has_proposal && is_final_tick; - // NOTE: here we assume on_tick never skips intervals - match interval { - 0 => { - // Start of slot - process attestations if proposal exists - if should_signal_proposal { - self.accept_new_attestations(); - } - } - 1 => { - // Second interval - no action - } - 2 => { - // Mid-slot - update safe target for validators - self.update_safe_target(); - } - 3 => { - // End of slot - accept accumulated attestations - self.accept_new_attestations(); + // NOTE: here we assume on_tick never skips intervals + match interval { + 0 => { + // Start of slot - process attestations if proposal exists + if should_signal_proposal { + accept_new_attestations(store); } - _ => unreachable!("slots only have 4 intervals"), - } - } - } - - pub fn on_gossip_attestation( - &mut self, - signed_attestation: SignedAttestation, - ) -> Result<(), StoreError> { - let validator_id = signed_attestation.validator_id; - let attestation = Attestation { - validator_id, - data: signed_attestation.message, - }; - if let Err(err) = self.validate_attestation(&attestation) { - metrics::inc_attestations_invalid("gossip"); - return Err(err); - } - let target = attestation.data.target; - let target_state = self - .states - .get(&target.root) - .ok_or(StoreError::MissingTargetState(target.root))?; - if validator_id >= target_state.validators.len() as u64 { - return Err(StoreError::InvalidValidatorIndex); - } - let validator_pubkey = target_state.validators[validator_id as usize] - .get_pubkey() - .map_err(|_| StoreError::PubkeyDecodingFailed(validator_id))?; - let message = attestation.data.tree_hash_root(); - if cfg!(not(feature = "skip-signature-verification")) { - use ethlambda_types::signature::ValidatorSignature; - // Use attestation.data.slot as epoch (matching what Zeam and ethlambda use for signing) - let epoch: u32 = attestation.data.slot.try_into().expect("slot exceeds u32"); - let signature = ValidatorSignature::from_bytes(&signed_attestation.signature) - .map_err(|_| StoreError::SignatureDecodingFailed)?; - if !signature.is_valid(&validator_pubkey, epoch, &message) { - return Err(StoreError::SignatureVerificationFailed); } - } - self.on_attestation(attestation, false)?; - - if cfg!(not(feature = "skip-signature-verification")) { - // Store signature for later lookup during block building - let signature_key = (validator_id, message); - let signature = ValidatorSignature::from_bytes(&signed_attestation.signature) - .map_err(|_| StoreError::SignatureDecodingFailed)?; - self.gossip_signatures.insert(signature_key, signature); - metrics::inc_attestations_valid("gossip"); - } - Ok(()) - } - - /// Process a new attestation and place it into the correct attestation stage. - /// - /// Attestations can come from: - /// - a block body (on-chain, `is_from_block=true`), or - /// - the gossip network (off-chain, `is_from_block=false`). - /// - /// The Attestation Pipeline: - /// - Stage 1 (latest_new_attestations): Pending attestations not yet counted in fork choice. - /// - Stage 2 (latest_known_attestations): Active attestations used by LMD-GHOST. - fn on_attestation( - &mut self, - attestation: Attestation, - is_from_block: bool, - ) -> Result<(), StoreError> { - // First, ensure the attestation is structurally and temporally valid. - self.validate_attestation(&attestation)?; - - let validator_id = attestation.validator_id; - let attestation_data = attestation.data; - let attestation_slot = attestation_data.slot; - - if is_from_block { - // On-chain attestation processing - // These are historical attestations from other validators included by the proposer. - // They are processed immediately as "known" attestations. - - let should_update = self - .latest_known_attestations - .get(&validator_id) - .is_none_or(|latest| latest.slot < attestation_slot); - - if should_update { - self.latest_known_attestations - .insert(validator_id, attestation_data.clone()); - } - - // Remove pending attestation if superseded by on-chain attestation - if let Some(existing_new) = self.latest_new_attestations.get(&validator_id) - && existing_new.slot <= attestation_slot - { - self.latest_new_attestations.remove(&validator_id); + 1 => { + // Second interval - no action } - } else { - // Network gossip attestation processing - // These enter the "new" stage and must wait for interval tick acceptance. - - // Reject attestations from future slots - let current_slot = self.time / SECONDS_PER_SLOT; - if attestation_slot > current_slot { - return Err(StoreError::AttestationTooFarInFuture { - attestation_slot, - current_slot, - }); + 2 => { + // Mid-slot - update safe target for validators + update_safe_target(store); } - - let should_update = self - .latest_new_attestations - .get(&validator_id) - .is_none_or(|latest| latest.slot < attestation_slot); - - if should_update { - self.latest_new_attestations - .insert(validator_id, attestation_data); + 3 => { + // End of slot - accept accumulated attestations + accept_new_attestations(store); } + _ => unreachable!("slots only have 4 intervals"), } - - Ok(()) } +} - /// Process a new block and update the forkchoice state. - /// - /// This method integrates a block into the forkchoice store by: - /// 1. Validating the block's parent exists - /// 2. Computing the post-state via the state transition function - /// 3. Processing attestations included in the block body (on-chain) - /// 4. Updating the forkchoice head - /// 5. Processing the proposer's attestation (as if gossiped) - pub fn on_block(&mut self, signed_block: SignedBlockWithAttestation) -> Result<(), StoreError> { - // Unpack block components - let block = signed_block.message.block.clone(); - let proposer_attestation = signed_block.message.proposer_attestation.clone(); - let block_root = block.tree_hash_root(); - let slot = block.slot; - - // Skip duplicate blocks (idempotent operation) - if self.blocks.contains_key(&block_root) { - return Ok(()); +/// Process a gossiped attestation. +pub fn on_gossip_attestation( + store: &mut Store, + signed_attestation: SignedAttestation, +) -> Result<(), StoreError> { + let validator_id = signed_attestation.validator_id; + let attestation = Attestation { + validator_id, + data: signed_attestation.message, + }; + validate_attestation(store, &attestation) + .inspect_err(|_| metrics::inc_attestations_invalid("gossip"))?; + let target = attestation.data.target; + let target_state = store + .get_state(&target.root) + .ok_or(StoreError::MissingTargetState(target.root))?; + if validator_id >= target_state.validators.len() as u64 { + return Err(StoreError::InvalidValidatorIndex); + } + let validator_pubkey = target_state.validators[validator_id as usize] + .get_pubkey() + .map_err(|_| StoreError::PubkeyDecodingFailed(validator_id))?; + let message = attestation.data.tree_hash_root(); + if cfg!(not(feature = "skip-signature-verification")) { + use ethlambda_types::signature::ValidatorSignature; + // Use attestation.data.slot as epoch (matching what Zeam and ethlambda use for signing) + let epoch: u32 = attestation.data.slot.try_into().expect("slot exceeds u32"); + let signature = ValidatorSignature::from_bytes(&signed_attestation.signature) + .map_err(|_| StoreError::SignatureDecodingFailed)?; + if !signature.is_valid(&validator_pubkey, epoch, &message) { + return Err(StoreError::SignatureVerificationFailed); } + } + on_attestation(store, attestation, false)?; + + if cfg!(not(feature = "skip-signature-verification")) { + // Store signature for later lookup during block building + let signature_key = (validator_id, message); + let signature = ValidatorSignature::from_bytes(&signed_attestation.signature) + .map_err(|_| StoreError::SignatureDecodingFailed)?; + store.insert_gossip_signature(signature_key, signature); + } + metrics::inc_attestations_valid("gossip"); + Ok(()) +} - // Verify parent chain is available - // TODO: sync parent chain if parent is missing - let parent_state = - self.states - .get(&block.parent_root) - .ok_or(StoreError::MissingParentState { - parent_root: block.parent_root, - slot, - })?; - - // Validate cryptographic signatures - // TODO: extract signature verification to a pre-checks function - // to avoid the need for this - if cfg!(not(feature = "skip-signature-verification")) { - verify_signatures(parent_state, &signed_block)?; - } +/// Process a new attestation and place it into the correct attestation stage. +/// +/// Attestations can come from: +/// - a block body (on-chain, `is_from_block=true`), or +/// - the gossip network (off-chain, `is_from_block=false`). +/// +/// The Attestation Pipeline: +/// - Stage 1 (latest_new_attestations): Pending attestations not yet counted in fork choice. +/// - Stage 2 (latest_known_attestations): Active attestations used by LMD-GHOST. +fn on_attestation( + store: &mut Store, + attestation: Attestation, + is_from_block: bool, +) -> Result<(), StoreError> { + // First, ensure the attestation is structurally and temporally valid. + validate_attestation(store, &attestation)?; - // Execute state transition function to compute post-block state - let mut post_state = parent_state.clone(); - ethlambda_state_transition::state_transition(&mut post_state, &block)?; + let validator_id = attestation.validator_id; + let attestation_data = attestation.data; + let attestation_slot = attestation_data.slot; - // Cache the state root in the latest block header - let state_root = block.state_root; - post_state.latest_block_header.state_root = state_root; + if is_from_block { + // On-chain attestation processing + // These are historical attestations from other validators included by the proposer. + // They are processed immediately as "known" attestations. - // If post-state has a higher justified checkpoint, update the store - if post_state.latest_justified.slot > self.latest_justified.slot { - self.latest_justified = post_state.latest_justified; - } + let should_update = store + .get_known_attestation(&validator_id) + .is_none_or(|latest| latest.slot < attestation_slot); - // If post-state has a higher finalized checkpoint, update the store - if post_state.latest_finalized.slot > self.latest_finalized.slot { - self.latest_finalized = post_state.latest_finalized; + if should_update { + store.insert_known_attestation(validator_id, attestation_data.clone()); } - // Store block and state - self.blocks.insert(block_root, block.clone()); - self.states.insert(block_root, post_state); - - // Process block body attestations and their signatures - let aggregated_attestations = &block.body.attestations; - let attestation_signatures = &signed_block.signature.attestation_signatures; - - // Process block body attestations. - // TODO: fail the block if an attestation is invalid. Right now we - // just log a warning. - for (att, proof) in aggregated_attestations - .iter() - .zip(attestation_signatures.iter()) + // Remove pending attestation if superseded by on-chain attestation + if let Some(existing_new) = store.get_new_attestation(&validator_id) + && existing_new.slot <= attestation_slot { - let validator_ids = aggregation_bits_to_validator_indices(&att.aggregation_bits); - let data_root = att.data.tree_hash_root(); - - for validator_id in validator_ids { - // Update Proof Map - Store the proof so future block builders can reuse this aggregation - let key: SignatureKey = (validator_id, data_root); - self.aggregated_payloads - .entry(key) - .or_default() - .push(proof.clone()); - - // Update Fork Choice - Register the vote immediately (historical/on-chain) - let attestation = Attestation { - validator_id, - data: att.data.clone(), - }; - // TODO: validate attestations before processing - if let Err(err) = self.on_attestation(attestation, true) { - warn!(%slot, %validator_id, %err, "Invalid attestation in block"); - metrics::inc_attestations_invalid("block"); - } else { - metrics::inc_attestations_valid("block"); - } - } + store.remove_new_attestation(&validator_id); } + } else { + // Network gossip attestation processing + // These enter the "new" stage and must wait for interval tick acceptance. - // Update forkchoice head based on new block and attestations - // IMPORTANT: This must happen BEFORE processing proposer attestation - // to prevent the proposer from gaining circular weight advantage. - self.update_head(); - - // Process proposer attestation as if received via gossip - // The proposer's attestation should NOT affect this block's fork choice position. - // It is treated as pending until interval 3 (end of slot). - - if cfg!(not(feature = "skip-signature-verification")) { - // Store the proposer's signature for potential future block building - let proposer_sig_key: SignatureKey = ( - proposer_attestation.validator_id, - proposer_attestation.data.tree_hash_root(), - ); - let proposer_sig = - ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature) - .map_err(|_| StoreError::SignatureDecodingFailed)?; - self.gossip_signatures - .insert(proposer_sig_key, proposer_sig); + // Reject attestations from future slots + let current_slot = store.time() / SECONDS_PER_SLOT; + if attestation_slot > current_slot { + return Err(StoreError::AttestationTooFarInFuture { + attestation_slot, + current_slot, + }); } - // Process proposer attestation (enters "new" stage, not "known") - // TODO: validate attestations before processing - if let Err(err) = self.on_attestation(proposer_attestation, false) { - warn!(%slot, %err, "Invalid proposer attestation in block"); - } + let should_update = store + .get_new_attestation(&validator_id) + .is_none_or(|latest| latest.slot < attestation_slot); - info!(%slot, %block_root, %state_root, "Processed new block"); - Ok(()) + if should_update { + store.insert_new_attestation(validator_id, attestation_data); + } } - /// Calculate target checkpoint for validator attestations. - /// - /// NOTE: this assumes that we have all the blocks from the head back to the latest finalized. - pub fn get_attestation_target(&self) -> Checkpoint { - // Start from current head - let mut target_block_root = self.head; - let mut target_block = &self.blocks[&target_block_root]; - - let safe_target_block_slot = self.blocks[&self.safe_target].slot; - - // Walk back toward safe target (up to `JUSTIFICATION_LOOKBACK_SLOTS` steps) - // - // This ensures the target doesn't advance too far ahead of safe target, - // providing a balance between liveness and safety. - for _ in 0..JUSTIFICATION_LOOKBACK_SLOTS { - if target_block.slot > safe_target_block_slot { - target_block_root = target_block.parent_root; - target_block = &self.blocks[&target_block_root]; - } else { - break; - } - } + Ok(()) +} - // Ensure target is in justifiable slot range - // - // Walk back until we find a slot that satisfies justifiability rules - // relative to the latest finalized checkpoint. - while !slot_is_justifiable_after(target_block.slot, self.latest_finalized.slot) { - target_block_root = target_block.parent_root; - target_block = &self.blocks[&target_block_root]; - } - Checkpoint { - root: target_block_root, - slot: target_block.slot, - } +/// Process a new block and update the forkchoice state. +/// +/// This method integrates a block into the forkchoice store by: +/// 1. Validating the block's parent exists +/// 2. Computing the post-state via the state transition function +/// 3. Processing attestations included in the block body (on-chain) +/// 4. Updating the forkchoice head +/// 5. Processing the proposer's attestation (as if gossiped) +pub fn on_block( + store: &mut Store, + signed_block: SignedBlockWithAttestation, +) -> Result<(), StoreError> { + // Unpack block components + let block = signed_block.message.block.clone(); + let proposer_attestation = signed_block.message.proposer_attestation.clone(); + let block_root = block.tree_hash_root(); + let slot = block.slot; + + // Skip duplicate blocks (idempotent operation) + if store.contains_block(&block_root) { + return Ok(()); } - /// Produce attestation data for the given slot. - pub fn produce_attestation_data(&self, slot: u64) -> AttestationData { - // Get the head block the validator sees for this slot - let head_checkpoint = Checkpoint { - root: self.head, - slot: self.blocks[&self.head].slot, - }; - - // Calculate the target checkpoint for this attestation - let target_checkpoint = self.get_attestation_target(); + // Verify parent chain is available + // TODO: sync parent chain if parent is missing + let parent_state = + store + .get_state(&block.parent_root) + .ok_or(StoreError::MissingParentState { + parent_root: block.parent_root, + slot, + })?; - // Construct attestation data - AttestationData { - slot, - head: head_checkpoint, - target: target_checkpoint, - source: self.latest_justified, - } + // Validate cryptographic signatures + // TODO: extract signature verification to a pre-checks function + // to avoid the need for this + if cfg!(not(feature = "skip-signature-verification")) { + verify_signatures(&parent_state, &signed_block)?; } - /// Get the head for block proposal at the given slot. - /// - /// Ensures store is up-to-date and processes any pending attestations - /// before returning the canonical head. - pub fn get_proposal_head(&mut self, slot: u64) -> H256 { - // Calculate time corresponding to this slot - let slot_time = self.config.genesis_time + slot * SECONDS_PER_SLOT; + // Execute state transition function to compute post-block state + let mut post_state = parent_state.clone(); + ethlambda_state_transition::state_transition(&mut post_state, &block)?; - // Advance time to current slot (ticking intervals) - self.on_tick(slot_time, true); + // Cache the state root in the latest block header + let state_root = block.state_root; + post_state.latest_block_header.state_root = state_root; - // Process any pending attestations before proposal - self.accept_new_attestations(); + // Update justified/finalized checkpoints if they have higher slots + let justified = (post_state.latest_justified.slot > store.latest_justified().slot) + .then_some(post_state.latest_justified); + let finalized = (post_state.latest_finalized.slot > store.latest_finalized().slot) + .then_some(post_state.latest_finalized); - self.head + if justified.is_some() || finalized.is_some() { + store.update_checkpoints(ForkCheckpoints::new(store.head(), justified, finalized)); } - /// Produce a block and per-aggregated-attestation signature payloads for the target slot. - /// - /// Returns the finalized block and attestation signature payloads aligned - /// with `block.body.attestations`. - pub fn produce_block_with_signatures( - &mut self, - slot: u64, - validator_index: u64, - ) -> Result<(Block, Vec), StoreError> { - // Get parent block and state to build upon - let head_root = self.get_proposal_head(slot); - let head_state = self - .states - .get(&head_root) - .ok_or(StoreError::MissingParentState { - parent_root: head_root, - slot, - })? - .clone(); - - // Validate proposer authorization for this slot - let num_validators = head_state.validators.len() as u64; - if !is_proposer(validator_index, slot, num_validators) { - return Err(StoreError::NotProposer { - validator_index, - slot, - }); - } + // Store block and state + store.insert_block(block_root, block.clone()); + store.insert_state(block_root, post_state); - // Convert AttestationData to Attestation objects for build_block - let available_attestations: Vec = self - .latest_known_attestations - .iter() - .map(|(&validator_id, data)| Attestation { + // Process block body attestations and their signatures + let aggregated_attestations = &block.body.attestations; + let attestation_signatures = &signed_block.signature.attestation_signatures; + + // Process block body attestations. + // TODO: fail the block if an attestation is invalid. Right now we + // just log a warning. + for (att, proof) in aggregated_attestations + .iter() + .zip(attestation_signatures.iter()) + { + let validator_ids = aggregation_bits_to_validator_indices(&att.aggregation_bits); + let data_root = att.data.tree_hash_root(); + + for validator_id in validator_ids { + // Update Proof Map - Store the proof so future block builders can reuse this aggregation + let key: SignatureKey = (validator_id, data_root); + store.push_aggregated_payload(key, proof.clone()); + + // Update Fork Choice - Register the vote immediately (historical/on-chain) + let attestation = Attestation { validator_id, - data: data.clone(), - }) - .collect(); + data: att.data.clone(), + }; + // TODO: validate attestations before processing + if let Err(err) = on_attestation(store, attestation, true) { + warn!(%slot, %validator_id, %err, "Invalid attestation in block"); + metrics::inc_attestations_invalid("block"); + } else { + metrics::inc_attestations_valid("block"); + } + } + } - // Get known block roots for attestation validation - let known_block_roots: HashSet = self.blocks.keys().copied().collect(); + // Update forkchoice head based on new block and attestations + // IMPORTANT: This must happen BEFORE processing proposer attestation + // to prevent the proposer from gaining circular weight advantage. + update_head(store); - // Build the block using fixed-point attestation collection - let (block, _post_state, signatures) = build_block( - &head_state, - slot, - validator_index, - head_root, - &available_attestations, - &known_block_roots, - &self.gossip_signatures, - &self.aggregated_payloads, - )?; - - Ok((block, signatures)) - } + // Process proposer attestation as if received via gossip + // The proposer's attestation should NOT affect this block's fork choice position. + // It is treated as pending until interval 3 (end of slot). - /// Returns the root of the current canonical chain head block. - pub fn head(&self) -> H256 { - self.head + if cfg!(not(feature = "skip-signature-verification")) { + // Store the proposer's signature for potential future block building + let proposer_sig_key: SignatureKey = ( + proposer_attestation.validator_id, + proposer_attestation.data.tree_hash_root(), + ); + let proposer_sig = + ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature) + .map_err(|_| StoreError::SignatureDecodingFailed)?; + store.insert_gossip_signature(proposer_sig_key, proposer_sig); } - /// Returns a reference to all known blocks. - pub fn blocks(&self) -> &HashMap { - &self.blocks + // Process proposer attestation (enters "new" stage, not "known") + // TODO: validate attestations before processing + if let Err(err) = on_attestation(store, proposer_attestation, false) { + metrics::inc_attestations_invalid("block"); + warn!(%slot, %err, "Invalid proposer attestation in block"); + } else { + metrics::inc_attestations_valid("gossip"); } - /// Returns a reference to the latest known attestations by validator. - pub fn latest_known_attestations(&self) -> &HashMap { - &self.latest_known_attestations - } + info!(%slot, %block_root, %state_root, "Processed new block"); + Ok(()) +} - /// Returns a reference to the latest new (pending) attestations by validator. - pub fn latest_new_attestations(&self) -> &HashMap { - &self.latest_new_attestations +/// Calculate target checkpoint for validator attestations. +/// +/// NOTE: this assumes that we have all the blocks from the head back to the latest finalized. +pub fn get_attestation_target(store: &Store) -> Checkpoint { + // Start from current head + let mut target_block_root = store.head(); + let mut target_block = store + .get_block(&target_block_root) + .expect("head block exists"); + + let safe_target_block_slot = store + .get_block(&store.safe_target()) + .expect("safe target exists") + .slot; + + // Walk back toward safe target (up to `JUSTIFICATION_LOOKBACK_SLOTS` steps) + // + // This ensures the target doesn't advance too far ahead of safe target, + // providing a balance between liveness and safety. + for _ in 0..JUSTIFICATION_LOOKBACK_SLOTS { + if target_block.slot > safe_target_block_slot { + target_block_root = target_block.parent_root; + target_block = store + .get_block(&target_block_root) + .expect("parent block exists"); + } else { + break; + } } - /// Returns a reference to the latest justified checkpoint. - pub fn latest_justified(&self) -> &Checkpoint { - &self.latest_justified + // Ensure target is in justifiable slot range + // + // Walk back until we find a slot that satisfies justifiability rules + // relative to the latest finalized checkpoint. + while !slot_is_justifiable_after(target_block.slot, store.latest_finalized().slot) { + target_block_root = target_block.parent_root; + target_block = store + .get_block(&target_block_root) + .expect("parent block exists"); } - - /// Returns a reference to the latest finalized checkpoint. - pub fn latest_finalized(&self) -> &Checkpoint { - &self.latest_finalized + Checkpoint { + root: target_block_root, + slot: target_block.slot, } +} - /// Returns a reference to the chain configuration. - pub fn config(&self) -> &ChainConfig { - &self.config - } +/// Produce attestation data for the given slot. +pub fn produce_attestation_data(store: &Store, slot: u64) -> AttestationData { + // Get the head block the validator sees for this slot + let head_checkpoint = Checkpoint { + root: store.head(), + slot: store + .get_block(&store.head()) + .expect("head block exists") + .slot, + }; - /// Returns a reference to the head state if it exists. - pub fn head_state(&self) -> &State { - self.states - .get(&self.head) - .expect("head state is always available") - } + // Calculate the target checkpoint for this attestation + let target_checkpoint = get_attestation_target(store); - /// Returns the slot of the current safe target block. - pub fn safe_target_slot(&self) -> u64 { - self.blocks[&self.safe_target].slot + // Construct attestation data + AttestationData { + slot, + head: head_checkpoint, + target: target_checkpoint, + source: store.latest_justified(), } } -/// Check if a head change represents a reorg. +/// Get the head for block proposal at the given slot. /// -/// A reorg occurs when the chains diverge - i.e., when walking back from the higher -/// slot head to the lower slot head's slot, we don't arrive at the lower slot head. -fn is_reorg(old_head: H256, new_head: H256, blocks: &HashMap) -> bool { - if new_head == old_head { - return false; - } +/// Ensures store is up-to-date and processes any pending attestations +/// before returning the canonical head. +fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { + // Calculate time corresponding to this slot + let slot_time = store.config().genesis_time + slot * SECONDS_PER_SLOT; - let Some(old_head_block) = blocks.get(&old_head) else { - return false; - }; + // Advance time to current slot (ticking intervals) + on_tick(store, slot_time, true); - let Some(new_head_block) = blocks.get(&new_head) else { - return false; - }; + // Process any pending attestations before proposal + accept_new_attestations(store); - let old_slot = old_head_block.slot; - let new_slot = new_head_block.slot; + store.head() +} - // Determine which head has the higher slot and walk back from it - let (mut current_root, target_slot, target_root) = if new_slot >= old_slot { - (new_head, old_slot, old_head) - } else { - (old_head, new_slot, new_head) - }; +/// Produce a block and per-aggregated-attestation signature payloads for the target slot. +/// +/// Returns the finalized block and attestation signature payloads aligned +/// with `block.body.attestations`. +pub fn produce_block_with_signatures( + store: &mut Store, + slot: u64, + validator_index: u64, +) -> Result<(Block, Vec), StoreError> { + // Get parent block and state to build upon + let head_root = get_proposal_head(store, slot); + let head_state = store + .get_state(&head_root) + .ok_or(StoreError::MissingParentState { + parent_root: head_root, + slot, + })? + .clone(); - // Walk back through the chain until we reach the target slot - while let Some(current_block) = blocks.get(¤t_root) { - if current_block.slot <= target_slot { - // We've reached the target slot - check if we're at the target block - return current_root != target_root; - } - current_root = current_block.parent_root; + // Validate proposer authorization for this slot + let num_validators = head_state.validators.len() as u64; + if !is_proposer(validator_index, slot, num_validators) { + return Err(StoreError::NotProposer { + validator_index, + slot, + }); } - // Couldn't walk back far enough (missing blocks in chain) - // Conservative: assume no reorg if we can't determine - false + // Convert AttestationData to Attestation objects for build_block + let available_attestations: Vec = store + .iter_known_attestations() + .map(|(validator_id, data)| Attestation { validator_id, data }) + .collect(); + + // Get known block roots for attestation validation + let known_block_roots: HashSet = store.iter_blocks().map(|(root, _)| root).collect(); + + // Collect signature data for block building + let gossip_signatures: HashMap = + store.iter_gossip_signatures().collect(); + let aggregated_payloads: HashMap> = + store.iter_aggregated_payloads().collect(); + + // Build the block using fixed-point attestation collection + let (block, _post_state, signatures) = build_block( + &head_state, + slot, + validator_index, + head_root, + &available_attestations, + &known_block_roots, + &gossip_signatures, + &aggregated_payloads, + )?; + + Ok((block, signatures)) } /// Errors that can occur during Store operations. @@ -1213,3 +989,44 @@ fn verify_signatures( } Ok(()) } + +/// Check if a head change represents a reorg. +/// +/// A reorg occurs when the chains diverge - i.e., when walking back from the higher +/// slot head to the lower slot head's slot, we don't arrive at the lower slot head. +fn is_reorg(old_head: H256, new_head: H256, store: &Store) -> bool { + if new_head == old_head { + return false; + } + + let Some(old_head_block) = store.get_block(&old_head) else { + return false; + }; + + let Some(new_head_block) = store.get_block(&new_head) else { + return false; + }; + + let old_slot = old_head_block.slot; + let new_slot = new_head_block.slot; + + // Determine which head has the higher slot and walk back from it + let (mut current_root, target_slot, target_root) = if new_slot >= old_slot { + (new_head, old_slot, old_head) + } else { + (old_head, new_slot, new_head) + }; + + // Walk back through the chain until we reach the target slot + while let Some(current_block) = store.get_block(¤t_root) { + if current_block.slot <= target_slot { + // We've reached the target slot - check if we're at the target block + return current_root != target_root; + } + current_root = current_block.parent_root; + } + + // Couldn't walk back far enough (missing blocks in chain) + // Conservative: assume no reorg if we can't determine + false +} diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index 718ced4..a0ee5d8 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -3,7 +3,8 @@ use std::{ path::Path, }; -use ethlambda_blockchain::{SECONDS_PER_SLOT, store::Store}; +use ethlambda_blockchain::{SECONDS_PER_SLOT, store}; +use ethlambda_storage::Store; use ethlambda_types::{ attestation::Attestation, block::{Block, BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation}, @@ -58,8 +59,8 @@ fn run(path: &Path) -> datatest_stable::Result<()> { signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time; // NOTE: the has_proposal argument is set to true, following the spec - store.on_tick(block_time, true); - let result = store.on_block(signed_block); + store::on_tick(&mut store, block_time, true); + let result = store::on_block(&mut store, signed_block); match (result.is_ok(), step.valid) { (true, false) => { @@ -83,7 +84,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { "tick" => { let timestamp = step.time.expect("tick step missing time"); // NOTE: the has_proposal argument is set to false, following the spec - store.on_tick(timestamp, false); + store::on_tick(&mut store, timestamp, false); } other => { // Fail for unsupported step types for now @@ -117,7 +118,7 @@ fn build_signed_block(block_data: types::BlockStepData) -> SignedBlockWithAttest } fn validate_checks( - store: &Store, + st: &Store, checks: &StoreChecks, step_idx: usize, block_registry: &HashMap, @@ -152,7 +153,7 @@ fn validate_checks( } // Validate attestationTargetSlot if let Some(expected_slot) = checks.attestation_target_slot { - let target = store.get_attestation_target(); + let target = store::get_attestation_target(st); if target.slot != expected_slot { return Err(format!( "Step {}: attestationTargetSlot mismatch: expected {}, got {}", @@ -162,14 +163,13 @@ fn validate_checks( } // Also validate the root matches a block at this slot - let block_found = store - .blocks() + let blocks: HashMap = st.iter_blocks().collect(); + let block_found = blocks .iter() .any(|(root, block)| block.slot == expected_slot && *root == target.root); if !block_found { - let available: Vec<_> = store - .blocks() + let available: Vec<_> = blocks .iter() .filter(|(_, block)| block.slot == expected_slot) .map(|(root, _)| format!("{:?}", root)) @@ -184,10 +184,9 @@ fn validate_checks( // Validate headSlot if let Some(expected_slot) = checks.head_slot { - let head_root = store.head(); - let head_block = store - .blocks() - .get(&head_root) + let head_root = st.head(); + let head_block = st + .get_block(&head_root) .ok_or_else(|| format!("Step {}: head block not found", step_idx))?; if head_block.slot != expected_slot { return Err(format!( @@ -200,7 +199,7 @@ fn validate_checks( // Validate headRoot if let Some(ref expected_root) = checks.head_root { - let head_root = store.head(); + let head_root = st.head(); if head_root != *expected_root { return Err(format!( "Step {}: headRoot mismatch: expected {:?}, got {:?}", @@ -212,7 +211,7 @@ fn validate_checks( // Validate latestJustifiedSlot if let Some(expected_slot) = checks.latest_justified_slot { - let justified = store.latest_justified(); + let justified = st.latest_justified(); if justified.slot != expected_slot { return Err(format!( "Step {}: latestJustifiedSlot mismatch: expected {}, got {}", @@ -224,7 +223,7 @@ fn validate_checks( // Validate latestJustifiedRoot if let Some(ref expected_root) = checks.latest_justified_root { - let justified = store.latest_justified(); + let justified = st.latest_justified(); if justified.root != *expected_root { return Err(format!( "Step {}: latestJustifiedRoot mismatch: expected {:?}, got {:?}", @@ -236,7 +235,7 @@ fn validate_checks( // Validate latestFinalizedSlot if let Some(expected_slot) = checks.latest_finalized_slot { - let finalized = store.latest_finalized(); + let finalized = st.latest_finalized(); if finalized.slot != expected_slot { return Err(format!( "Step {}: latestFinalizedSlot mismatch: expected {}, got {}", @@ -248,7 +247,7 @@ fn validate_checks( // Validate latestFinalizedRoot if let Some(ref expected_root) = checks.latest_finalized_root { - let finalized = store.latest_finalized(); + let finalized = st.latest_finalized(); if finalized.root != *expected_root { return Err(format!( "Step {}: latestFinalizedRoot mismatch: expected {:?}, got {:?}", @@ -261,29 +260,31 @@ fn validate_checks( // Validate attestationChecks if let Some(ref att_checks) = checks.attestation_checks { for att_check in att_checks { - validate_attestation_check(store, att_check, step_idx)?; + validate_attestation_check(st, att_check, step_idx)?; } } // Validate lexicographicHeadAmong if let Some(ref fork_labels) = checks.lexicographic_head_among { - validate_lexicographic_head_among(store, fork_labels, step_idx, block_registry)?; + validate_lexicographic_head_among(st, fork_labels, step_idx, block_registry)?; } Ok(()) } fn validate_attestation_check( - store: &Store, + st: &Store, check: &types::AttestationCheck, step_idx: usize, ) -> datatest_stable::Result<()> { + use ethlambda_types::attestation::AttestationData; + let validator_id = check.validator; let location = check.location.as_str(); - let attestations = match location { - "new" => store.latest_new_attestations(), - "known" => store.latest_known_attestations(), + let attestations: HashMap = match location { + "new" => st.iter_new_attestations().collect(), + "known" => st.iter_known_attestations().collect(), other => { return Err( format!("Step {}: unknown attestation location: {}", step_idx, other).into(), @@ -345,11 +346,13 @@ fn validate_attestation_check( } fn validate_lexicographic_head_among( - store: &Store, + st: &Store, fork_labels: &[String], step_idx: usize, block_registry: &HashMap, ) -> datatest_stable::Result<()> { + use ethlambda_types::attestation::AttestationData; + // Require at least 2 forks to test tiebreaker if fork_labels.len() < 2 { return Err(format!( @@ -360,7 +363,8 @@ fn validate_lexicographic_head_among( .into()); } - let blocks = store.blocks(); + let blocks: HashMap = st.iter_blocks().collect(); + let known_attestations: HashMap = st.iter_known_attestations().collect(); // Resolve all fork labels to roots and compute their weights // Map: label -> (root, slot, weight) @@ -385,7 +389,7 @@ fn validate_lexicographic_head_among( // Calculate attestation weight: count attestations voting for this fork // An attestation votes for this fork if its head is this block or a descendant let mut weight = 0; - for attestation in store.latest_known_attestations().values() { + for attestation in known_attestations.values() { let att_head_root = attestation.head.root; // Check if attestation head is this block or a descendant if att_head_root == *root { @@ -451,7 +455,7 @@ fn validate_lexicographic_head_among( .expect("fork_data is not empty"); // Verify the current head matches the lexicographically highest root - let actual_head_root = store.head(); + let actual_head_root = st.head(); if actual_head_root != expected_head_root { let highest_label = fork_data .iter() diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index 0318d35..a0fb24c 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -1,6 +1,7 @@ use std::path::Path; -use ethlambda_blockchain::{SECONDS_PER_SLOT, store::Store}; +use ethlambda_blockchain::{SECONDS_PER_SLOT, store}; +use ethlambda_storage::Store; use ethlambda_types::{ block::{Block, SignedBlockWithAttestation}, primitives::TreeHash, @@ -40,17 +41,17 @@ fn run(path: &Path) -> datatest_stable::Result<()> { // Initialize the store with the anchor state and block let genesis_time = anchor_state.config.genesis_time; - let mut store = Store::get_forkchoice_store(anchor_state, anchor_block); + let mut st = Store::get_forkchoice_store(anchor_state, anchor_block); // Step 2: Run the state transition function with the block fixture let signed_block: SignedBlockWithAttestation = test.signed_block_with_attestation.into(); // Advance time to the block's slot let block_time = signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time; - store.on_tick(block_time, true); + store::on_tick(&mut st, block_time, true); // Process the block (this includes signature verification) - let result = store.on_block(signed_block); + let result = store::on_block(&mut st, signed_block); // Step 3: Check that it succeeded or failed as expected match (result.is_ok(), test.expect_exception.as_ref()) { diff --git a/crates/storage/src/api/mod.rs b/crates/storage/src/api/mod.rs new file mode 100644 index 0000000..f93e9d1 --- /dev/null +++ b/crates/storage/src/api/mod.rs @@ -0,0 +1,5 @@ +mod tables; +mod traits; + +pub use tables::{ALL_TABLES, Table}; +pub use traits::{Error, PrefixResult, StorageBackend, StorageReadView, StorageWriteBatch}; diff --git a/crates/storage/src/api/tables.rs b/crates/storage/src/api/tables.rs new file mode 100644 index 0000000..0b99940 --- /dev/null +++ b/crates/storage/src/api/tables.rs @@ -0,0 +1,29 @@ +/// Tables in the storage layer. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum Table { + /// Block storage: H256 -> Block + Blocks, + /// State storage: H256 -> State + States, + /// Known attestations: u64 -> AttestationData + LatestKnownAttestations, + /// Pending attestations: u64 -> AttestationData + LatestNewAttestations, + /// Gossip signatures: SignatureKey -> ValidatorSignature + GossipSignatures, + /// Aggregated proofs: SignatureKey -> Vec + AggregatedPayloads, + /// Metadata: string keys -> various scalar values + Metadata, +} + +/// All table variants. +pub const ALL_TABLES: [Table; 7] = [ + Table::Blocks, + Table::States, + Table::LatestKnownAttestations, + Table::LatestNewAttestations, + Table::GossipSignatures, + Table::AggregatedPayloads, + Table::Metadata, +]; diff --git a/crates/storage/src/api/traits.rs b/crates/storage/src/api/traits.rs new file mode 100644 index 0000000..6014dda --- /dev/null +++ b/crates/storage/src/api/traits.rs @@ -0,0 +1,41 @@ +use super::Table; + +/// Storage error type. +pub type Error = Box; + +/// Result type for prefix iterator operations. +pub type PrefixResult = Result<(Box<[u8]>, Box<[u8]>), Error>; + +/// A storage backend that can create read views and write batches. +pub trait StorageBackend: Send + Sync { + /// Begin a read-only transaction. + fn begin_read(&self) -> Result, Error>; + + /// Begin a write batch. + fn begin_write(&self) -> Result, Error>; +} + +/// A read-only view of the storage. +pub trait StorageReadView { + /// Get a value by key from a table. + fn get(&self, table: Table, key: &[u8]) -> Result>, Error>; + + /// Iterate over all entries with a given key prefix. + fn prefix_iterator( + &self, + table: Table, + prefix: &[u8], + ) -> Result + '_>, Error>; +} + +/// A write batch that can be committed atomically. +pub trait StorageWriteBatch: Send { + /// Put multiple key-value pairs into a table. + fn put_batch(&mut self, table: Table, batch: Vec<(Vec, Vec)>) -> Result<(), Error>; + + /// Delete multiple keys from a table. + fn delete_batch(&mut self, table: Table, keys: Vec>) -> Result<(), Error>; + + /// Commit the batch, consuming it. + fn commit(self: Box) -> Result<(), Error>; +} diff --git a/crates/storage/src/backend/in_memory.rs b/crates/storage/src/backend/in_memory.rs new file mode 100644 index 0000000..914cc21 --- /dev/null +++ b/crates/storage/src/backend/in_memory.rs @@ -0,0 +1,316 @@ +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use crate::api::{ + ALL_TABLES, Error, PrefixResult, StorageBackend, StorageReadView, StorageWriteBatch, Table, +}; + +type TableData = HashMap, Vec>; +type StorageData = HashMap; + +/// Pending operation for a key - last operation wins. +enum PendingOp { + Put(Vec), + Delete, +} + +type PendingOps = HashMap, PendingOp>>; + +/// In-memory storage backend using HashMaps. +/// +/// All tables are created (empty) on initialization. +#[derive(Clone)] +pub struct InMemoryBackend { + data: Arc>, +} + +impl Default for InMemoryBackend { + fn default() -> Self { + let mut data = StorageData::new(); + for table in ALL_TABLES { + data.insert(table, TableData::new()); + } + Self { + data: Arc::new(RwLock::new(data)), + } + } +} + +impl InMemoryBackend { + /// Create a new in-memory backend with all tables initialized empty. + pub fn new() -> Self { + Self::default() + } +} + +impl StorageBackend for InMemoryBackend { + fn begin_read(&self) -> Result, Error> { + let guard = self.data.read().map_err(|e| e.to_string())?; + Ok(Box::new(InMemoryReadView { guard })) + } + + fn begin_write(&self) -> Result, Error> { + Ok(Box::new(InMemoryWriteBatch { + data: Arc::clone(&self.data), + ops: HashMap::new(), + })) + } +} + +/// Read view holding a read lock on the storage data. +struct InMemoryReadView<'a> { + guard: std::sync::RwLockReadGuard<'a, StorageData>, +} + +impl StorageReadView for InMemoryReadView<'_> { + fn get(&self, table: Table, key: &[u8]) -> Result>, Error> { + Ok(self + .guard + .get(&table) + .expect("table exists") + .get(key) + .cloned()) + } + + fn prefix_iterator( + &self, + table: Table, + prefix: &[u8], + ) -> Result + '_>, Error> { + let table_data = self.guard.get(&table).expect("table exists"); + let prefix_owned = prefix.to_vec(); + + let iter = table_data + .iter() + .filter(move |(k, _)| k.starts_with(&prefix_owned)) + .map(|(k, v)| Ok((k.clone().into_boxed_slice(), v.clone().into_boxed_slice()))); + + Ok(Box::new(iter)) + } +} + +/// Write batch that accumulates changes before committing. +struct InMemoryWriteBatch { + data: Arc>, + ops: PendingOps, +} + +impl StorageWriteBatch for InMemoryWriteBatch { + fn put_batch(&mut self, table: Table, batch: Vec<(Vec, Vec)>) -> Result<(), Error> { + let table_ops = self.ops.entry(table).or_default(); + for (key, value) in batch { + table_ops.insert(key, PendingOp::Put(value)); + } + Ok(()) + } + + fn delete_batch(&mut self, table: Table, keys: Vec>) -> Result<(), Error> { + let table_ops = self.ops.entry(table).or_default(); + for key in keys { + table_ops.insert(key, PendingOp::Delete); + } + Ok(()) + } + + fn commit(self: Box) -> Result<(), Error> { + let mut guard = self.data.write().map_err(|e| e.to_string())?; + + for (table, ops) in self.ops { + let table_data = guard.get_mut(&table).expect("table exists"); + for (key, op) in ops { + match op { + PendingOp::Put(value) => { + table_data.insert(key, value); + } + PendingOp::Delete => { + table_data.remove(&key); + } + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_put_and_get() { + let backend = InMemoryBackend::new(); + + // Write data + { + let mut batch = backend.begin_write().unwrap(); + batch + .put_batch(Table::Blocks, vec![(b"key1".to_vec(), b"value1".to_vec())]) + .unwrap(); + batch.commit().unwrap(); + } + + // Read data + { + let view = backend.begin_read().unwrap(); + let value = view.get(Table::Blocks, b"key1").unwrap(); + assert_eq!(value, Some(b"value1".to_vec())); + } + } + + #[test] + fn test_delete() { + let backend = InMemoryBackend::new(); + + // Write data + { + let mut batch = backend.begin_write().unwrap(); + batch + .put_batch(Table::Blocks, vec![(b"key1".to_vec(), b"value1".to_vec())]) + .unwrap(); + batch.commit().unwrap(); + } + + // Delete data + { + let mut batch = backend.begin_write().unwrap(); + batch + .delete_batch(Table::Blocks, vec![b"key1".to_vec()]) + .unwrap(); + batch.commit().unwrap(); + } + + // Verify deleted + { + let view = backend.begin_read().unwrap(); + let value = view.get(Table::Blocks, b"key1").unwrap(); + assert_eq!(value, None); + } + } + + #[test] + fn test_prefix_iterator() { + let backend = InMemoryBackend::new(); + + // Write data with common prefix + { + let mut batch = backend.begin_write().unwrap(); + batch + .put_batch( + Table::Metadata, + vec![ + (b"config:a".to_vec(), b"1".to_vec()), + (b"config:b".to_vec(), b"2".to_vec()), + (b"other:x".to_vec(), b"3".to_vec()), + ], + ) + .unwrap(); + batch.commit().unwrap(); + } + + // Query by prefix + { + let view = backend.begin_read().unwrap(); + let mut results: Vec<_> = view + .prefix_iterator(Table::Metadata, b"config:") + .unwrap() + .collect::, _>>() + .unwrap(); + + results.sort_by(|a, b| a.0.cmp(&b.0)); + assert_eq!(results.len(), 2); + assert_eq!(&*results[0].0, b"config:a"); + assert_eq!(&*results[1].0, b"config:b"); + } + } + + #[test] + fn test_nonexistent_key() { + let backend = InMemoryBackend::new(); + let view = backend.begin_read().unwrap(); + let value = view.get(Table::Blocks, b"nonexistent").unwrap(); + assert_eq!(value, None); + } + + #[test] + fn test_delete_then_put() { + let backend = InMemoryBackend::new(); + + // Initial value + { + let mut batch = backend.begin_write().unwrap(); + batch + .put_batch(Table::Blocks, vec![(b"key".to_vec(), b"old".to_vec())]) + .unwrap(); + batch.commit().unwrap(); + } + + // Delete then put in same batch - put should win + { + let mut batch = backend.begin_write().unwrap(); + batch + .delete_batch(Table::Blocks, vec![b"key".to_vec()]) + .unwrap(); + batch + .put_batch(Table::Blocks, vec![(b"key".to_vec(), b"new".to_vec())]) + .unwrap(); + batch.commit().unwrap(); + } + + let view = backend.begin_read().unwrap(); + assert_eq!( + view.get(Table::Blocks, b"key").unwrap(), + Some(b"new".to_vec()) + ); + } + + #[test] + fn test_put_then_delete() { + let backend = InMemoryBackend::new(); + + // Put then delete in same batch - delete should win + { + let mut batch = backend.begin_write().unwrap(); + batch + .put_batch(Table::Blocks, vec![(b"key".to_vec(), b"value".to_vec())]) + .unwrap(); + batch + .delete_batch(Table::Blocks, vec![b"key".to_vec()]) + .unwrap(); + batch.commit().unwrap(); + } + + let view = backend.begin_read().unwrap(); + assert_eq!(view.get(Table::Blocks, b"key").unwrap(), None); + } + + #[test] + fn test_multiple_tables() { + let backend = InMemoryBackend::new(); + + // Write to different tables + { + let mut batch = backend.begin_write().unwrap(); + batch + .put_batch(Table::Blocks, vec![(b"key".to_vec(), b"block".to_vec())]) + .unwrap(); + batch + .put_batch(Table::States, vec![(b"key".to_vec(), b"state".to_vec())]) + .unwrap(); + batch.commit().unwrap(); + } + + // Verify isolation + { + let view = backend.begin_read().unwrap(); + assert_eq!( + view.get(Table::Blocks, b"key").unwrap(), + Some(b"block".to_vec()) + ); + assert_eq!( + view.get(Table::States, b"key").unwrap(), + Some(b"state".to_vec()) + ); + } + } +} diff --git a/crates/storage/src/backend/mod.rs b/crates/storage/src/backend/mod.rs new file mode 100644 index 0000000..f146446 --- /dev/null +++ b/crates/storage/src/backend/mod.rs @@ -0,0 +1,3 @@ +mod in_memory; + +pub use in_memory::InMemoryBackend; diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 8b13789..5442564 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -1 +1,5 @@ +mod api; +mod backend; +mod store; +pub use store::{ForkCheckpoints, SignatureKey, Store}; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs new file mode 100644 index 0000000..4ed2af4 --- /dev/null +++ b/crates/storage/src/store.rs @@ -0,0 +1,571 @@ +use std::sync::Arc; + +use crate::api::{StorageBackend, Table}; +use crate::backend::InMemoryBackend; + +use ethlambda_types::{ + attestation::AttestationData, + block::{AggregatedSignatureProof, Block, BlockBody}, + primitives::{Decode, Encode, H256, TreeHash}, + signature::ValidatorSignature, + state::{ChainConfig, Checkpoint, State}, +}; +use tracing::info; + +/// Key for looking up individual validator signatures. +/// Used to index signature caches by (validator, message) pairs. +/// +/// Values are (validator_index, attestation_data_root). +pub type SignatureKey = (u64, H256); + +/// Checkpoints to update in the forkchoice store. +/// +/// Used with `Store::update_checkpoints` to update head and optionally +/// update justified/finalized checkpoints (only if higher slot). +pub struct ForkCheckpoints { + head: H256, + justified: Option, + finalized: Option, +} + +impl ForkCheckpoints { + /// Create checkpoints update with only the head. + pub fn head_only(head: H256) -> Self { + Self { + head, + justified: None, + finalized: None, + } + } + + /// Create checkpoints update with optional justified and finalized. + /// + /// The head is passed through unchanged. + pub fn new(head: H256, justified: Option, finalized: Option) -> Self { + Self { + head, + justified, + finalized, + } + } +} + +// ============ Metadata Keys ============ + +const KEY_TIME: &[u8] = b"time"; +const KEY_CONFIG: &[u8] = b"config"; +const KEY_HEAD: &[u8] = b"head"; +const KEY_SAFE_TARGET: &[u8] = b"safe_target"; +const KEY_LATEST_JUSTIFIED: &[u8] = b"latest_justified"; +const KEY_LATEST_FINALIZED: &[u8] = b"latest_finalized"; + +// ============ Key Encoding Helpers ============ + +/// Encode a SignatureKey (validator_id, root) to bytes. +/// Layout: validator_id (8 bytes SSZ) || root (32 bytes SSZ) +fn encode_signature_key(key: &SignatureKey) -> Vec { + let mut result = key.0.as_ssz_bytes(); + result.extend(key.1.as_ssz_bytes()); + result +} + +/// Decode a SignatureKey from bytes. +fn decode_signature_key(bytes: &[u8]) -> SignatureKey { + let validator_id = u64::from_ssz_bytes(&bytes[..8]).expect("valid validator_id"); + let root = H256::from_ssz_bytes(&bytes[8..]).expect("valid root"); + (validator_id, root) +} + +/// Forkchoice store tracking chain state and validator attestations. +/// +/// This is the "local view" that a node uses to run LMD GHOST. It contains: +/// +/// - which blocks and states are known, +/// - which checkpoints are justified and finalized, +/// - which block is currently considered the head, +/// - and, for each validator, their latest attestation that should influence fork choice. +/// +/// The `Store` is updated whenever: +/// - a new block is processed, +/// - an attestation is received (via a block or gossip), +/// - an interval tick occurs (activating new attestations), +/// - or when the head is recomputed. +/// +/// All data is stored in the backend. Metadata fields (time, config, head, etc.) +/// are stored in the Metadata table with their field name as the key. +#[derive(Clone)] +pub struct Store { + /// Storage backend for all store data. + backend: Arc, +} + +impl Store { + /// Initialize a Store from a genesis state. + pub fn from_genesis(mut genesis_state: State) -> Self { + // Ensure the header state root is zero before computing the state root + genesis_state.latest_block_header.state_root = H256::ZERO; + + let genesis_state_root = genesis_state.tree_hash_root(); + let genesis_block = Block { + slot: 0, + proposer_index: 0, + parent_root: H256::ZERO, + state_root: genesis_state_root, + body: BlockBody::default(), + }; + Self::get_forkchoice_store(genesis_state, genesis_block) + } + + /// Initialize a Store from an anchor state and block. + pub fn get_forkchoice_store(anchor_state: State, anchor_block: Block) -> Self { + let anchor_state_root = anchor_state.tree_hash_root(); + let anchor_block_root = anchor_block.tree_hash_root(); + + let backend: Arc = Arc::new(InMemoryBackend::new()); + + let anchor_checkpoint = Checkpoint { + root: anchor_block_root, + slot: anchor_block.slot, + }; + + // Insert initial data + { + let mut batch = backend.begin_write().expect("write batch"); + + // Metadata + batch + .put_batch( + Table::Metadata, + vec![ + (KEY_TIME.to_vec(), 0u64.as_ssz_bytes()), + (KEY_CONFIG.to_vec(), anchor_state.config.as_ssz_bytes()), + (KEY_HEAD.to_vec(), anchor_block_root.as_ssz_bytes()), + (KEY_SAFE_TARGET.to_vec(), anchor_block_root.as_ssz_bytes()), + ( + KEY_LATEST_JUSTIFIED.to_vec(), + anchor_checkpoint.as_ssz_bytes(), + ), + ( + KEY_LATEST_FINALIZED.to_vec(), + anchor_checkpoint.as_ssz_bytes(), + ), + ], + ) + .expect("put metadata"); + + // Block and state + batch + .put_batch( + Table::Blocks, + vec![( + anchor_block_root.as_ssz_bytes(), + anchor_block.as_ssz_bytes(), + )], + ) + .expect("put block"); + batch + .put_batch( + Table::States, + vec![( + anchor_block_root.as_ssz_bytes(), + anchor_state.as_ssz_bytes(), + )], + ) + .expect("put state"); + + batch.commit().expect("commit"); + } + + info!(%anchor_state_root, %anchor_block_root, "Initialized store"); + + Self { backend } + } + + // ============ Metadata Helpers ============ + + fn get_metadata(&self, key: &[u8]) -> T { + let view = self.backend.begin_read().expect("read view"); + let bytes = view + .get(Table::Metadata, key) + .expect("get") + .expect("metadata key exists"); + T::from_ssz_bytes(&bytes).expect("valid encoding") + } + + fn set_metadata(&self, key: &[u8], value: &T) { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .put_batch(Table::Metadata, vec![(key.to_vec(), value.as_ssz_bytes())]) + .expect("put metadata"); + batch.commit().expect("commit"); + } + + // ============ Time ============ + + pub fn time(&self) -> u64 { + self.get_metadata(KEY_TIME) + } + + pub fn set_time(&mut self, time: u64) { + self.set_metadata(KEY_TIME, &time); + } + + // ============ Config ============ + + pub fn config(&self) -> ChainConfig { + self.get_metadata(KEY_CONFIG) + } + + // ============ Head ============ + + pub fn head(&self) -> H256 { + self.get_metadata(KEY_HEAD) + } + + // ============ Safe Target ============ + + pub fn safe_target(&self) -> H256 { + self.get_metadata(KEY_SAFE_TARGET) + } + + pub fn set_safe_target(&mut self, safe_target: H256) { + self.set_metadata(KEY_SAFE_TARGET, &safe_target); + } + + // ============ Latest Justified ============ + + pub fn latest_justified(&self) -> Checkpoint { + self.get_metadata(KEY_LATEST_JUSTIFIED) + } + + // ============ Latest Finalized ============ + + pub fn latest_finalized(&self) -> Checkpoint { + self.get_metadata(KEY_LATEST_FINALIZED) + } + + // ============ Checkpoint Updates ============ + + /// Updates head, justified, and finalized checkpoints. + /// + /// - Head is always updated to the new value. + /// - Justified is updated if provided. + /// - Finalized is updated if provided. + pub fn update_checkpoints(&mut self, checkpoints: ForkCheckpoints) { + let mut entries = vec![(KEY_HEAD.to_vec(), checkpoints.head.as_ssz_bytes())]; + + if let Some(justified) = checkpoints.justified { + entries.push((KEY_LATEST_JUSTIFIED.to_vec(), justified.as_ssz_bytes())); + } + + if let Some(finalized) = checkpoints.finalized { + entries.push((KEY_LATEST_FINALIZED.to_vec(), finalized.as_ssz_bytes())); + } + + let mut batch = self.backend.begin_write().expect("write batch"); + batch.put_batch(Table::Metadata, entries).expect("put"); + batch.commit().expect("commit"); + } + + // ============ Blocks ============ + + /// Iterate over all (root, block) pairs. + pub fn iter_blocks(&self) -> impl Iterator + '_ { + let view = self.backend.begin_read().expect("read view"); + let entries: Vec<_> = view + .prefix_iterator(Table::Blocks, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .map(|(k, v)| { + let root = H256::from_ssz_bytes(&k).expect("valid root"); + let block = Block::from_ssz_bytes(&v).expect("valid block"); + (root, block) + }) + .collect(); + entries.into_iter() + } + + pub fn get_block(&self, root: &H256) -> Option { + let view = self.backend.begin_read().expect("read view"); + view.get(Table::Blocks, &root.as_ssz_bytes()) + .expect("get") + .map(|bytes| Block::from_ssz_bytes(&bytes).expect("valid block")) + } + + pub fn contains_block(&self, root: &H256) -> bool { + let view = self.backend.begin_read().expect("read view"); + view.get(Table::Blocks, &root.as_ssz_bytes()) + .expect("get") + .is_some() + } + + pub fn insert_block(&mut self, root: H256, block: Block) { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .put_batch( + Table::Blocks, + vec![(root.as_ssz_bytes(), block.as_ssz_bytes())], + ) + .expect("put block"); + batch.commit().expect("commit"); + } + + // ============ States ============ + + /// Iterate over all (root, state) pairs. + pub fn iter_states(&self) -> impl Iterator + '_ { + let view = self.backend.begin_read().expect("read view"); + let entries: Vec<_> = view + .prefix_iterator(Table::States, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .map(|(k, v)| { + let root = H256::from_ssz_bytes(&k).expect("valid root"); + let state = State::from_ssz_bytes(&v).expect("valid state"); + (root, state) + }) + .collect(); + entries.into_iter() + } + + pub fn get_state(&self, root: &H256) -> Option { + let view = self.backend.begin_read().expect("read view"); + view.get(Table::States, &root.as_ssz_bytes()) + .expect("get") + .map(|bytes| State::from_ssz_bytes(&bytes).expect("valid state")) + } + + pub fn insert_state(&mut self, root: H256, state: State) { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .put_batch( + Table::States, + vec![(root.as_ssz_bytes(), state.as_ssz_bytes())], + ) + .expect("put state"); + batch.commit().expect("commit"); + } + + // ============ Latest Known Attestations ============ + + /// Iterate over all (validator_id, attestation_data) pairs for known attestations. + pub fn iter_known_attestations(&self) -> impl Iterator + '_ { + let view = self.backend.begin_read().expect("read view"); + let entries: Vec<_> = view + .prefix_iterator(Table::LatestKnownAttestations, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .map(|(k, v)| { + let validator_id = u64::from_ssz_bytes(&k).expect("valid validator_id"); + let data = AttestationData::from_ssz_bytes(&v).expect("valid attestation data"); + (validator_id, data) + }) + .collect(); + entries.into_iter() + } + + pub fn get_known_attestation(&self, validator_id: &u64) -> Option { + let view = self.backend.begin_read().expect("read view"); + view.get(Table::LatestKnownAttestations, &validator_id.as_ssz_bytes()) + .expect("get") + .map(|bytes| AttestationData::from_ssz_bytes(&bytes).expect("valid attestation data")) + } + + pub fn insert_known_attestation(&mut self, validator_id: u64, data: AttestationData) { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .put_batch( + Table::LatestKnownAttestations, + vec![(validator_id.as_ssz_bytes(), data.as_ssz_bytes())], + ) + .expect("put attestation"); + batch.commit().expect("commit"); + } + + // ============ Latest New Attestations ============ + + /// Iterate over all (validator_id, attestation_data) pairs for new attestations. + pub fn iter_new_attestations(&self) -> impl Iterator + '_ { + let view = self.backend.begin_read().expect("read view"); + let entries: Vec<_> = view + .prefix_iterator(Table::LatestNewAttestations, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .map(|(k, v)| { + let validator_id = u64::from_ssz_bytes(&k).expect("valid validator_id"); + let data = AttestationData::from_ssz_bytes(&v).expect("valid attestation data"); + (validator_id, data) + }) + .collect(); + entries.into_iter() + } + + pub fn get_new_attestation(&self, validator_id: &u64) -> Option { + let view = self.backend.begin_read().expect("read view"); + view.get(Table::LatestNewAttestations, &validator_id.as_ssz_bytes()) + .expect("get") + .map(|bytes| AttestationData::from_ssz_bytes(&bytes).expect("valid attestation data")) + } + + pub fn insert_new_attestation(&mut self, validator_id: u64, data: AttestationData) { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .put_batch( + Table::LatestNewAttestations, + vec![(validator_id.as_ssz_bytes(), data.as_ssz_bytes())], + ) + .expect("put attestation"); + batch.commit().expect("commit"); + } + + pub fn remove_new_attestation(&mut self, validator_id: &u64) { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .delete_batch( + Table::LatestNewAttestations, + vec![validator_id.as_ssz_bytes()], + ) + .expect("delete attestation"); + batch.commit().expect("commit"); + } + + /// Promotes all new attestations to known attestations. + /// + /// Takes all attestations from `latest_new_attestations` and moves them + /// to `latest_known_attestations`, making them count for fork choice. + pub fn promote_new_attestations(&mut self) { + // Read all new attestations + let view = self.backend.begin_read().expect("read view"); + let new_attestations: Vec<(Vec, Vec)> = view + .prefix_iterator(Table::LatestNewAttestations, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .map(|(k, v)| (k.to_vec(), v.to_vec())) + .collect(); + drop(view); + + if new_attestations.is_empty() { + return; + } + + // Delete from new and insert to known in a single batch + let mut batch = self.backend.begin_write().expect("write batch"); + let keys_to_delete: Vec<_> = new_attestations.iter().map(|(k, _)| k.clone()).collect(); + batch + .delete_batch(Table::LatestNewAttestations, keys_to_delete) + .expect("delete new attestations"); + batch + .put_batch(Table::LatestKnownAttestations, new_attestations) + .expect("put known attestations"); + batch.commit().expect("commit"); + } + + // ============ Gossip Signatures ============ + + /// Iterate over all (signature_key, signature) pairs. + pub fn iter_gossip_signatures( + &self, + ) -> impl Iterator + '_ { + let view = self.backend.begin_read().expect("read view"); + let entries: Vec<_> = view + .prefix_iterator(Table::GossipSignatures, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .filter_map(|(k, v)| { + let key = decode_signature_key(&k); + ValidatorSignature::from_bytes(&v) + .ok() + .map(|sig| (key, sig)) + }) + .collect(); + entries.into_iter() + } + + pub fn get_gossip_signature(&self, key: &SignatureKey) -> Option { + let view = self.backend.begin_read().expect("read view"); + view.get(Table::GossipSignatures, &encode_signature_key(key)) + .expect("get") + .and_then(|bytes| ValidatorSignature::from_bytes(&bytes).ok()) + } + + pub fn contains_gossip_signature(&self, key: &SignatureKey) -> bool { + let view = self.backend.begin_read().expect("read view"); + view.get(Table::GossipSignatures, &encode_signature_key(key)) + .expect("get") + .is_some() + } + + pub fn insert_gossip_signature(&mut self, key: SignatureKey, signature: ValidatorSignature) { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .put_batch( + Table::GossipSignatures, + vec![(encode_signature_key(&key), signature.to_bytes())], + ) + .expect("put signature"); + batch.commit().expect("commit"); + } + + // ============ Aggregated Payloads ============ + + /// Iterate over all (signature_key, proofs) pairs. + pub fn iter_aggregated_payloads( + &self, + ) -> impl Iterator)> + '_ { + let view = self.backend.begin_read().expect("read view"); + let entries: Vec<_> = view + .prefix_iterator(Table::AggregatedPayloads, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .map(|(k, v)| { + let key = decode_signature_key(&k); + let proofs = + Vec::::from_ssz_bytes(&v).expect("valid proofs"); + (key, proofs) + }) + .collect(); + entries.into_iter() + } + + pub fn get_aggregated_payloads( + &self, + key: &SignatureKey, + ) -> Option> { + let view = self.backend.begin_read().expect("read view"); + view.get(Table::AggregatedPayloads, &encode_signature_key(key)) + .expect("get") + .map(|bytes| { + Vec::::from_ssz_bytes(&bytes).expect("valid proofs") + }) + } + + pub fn push_aggregated_payload(&mut self, key: SignatureKey, proof: AggregatedSignatureProof) { + // Read existing, add new, write back + let mut proofs = self.get_aggregated_payloads(&key).unwrap_or_default(); + proofs.push(proof); + + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .put_batch( + Table::AggregatedPayloads, + vec![(encode_signature_key(&key), proofs.as_ssz_bytes())], + ) + .expect("put proofs"); + batch.commit().expect("commit"); + } + + // ============ Derived Accessors ============ + + /// Returns the slot of the current safe target block. + pub fn safe_target_slot(&self) -> u64 { + self.get_block(&self.safe_target()) + .expect("safe target exists") + .slot + } + + /// Returns a clone of the head state. + pub fn head_state(&self) -> State { + self.get_state(&self.head()) + .expect("head state is always available") + } +}