Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async fn main() {

let genesis_state = State::from_genesis(genesis_config.genesis_time, validators);
let backend = Arc::new(RocksDBBackend::open("./data").expect("Failed to open RocksDB"));
let store = Store::from_genesis(backend, genesis_state);
let store = Store::from_anchor_state(backend, genesis_state);

let (p2p_tx, p2p_rx) = tokio::sync::mpsc::unbounded_channel();
let blockchain = BlockChain::spawn(store.clone(), p2p_tx, validator_keys);
Expand Down
4 changes: 2 additions & 2 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ impl BlockChainServer {
let parent_root = signed_block.message.block.parent_root;
let proposer = signed_block.message.block.proposer_index;

// Check if parent block exists before attempting to process
if !self.store.contains_block(&parent_root) {
// Check if parent state exists before attempting to process
if !self.store.has_state(&parent_root) {
info!(%slot, %parent_root, %block_root, "Block parent missing, storing as pending");

// Store block for later processing
Expand Down
80 changes: 43 additions & 37 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@ fn update_head(store: &mut Store) {
store.update_checkpoints(ForkCheckpoints::head_only(new_head));

if old_head != new_head {
let old_slot = store.get_block(&old_head).map(|b| b.slot).unwrap_or(0);
let new_slot = store.get_block(&new_head).map(|b| b.slot).unwrap_or(0);
let old_slot = store
.get_block_header(&old_head)
.map(|h| h.slot)
.unwrap_or(0);
let new_slot = store
.get_block_header(&new_head)
.map(|h| h.slot)
.unwrap_or(0);
let justified_slot = store.latest_justified().slot;
let finalized_slot = store.latest_finalized().slot;
info!(
Expand Down Expand Up @@ -91,33 +97,33 @@ fn validate_attestation(store: &Store, attestation: &Attestation) -> Result<(),
let data = &attestation.data;

// Availability Check - We cannot count a vote if we haven't seen the blocks involved.
let source_block = store
.get_block(&data.source.root)
let source_header = store
.get_block_header(&data.source.root)
.ok_or(StoreError::UnknownSourceBlock(data.source.root))?;
let target_block = store
.get_block(&data.target.root)
let target_header = store
.get_block_header(&data.target.root)
.ok_or(StoreError::UnknownTargetBlock(data.target.root))?;

if !store.contains_block(&data.head.root) {
return Err(StoreError::UnknownHeadBlock(data.head.root));
}
let _ = store
.get_block_header(&data.head.root)
.ok_or(StoreError::UnknownHeadBlock(data.head.root))?;

// Topology Check - Source must be older than Target.
if data.source.slot > data.target.slot {
return Err(StoreError::SourceExceedsTarget);
}

// Consistency Check - Validate checkpoint slots match block slots.
if source_block.slot != data.source.slot {
if source_header.slot != data.source.slot {
return Err(StoreError::SourceSlotMismatch {
checkpoint_slot: data.source.slot,
block_slot: source_block.slot,
block_slot: source_header.slot,
});
}
if target_block.slot != data.target.slot {
if target_header.slot != data.target.slot {
return Err(StoreError::TargetSlotMismatch {
checkpoint_slot: data.target.slot,
block_slot: target_block.slot,
block_slot: target_header.slot,
});
}

Expand Down Expand Up @@ -325,7 +331,7 @@ pub fn on_block(
let slot = block.slot;

// Skip duplicate blocks (idempotent operation)
if store.contains_block(&block_root) {
if store.has_state(&block_root) {
return Ok(());
}

Expand Down Expand Up @@ -441,12 +447,12 @@ pub fn on_block(
pub fn get_attestation_target(store: &Store) -> Checkpoint {
// Start from current head
let mut target_block_root = store.head();
let mut target_block = store
.get_block(&target_block_root)
let mut target_header = store
.get_block_header(&target_block_root)
.expect("head block exists");

let safe_target_block_slot = store
.get_block(&store.safe_target())
.get_block_header(&store.safe_target())
.expect("safe target exists")
.slot;

Expand All @@ -455,10 +461,10 @@ pub fn get_attestation_target(store: &Store) -> Checkpoint {
// This ensures the target doesn't advance too far ahead of safe target,
// providing a balance between liveness and safety.
for _ in 0..JUSTIFICATION_LOOKBACK_SLOTS {
if target_block.slot > safe_target_block_slot {
target_block_root = target_block.parent_root;
target_block = store
.get_block(&target_block_root)
if target_header.slot > safe_target_block_slot {
target_block_root = target_header.parent_root;
target_header = store
.get_block_header(&target_block_root)
.expect("parent block exists");
} else {
break;
Expand All @@ -471,12 +477,12 @@ pub fn get_attestation_target(store: &Store) -> Checkpoint {
//
// Walk back until we find a slot that satisfies justifiability rules
// relative to the latest finalized checkpoint.
while target_block.slot > finalized_slot
&& !slot_is_justifiable_after(target_block.slot, finalized_slot)
while target_header.slot > finalized_slot
&& !slot_is_justifiable_after(target_header.slot, finalized_slot)
{
target_block_root = target_block.parent_root;
target_block = store
.get_block(&target_block_root)
target_block_root = target_header.parent_root;
target_header = store
.get_block_header(&target_block_root)
.expect("parent block exists");
}
// Ensure target is at or after the source (latest_justified) to maintain
Expand All @@ -486,9 +492,9 @@ pub fn get_attestation_target(store: &Store) -> Checkpoint {
//
// See https://github.com/blockblaz/zeam/blob/697c293879e922942965cdb1da3c6044187ae00e/pkgs/node/src/forkchoice.zig#L654-L659
let latest_justified = store.latest_justified();
if target_block.slot < latest_justified.slot {
if target_header.slot < latest_justified.slot {
warn!(
target_slot = target_block.slot,
target_slot = target_header.slot,
justified_slot = latest_justified.slot,
"Attestation target walked behind justified source, clamping to justified"
);
Expand All @@ -497,7 +503,7 @@ pub fn get_attestation_target(store: &Store) -> Checkpoint {

Checkpoint {
root: target_block_root,
slot: target_block.slot,
slot: target_header.slot,
}
}

Expand All @@ -507,7 +513,7 @@ pub fn produce_attestation_data(store: &Store, slot: u64) -> AttestationData {
let head_checkpoint = Checkpoint {
root: store.head(),
slot: store
.get_block(&store.head())
.get_block_header(&store.head())
.expect("head block exists")
.slot,
};
Expand Down Expand Up @@ -1075,16 +1081,16 @@ fn is_reorg(old_head: H256, new_head: H256, store: &Store) -> bool {
return false;
}

let Some(old_head_block) = store.get_block(&old_head) else {
let Some(old_head_header) = store.get_block_header(&old_head) else {
return false;
};

let Some(new_head_block) = store.get_block(&new_head) else {
let Some(new_head_header) = store.get_block_header(&new_head) else {
return false;
};

let old_slot = old_head_block.slot;
let new_slot = new_head_block.slot;
let old_slot = old_head_header.slot;
let new_slot = new_head_header.slot;

// Determine which head has the higher slot and walk back from it
let (mut current_root, target_slot, target_root) = if new_slot >= old_slot {
Expand All @@ -1094,12 +1100,12 @@ fn is_reorg(old_head: H256, new_head: H256, store: &Store) -> bool {
};

// Walk back through the chain until we reach the target slot
while let Some(current_block) = store.get_block(&current_root) {
if current_block.slot <= target_slot {
while let Some(current_header) = store.get_block_header(&current_root) {
if current_header.slot <= target_slot {
// We've reached the target slot - check if we're at the target block
return current_root != target_root;
}
current_root = current_block.parent_root;
current_root = current_header.parent_root;
}

// Couldn't walk back far enough (missing blocks in chain)
Expand Down
8 changes: 4 additions & 4 deletions crates/blockchain/tests/forkchoice_spectests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,13 @@ fn validate_checks(
// Validate headSlot
if let Some(expected_slot) = checks.head_slot {
let head_root = st.head();
let head_block = st
.get_block(&head_root)
let head_header = st
.get_block_header(&head_root)
.ok_or_else(|| format!("Step {}: head block not found", step_idx))?;
if head_block.slot != expected_slot {
if head_header.slot != expected_slot {
return Err(format!(
"Step {}: headSlot mismatch: expected {}, got {}",
step_idx, expected_slot, head_block.slot
step_idx, expected_slot, head_header.slot
)
.into());
}
Expand Down
34 changes: 33 additions & 1 deletion crates/common/types/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl BlockSignaturesWithAttestation {
///
/// Headers are smaller than full blocks. They're useful for tracking the chain
/// without storing everything.
#[derive(Debug, Clone, Serialize, Encode, Decode, TreeHash)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Encode, Decode, TreeHash)]
pub struct BlockHeader {
/// The slot in which the block was proposed
pub slot: u64,
Expand Down Expand Up @@ -205,6 +205,38 @@ pub struct Block {
pub body: BlockBody,
}

impl Block {
/// Extract the block header, computing the body root.
pub fn header(&self) -> BlockHeader {
BlockHeader {
slot: self.slot,
proposer_index: self.proposer_index,
parent_root: self.parent_root,
state_root: self.state_root,
body_root: self.body.tree_hash_root(),
}
}

/// Reconstruct a block from header and body.
///
/// The caller should ensure that `header.body_root` matches `body.tree_hash_root()`.
/// This is verified with a debug assertion but not in release builds.
pub fn from_header_and_body(header: BlockHeader, body: BlockBody) -> Self {
debug_assert_eq!(
header.body_root,
body.tree_hash_root(),
"body root mismatch"
);
Self {
slot: header.slot,
proposer_index: header.proposer_index,
parent_root: header.parent_root,
state_root: header.state_root,
body,
}
}
}

/// The body of a block, containing payload data.
///
/// Currently, the main operation is voting. Validators submit attestations which are
Expand Down
5 changes: 4 additions & 1 deletion crates/net/p2p/src/req_resp/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ async fn handle_blocks_by_root_response(
pub fn build_status(store: &Store) -> Status {
let finalized = store.latest_finalized();
let head_root = store.head();
let head_slot = store.get_block(&head_root).expect("head block exists").slot;
let head_slot = store
.get_block_header(&head_root)
.expect("head block exists")
.slot;
Status {
finalized,
head: ethlambda_types::state::Checkpoint {
Expand Down
4 changes: 2 additions & 2 deletions crates/net/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ mod tests {
async fn test_get_latest_justified_checkpoint() {
let state = create_test_state();
let backend = Arc::new(InMemoryBackend::new());
let store = Store::from_genesis(backend, state);
let store = Store::from_anchor_state(backend, state);

let app = build_api_router(store.clone());

Expand Down Expand Up @@ -154,7 +154,7 @@ mod tests {

let state = create_test_state();
let backend = Arc::new(InMemoryBackend::new());
let store = Store::from_genesis(backend, state);
let store = Store::from_anchor_state(backend, state);

// Get the expected state from the store
let finalized = store.latest_finalized();
Expand Down
11 changes: 7 additions & 4 deletions crates/storage/src/api/tables.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
/// Tables in the storage layer.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Table {
/// Block storage: H256 -> Block
Blocks,
/// Block header storage: H256 -> BlockHeader
BlockHeaders,
/// Block body storage: H256 -> BlockBody
BlockBodies,
/// Block signatures storage: H256 -> BlockSignaturesWithAttestation
///
/// Stored separately from blocks because the genesis block has no signatures.
Expand All @@ -29,8 +31,9 @@ pub enum Table {
}

/// All table variants.
pub const ALL_TABLES: [Table; 9] = [
Table::Blocks,
pub const ALL_TABLES: [Table; 10] = [
Table::BlockHeaders,
Table::BlockBodies,
Table::BlockSignatures,
Table::States,
Table::LatestKnownAttestations,
Expand Down
10 changes: 7 additions & 3 deletions crates/storage/src/backend/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use std::sync::Arc;
/// Returns the column family name for a table.
fn cf_name(table: Table) -> &'static str {
match table {
Table::Blocks => "blocks",
Table::BlockHeaders => "block_headers",
Table::BlockBodies => "block_bodies",
Table::BlockSignatures => "block_signatures",
Table::States => "states",
Table::LatestKnownAttestations => "latest_known_attestations",
Expand Down Expand Up @@ -166,7 +167,10 @@ mod tests {
let backend = RocksDBBackend::open(dir.path()).unwrap();
let mut batch = backend.begin_write().unwrap();
batch
.put_batch(Table::Blocks, vec![(b"key1".to_vec(), b"value1".to_vec())])
.put_batch(
Table::BlockHeaders,
vec![(b"key1".to_vec(), b"value1".to_vec())],
)
.unwrap();
batch.commit().unwrap();
}
Expand All @@ -175,7 +179,7 @@ mod tests {
{
let backend = RocksDBBackend::open(dir.path()).unwrap();
let view = backend.begin_read().unwrap();
let value = view.get(Table::Blocks, b"key1").unwrap();
let value = view.get(Table::BlockHeaders, b"key1").unwrap();
assert_eq!(value, Some(b"value1".to_vec()));
}
}
Expand Down
Loading