Skip to content

Commit

Permalink
feat(engine): emit events with executed blocks (#14341)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk authored Feb 9, 2025
1 parent 46d3b6a commit a215256
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 40 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/engine/primitives/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ reth-payload-primitives.workspace = true
reth-payload-builder-primitives.workspace = true
reth-primitives.workspace = true
reth-primitives-traits.workspace = true
reth-chain-state.workspace = true
reth-trie.workspace = true
reth-errors.workspace = true

Expand Down
17 changes: 11 additions & 6 deletions crates/engine/primitives/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
//! Events emitted by the beacon consensus engine.
use crate::ForkchoiceStatus;
use alloc::{boxed::Box, sync::Arc};
use alloc::boxed::Box;
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use alloy_rpc_types_engine::ForkchoiceState;
use core::{
fmt::{Display, Formatter, Result},
time::Duration,
};
use reth_primitives::{EthPrimitives, SealedBlock};
use reth_chain_state::ExecutedBlockWithTrieUpdates;
use reth_primitives::EthPrimitives;
use reth_primitives_traits::{NodePrimitives, SealedHeader};

/// Events emitted by the consensus engine.
Expand All @@ -18,9 +19,9 @@ pub enum BeaconConsensusEngineEvent<N: NodePrimitives = EthPrimitives> {
/// The fork choice state was updated, and the current fork choice status
ForkchoiceUpdated(ForkchoiceState, ForkchoiceStatus),
/// A block was added to the fork chain.
ForkBlockAdded(Arc<SealedBlock<N::Block>>, Duration),
ForkBlockAdded(ExecutedBlockWithTrieUpdates<N>, Duration),
/// A block was added to the canonical chain, and the elapsed time validating the block
CanonicalBlockAdded(Arc<SealedBlock<N::Block>>, Duration),
CanonicalBlockAdded(ExecutedBlockWithTrieUpdates<N>, Duration),
/// A canonical chain was committed, and the elapsed time committing the data
CanonicalChainCommitted(Box<SealedHeader<N::BlockHeader>>, Duration),
/// The consensus engine is involved in live sync, and has specific progress
Expand Down Expand Up @@ -48,10 +49,14 @@ where
write!(f, "ForkchoiceUpdated({state:?}, {status:?})")
}
Self::ForkBlockAdded(block, duration) => {
write!(f, "ForkBlockAdded({:?}, {duration:?})", block.num_hash())
write!(f, "ForkBlockAdded({:?}, {duration:?})", block.recovered_block.num_hash())
}
Self::CanonicalBlockAdded(block, duration) => {
write!(f, "CanonicalBlockAdded({:?}, {duration:?})", block.num_hash())
write!(
f,
"CanonicalBlockAdded({:?}, {duration:?})",
block.recovered_block.num_hash()
)
}
Self::CanonicalChainCommitted(block, duration) => {
write!(f, "CanonicalChainCommitted({:?}, {duration:?})", block.num_hash())
Expand Down
26 changes: 12 additions & 14 deletions crates/engine/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,21 @@

extern crate alloc;

use alloy_consensus::BlockHeader;
use alloy_eips::{eip7685::Requests, Decodable2718};
use alloy_primitives::B256;
use reth_payload_primitives::{BuiltPayload, PayloadAttributes};
mod error;

use alloy_rpc_types_engine::{ExecutionPayloadSidecar, PayloadError};
use core::fmt::{self, Debug};
use reth_payload_primitives::{
validate_execution_requests, BuiltPayload, EngineApiMessageVersion,
EngineObjectValidationError, InvalidPayloadAttributesError, PayloadAttributes,
PayloadOrAttributes, PayloadTypes,
};
use reth_primitives::{NodePrimitives, SealedBlock};
use reth_primitives_traits::Block;
use serde::{de::DeserializeOwned, Deserialize, Serialize};

use alloy_consensus::BlockHeader;
use alloy_rpc_types_engine::{ExecutionPayloadSidecar, PayloadError};
mod error;
pub use error::*;

mod forkchoice;
Expand All @@ -33,15 +40,6 @@ pub use event::*;
mod invalid_block_hook;
pub use invalid_block_hook::InvalidBlockHook;

use alloy_eips::{eip7685::Requests, Decodable2718};
use reth_payload_primitives::{
validate_execution_requests, EngineApiMessageVersion, EngineObjectValidationError,
InvalidPayloadAttributesError, PayloadOrAttributes, PayloadTypes,
};
use reth_primitives::{NodePrimitives, SealedBlock};
use reth_primitives_traits::Block;
use serde::{de::DeserializeOwned, Deserialize, Serialize};

/// Struct aggregating [`alloy_rpc_types_engine::ExecutionPayload`] and [`ExecutionPayloadSidecar`]
/// and encapsulating complete payload supplied for execution.
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
29 changes: 11 additions & 18 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1392,14 +1392,10 @@ where
self.canonical_in_memory_state.set_pending_block(block.clone());
}

let sealed_block = Arc::new(block.sealed_block().clone());
self.state.tree_state.insert_executed(block);
self.state.tree_state.insert_executed(block.clone());
self.metrics.engine.inserted_already_executed_blocks.increment(1);
self.emit_event(EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalBlockAdded(
sealed_block,
now.elapsed(),
),
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
));
}
EngineApiRequest::Beacon(request) => {
Expand Down Expand Up @@ -2409,8 +2405,6 @@ where
return Err(e.into())
}

let sealed_block = Arc::new(block.clone_sealed_block());

// We only run the parallel state root if we are currently persisting blocks that are all
// ancestors of the one we are executing. If we're committing ancestor blocks, then: any
// trie updates being committed are a subset of the in-memory trie updates collected before
Expand Down Expand Up @@ -2558,7 +2552,7 @@ where
self.handle_state_root_result(
state_root_handle,
state_root_config,
sealed_block.as_ref(),
block.sealed_block(),
&hashed_state,
&state_provider,
root_time,
Expand Down Expand Up @@ -2617,8 +2611,7 @@ where
// prewarm tasks are still running at this point however
drop(prewarm_task_lock.write().unwrap());
// apply state updates to cache and save it (if saving was successful)
self.most_recent_cache =
state_provider.save_cache(sealed_block.hash(), &output.state).ok();
self.most_recent_cache = state_provider.save_cache(block.hash(), &output.state).ok();
let elapsed = save_cache_start.elapsed();

// record how long it took to save caches
Expand All @@ -2641,15 +2634,15 @@ where
self.canonical_in_memory_state.set_pending_block(executed.clone());
}

self.state.tree_state.insert_executed(executed);
self.state.tree_state.insert_executed(executed.clone());
self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);

// emit insert event
let elapsed = start.elapsed();
let engine_event = if self.is_fork(block_num_hash.hash)? {
BeaconConsensusEngineEvent::ForkBlockAdded(sealed_block, elapsed)
BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
} else {
BeaconConsensusEngineEvent::CanonicalBlockAdded(sealed_block, elapsed)
BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
};
self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));

Expand Down Expand Up @@ -3540,9 +3533,9 @@ mod tests {
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _),
BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, _),
) => {
assert_eq!(block.hash(), expected_hash);
assert_eq!(executed.recovered_block.hash(), expected_hash);
}
_ => panic!("Unexpected event: {:#?}", event),
}
Expand All @@ -3552,10 +3545,10 @@ mod tests {
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkBlockAdded(
block,
executed,
_,
)) => {
assert_eq!(block.hash(), expected_hash);
assert_eq!(executed.recovered_block.hash(), expected_hash);
}
_ => panic!("Unexpected event: {:#?}", event),
}
Expand Down
6 changes: 4 additions & 2 deletions crates/node/events/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ impl NodeState {
}
}
}
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed) => {
BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed) => {
let block = executed.sealed_block();
info!(
number=block.number(),
hash=?block.hash(),
Expand All @@ -272,7 +273,8 @@ impl NodeState {

info!(number=head.number(), hash=?head.hash(), ?elapsed, "Canonical chain committed");
}
BeaconConsensusEngineEvent::ForkBlockAdded(block, elapsed) => {
BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed) => {
let block = executed.sealed_block();
info!(number=block.number(), hash=?block.hash(), ?elapsed, "Block added to fork chain");
}
}
Expand Down

0 comments on commit a215256

Please sign in to comment.