Skip to content

Commit

Permalink
extract pingtracker and use it on the relay server
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Jan 9, 2025
1 parent 9d1917b commit 8cf7dc5
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 91 deletions.
7 changes: 6 additions & 1 deletion iroh-relay/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub mod quic;
#[cfg(feature = "server")]
pub mod server;

mod ping_tracker;

mod key_cache;
mod relay_map;
pub(crate) use key_cache::KeyCache;
Expand All @@ -47,4 +49,7 @@ mod dns;

pub use protos::relay::MAX_PACKET_SIZE;

pub use self::relay_map::{RelayMap, RelayNode, RelayQuicConfig};
pub use self::{
ping_tracker::PingTracker,
relay_map::{RelayMap, RelayNode, RelayQuicConfig},
};
64 changes: 64 additions & 0 deletions iroh-relay/src/ping_tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::time::{Duration, Instant};

use tracing::debug;

/// Tracks pings on a single relay connection.
///
/// Only the last ping needs is useful, any previously sent ping is forgotten and ignored.
#[derive(Debug)]
pub struct PingTracker {
inner: Option<PingInner>,
default_timeout: Duration,
}

#[derive(Debug)]
struct PingInner {
data: [u8; 8],
deadline: Instant,
}

impl PingTracker {
/// Creates a new ping tracker, setting the ping timeout for pings.
pub fn new(default_timeout: Duration) -> Self {
Self {
inner: None,
default_timeout,
}
}

/// Starts a new ping.
pub fn new_ping(&mut self) -> [u8; 8] {
let ping_data = rand::random();
debug!(data = ?ping_data, "Sending ping to relay server.");
self.inner = Some(PingInner {
data: ping_data,
deadline: Instant::now() + self.default_timeout,
});
ping_data
}

/// Updates the ping tracker with a received pong.
///
/// Only the pong of the most recent ping will do anything. There is no harm feeding
/// any pong however.
pub fn pong_received(&mut self, data: [u8; 8]) {
if self.inner.as_ref().map(|inner| inner.data) == Some(data) {
debug!(?data, "Pong received from relay server");
self.inner = None;
}
}

/// Cancel-safe waiting for a ping timeout.
///
/// Unless the most recent sent ping times out, this will never return.
pub async fn timeout(&mut self) {
match self.inner {
Some(PingInner { deadline, data }) => {
tokio::time::sleep_until(deadline.into()).await;
debug!(?data, "Ping timeout.");
self.inner = None;
}
None => std::future::pending().await,
}
}
}
48 changes: 18 additions & 30 deletions iroh-relay/src/server/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
relay::{write_frame, Frame, KEEP_ALIVE},
},
server::{clients::Clients, metrics::Metrics, streams::RelayedStream, ClientRateLimit},
PingTracker,
};

/// A request to write a dataframe to a Client
Expand Down Expand Up @@ -103,6 +104,7 @@ impl Client {
node_id,
connection_id,
clients: clients.clone(),
ping_tracker: PingTracker::new(Duration::from_secs(5)),
};

// start io loop
Expand Down Expand Up @@ -202,6 +204,7 @@ struct Actor {
connection_id: u64,
/// Reference to the other connected clients.
clients: Clients,
ping_tracker: PingTracker,
}

impl Actor {
Expand Down Expand Up @@ -231,13 +234,6 @@ impl Actor {
// ticks immediately
keep_alive.tick().await;

const PING_TIMEOUT: Duration = Duration::from_secs(10);
let mut ping_timeout = tokio::time::interval(PING_TIMEOUT);
// ticks immediately
ping_timeout.tick().await;
let mut last_ping_data = [0u8; 8];
let mut waiting_for_ping = false;

loop {
tokio::select! {
biased;
Expand All @@ -249,22 +245,7 @@ impl Actor {
break;
}
maybe_frame = self.stream.next() => {
trace!(?maybe_frame, "handle incoming frame");
let frame = match maybe_frame {
Some(frame) => frame?,
None => anyhow::bail!("stream terminated"),
};
match frame {
Frame::Pong { data } => {
if waiting_for_ping && data == last_ping_data {
// clear outstanding ping
waiting_for_ping = false;
} else {
warn!("unexpected pong data: {:?}", data);
}
},
_ => self.handle_frame(frame).await.context("handle read")?,
}
self.handle_frame(maybe_frame).await.context("handle read")?;
}
// First priority, disco packets
packet = self.disco_send_queue.recv() => {
Expand All @@ -282,19 +263,16 @@ impl Actor {
trace!("node_id gone: {:?}", node_id);
self.write_frame(Frame::NodeGone { node_id }).await?;
}
_ = ping_timeout.tick(), if waiting_for_ping => {
_ = self.ping_tracker.timeout() => {
trace!("pong timed out");
break;
}
_ = keep_alive.tick() => {
trace!("keep alive ping");
// new interval
keep_alive.reset_after(next_interval());
rand::rngs::OsRng.fill(&mut last_ping_data);
self.write_frame(Frame::Ping { data: last_ping_data }).await?;

ping_timeout.reset();
waiting_for_ping = true;
let data = self.ping_tracker.new_ping();
self.write_frame(Frame::Ping { data }).await?;
}
}

Expand Down Expand Up @@ -354,7 +332,13 @@ impl Actor {
}

/// Handles frame read results.
async fn handle_frame(&mut self, frame: Frame) -> Result<()> {
async fn handle_frame(&mut self, maybe_frame: Option<Result<Frame>>) -> Result<()> {
trace!(?maybe_frame, "handle incoming frame");
let frame = match maybe_frame {
Some(frame) => frame?,
None => anyhow::bail!("stream terminated"),
};

match frame {
Frame::SendPacket { dst_key, packet } => {
let packet_len = packet.len();
Expand All @@ -367,6 +351,9 @@ impl Actor {
self.write_frame(Frame::Pong { data }).await?;
inc!(Metrics, sent_pong);
}
Frame::Pong { data } => {
self.ping_tracker.pong_received(data);
}
Frame::Health { problem } => {
bail!("server issue: {:?}", problem);
}
Expand Down Expand Up @@ -601,6 +588,7 @@ mod tests {
connection_id: 0,
node_id,
clients: clients.clone(),
ping_tracker: PingTracker::new(Duration::from_secs(5)),
};

let done = CancellationToken::new();
Expand Down
64 changes: 4 additions & 60 deletions iroh/src/magicsock/relay_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ use backoff::exponential::{ExponentialBackoff, ExponentialBackoffBuilder};
use bytes::{Bytes, BytesMut};
use futures_buffered::FuturesUnorderedBounded;
use futures_lite::StreamExt;
use futures_util::{future, SinkExt};
use futures_util::SinkExt;
use iroh_base::{NodeId, PublicKey, RelayUrl, SecretKey};
use iroh_metrics::{inc, inc_by};
use iroh_relay::{
self as relay,
client::{Client, ReceivedMessage, SendMessage},
MAX_PACKET_SIZE,
PingTracker, MAX_PACKET_SIZE,
};
use tokio::{
sync::{mpsc, oneshot},
Expand Down Expand Up @@ -440,7 +440,7 @@ impl ActiveRelayActor {
let (mut client_stream, mut client_sink) = client.split();

let mut state = ConnectedRelayState {
ping_tracker: PingTracker::new(),
ping_tracker: PingTracker::new(PING_TIMEOUT),
nodes_present: BTreeSet::new(),
last_packet_src: None,
pong_pending: None,
Expand Down Expand Up @@ -1185,62 +1185,6 @@ impl Iterator for PacketSplitIter {
}
}

/// Tracks pings on a single relay connection.
///
/// Only the last ping needs is useful, any previously sent ping is forgotten and ignored.
#[derive(Debug)]
struct PingTracker {
inner: Option<PingInner>,
}

#[derive(Debug)]
struct PingInner {
data: [u8; 8],
deadline: Instant,
}

impl PingTracker {
fn new() -> Self {
Self { inner: None }
}

/// Starts a new ping.
fn new_ping(&mut self) -> [u8; 8] {
let ping_data = rand::random();
debug!(data = ?ping_data, "Sending ping to relay server.");
self.inner = Some(PingInner {
data: ping_data,
deadline: Instant::now() + PING_TIMEOUT,
});
ping_data
}

/// Updates the ping tracker with a received pong.
///
/// Only the pong of the most recent ping will do anything. There is no harm feeding
/// any pong however.
fn pong_received(&mut self, data: [u8; 8]) {
if self.inner.as_ref().map(|inner| inner.data) == Some(data) {
debug!(?data, "Pong received from relay server");
self.inner = None;
}
}

/// Cancel-safe waiting for a ping timeout.
///
/// Unless the most recent sent ping times out, this will never return.
async fn timeout(&mut self) {
match self.inner {
Some(PingInner { deadline, data }) => {
tokio::time::sleep_until(deadline).await;
debug!(?data, "Ping timeout.");
self.inner = None;
}
None => future::pending().await,
}
}
}

#[cfg(test)]
mod tests {
use anyhow::Context;
Expand Down Expand Up @@ -1568,7 +1512,7 @@ mod tests {
#[tokio::test]
async fn test_ping_tracker() {
tokio::time::pause();
let mut tracker = PingTracker::new();
let mut tracker = PingTracker::new(PING_TIMEOUT);

let ping0 = tracker.new_ping();

Expand Down

0 comments on commit 8cf7dc5

Please sign in to comment.