diff --git a/Cargo.lock b/Cargo.lock index a9eb1a7..a22bcd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1960,6 +1960,7 @@ dependencies = [ "ethereum_ssz", "ethereum_ssz_derive", "ethlambda-blockchain", + "ethlambda-storage", "ethlambda-types", "ethrex-common", "ethrex-p2p", diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 4587651..c9da076 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -102,6 +102,7 @@ async fn main() { p2p_socket, blockchain, p2p_rx, + store.clone(), )); ethlambda_rpc::start_rpc_server(metrics_socket, store) diff --git a/crates/net/p2p/Cargo.toml b/crates/net/p2p/Cargo.toml index 488275f..29af2f7 100644 --- a/crates/net/p2p/Cargo.toml +++ b/crates/net/p2p/Cargo.toml @@ -11,6 +11,7 @@ version.workspace = true [dependencies] ethlambda-blockchain.workspace = true +ethlambda-storage.workspace = true ethlambda-types.workspace = true async-trait = "0.1" diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 25b9fc2..ecaab66 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -4,6 +4,8 @@ use std::{ }; use ethlambda_blockchain::{BlockChain, OutboundGossip}; +use ethlambda_storage::Store; +use ethlambda_types::state::Checkpoint; use ethrex_common::H264; use ethrex_p2p::types::NodeRecord; use ethrex_rlp::decode::RLPDecode; @@ -41,6 +43,7 @@ pub async fn start_p2p( listening_socket: SocketAddr, blockchain: BlockChain, p2p_rx: mpsc::UnboundedReceiver, + store: Store, ) { let config = libp2p::gossipsub::ConfigBuilder::default() // d @@ -142,7 +145,15 @@ pub async fn start_p2p( info!("P2P node started on {listening_socket}"); - event_loop(swarm, blockchain, p2p_rx, attestation_topic, block_topic).await; + event_loop( + swarm, + blockchain, + p2p_rx, + attestation_topic, + block_topic, + store, + ) + .await; } /// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining Gossipsub and Request-Response Behaviours @@ -160,6 +171,7 @@ async fn event_loop( mut p2p_rx: mpsc::UnboundedReceiver, attestation_topic: libp2p::gossipsub::IdentTopic, block_topic: libp2p::gossipsub::IdentTopic, + store: Store, ) { loop { tokio::select! { @@ -179,7 +191,7 @@ async fn event_loop( SwarmEvent::Behaviour(BehaviourEvent::ReqResp( message @ request_response::Event::Message { .. }, )) => { - handle_req_resp_message(&mut swarm, message).await; + handle_req_resp_message(&mut swarm, message, &store).await; } SwarmEvent::Behaviour(BehaviourEvent::Gossipsub( message @ libp2p::gossipsub::Event::Message { .. }, @@ -195,8 +207,13 @@ async fn event_loop( let direction = connection_direction(&endpoint); if num_established.get() == 1 { metrics::notify_peer_connected(&Some(peer_id), direction, "success"); + // Send status request on first connection to this peer + let our_status = build_status(&store); + info!(%peer_id, %direction, finalized_slot=%our_status.finalized.slot, head_slot=%our_status.head.slot, "Added connection to new peer, sending status request"); + swarm.behaviour_mut().req_resp.send_request(&peer_id, our_status); + } else { + info!(%peer_id, %direction, "Added peer connection"); } - info!(%peer_id, %direction, "Peer connected"); } SwarmEvent::ConnectionClosed { peer_id, @@ -296,6 +313,7 @@ async fn handle_outgoing_gossip( async fn handle_req_resp_message( swarm: &mut libp2p::Swarm, event: request_response::Event, + store: &Store, ) { let request_response::Event::Message { peer, @@ -312,13 +330,12 @@ async fn handle_req_resp_message( channel, } => { info!(finalized_slot=%request.finalized.slot, head_slot=%request.head.slot, "Received status request from peer {peer}"); - // TODO: send real status + let our_status = build_status(store); swarm .behaviour_mut() .req_resp - .send_response(channel, request.clone()) + .send_response(channel, our_status) .unwrap(); - swarm.behaviour_mut().req_resp.send_request(&peer, request); } request_response::Message::Response { request_id: _, @@ -377,6 +394,20 @@ fn connection_direction(endpoint: &libp2p::core::ConnectedPoint) -> &'static str } } +/// Build a Status message from the current Store state. +fn build_status(store: &Store) -> Status { + let finalized = store.latest_finalized(); + let head_root = store.head(); + let head_slot = store.get_block(&head_root).expect("head block exists").slot; + Status { + finalized, + head: Checkpoint { + root: head_root, + slot: head_slot, + }, + } +} + fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub::MessageId { const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0x00, 0x00, 0x00, 0x00]; const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [0x01, 0x00, 0x00, 0x00];