Skip to content

Commit 32dc1ca

Browse files
committed
feat: dial again to disconnected peers
1 parent 5adc4ef commit 32dc1ca

File tree

2 files changed

+69
-7
lines changed

2 files changed

+69
-7
lines changed

crates/net/p2p/src/lib.rs

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ pub use metrics::populate_name_registry;
4141
const MAX_FETCH_RETRIES: u32 = 5;
4242
const INITIAL_BACKOFF_MS: u64 = 10;
4343
const BACKOFF_MULTIPLIER: u64 = 4;
44+
const PEER_REDIAL_INTERVAL_SECS: u64 = 30;
45+
46+
enum RetryMessage {
47+
BlockFetch(H256),
48+
PeerRedial(PeerId),
49+
}
4450

4551
pub(crate) struct PendingRequest {
4652
pub(crate) attempts: u32,
@@ -122,6 +128,7 @@ pub async fn start_p2p(
122128
})
123129
.build();
124130
let local_peer_id = *swarm.local_peer_id();
131+
let mut bootnode_addrs = HashMap::new();
125132
for bootnode in bootnodes {
126133
let peer_id = PeerId::from_public_key(&bootnode.public_key);
127134
if peer_id == local_peer_id {
@@ -133,6 +140,7 @@ pub async fn start_p2p(
133140
.with(Protocol::QuicV1)
134141
.with_p2p(peer_id)
135142
.expect("failed to add peer ID to multiaddr");
143+
bootnode_addrs.insert(peer_id, addr.clone());
136144
swarm.dial(addr).unwrap();
137145
}
138146
let addr = Multiaddr::empty()
@@ -159,7 +167,7 @@ pub async fn start_p2p(
159167
"/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy"
160168
));
161169

162-
info!("P2P node started on {listening_socket}");
170+
info!(socket=%listening_socket, "P2P node started");
163171

164172
let (retry_tx, retry_rx) = mpsc::unbounded_channel();
165173

@@ -173,6 +181,7 @@ pub async fn start_p2p(
173181
connected_peers: HashSet::new(),
174182
pending_requests: HashMap::new(),
175183
request_id_map: HashMap::new(),
184+
bootnode_addrs,
176185
retry_tx,
177186
retry_rx,
178187
};
@@ -197,8 +206,11 @@ pub(crate) struct P2PServer {
197206
pub(crate) connected_peers: HashSet<PeerId>,
198207
pub(crate) pending_requests: HashMap<ethlambda_types::primitives::H256, PendingRequest>,
199208
pub(crate) request_id_map: HashMap<OutboundRequestId, ethlambda_types::primitives::H256>,
200-
retry_tx: mpsc::UnboundedSender<ethlambda_types::primitives::H256>,
201-
retry_rx: mpsc::UnboundedReceiver<ethlambda_types::primitives::H256>,
209+
/// Bootnode addresses for redialing when disconnected
210+
bootnode_addrs: HashMap<PeerId, Multiaddr>,
211+
/// Channel for scheduling retries (block fetches and peer redials)
212+
pub(crate) retry_tx: mpsc::UnboundedSender<RetryMessage>,
213+
retry_rx: mpsc::UnboundedReceiver<RetryMessage>,
202214
}
203215

204216
/// Event loop for the P2P crate.
@@ -220,8 +232,11 @@ async fn event_loop(mut server: P2PServer) {
220232
};
221233
handle_swarm_event(&mut server, event).await;
222234
}
223-
Some(root) = server.retry_rx.recv() => {
224-
handle_retry(&mut server, root).await;
235+
Some(msg) = server.retry_rx.recv() => {
236+
match msg {
237+
RetryMessage::BlockFetch(root) => handle_retry(&mut server, root).await,
238+
RetryMessage::PeerRedial(peer_id) => handle_peer_redial(&mut server, peer_id).await,
239+
}
225240
}
226241
}
227242
}
@@ -291,6 +306,16 @@ async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent<BehaviourE
291306
if num_established == 0 {
292307
server.connected_peers.remove(&peer_id);
293308
metrics::notify_peer_disconnected(&Some(peer_id), direction, reason);
309+
310+
// Schedule redial if this is a bootnode
311+
if server.bootnode_addrs.contains_key(&peer_id) {
312+
let retry_tx = server.retry_tx.clone();
313+
tokio::spawn(async move {
314+
tokio::time::sleep(Duration::from_secs(PEER_REDIAL_INTERVAL_SECS)).await;
315+
let _ = retry_tx.send(RetryMessage::PeerRedial(peer_id));
316+
});
317+
info!(%peer_id, "Scheduled bootnode redial in {}s", PEER_REDIAL_INTERVAL_SECS);
318+
}
294319
}
295320
info!(%peer_id, %direction, %reason, "Peer disconnected");
296321
}
@@ -302,6 +327,19 @@ async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent<BehaviourE
302327
};
303328
metrics::notify_peer_connected(&peer_id, "outbound", result);
304329
warn!(?peer_id, %error, "Outgoing connection error");
330+
331+
// Schedule redial if this was a bootnode
332+
if let Some(pid) = peer_id
333+
&& server.bootnode_addrs.contains_key(&pid)
334+
&& !server.connected_peers.contains(&pid)
335+
{
336+
let retry_tx = server.retry_tx.clone();
337+
tokio::spawn(async move {
338+
tokio::time::sleep(Duration::from_secs(PEER_REDIAL_INTERVAL_SECS)).await;
339+
let _ = retry_tx.send(RetryMessage::PeerRedial(pid));
340+
});
341+
info!(%pid, "Scheduled bootnode redial after connection error");
342+
}
305343
}
306344
SwarmEvent::IncomingConnectionError { peer_id, error, .. } => {
307345
metrics::notify_peer_connected(&peer_id, "inbound", "error");
@@ -350,6 +388,27 @@ async fn handle_retry(server: &mut P2PServer, root: H256) {
350388
}
351389
}
352390

391+
async fn handle_peer_redial(server: &mut P2PServer, peer_id: PeerId) {
392+
// Skip if already reconnected
393+
if server.connected_peers.contains(&peer_id) {
394+
trace!(%peer_id, "Bootnode reconnected during redial delay, skipping");
395+
return;
396+
}
397+
398+
if let Some(addr) = server.bootnode_addrs.get(&peer_id) {
399+
info!(%peer_id, "Redialing disconnected bootnode");
400+
if let Err(e) = server.swarm.dial(addr.clone()) {
401+
warn!(%peer_id, %e, "Failed to redial bootnode, will retry");
402+
// Schedule another redial attempt
403+
let retry_tx = server.retry_tx.clone();
404+
tokio::spawn(async move {
405+
tokio::time::sleep(Duration::from_secs(PEER_REDIAL_INTERVAL_SECS)).await;
406+
let _ = retry_tx.send(RetryMessage::PeerRedial(peer_id));
407+
});
408+
}
409+
}
410+
}
411+
353412
pub struct Bootnode {
354413
ip: IpAddr,
355414
quic_port: u16,

crates/net/p2p/src/req_resp/handlers.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use super::{
1111
BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponsePayload,
1212
ResponseResult, Status,
1313
};
14-
use crate::{BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest};
14+
use crate::{
15+
BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest,
16+
RetryMessage,
17+
};
1518

1619
pub async fn handle_req_resp_message(
1720
server: &mut P2PServer,
@@ -226,6 +229,6 @@ async fn handle_fetch_failure(
226229
let retry_tx = server.retry_tx.clone();
227230
tokio::spawn(async move {
228231
tokio::time::sleep(backoff).await;
229-
let _ = retry_tx.send(root);
232+
let _ = retry_tx.send(RetryMessage::BlockFetch(root));
230233
});
231234
}

0 commit comments

Comments
 (0)