Skip to content

Commit

Permalink
feat(iroh): Ping the relay server regularly
Browse files Browse the repository at this point in the history
TCP does not guarantee that the connection is healty, so introduce
protocol-level pinging to force us to reconnect if needed.
  • Loading branch information
flub authored and dignifiedquire committed Jan 9, 2025
1 parent 9d73ed8 commit 9d1917b
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions iroh/src/magicsock/relay_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ const MAX_PAYLOAD_SIZE: usize = MAX_PACKET_SIZE - PublicKey::LENGTH;
/// Maximum time for a relay server to respond to a relay protocol ping.
const PING_TIMEOUT: Duration = Duration::from_secs(5);

/// Interval in which we ping the relay server to ensure the connection is alive.
///
/// The default QUIC max_idle_timeout is 30s, so setting that to half this time gives some
/// chance of recovering.
const PING_INTERVAL: Duration = Duration::from_secs(15);

/// Number of datagrams which can be sent to the relay server in one batch.
///
/// This means while this batch is sending to the server no other relay protocol frames can
Expand Down Expand Up @@ -441,8 +447,15 @@ impl ActiveRelayActor {
#[cfg(test)]
test_pong: None,
};

// A buffer to pass through multiple datagrams at once as an optimisation.
let mut send_datagrams_buf = Vec::with_capacity(SEND_DATAGRAM_BATCH_SIZE);

// Regularly send pings so we know the connection is healthy.
let mut ping_interval = tokio::time::interval(PING_INTERVAL);
ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
ping_interval.reset(); // skip the ping at current time.

let res = loop {
if let Some(data) = state.pong_pending.take() {
let fut = client_sink.send(SendMessage::Pong(data));
Expand Down Expand Up @@ -470,6 +483,11 @@ impl ActiveRelayActor {
_ = state.ping_tracker.timeout() => {
break Err(anyhow!("Ping timeout"));
}
_ = ping_interval.tick() => {
let data = state.ping_tracker.new_ping();
let fut = client_sink.send(SendMessage::Ping(data));
self.run_sending(fut, &mut state, &mut client_stream).await?;
}
msg = self.inbox.recv() => {
let Some(msg) = msg else {
warn!("Inbox closed, shutdown.");
Expand Down Expand Up @@ -623,6 +641,8 @@ impl ActiveRelayActor {
state: &mut ConnectedRelayState,
client_stream: &mut iroh_relay::client::ClientStream,
) -> Result<()> {
// TODO
// let mut timeout = pin!(tokio::time::sleep())
let mut sending_fut = pin!(sending_fut);
loop {
tokio::select! {
Expand Down

0 comments on commit 9d1917b

Please sign in to comment.