diff --git a/iroh-relay/src/server/client.rs b/iroh-relay/src/server/client.rs index d4361dd8bd..90fccc469f 100644 --- a/iroh-relay/src/server/client.rs +++ b/iroh-relay/src/server/client.rs @@ -9,6 +9,7 @@ use futures_sink::Sink; use futures_util::{SinkExt, Stream, StreamExt}; use iroh_base::NodeId; use iroh_metrics::{inc, inc_by}; +use rand::Rng; use tokio::sync::mpsc::{self, error::TrySendError}; use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; use tracing::{debug, error, instrument, trace, warn, Instrument}; @@ -220,11 +221,23 @@ impl Actor { } async fn run_inner(&mut self, done: CancellationToken) -> Result<()> { - let jitter = Duration::from_secs(5); - let mut keep_alive = tokio::time::interval(KEEP_ALIVE + jitter); + // Add some jitter to ping pong interactions, to avoid all pings being sent at the same time + let next_interval = || { + let random_secs = rand::rngs::OsRng.gen_range(1..=5); + Duration::from_secs(random_secs) + KEEP_ALIVE + }; + + let mut keep_alive = tokio::time::interval(next_interval()); // ticks immediately keep_alive.tick().await; + const PING_TIMEOUT: Duration = Duration::from_secs(10); + let mut ping_timeout = tokio::time::interval(PING_TIMEOUT); + // ticks immediately + ping_timeout.tick().await; + let mut last_ping_data = [0u8; 8]; + let mut waiting_for_ping = false; + loop { tokio::select! { biased; @@ -236,7 +249,22 @@ impl Actor { break; } maybe_frame = self.stream.next() => { - self.handle_frame(maybe_frame).await.context("handle read")?; + trace!(?maybe_frame, "handle incoming frame"); + let frame = match maybe_frame { + Some(frame) => frame?, + None => anyhow::bail!("stream terminated"), + }; + match frame { + Frame::Pong { data } => { + if waiting_for_ping && data == last_ping_data { + // clear outstanding ping + waiting_for_ping = false; + } else { + warn!("unexpected pong data: {:?}", data); + } + }, + _ => self.handle_frame(frame).await.context("handle read")?, + } } // First priority, disco packets packet = self.disco_send_queue.recv() => { @@ -254,11 +282,22 @@ impl Actor { trace!("node_id gone: {:?}", node_id); self.write_frame(Frame::NodeGone { node_id }).await?; } + _ = ping_timeout.tick(), if waiting_for_ping => { + trace!("pong timed out"); + break; + } _ = keep_alive.tick() => { - trace!("keep alive"); - self.write_frame(Frame::KeepAlive).await?; + trace!("keep alive ping"); + // new interval + keep_alive.reset_after(next_interval()); + rand::rngs::OsRng.fill(&mut last_ping_data); + self.write_frame(Frame::Ping { data: last_ping_data }).await?; + + ping_timeout.reset(); + waiting_for_ping = true; } } + self.stream.flush().await.context("tick flush")?; } Ok(()) @@ -315,12 +354,7 @@ impl Actor { } /// Handles frame read results. - async fn handle_frame(&mut self, maybe_frame: Option>) -> Result<()> { - trace!(?maybe_frame, "handle incoming frame"); - let frame = match maybe_frame { - Some(frame) => frame?, - None => anyhow::bail!("stream terminated"), - }; + async fn handle_frame(&mut self, frame: Frame) -> Result<()> { match frame { Frame::SendPacket { dst_key, packet } => { let packet_len = packet.len();