Skip to content

Commit

Permalink
feat(iroh-relay): send regular pings to check the connection
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Jan 9, 2025
1 parent 1ae820d commit 9d73ed8
Showing 1 changed file with 45 additions and 11 deletions.
56 changes: 45 additions & 11 deletions iroh-relay/src/server/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures_sink::Sink;
use futures_util::{SinkExt, Stream, StreamExt};
use iroh_base::NodeId;
use iroh_metrics::{inc, inc_by};
use rand::Rng;
use tokio::sync::mpsc::{self, error::TrySendError};
use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle};
use tracing::{debug, error, instrument, trace, warn, Instrument};
Expand Down Expand Up @@ -220,11 +221,23 @@ impl Actor {
}

async fn run_inner(&mut self, done: CancellationToken) -> Result<()> {
let jitter = Duration::from_secs(5);
let mut keep_alive = tokio::time::interval(KEEP_ALIVE + jitter);
// Add some jitter to ping pong interactions, to avoid all pings being sent at the same time
let next_interval = || {
let random_secs = rand::rngs::OsRng.gen_range(1..=5);
Duration::from_secs(random_secs) + KEEP_ALIVE
};

let mut keep_alive = tokio::time::interval(next_interval());
// ticks immediately
keep_alive.tick().await;

const PING_TIMEOUT: Duration = Duration::from_secs(10);
let mut ping_timeout = tokio::time::interval(PING_TIMEOUT);
// ticks immediately
ping_timeout.tick().await;
let mut last_ping_data = [0u8; 8];
let mut waiting_for_ping = false;

loop {
tokio::select! {
biased;
Expand All @@ -236,7 +249,22 @@ impl Actor {
break;
}
maybe_frame = self.stream.next() => {
self.handle_frame(maybe_frame).await.context("handle read")?;
trace!(?maybe_frame, "handle incoming frame");
let frame = match maybe_frame {
Some(frame) => frame?,
None => anyhow::bail!("stream terminated"),
};
match frame {
Frame::Pong { data } => {
if waiting_for_ping && data == last_ping_data {
// clear outstanding ping
waiting_for_ping = false;
} else {
warn!("unexpected pong data: {:?}", data);
}
},
_ => self.handle_frame(frame).await.context("handle read")?,
}
}
// First priority, disco packets
packet = self.disco_send_queue.recv() => {
Expand All @@ -254,11 +282,22 @@ impl Actor {
trace!("node_id gone: {:?}", node_id);
self.write_frame(Frame::NodeGone { node_id }).await?;
}
_ = ping_timeout.tick(), if waiting_for_ping => {
trace!("pong timed out");
break;
}
_ = keep_alive.tick() => {
trace!("keep alive");
self.write_frame(Frame::KeepAlive).await?;
trace!("keep alive ping");
// new interval
keep_alive.reset_after(next_interval());
rand::rngs::OsRng.fill(&mut last_ping_data);
self.write_frame(Frame::Ping { data: last_ping_data }).await?;

ping_timeout.reset();
waiting_for_ping = true;
}
}

self.stream.flush().await.context("tick flush")?;
}
Ok(())
Expand Down Expand Up @@ -315,12 +354,7 @@ impl Actor {
}

/// Handles frame read results.
async fn handle_frame(&mut self, maybe_frame: Option<Result<Frame>>) -> Result<()> {
trace!(?maybe_frame, "handle incoming frame");
let frame = match maybe_frame {
Some(frame) => frame?,
None => anyhow::bail!("stream terminated"),
};
async fn handle_frame(&mut self, frame: Frame) -> Result<()> {
match frame {
Frame::SendPacket { dst_key, packet } => {
let packet_len = packet.len();
Expand Down

0 comments on commit 9d73ed8

Please sign in to comment.