From 32dc1ca62025e417ff020bb56bd065f40dbce22f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 29 Jan 2026 18:04:28 -0300 Subject: [PATCH 1/5] feat: dial again to disconnected peers --- crates/net/p2p/src/lib.rs | 69 +++++++++++++++++++++++-- crates/net/p2p/src/req_resp/handlers.rs | 7 ++- 2 files changed, 69 insertions(+), 7 deletions(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 06f77ad..d4276d5 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -41,6 +41,12 @@ pub use metrics::populate_name_registry; const MAX_FETCH_RETRIES: u32 = 5; const INITIAL_BACKOFF_MS: u64 = 10; const BACKOFF_MULTIPLIER: u64 = 4; +const PEER_REDIAL_INTERVAL_SECS: u64 = 30; + +enum RetryMessage { + BlockFetch(H256), + PeerRedial(PeerId), +} pub(crate) struct PendingRequest { pub(crate) attempts: u32, @@ -122,6 +128,7 @@ pub async fn start_p2p( }) .build(); let local_peer_id = *swarm.local_peer_id(); + let mut bootnode_addrs = HashMap::new(); for bootnode in bootnodes { let peer_id = PeerId::from_public_key(&bootnode.public_key); if peer_id == local_peer_id { @@ -133,6 +140,7 @@ pub async fn start_p2p( .with(Protocol::QuicV1) .with_p2p(peer_id) .expect("failed to add peer ID to multiaddr"); + bootnode_addrs.insert(peer_id, addr.clone()); swarm.dial(addr).unwrap(); } let addr = Multiaddr::empty() @@ -159,7 +167,7 @@ pub async fn start_p2p( "/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy" )); - info!("P2P node started on {listening_socket}"); + info!(socket=%listening_socket, "P2P node started"); let (retry_tx, retry_rx) = mpsc::unbounded_channel(); @@ -173,6 +181,7 @@ pub async fn start_p2p( connected_peers: HashSet::new(), pending_requests: HashMap::new(), request_id_map: HashMap::new(), + bootnode_addrs, retry_tx, retry_rx, }; @@ -197,8 +206,11 @@ pub(crate) struct P2PServer { pub(crate) connected_peers: HashSet, pub(crate) pending_requests: HashMap, pub(crate) request_id_map: HashMap, - retry_tx: mpsc::UnboundedSender, - retry_rx: mpsc::UnboundedReceiver, + /// Bootnode addresses for redialing when disconnected + bootnode_addrs: HashMap, + /// Channel for scheduling retries (block fetches and peer redials) + pub(crate) retry_tx: mpsc::UnboundedSender, + retry_rx: mpsc::UnboundedReceiver, } /// Event loop for the P2P crate. @@ -220,8 +232,11 @@ async fn event_loop(mut server: P2PServer) { }; handle_swarm_event(&mut server, event).await; } - Some(root) = server.retry_rx.recv() => { - handle_retry(&mut server, root).await; + Some(msg) = server.retry_rx.recv() => { + match msg { + RetryMessage::BlockFetch(root) => handle_retry(&mut server, root).await, + RetryMessage::PeerRedial(peer_id) => handle_peer_redial(&mut server, peer_id).await, + } } } } @@ -291,6 +306,16 @@ async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent { metrics::notify_peer_connected(&peer_id, "inbound", "error"); @@ -350,6 +388,27 @@ async fn handle_retry(server: &mut P2PServer, root: H256) { } } +async fn handle_peer_redial(server: &mut P2PServer, peer_id: PeerId) { + // Skip if already reconnected + if server.connected_peers.contains(&peer_id) { + trace!(%peer_id, "Bootnode reconnected during redial delay, skipping"); + return; + } + + if let Some(addr) = server.bootnode_addrs.get(&peer_id) { + info!(%peer_id, "Redialing disconnected bootnode"); + if let Err(e) = server.swarm.dial(addr.clone()) { + warn!(%peer_id, %e, "Failed to redial bootnode, will retry"); + // Schedule another redial attempt + let retry_tx = server.retry_tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(PEER_REDIAL_INTERVAL_SECS)).await; + let _ = retry_tx.send(RetryMessage::PeerRedial(peer_id)); + }); + } + } +} + pub struct Bootnode { ip: IpAddr, quic_port: u16, diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index d8e670c..026ae9d 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -11,7 +11,10 @@ use super::{ BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponsePayload, ResponseResult, Status, }; -use crate::{BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest}; +use crate::{ + BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest, + RetryMessage, +}; pub async fn handle_req_resp_message( server: &mut P2PServer, @@ -226,6 +229,6 @@ async fn handle_fetch_failure( let retry_tx = server.retry_tx.clone(); tokio::spawn(async move { tokio::time::sleep(backoff).await; - let _ = retry_tx.send(root); + let _ = retry_tx.send(RetryMessage::BlockFetch(root)); }); } From 8af9634fcadebacd389ed4d62b725b7b9953ff26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 29 Jan 2026 18:21:11 -0300 Subject: [PATCH 2/5] chore: reduce time between redials to 12s --- crates/net/p2p/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index d4276d5..84a524a 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -41,7 +41,7 @@ pub use metrics::populate_name_registry; const MAX_FETCH_RETRIES: u32 = 5; const INITIAL_BACKOFF_MS: u64 = 10; const BACKOFF_MULTIPLIER: u64 = 4; -const PEER_REDIAL_INTERVAL_SECS: u64 = 30; +const PEER_REDIAL_INTERVAL_SECS: u64 = 12; enum RetryMessage { BlockFetch(H256), From ec50c504f8dc88728ff882470255e2b66afcb1ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 29 Jan 2026 18:25:41 -0300 Subject: [PATCH 3/5] docs: add comment --- crates/net/p2p/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 84a524a..7e6e9ee 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -397,6 +397,8 @@ async fn handle_peer_redial(server: &mut P2PServer, peer_id: PeerId) { if let Some(addr) = server.bootnode_addrs.get(&peer_id) { info!(%peer_id, "Redialing disconnected bootnode"); + // NOTE: this dial does some checks and adds a pending outbound connection attempt. + // It does NOT block. If the dial fails, we'll later get an OutgoingConnectionError event. if let Err(e) = server.swarm.dial(addr.clone()) { warn!(%peer_id, %e, "Failed to redial bootnode, will retry"); // Schedule another redial attempt From ce230839c2b31cc4c2b3ac00ceff147a876b147c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 29 Jan 2026 18:52:10 -0300 Subject: [PATCH 4/5] refactor: move repeated code to a helper --- crates/net/p2p/src/lib.rs | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 7e6e9ee..e21aa42 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -309,11 +309,7 @@ async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent, peer_id: PeerId) { + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(PEER_REDIAL_INTERVAL_SECS)).await; + let _ = retry_tx.send(RetryMessage::PeerRedial(peer_id)); + }); +} + pub struct Bootnode { ip: IpAddr, quic_port: u16, From c0e0d371d9978b9577eba466f690acfea9a83ca1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 29 Jan 2026 19:36:24 -0300 Subject: [PATCH 5/5] chore: fmt --- crates/net/p2p/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 7a53338..20d64a3 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -317,7 +317,7 @@ async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent