Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async fn main() {
p2p_socket,
blockchain,
p2p_rx,
store.clone(),
));

ethlambda_rpc::start_rpc_server(metrics_socket, store)
Expand Down
1 change: 1 addition & 0 deletions crates/net/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ version.workspace = true

[dependencies]
ethlambda-blockchain.workspace = true
ethlambda-storage.workspace = true
ethlambda-types.workspace = true

async-trait = "0.1"
Expand Down
43 changes: 37 additions & 6 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::{
};

use ethlambda_blockchain::{BlockChain, OutboundGossip};
use ethlambda_storage::Store;
use ethlambda_types::state::Checkpoint;
use ethrex_common::H264;
use ethrex_p2p::types::NodeRecord;
use ethrex_rlp::decode::RLPDecode;
Expand Down Expand Up @@ -41,6 +43,7 @@ pub async fn start_p2p(
listening_socket: SocketAddr,
blockchain: BlockChain,
p2p_rx: mpsc::UnboundedReceiver<OutboundGossip>,
store: Store,
) {
let config = libp2p::gossipsub::ConfigBuilder::default()
// d
Expand Down Expand Up @@ -142,7 +145,15 @@ pub async fn start_p2p(

info!("P2P node started on {listening_socket}");

event_loop(swarm, blockchain, p2p_rx, attestation_topic, block_topic).await;
event_loop(
swarm,
blockchain,
p2p_rx,
attestation_topic,
block_topic,
store,
)
.await;
}

/// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining Gossipsub and Request-Response Behaviours
Expand All @@ -160,6 +171,7 @@ async fn event_loop(
mut p2p_rx: mpsc::UnboundedReceiver<OutboundGossip>,
attestation_topic: libp2p::gossipsub::IdentTopic,
block_topic: libp2p::gossipsub::IdentTopic,
store: Store,
) {
loop {
tokio::select! {
Expand All @@ -179,7 +191,7 @@ async fn event_loop(
SwarmEvent::Behaviour(BehaviourEvent::ReqResp(
message @ request_response::Event::Message { .. },
)) => {
handle_req_resp_message(&mut swarm, message).await;
handle_req_resp_message(&mut swarm, message, &store).await;
}
SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(
message @ libp2p::gossipsub::Event::Message { .. },
Expand All @@ -195,8 +207,13 @@ async fn event_loop(
let direction = connection_direction(&endpoint);
if num_established.get() == 1 {
metrics::notify_peer_connected(&Some(peer_id), direction, "success");
// Send status request on first connection to this peer
let our_status = build_status(&store);
info!(%peer_id, %direction, finalized_slot=%our_status.finalized.slot, head_slot=%our_status.head.slot, "Added connection to new peer, sending status request");
swarm.behaviour_mut().req_resp.send_request(&peer_id, our_status);
} else {
info!(%peer_id, %direction, "Added peer connection");
}
info!(%peer_id, %direction, "Peer connected");
}
SwarmEvent::ConnectionClosed {
peer_id,
Expand Down Expand Up @@ -296,6 +313,7 @@ async fn handle_outgoing_gossip(
async fn handle_req_resp_message(
swarm: &mut libp2p::Swarm<Behaviour>,
event: request_response::Event<Status, Status>,
store: &Store,
) {
let request_response::Event::Message {
peer,
Expand All @@ -312,13 +330,12 @@ async fn handle_req_resp_message(
channel,
} => {
info!(finalized_slot=%request.finalized.slot, head_slot=%request.head.slot, "Received status request from peer {peer}");
// TODO: send real status
let our_status = build_status(store);
swarm
.behaviour_mut()
.req_resp
.send_response(channel, request.clone())
.send_response(channel, our_status)
.unwrap();
swarm.behaviour_mut().req_resp.send_request(&peer, request);
}
request_response::Message::Response {
request_id: _,
Expand Down Expand Up @@ -377,6 +394,20 @@ fn connection_direction(endpoint: &libp2p::core::ConnectedPoint) -> &'static str
}
}

/// Build a Status message from the current Store state.
fn build_status(store: &Store) -> Status {
let finalized = store.latest_finalized();
let head_root = store.head();
let head_slot = store.get_block(&head_root).expect("head block exists").slot;
Status {
finalized,
head: Checkpoint {
root: head_root,
slot: head_slot,
},
}
}

fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub::MessageId {
const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0x00, 0x00, 0x00, 0x00];
const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [0x01, 0x00, 0x00, 0x00];
Expand Down