From 04376c2c515857a0d3d4c0e73afc539c8b65e2e2 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Fri, 2 Aug 2024 17:50:51 +0200 Subject: [PATCH 01/12] fix(iroh-net): Make a single direct address in NodeAddr instant --- iroh-net/src/magicsock.rs | 5 +- iroh-net/src/magicsock/node_map.rs | 4 +- iroh-net/src/magicsock/node_map/node_state.rs | 188 ++++++++++-------- iroh-net/src/magicsock/node_map/udp_paths.rs | 148 ++++++++++++++ 4 files changed, 261 insertions(+), 84 deletions(-) create mode 100644 iroh-net/src/magicsock/node_map/udp_paths.rs diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index 0547953dc1..2f54e459ce 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -499,7 +499,10 @@ impl MagicSock { let dest = QuicMappedAddr(dest); let mut transmits_sent = 0; - match self.node_map.get_send_addrs(dest) { + match self + .node_map + .get_send_addrs(dest, self.ipv6_reported.load(Ordering::Relaxed)) + { Some((public_key, udp_addr, relay_url, mut msgs)) => { let mut pings_sent = false; // If we have pings to send, we *have* to send them out first. diff --git a/iroh-net/src/magicsock/node_map.rs b/iroh-net/src/magicsock/node_map.rs index 7632dce0c5..3550f34bfb 100644 --- a/iroh-net/src/magicsock/node_map.rs +++ b/iroh-net/src/magicsock/node_map.rs @@ -30,6 +30,7 @@ use crate::{ mod best_addr; mod node_state; +mod udp_paths; pub use node_state::{ConnectionType, ControlMsg, DirectAddrInfo, NodeInfo}; pub(super) use node_state::{DiscoPingPurpose, PingAction, PingRole, SendPing}; @@ -186,6 +187,7 @@ impl NodeMap { pub(super) fn get_send_addrs( &self, addr: QuicMappedAddr, + have_ipv6: bool, ) -> Option<( PublicKey, Option, @@ -195,7 +197,7 @@ impl NodeMap { let mut inner = self.inner.lock(); let ep = inner.get_mut(NodeStateKey::QuicMappedAddr(addr))?; let public_key = *ep.public_key(); - let (udp_addr, relay_url, msgs) = ep.get_send_addrs(); + let (udp_addr, relay_url, msgs) = ep.get_send_addrs(have_ipv6); Some((public_key, udp_addr, relay_url, msgs)) } diff --git a/iroh-net/src/magicsock/node_map/node_state.rs b/iroh-net/src/magicsock/node_map/node_state.rs index df3e88a82d..d944ba5e20 100644 --- a/iroh-net/src/magicsock/node_map/node_state.rs +++ b/iroh-net/src/magicsock/node_map/node_state.rs @@ -25,7 +25,8 @@ use crate::{ use crate::magicsock::{metrics::Metrics as MagicsockMetrics, ActorMessage, QuicMappedAddr}; -use super::best_addr::{self, BestAddr, ClearReason, Source}; +use super::best_addr::{self, ClearReason, Source}; +use super::udp_paths::NodeUdpPaths; use super::IpPort; /// Number of addresses that are not active that we keep around per node. @@ -116,10 +117,7 @@ pub(super) struct NodeState { /// /// The fallback/bootstrap path, if non-zero (non-zero for well-behaved clients). relay_url: Option<(RelayUrl, PathState)>, - /// Best non-relay path, i.e. a UDP address. - best_addr: BestAddr, - /// State for each of this node's direct paths. - direct_addr_state: BTreeMap, + udp_paths: NodeUdpPaths, sent_pings: HashMap, /// Last time this node was used. /// @@ -169,9 +167,8 @@ impl NodeState { PathState::new(options.node_id, SendAddr::Relay(url)), ) }), - best_addr: Default::default(), + udp_paths: NodeUdpPaths::new(), sent_pings: HashMap::new(), - direct_addr_state: BTreeMap::new(), last_used: options.active.then(Instant::now), last_call_me_maybe: None, conn_type: Watchable::new(ConnectionType::None), @@ -203,7 +200,8 @@ impl NodeState { let conn_type = self.conn_type.get(); let latency = match conn_type { ConnectionType::Direct(addr) => self - .direct_addr_state + .udp_paths + .paths .get(&addr.into()) .and_then(|state| state.latency()), ConnectionType::Relay(ref url) => self @@ -213,7 +211,8 @@ impl NodeState { .and_then(|(_, state)| state.latency()), ConnectionType::Mixed(addr, ref url) => { let addr_latency = self - .direct_addr_state + .udp_paths + .paths .get(&addr.into()) .and_then(|state| state.latency()); let relay_latency = self @@ -226,7 +225,8 @@ impl NodeState { ConnectionType::None => None, }; let addrs = self - .direct_addr_state + .udp_paths + .paths .iter() .map(|(addr, endpoint_state)| DirectAddrInfo { addr: SocketAddr::from(*addr), @@ -261,7 +261,11 @@ impl NodeState { /// Returns the address(es) that should be used for sending the next packet. /// /// This may return to send on one, both or no paths. - fn addr_for_send(&mut self, now: &Instant) -> (Option, Option) { + fn addr_for_send( + &mut self, + now: &Instant, + have_ipv6: bool, + ) -> (Option, Option) { if relay_only_mode() { debug!("in `DEV_relay_ONLY` mode, giving the relay address as the only viable address for this endpoint"); return (None, self.relay_url()); @@ -269,23 +273,25 @@ impl NodeState { // Update our best addr from candidate addresses (only if it is empty and if we have // recent pongs). self.assign_best_addr_from_candidates_if_empty(); - let (best_addr, relay_url) = match self.best_addr.state(*now) { - best_addr::State::Valid(best_addr) => { + let (best_addr, relay_url) = match self.udp_paths.send_addr(*now, have_ipv6) { + super::udp_paths::UdpSendAddr::Valid(addr) => { // If we have a valid address we use it. - trace!(addr = %best_addr.addr, latency = ?best_addr.latency, - "best_addr is set and valid, use best_addr only"); - (Some(best_addr.addr), None) + trace!(%addr, "UdpSendAddr is valid, use it"); + (Some(addr), None) } - best_addr::State::Outdated(best_addr) => { + super::udp_paths::UdpSendAddr::Outdated(addr) => { // If the address is outdated we use it, but send via relay at the same time. // We also send disco pings so that it will become valid again if it still // works (i.e. we don't need to holepunch again). - trace!(addr = %best_addr.addr, latency = ?best_addr.latency, - "best_addr is set but outdated, use best_addr and relay"); - (Some(best_addr.addr), self.relay_url()) + trace!(%addr, "UdpSendAddr is outdated, use it together with relay"); + (Some(addr), self.relay_url()) } - best_addr::State::Empty => { - trace!("best_addr is unset, use relay"); + super::udp_paths::UdpSendAddr::Unconfirmed(addr) => { + trace!(%addr, "UdpSendAddr is unconfirmed, use it together with relay"); + (Some(addr), self.relay_url()) + } + super::udp_paths::UdpSendAddr::None => { + trace!("No UdpSendAddr, use relay"); (None, self.relay_url()) } }; @@ -356,7 +362,7 @@ impl NodeState { /// /// If this is also the best address, it will be cleared as well. pub(super) fn remove_direct_addr(&mut self, ip_port: &IpPort, reason: ClearReason) { - let Some(state) = self.direct_addr_state.remove(ip_port) else { + let Some(state) = self.udp_paths.paths.remove(ip_port) else { return; }; @@ -365,8 +371,11 @@ impl NodeState { None => debug!(%ip_port, last_seen=%"never", ?reason, "pruning address"), } - self.best_addr - .clear_if_equals((*ip_port).into(), reason, self.relay_url.is_some()); + self.udp_paths.best_addr.clear_if_equals( + (*ip_port).into(), + reason, + self.relay_url.is_some(), + ); } /// Fixup best_adrr from candidates. @@ -375,7 +384,7 @@ impl NodeState { /// valid candidates, this will chose a candidate and set best_addr again. Most likely /// this is a bug elsewhere though. fn assign_best_addr_from_candidates_if_empty(&mut self) { - if !self.best_addr.is_empty() { + if !self.udp_paths.best_addr.is_empty() { return; } @@ -383,7 +392,8 @@ impl NodeState { // then this the path will be ignored. const MAX_LATENCY: Duration = Duration::from_secs(60 * 60); let best_pong = self - .direct_addr_state + .udp_paths + .paths .iter() .fold(None, |best_pong, (ipp, state)| { let best_latency = best_pong @@ -406,7 +416,7 @@ impl NodeState { if let Some(pong) = best_pong { if let SendAddr::Udp(addr) = pong.from { warn!(%addr, "No best_addr was set, choose candidate with lowest latency"); - self.best_addr.insert_if_better_or_reconfirm( + self.udp_paths.best_addr.insert_if_better_or_reconfirm( addr, pong.latency, best_addr::Source::BestCandidate, @@ -430,7 +440,7 @@ impl NodeState { debug!("no previous full ping: need full ping"); return true; }; - match self.best_addr.state(*now) { + match self.udp_paths.best_addr.state(*now) { best_addr::State::Empty => { debug!("best addr not set: need full ping"); true @@ -461,7 +471,7 @@ impl NodeState { debug!(tx = %hex::encode(txid), addr = %sp.to, "pong not received in timeout"); match sp.to { SendAddr::Udp(addr) => { - if let Some(path_state) = self.direct_addr_state.get_mut(&addr.into()) { + if let Some(path_state) = self.udp_paths.paths.get_mut(&addr.into()) { path_state.last_ping = None; // only clear the best address if there was no sign of life from this path // within the time the pong should have arrived @@ -470,7 +480,7 @@ impl NodeState { .map(|last_alive| last_alive.elapsed() <= PING_TIMEOUT_DURATION) .unwrap_or(false); if !consider_alive { - self.best_addr.clear_if_equals( + self.udp_paths.best_addr.clear_if_equals( addr, ClearReason::PongTimeout, self.relay_url().is_some(), @@ -479,7 +489,7 @@ impl NodeState { } else { // If we have no state for the best addr it should have been cleared // anyway. - self.best_addr.clear_if_equals( + self.udp_paths.best_addr.clear_if_equals( addr, ClearReason::PongTimeout, self.relay_url.is_some(), @@ -539,7 +549,7 @@ impl NodeState { let mut path_found = false; match to { SendAddr::Udp(addr) => { - if let Some(st) = self.direct_addr_state.get_mut(&addr.into()) { + if let Some(st) = self.udp_paths.paths.get_mut(&addr.into()) { st.last_ping.replace(now); path_found = true } @@ -633,7 +643,7 @@ impl NodeState { #[must_use = "actions must be handled"] fn send_pings(&mut self, now: Instant) -> Vec { // We allocate +1 in case the caller wants to add a call-me-maybe message. - let mut ping_msgs = Vec::with_capacity(self.direct_addr_state.len() + 1); + let mut ping_msgs = Vec::with_capacity(self.udp_paths.paths.len() + 1); if let Some((url, state)) = self.relay_url.as_ref() { if state.needs_ping(&now) { @@ -653,7 +663,8 @@ impl NodeState { } self.prune_direct_addresses(); let mut ping_dsts = String::from("["); - self.direct_addr_state + self.udp_paths + .paths .iter() .filter_map(|(ipp, state)| state.needs_ping(&now).then_some(*ipp)) .filter_map(|ipp| { @@ -668,7 +679,7 @@ impl NodeState { debug!( %ping_dsts, dst = %self.node_id.fmt_short(), - paths = %summarize_node_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.udp_paths.paths), "sending pings to node", ); self.last_full_ping.replace(now); @@ -676,7 +687,7 @@ impl NodeState { } pub(super) fn update_from_node_addr(&mut self, n: &AddrInfo) { - if self.best_addr.is_empty() { + if self.udp_paths.best_addr.is_empty() { // we do not have a direct connection, so changing the relay information may // have an effect on our connection status if self.relay_url.is_none() && n.relay_url.is_some() { @@ -702,11 +713,12 @@ impl NodeState { } for &addr in n.direct_addresses.iter() { - self.direct_addr_state + self.udp_paths + .paths .entry(addr.into()) .or_insert_with(|| PathState::new(self.node_id, SendAddr::from(addr))); } - let paths = summarize_node_paths(&self.direct_addr_state); + let paths = summarize_node_paths(&self.udp_paths.paths); debug!(new = ?n.direct_addresses , %paths, "added new direct paths for endpoint"); } @@ -714,10 +726,11 @@ impl NodeState { #[instrument(skip_all, fields(node = %self.node_id.fmt_short()))] pub(super) fn reset(&mut self) { self.last_full_ping = None; - self.best_addr + self.udp_paths + .best_addr .clear(ClearReason::Reset, self.relay_url.is_some()); - for es in self.direct_addr_state.values_mut() { + for es in self.udp_paths.paths.values_mut() { es.last_ping = None; } } @@ -739,7 +752,7 @@ impl NodeState { let now = Instant::now(); let role = match path { - SendAddr::Udp(addr) => match self.direct_addr_state.entry(addr.into()) { + SendAddr::Udp(addr) => match self.udp_paths.paths.entry(addr.into()) { Entry::Occupied(mut occupied) => occupied.get_mut().handle_ping(tx_id, now), Entry::Vacant(vacant) => { info!(%addr, "new direct addr for node"); @@ -787,7 +800,7 @@ impl NodeState { // if the endpoint does not yet have a best_addrr let needs_ping_back = if matches!(path, SendAddr::Udp(_)) && matches!( - self.best_addr.state(now), + self.udp_paths.best_addr.state(now), best_addr::State::Empty | best_addr::State::Outdated(_) ) { // We also need to send a ping to make this path available to us as well. This @@ -803,7 +816,7 @@ impl NodeState { debug!( ?role, needs_ping_back = ?needs_ping_back.is_some(), - paths = %summarize_node_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.udp_paths.paths), "endpoint handled ping", ); PingHandled { @@ -819,7 +832,8 @@ impl NodeState { pub(super) fn prune_direct_addresses(&mut self) { // prune candidates are addresses that are not active let mut prune_candidates: Vec<_> = self - .direct_addr_state + .udp_paths + .paths .iter() .filter(|(_ip_port, state)| !state.is_active()) .map(|(ip_port, state)| (*ip_port, state.last_alive())) @@ -834,7 +848,7 @@ impl NodeState { if prune_count == 0 { // nothing to do, within limits debug!( - paths = %summarize_node_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.udp_paths.paths), "prune addresses: {prune_count} pruned", ); return; @@ -848,7 +862,7 @@ impl NodeState { self.remove_direct_addr(&ip_port, ClearReason::Inactive) } debug!( - paths = %summarize_node_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.udp_paths.paths), "prune addresses: {prune_count} pruned", ); } @@ -857,8 +871,8 @@ impl NodeState { /// assumptions about which paths work. #[instrument("disco", skip_all, fields(node = %self.node_id.fmt_short()))] pub(super) fn note_connectivity_change(&mut self) { - self.best_addr.clear_trust("connectivity changed"); - for es in self.direct_addr_state.values_mut() { + self.udp_paths.best_addr.clear_trust("connectivity changed"); + for es in self.udp_paths.paths.values_mut() { es.clear(); } } @@ -906,7 +920,7 @@ impl NodeState { match src { SendAddr::Udp(addr) => { - match self.direct_addr_state.get_mut(&addr.into()) { + match self.udp_paths.paths.get_mut(&addr.into()) { None => { warn!("ignoring pong: no state for src addr"); // This is no longer an endpoint we care about. @@ -923,7 +937,7 @@ impl NodeState { } } debug!( - paths = %summarize_node_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.udp_paths.paths), "handled pong", ); } @@ -954,7 +968,7 @@ impl NodeState { // TODO(bradfitz): decide how latency vs. preference order affects decision if let SendAddr::Udp(to) = sp.to { debug_assert!(!is_relay, "mismatching relay & udp"); - self.best_addr.insert_if_better_or_reconfirm( + self.udp_paths.best_addr.insert_if_better_or_reconfirm( to, latency, best_addr::Source::ReceivedPong, @@ -991,7 +1005,8 @@ impl NodeState { } let ipp = IpPort::from(*peer_sockaddr); call_me_maybe_ipps.insert(ipp); - self.direct_addr_state + self.udp_paths + .paths .entry(ipp) .or_insert_with(|| PathState::new(self.node_id, SendAddr::from(*peer_sockaddr))) .call_me_maybe_time @@ -1001,7 +1016,7 @@ impl NodeState { // Zero out all the last_ping times to force send_pings to send new ones, even if // it's been less than 5 seconds ago. Also clear pongs for direct addresses not // included in the updated set. - for (ipp, st) in self.direct_addr_state.iter_mut() { + for (ipp, st) in self.udp_paths.paths.iter_mut() { st.last_ping = None; if !call_me_maybe_ipps.contains(ipp) { // TODO: This seems like a weird way to signal that the endpoint no longer @@ -1014,16 +1029,17 @@ impl NodeState { } // Clear trust on our best_addr if it is not included in the updated set. Also // clear the last call-me-maybe send time so we will send one again. - if let Some(addr) = self.best_addr.addr() { + if let Some(addr) = self.udp_paths.best_addr.addr() { let ipp: IpPort = addr.into(); if !call_me_maybe_ipps.contains(&ipp) { - self.best_addr + self.udp_paths + .best_addr .clear_trust("best_addr not in new call-me-maybe"); self.last_call_me_maybe = None; } } debug!( - paths = %summarize_node_paths(&self.direct_addr_state), + paths = %summarize_node_paths(&self.udp_paths.paths), "updated endpoint paths from call-me-maybe", ); self.send_pings(now) @@ -1031,13 +1047,14 @@ impl NodeState { /// Marks this endpoint as having received a UDP payload message. pub(super) fn receive_udp(&mut self, addr: IpPort, now: Instant) { - let Some(state) = self.direct_addr_state.get_mut(&addr) else { + let Some(state) = self.udp_paths.paths.get_mut(&addr) else { debug_assert!(false, "node map inconsistency by_ip_port <-> direct addr"); return; }; state.last_payload_msg = Some(now); self.last_used = Some(now); - self.best_addr + self.udp_paths + .best_addr .reconfirm_if_used(addr.into(), Source::Udp, now); } @@ -1063,7 +1080,8 @@ impl NodeState { pub(super) fn last_ping(&self, addr: &SendAddr) -> Option { match addr { SendAddr::Udp(addr) => self - .direct_addr_state + .udp_paths + .paths .get(&(*addr).into()) .and_then(|ep| ep.last_ping), SendAddr::Relay(url) => self @@ -1100,7 +1118,7 @@ impl NodeState { } // Send heartbeat ping to keep the current addr going as long as we need it. - if let Some(udp_addr) = self.best_addr.addr() { + if let Some(udp_addr) = self.udp_paths.best_addr.addr() { let elapsed = self.last_ping(&SendAddr::Udp(udp_addr)).map(|l| now - l); // Send a ping if the last ping is older than 2 seconds. let needs_ping = match elapsed { @@ -1131,10 +1149,11 @@ impl NodeState { #[instrument("get_send_addrs", skip_all, fields(node = %self.node_id.fmt_short()))] pub(crate) fn get_send_addrs( &mut self, + have_ipv6: bool, ) -> (Option, Option, Vec) { let now = Instant::now(); self.last_used.replace(now); - let (udp_addr, relay_url) = self.addr_for_send(&now); + let (udp_addr, relay_url) = self.addr_for_send(&now, have_ipv6); let mut ping_msgs = Vec::new(); if self.want_call_me_maybe(&now) { @@ -1153,12 +1172,12 @@ impl NodeState { /// Get the direct addresses for this endpoint. pub(super) fn direct_addresses(&self) -> impl Iterator + '_ { - self.direct_addr_state.keys().copied() + self.udp_paths.paths.keys().copied() } #[cfg(test)] pub(super) fn direct_address_states(&self) -> impl Iterator + '_ { - self.direct_addr_state.iter() + self.udp_paths.paths.iter() } pub(super) fn last_used(&self) -> Option { @@ -1223,6 +1242,13 @@ impl PathState { } } + pub(super) fn udp_addr(&self) -> Option { + match self.path { + SendAddr::Udp(addr) => Some(addr), + SendAddr::Relay(_) => None, + } + } + pub(super) fn with_last_payload(node_id: NodeId, path: SendAddr, now: Instant) -> Self { PathState { node_id, @@ -1337,7 +1363,7 @@ impl PathState { } /// Returns the most recent pong if available. - fn recent_pong(&self) -> Option<&PongReply> { + pub(super) fn recent_pong(&self) -> Option<&PongReply> { self.recent_pong.as_ref() } @@ -1604,6 +1630,8 @@ pub enum ConnectionType { mod tests { use std::net::Ipv4Addr; + use best_addr::BestAddr; + use super::{ super::{NodeMap, NodeMapInner}, *, @@ -1659,13 +1687,15 @@ mod tests { node_id: key.public(), last_full_ping: None, relay_url: None, - best_addr: BestAddr::from_parts( - ip_port.into(), - latency, - now, - now + Duration::from_secs(100), + udp_paths: NodeUdpPaths::from_parts( + endpoint_state, + BestAddr::from_parts( + ip_port.into(), + latency, + now, + now + Duration::from_secs(100), + ), ), - direct_addr_state: endpoint_state, sent_pings: HashMap::new(), last_used: Some(now), last_call_me_maybe: None, @@ -1684,8 +1714,7 @@ mod tests { node_id: key.public(), last_full_ping: None, relay_url: relay_and_state(key.public(), send_addr.clone()), - best_addr: BestAddr::default(), - direct_addr_state: BTreeMap::default(), + udp_paths: NodeUdpPaths::new(), sent_pings: HashMap::new(), last_used: Some(now), last_call_me_maybe: None, @@ -1696,7 +1725,6 @@ mod tests { // endpoint w/ no best addr but a relay w/ no latency let c_endpoint = { // let socket_addr = "0.0.0.0:8".parse().unwrap(); - let endpoint_state = BTreeMap::new(); let key = SecretKey::generate(); NodeState { id: 2, @@ -1707,8 +1735,7 @@ mod tests { send_addr.clone(), PathState::new(key.public(), SendAddr::from(send_addr.clone())), )), - best_addr: BestAddr::default(), - direct_addr_state: endpoint_state, + udp_paths: NodeUdpPaths::new(), sent_pings: HashMap::new(), last_used: Some(now), last_call_me_maybe: None, @@ -1741,13 +1768,10 @@ mod tests { node_id: key.public(), last_full_ping: None, relay_url: relay_and_state(key.public(), send_addr.clone()), - best_addr: BestAddr::from_parts( - socket_addr, - Duration::from_millis(80), - now, - expired, + udp_paths: NodeUdpPaths::from_parts( + endpoint_state, + BestAddr::from_parts(socket_addr, Duration::from_millis(80), now, expired), ), - direct_addr_state: endpoint_state, sent_pings: HashMap::new(), last_used: Some(now), last_call_me_maybe: None, diff --git a/iroh-net/src/magicsock/node_map/udp_paths.rs b/iroh-net/src/magicsock/node_map/udp_paths.rs new file mode 100644 index 0000000000..725a6ea7af --- /dev/null +++ b/iroh-net/src/magicsock/node_map/udp_paths.rs @@ -0,0 +1,148 @@ +//! Path state for UDP addresses of a single peer node. +//! +//! This started as simply moving the [`NodeState`]'s `direct_addresses` and `best_addr` +//! into one place together. The aim is for external places to not directly interact with +//! the inside and instead only notifies this struct of state changes to each path. + +use std::collections::BTreeMap; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +use rand::seq::IteratorRandom; +use tracing::warn; + +use crate::disco::SendAddr; + +use super::best_addr::{self, BestAddr}; +use super::node_state::{PathState, PongReply}; +use super::IpPort; + +#[derive(Debug)] +pub(super) enum UdpSendAddr { + Valid(SocketAddr), + Outdated(SocketAddr), + Unconfirmed(SocketAddr), + None, +} + +/// The UDP paths for a single node. +/// +/// Paths are identified by the [`IpPort`] of their UDP address. +/// +/// Initially this collects two structs directly from the [`NodeState`] into one place, +/// leaving the APIs and astractions the same. The goal is that this slowly migrates +/// directly interacting with this data into only receiving [`PathState`] updates. This +/// will consolidate the logic of direct path selection and make this simpler to reason +/// about. However doing that all at once is too large a refactor. +/// +/// [`NodeState`]: super::node_state::NodeState +#[derive(Debug, Default)] +pub(super) struct NodeUdpPaths { + /// The state for each of this node's direct paths. + pub(super) paths: BTreeMap, + /// Best UDP path currently selected. + pub(super) best_addr: BestAddr, + /// If we had to choose a path because we had no `best_addr` it is stored here. + chosen_candidate: Option, +} + +impl NodeUdpPaths { + pub(super) fn new() -> Self { + Default::default() + } + + #[cfg(test)] + pub(super) fn from_parts(paths: BTreeMap, best_addr: BestAddr) -> Self { + Self { + paths, + best_addr, + chosen_candidate: None, + } + } + + /// Returns the current UDP address to send on. + /// + /// TODO: The goal here is for this to simply return the already known send address, so + /// it should be `&self` and not `&mut self`. This is only possible once the state from + /// [`NodeUdpPaths`] is no longer modified from outside. + pub(super) fn send_addr(&mut self, now: Instant, have_ipv6: bool) -> UdpSendAddr { + self.assign_best_addr_from_candidates_if_empty(); + match self.best_addr.state(now) { + best_addr::State::Valid(addr) => UdpSendAddr::Valid(addr.addr), + best_addr::State::Outdated(addr) => UdpSendAddr::Outdated(addr.addr), + best_addr::State::Empty => { + // No direct connection has been used before. If we know of any possible + // candidate addresses, randomlly try to use one. This path is most + // effective when folks use a NodeAddr with exactly one direct address which + // they know to work, effectively like using a traditional socket or quic + // endpoint. + let addr = self + .chosen_candidate + .and_then(|ipp| self.paths.get(&ipp)) + .and_then(|path| path.udp_addr()) + .filter(|addr| addr.is_ipv4() || have_ipv6) + .or_else(|| { + // Look for a new candidate in all the known paths. This may look + // like a RNG use on the hot-path but this is normally invoked at + // most most once at startup. + let addr = self + .paths + .values() + .filter_map(|path| path.udp_addr()) + .filter(|addr| addr.is_ipv4() || have_ipv6) + .choose(&mut rand::thread_rng()); + self.chosen_candidate = addr.as_ref().map(|addr| IpPort::from(*addr)); + addr + }); + match addr { + Some(addr) => UdpSendAddr::Unconfirmed(addr), + None => UdpSendAddr::None, + } + } + } + } + + /// Fixup best_adrr from candidates. + /// + /// If somehow we end up in a state where we failed to set a best_addr, while we do have + /// valid candidates, this will chose a candidate and set best_addr again. Most likely + /// this is a bug elsewhere though. + fn assign_best_addr_from_candidates_if_empty(&mut self) { + if !self.best_addr.is_empty() { + return; + } + + // The highest acceptable latency for an endpoint path. If the latency is higher + // then this the path will be ignored. + const MAX_LATENCY: Duration = Duration::from_secs(60 * 60); + let best_pong = self.paths.iter().fold(None, |best_pong, (ipp, state)| { + let best_latency = best_pong + .map(|p: &PongReply| p.latency) + .unwrap_or(MAX_LATENCY); + match state.recent_pong() { + // This pong is better if it has a lower latency, or if it has the same + // latency but on an IPv6 path. + Some(pong) + if pong.latency < best_latency + || (pong.latency == best_latency && ipp.ip().is_ipv6()) => + { + Some(pong) + } + _ => best_pong, + } + }); + + // If we found a candidate, set to best addr + if let Some(pong) = best_pong { + if let SendAddr::Udp(addr) = pong.from { + warn!(%addr, "No best_addr was set, choose candidate with lowest latency"); + self.best_addr.insert_if_better_or_reconfirm( + addr, + pong.latency, + best_addr::Source::BestCandidate, + pong.pong_at, + ) + } + } + } +} From abba21de9ae4c8fc2635a7c609edbd2a197707e5 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Fri, 2 Aug 2024 18:16:49 +0200 Subject: [PATCH 02/12] Remove, this was moved to NodeUdpPaths --- iroh-net/src/magicsock/node_map/node_state.rs | 51 ------------------- 1 file changed, 51 deletions(-) diff --git a/iroh-net/src/magicsock/node_map/node_state.rs b/iroh-net/src/magicsock/node_map/node_state.rs index d944ba5e20..f49f1ec10b 100644 --- a/iroh-net/src/magicsock/node_map/node_state.rs +++ b/iroh-net/src/magicsock/node_map/node_state.rs @@ -270,9 +270,6 @@ impl NodeState { debug!("in `DEV_relay_ONLY` mode, giving the relay address as the only viable address for this endpoint"); return (None, self.relay_url()); } - // Update our best addr from candidate addresses (only if it is empty and if we have - // recent pongs). - self.assign_best_addr_from_candidates_if_empty(); let (best_addr, relay_url) = match self.udp_paths.send_addr(*now, have_ipv6) { super::udp_paths::UdpSendAddr::Valid(addr) => { // If we have a valid address we use it. @@ -378,54 +375,6 @@ impl NodeState { ); } - /// Fixup best_adrr from candidates. - /// - /// If somehow we end up in a state where we failed to set a best_addr, while we do have - /// valid candidates, this will chose a candidate and set best_addr again. Most likely - /// this is a bug elsewhere though. - fn assign_best_addr_from_candidates_if_empty(&mut self) { - if !self.udp_paths.best_addr.is_empty() { - return; - } - - // The highest acceptable latency for an endpoint path. If the latency is higher - // then this the path will be ignored. - const MAX_LATENCY: Duration = Duration::from_secs(60 * 60); - let best_pong = self - .udp_paths - .paths - .iter() - .fold(None, |best_pong, (ipp, state)| { - let best_latency = best_pong - .map(|p: &PongReply| p.latency) - .unwrap_or(MAX_LATENCY); - match state.recent_pong() { - // This pong is better if it has a lower latency, or if it has the same - // latency but on an IPv6 path. - Some(pong) - if pong.latency < best_latency - || (pong.latency == best_latency && ipp.ip().is_ipv6()) => - { - Some(pong) - } - _ => best_pong, - } - }); - - // If we found a candidate, set to best addr - if let Some(pong) = best_pong { - if let SendAddr::Udp(addr) = pong.from { - warn!(%addr, "No best_addr was set, choose candidate with lowest latency"); - self.udp_paths.best_addr.insert_if_better_or_reconfirm( - addr, - pong.latency, - best_addr::Source::BestCandidate, - pong.pong_at, - ) - } - } - } - /// Whether we need to send another call-me-maybe to the endpoint. /// /// Basically we need to send a call-me-maybe if we need to find a better path. Maybe From e819984d55f04d04fd1a6b1050b8bee7f468c54a Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 5 Aug 2024 10:13:49 +0200 Subject: [PATCH 03/12] fixup docs --- iroh-net/src/magicsock/node_map/udp_paths.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iroh-net/src/magicsock/node_map/udp_paths.rs b/iroh-net/src/magicsock/node_map/udp_paths.rs index 725a6ea7af..91a9062dec 100644 --- a/iroh-net/src/magicsock/node_map/udp_paths.rs +++ b/iroh-net/src/magicsock/node_map/udp_paths.rs @@ -3,7 +3,8 @@ //! This started as simply moving the [`NodeState`]'s `direct_addresses` and `best_addr` //! into one place together. The aim is for external places to not directly interact with //! the inside and instead only notifies this struct of state changes to each path. - +//! +//! [`NodeState`]: super::node_state::NodeState use std::collections::BTreeMap; use std::net::SocketAddr; use std::time::{Duration, Instant}; From 5a54a2a3d953ea4dfe58799235ba7ca5e0dd6473 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 5 Aug 2024 10:19:21 +0200 Subject: [PATCH 04/12] sort out imports --- iroh-net/src/magicsock/node_map/node_state.rs | 41 ++++++++----------- 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/iroh-net/src/magicsock/node_map/node_state.rs b/iroh-net/src/magicsock/node_map/node_state.rs index f49f1ec10b..7a69b57117 100644 --- a/iroh-net/src/magicsock/node_map/node_state.rs +++ b/iroh-net/src/magicsock/node_map/node_state.rs @@ -1,9 +1,7 @@ -use std::{ - collections::{btree_map::Entry, BTreeMap, BTreeSet, HashMap}, - hash::Hash, - net::{IpAddr, SocketAddr}, - time::{Duration, Instant}, -}; +use std::collections::{btree_map::Entry, BTreeMap, BTreeSet, HashMap}; +use std::hash::Hash; +use std::net::{IpAddr, SocketAddr}; +use std::time::{Duration, Instant}; use iroh_metrics::inc; use serde::{Deserialize, Serialize}; @@ -11,22 +9,17 @@ use tokio::sync::mpsc; use tracing::{debug, event, info, instrument, trace, warn, Level}; use watchable::{Watchable, WatcherStream}; -use crate::{ - disco::{self, SendAddr}, - endpoint::AddrInfo, - key::PublicKey, - magicsock::{Timer, HEARTBEAT_INTERVAL}, - net::ip::is_unicast_link_local, - relay::RelayUrl, - stun, - util::relay_only_mode, - NodeAddr, NodeId, -}; - -use crate::magicsock::{metrics::Metrics as MagicsockMetrics, ActorMessage, QuicMappedAddr}; +use crate::disco::{self, SendAddr}; +use crate::endpoint::AddrInfo; +use crate::key::PublicKey; +use crate::magicsock::{ActorMessage, MagicsockMetrics, QuicMappedAddr, Timer, HEARTBEAT_INTERVAL}; +use crate::net::ip::is_unicast_link_local; +use crate::relay::RelayUrl; +use crate::util::relay_only_mode; +use crate::{stun, NodeAddr, NodeId}; use super::best_addr::{self, ClearReason, Source}; -use super::udp_paths::NodeUdpPaths; +use super::udp_paths::{NodeUdpPaths, UdpSendAddr}; use super::IpPort; /// Number of addresses that are not active that we keep around per node. @@ -271,23 +264,23 @@ impl NodeState { return (None, self.relay_url()); } let (best_addr, relay_url) = match self.udp_paths.send_addr(*now, have_ipv6) { - super::udp_paths::UdpSendAddr::Valid(addr) => { + UdpSendAddr::Valid(addr) => { // If we have a valid address we use it. trace!(%addr, "UdpSendAddr is valid, use it"); (Some(addr), None) } - super::udp_paths::UdpSendAddr::Outdated(addr) => { + UdpSendAddr::Outdated(addr) => { // If the address is outdated we use it, but send via relay at the same time. // We also send disco pings so that it will become valid again if it still // works (i.e. we don't need to holepunch again). trace!(%addr, "UdpSendAddr is outdated, use it together with relay"); (Some(addr), self.relay_url()) } - super::udp_paths::UdpSendAddr::Unconfirmed(addr) => { + UdpSendAddr::Unconfirmed(addr) => { trace!(%addr, "UdpSendAddr is unconfirmed, use it together with relay"); (Some(addr), self.relay_url()) } - super::udp_paths::UdpSendAddr::None => { + UdpSendAddr::None => { trace!("No UdpSendAddr, use relay"); (None, self.relay_url()) } From cb44539dc2734e645e6a751f9fad630378780c1d Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 5 Aug 2024 10:55:10 +0200 Subject: [PATCH 05/12] document UdpSendAddr --- iroh-net/src/magicsock/node_map/udp_paths.rs | 30 ++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/iroh-net/src/magicsock/node_map/udp_paths.rs b/iroh-net/src/magicsock/node_map/udp_paths.rs index 91a9062dec..da520537af 100644 --- a/iroh-net/src/magicsock/node_map/udp_paths.rs +++ b/iroh-net/src/magicsock/node_map/udp_paths.rs @@ -18,11 +18,41 @@ use super::best_addr::{self, BestAddr}; use super::node_state::{PathState, PongReply}; use super::IpPort; +/// The address on which to send datagrams over UDP. +/// +/// The [`MagicSock`] sends packets to zero or one UDP address, depending on the known paths +/// to the remote node. This conveys the UDP address to send on from the [`NodeUdpPaths`] +/// to the [`NodeState`]. +/// +/// [`NodeUdpPaths`] contains all the UDP path states, while [`NodeState`] has to decide the +/// bigger picture including the relay server. +/// +/// See [`NodeUdpPaths::send_addr`]. +/// +/// [`MagicSock`]: crate::magicsocket::MagicSock +/// [`NodeState`]: super::node_state::NodeState #[derive(Debug)] pub(super) enum UdpSendAddr { + /// The UDP address can be relied on to deliver data to the remote node. + /// + /// This means this path is usable with a reasonable latency and can be fully trusted to + /// transport payload data to the remote node. Valid(SocketAddr), + /// The UDP address is highly likely to work, but has not been used for a while. + /// + /// The path should be usable but has not carried DISCO or payload data for a little too + /// long. It is best to also use a backup, i.e. relay, path if possible. Outdated(SocketAddr), + /// The UDP address is not known to work, but it might. + /// + /// We know this UDP address belongs to the remote node, but we do not know if the path + /// already works or may need holepunching before it will start to work. It migt even + /// never work. It is still useful to send to this together with backup path, + /// i.e. relay, in case the path works: if the path does not need holepunching it might + /// be much faster. And if there is no relay path at all it might be the only way to + /// establish a connection. Unconfirmed(SocketAddr), + /// No known UDP path exists to the remote node. None, } From 5f8f85166a05d4470b873066c723081b2d9f3133 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 5 Aug 2024 10:56:14 +0200 Subject: [PATCH 06/12] typo --- iroh-net/src/magicsock/node_map/udp_paths.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iroh-net/src/magicsock/node_map/udp_paths.rs b/iroh-net/src/magicsock/node_map/udp_paths.rs index da520537af..0d0485f018 100644 --- a/iroh-net/src/magicsock/node_map/udp_paths.rs +++ b/iroh-net/src/magicsock/node_map/udp_paths.rs @@ -103,9 +103,9 @@ impl NodeUdpPaths { best_addr::State::Outdated(addr) => UdpSendAddr::Outdated(addr.addr), best_addr::State::Empty => { // No direct connection has been used before. If we know of any possible - // candidate addresses, randomlly try to use one. This path is most + // candidate addresses, randomly try to use one. This path is most // effective when folks use a NodeAddr with exactly one direct address which - // they know to work, effectively like using a traditional socket or quic + // they know to work, effectively like using a traditional socket or QUIC // endpoint. let addr = self .chosen_candidate From 078c3b532d81178cc8372d8ea557d53ae96fc0df Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 5 Aug 2024 10:57:51 +0200 Subject: [PATCH 07/12] easier syntax --- iroh-net/src/magicsock/node_map/udp_paths.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-net/src/magicsock/node_map/udp_paths.rs b/iroh-net/src/magicsock/node_map/udp_paths.rs index 0d0485f018..910073758d 100644 --- a/iroh-net/src/magicsock/node_map/udp_paths.rs +++ b/iroh-net/src/magicsock/node_map/udp_paths.rs @@ -122,7 +122,7 @@ impl NodeUdpPaths { .filter_map(|path| path.udp_addr()) .filter(|addr| addr.is_ipv4() || have_ipv6) .choose(&mut rand::thread_rng()); - self.chosen_candidate = addr.as_ref().map(|addr| IpPort::from(*addr)); + self.chosen_candidate = addr.clone().map(IpPort::from); addr }); match addr { From 6424d65395349240a75162badec4b416fc895241 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 5 Aug 2024 10:58:38 +0200 Subject: [PATCH 08/12] typo --- iroh-net/src/magicsock/node_map/udp_paths.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-net/src/magicsock/node_map/udp_paths.rs b/iroh-net/src/magicsock/node_map/udp_paths.rs index 910073758d..e61444dc86 100644 --- a/iroh-net/src/magicsock/node_map/udp_paths.rs +++ b/iroh-net/src/magicsock/node_map/udp_paths.rs @@ -133,7 +133,7 @@ impl NodeUdpPaths { } } - /// Fixup best_adrr from candidates. + /// Fixup best_addr from candidates. /// /// If somehow we end up in a state where we failed to set a best_addr, while we do have /// valid candidates, this will chose a candidate and set best_addr again. Most likely From 70deec3c03b829ada9c747601e1e5158c05483c9 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 5 Aug 2024 11:06:49 +0200 Subject: [PATCH 09/12] clippy --- iroh-net/src/magicsock/node_map/udp_paths.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-net/src/magicsock/node_map/udp_paths.rs b/iroh-net/src/magicsock/node_map/udp_paths.rs index e61444dc86..a16c6d91a2 100644 --- a/iroh-net/src/magicsock/node_map/udp_paths.rs +++ b/iroh-net/src/magicsock/node_map/udp_paths.rs @@ -122,7 +122,7 @@ impl NodeUdpPaths { .filter_map(|path| path.udp_addr()) .filter(|addr| addr.is_ipv4() || have_ipv6) .choose(&mut rand::thread_rng()); - self.chosen_candidate = addr.clone().map(IpPort::from); + self.chosen_candidate = addr.map(IpPort::from); addr }); match addr { From 093586cb6c11c788d47bba89d6ecd34187a7fa58 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 5 Aug 2024 11:07:26 +0200 Subject: [PATCH 10/12] typo --- iroh-net/src/magicsock/node_map/udp_paths.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-net/src/magicsock/node_map/udp_paths.rs b/iroh-net/src/magicsock/node_map/udp_paths.rs index a16c6d91a2..ba5b5beb0d 100644 --- a/iroh-net/src/magicsock/node_map/udp_paths.rs +++ b/iroh-net/src/magicsock/node_map/udp_paths.rs @@ -46,7 +46,7 @@ pub(super) enum UdpSendAddr { /// The UDP address is not known to work, but it might. /// /// We know this UDP address belongs to the remote node, but we do not know if the path - /// already works or may need holepunching before it will start to work. It migt even + /// already works or may need holepunching before it will start to work. It might even /// never work. It is still useful to send to this together with backup path, /// i.e. relay, in case the path works: if the path does not need holepunching it might /// be much faster. And if there is no relay path at all it might be the only way to From cb3b9eb62c6d3ea8f6c0edccb5f0ed6a759b288e Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 5 Aug 2024 11:08:37 +0200 Subject: [PATCH 11/12] fix module name in docs --- iroh-net/src/magicsock/node_map/udp_paths.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-net/src/magicsock/node_map/udp_paths.rs b/iroh-net/src/magicsock/node_map/udp_paths.rs index ba5b5beb0d..1154bc19c1 100644 --- a/iroh-net/src/magicsock/node_map/udp_paths.rs +++ b/iroh-net/src/magicsock/node_map/udp_paths.rs @@ -29,7 +29,7 @@ use super::IpPort; /// /// See [`NodeUdpPaths::send_addr`]. /// -/// [`MagicSock`]: crate::magicsocket::MagicSock +/// [`MagicSock`]: crate::magicsock::MagicSock /// [`NodeState`]: super::node_state::NodeState #[derive(Debug)] pub(super) enum UdpSendAddr { From b812344eb4a10564f4dee2ee739afb6a67232e9d Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 5 Aug 2024 11:54:18 +0200 Subject: [PATCH 12/12] ref(iroh-net): Move PathState to its own module There are no functional changes, this only moves code and fixes up imports. --- iroh-net/src/magicsock/node_map.rs | 1 + iroh-net/src/magicsock/node_map/node_state.rs | 272 +---------------- iroh-net/src/magicsock/node_map/path_state.rs | 284 ++++++++++++++++++ iroh-net/src/magicsock/node_map/udp_paths.rs | 3 +- 4 files changed, 291 insertions(+), 269 deletions(-) create mode 100644 iroh-net/src/magicsock/node_map/path_state.rs diff --git a/iroh-net/src/magicsock/node_map.rs b/iroh-net/src/magicsock/node_map.rs index 3550f34bfb..a91510674c 100644 --- a/iroh-net/src/magicsock/node_map.rs +++ b/iroh-net/src/magicsock/node_map.rs @@ -30,6 +30,7 @@ use crate::{ mod best_addr; mod node_state; +mod path_state; mod udp_paths; pub use node_state::{ConnectionType, ControlMsg, DirectAddrInfo, NodeInfo}; diff --git a/iroh-net/src/magicsock/node_map/node_state.rs b/iroh-net/src/magicsock/node_map/node_state.rs index 7a69b57117..d438f37e6c 100644 --- a/iroh-net/src/magicsock/node_map/node_state.rs +++ b/iroh-net/src/magicsock/node_map/node_state.rs @@ -1,4 +1,4 @@ -use std::collections::{btree_map::Entry, BTreeMap, BTreeSet, HashMap}; +use std::collections::{btree_map::Entry, BTreeSet, HashMap}; use std::hash::Hash; use std::net::{IpAddr, SocketAddr}; use std::time::{Duration, Instant}; @@ -19,6 +19,7 @@ use crate::util::relay_only_mode; use crate::{stun, NodeAddr, NodeId}; use super::best_addr::{self, ClearReason, Source}; +use super::path_state::{summarize_node_paths, PathState}; use super::udp_paths::{NodeUdpPaths, UdpSendAddr}; use super::IpPort; @@ -33,16 +34,12 @@ const LAST_ALIVE_PRUNE_DURATION: Duration = Duration::from_secs(120); /// How long we wait for a pong reply before assuming it's never coming. const PING_TIMEOUT_DURATION: Duration = Duration::from_secs(5); -/// The minimum time between pings to an endpoint. (Except in the case of CallMeMaybe frames -/// resetting the counter, as the first pings likely didn't through the firewall) -const DISCO_PING_INTERVAL: Duration = Duration::from_secs(5); - /// The latency at or under which we don't try to upgrade to a better path. const GOOD_ENOUGH_LATENCY: Duration = Duration::from_millis(5); /// How long since the last activity we try to keep an established endpoint peering alive. /// It's also the idle time at which we stop doing STUN queries to keep NAT mappings alive. -const SESSION_ACTIVE_TIMEOUT: Duration = Duration::from_secs(45); +pub(super) const SESSION_ACTIVE_TIMEOUT: Duration = Duration::from_secs(45); /// How often we try to upgrade to a better patheven if we have some non-relay route that works. const UPGRADE_INTERVAL: Duration = Duration::from_secs(60); @@ -1145,267 +1142,6 @@ impl From for NodeAddr { } } -/// State about a particular path to another [`NodeState`]. -/// -/// This state is used for both the relay path and any direct UDP paths. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub(super) struct PathState { - /// The node for which this path exists. - node_id: NodeId, - /// The path this applies for. - path: SendAddr, - /// The last (outgoing) ping time. - last_ping: Option, - - /// If non-zero, means that this was an endpoint that we learned about at runtime (from an - /// incoming ping). If so, we keep the time updated and use it to discard old candidates. - // NOTE: tx_id Originally added in tailscale due to . - last_got_ping: Option<(Instant, stun::TransactionId)>, - - /// If non-zero, is the time this endpoint was advertised last via a call-me-maybe disco message. - call_me_maybe_time: Option, - - /// Last [`PongReply`] received. - pub(super) recent_pong: Option, - /// When was this endpoint last used to transmit payload data (removing ping, pong, etc). - pub(super) last_payload_msg: Option, -} - -impl PathState { - fn new(node_id: NodeId, path: SendAddr) -> Self { - Self { - node_id, - path, - last_ping: None, - last_got_ping: None, - call_me_maybe_time: None, - recent_pong: None, - last_payload_msg: None, - } - } - - pub(super) fn udp_addr(&self) -> Option { - match self.path { - SendAddr::Udp(addr) => Some(addr), - SendAddr::Relay(_) => None, - } - } - - pub(super) fn with_last_payload(node_id: NodeId, path: SendAddr, now: Instant) -> Self { - PathState { - node_id, - path, - last_ping: None, - last_got_ping: None, - call_me_maybe_time: None, - recent_pong: None, - last_payload_msg: Some(now), - } - } - - pub(super) fn with_ping( - node_id: NodeId, - path: SendAddr, - tx_id: stun::TransactionId, - now: Instant, - ) -> Self { - let mut new = PathState::new(node_id, path); - new.handle_ping(tx_id, now); - new - } - - pub(super) fn add_pong_reply(&mut self, r: PongReply) { - if let SendAddr::Udp(ref path) = self.path { - if self.recent_pong.is_none() { - event!( - target: "events.net.holepunched", - Level::DEBUG, - node = %self.node_id.fmt_short(), - path = ?path, - direction = "outgoing", - ); - } - } - self.recent_pong = Some(r); - } - - #[cfg(test)] - pub(super) fn with_pong_reply(node_id: NodeId, r: PongReply) -> Self { - PathState { - node_id, - path: r.from.clone(), - last_ping: None, - last_got_ping: None, - call_me_maybe_time: None, - recent_pong: Some(r), - last_payload_msg: None, - } - } - - /// Check whether this path is considered active. - /// - /// Active means the path has received payload messages within the last - /// [`SESSION_ACTIVE_TIMEOUT`]. - /// - /// Note that a path might be alive but not active if it's contactable but not in - /// use. - pub(super) fn is_active(&self) -> bool { - self.last_payload_msg - .as_ref() - .map(|instant| instant.elapsed() <= SESSION_ACTIVE_TIMEOUT) - .unwrap_or(false) - } - - /// Returns the instant the last incoming ping was received. - pub(super) fn last_incoming_ping(&self) -> Option<&Instant> { - self.last_got_ping.as_ref().map(|(time, _tx_id)| time) - } - - /// Reports the last instant this path was considered alive. - /// - /// Alive means the path is considered in use by the remote endpoint. Either because we - /// received a payload message, a DISCO message (ping, pong) or it was advertised in a - /// call-me-maybe message. - /// - /// This is the most recent instant between: - /// - when last pong was received. - /// - when this path was last advertised in a received CallMeMaybe message. - /// - When the last payload transmission occurred. - /// - when the last ping from them was received. - pub(super) fn last_alive(&self) -> Option { - self.recent_pong() - .map(|pong| &pong.pong_at) - .into_iter() - .chain(self.last_payload_msg.as_ref()) - .chain(self.call_me_maybe_time.as_ref()) - .chain(self.last_incoming_ping()) - .max() - .copied() - } - - pub(super) fn last_control_msg(&self, now: Instant) -> Option<(Duration, ControlMsg)> { - // get every control message and assign it its kind - let last_pong = self - .recent_pong() - .map(|pong| (pong.pong_at, ControlMsg::Pong)); - let last_call_me_maybe = self - .call_me_maybe_time - .as_ref() - .map(|call_me| (*call_me, ControlMsg::CallMeMaybe)); - let last_ping = self - .last_incoming_ping() - .map(|ping| (*ping, ControlMsg::Ping)); - - last_pong - .into_iter() - .chain(last_call_me_maybe) - .chain(last_ping) - .max_by_key(|(instant, _kind)| *instant) - .map(|(instant, kind)| (now.duration_since(instant), kind)) - } - - /// Returns the most recent pong if available. - pub(super) fn recent_pong(&self) -> Option<&PongReply> { - self.recent_pong.as_ref() - } - - /// Returns the latency from the most recent pong, if available. - fn latency(&self) -> Option { - self.recent_pong.as_ref().map(|p| p.latency) - } - - fn needs_ping(&self, now: &Instant) -> bool { - match self.last_ping { - None => true, - Some(last_ping) => { - let elapsed = now.duration_since(last_ping); - - // TODO: remove! - // This logs "ping is too new" for each send whenever the endpoint does *not* need - // a ping. Pretty sure this is not a useful log, but maybe there was a reason? - // if !needs_ping { - // debug!("ping is too new: {}ms", elapsed.as_millis()); - // } - elapsed > DISCO_PING_INTERVAL - } - } - } - - fn handle_ping(&mut self, tx_id: stun::TransactionId, now: Instant) -> PingRole { - if Some(&tx_id) == self.last_got_ping.as_ref().map(|(_t, tx_id)| tx_id) { - PingRole::Duplicate - } else { - let prev = self.last_got_ping.replace((now, tx_id)); - let heartbeat_deadline = HEARTBEAT_INTERVAL + (HEARTBEAT_INTERVAL / 2); - match prev { - Some((prev_time, _tx)) if now.duration_since(prev_time) <= heartbeat_deadline => { - PingRole::LikelyHeartbeat - } - Some((prev_time, _tx)) => { - debug!( - elapsed = ?now.duration_since(prev_time), - "heartbeat missed, reactivating", - ); - PingRole::Activate - } - None => { - if let SendAddr::Udp(ref addr) = self.path { - event!( - target: "events.net.holepunched", - Level::DEBUG, - node = %self.node_id.fmt_short(), - path = ?addr, - direction = "incoming", - ); - } - PingRole::Activate - } - } - } - } - - fn clear(&mut self) { - self.last_ping = None; - self.last_got_ping = None; - self.call_me_maybe_time = None; - self.recent_pong = None; - } - - fn summary(&self, mut w: impl std::fmt::Write) -> std::fmt::Result { - write!(w, "{{ ")?; - if self.is_active() { - write!(w, "active ")?; - } - if let Some(ref pong) = self.recent_pong { - write!(w, "pong-received({:?} ago) ", pong.pong_at.elapsed())?; - } - if let Some(when) = self.last_incoming_ping() { - write!(w, "ping-received({:?} ago) ", when.elapsed())?; - } - if let Some(ref when) = self.last_ping { - write!(w, "ping-sent({:?} ago) ", when.elapsed())?; - } - write!(w, "}}") - } -} - -// TODO: Make an `EndpointPaths` struct and do things nicely. -fn summarize_node_paths(paths: &BTreeMap) -> String { - use std::fmt::Write; - - let mut w = String::new(); - write!(&mut w, "[").ok(); - for (i, (ipp, state)) in paths.iter().enumerate() { - if i > 0 { - write!(&mut w, ", ").ok(); - } - write!(&mut w, "{ipp}").ok(); - state.summary(&mut w).ok(); - } - write!(&mut w, "]").ok(); - w -} - /// Whether to send a call-me-maybe message after sending pings to all known paths. /// /// `IfNoRecent` will only send a call-me-maybe if no previous one was sent in the last @@ -1570,7 +1306,7 @@ pub enum ConnectionType { #[cfg(test)] mod tests { - use std::net::Ipv4Addr; + use std::{collections::BTreeMap, net::Ipv4Addr}; use best_addr::BestAddr; diff --git a/iroh-net/src/magicsock/node_map/path_state.rs b/iroh-net/src/magicsock/node_map/path_state.rs new file mode 100644 index 0000000000..6121d8242d --- /dev/null +++ b/iroh-net/src/magicsock/node_map/path_state.rs @@ -0,0 +1,284 @@ +//! The state kept for each network path to a remote node. + +use std::collections::BTreeMap; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +use iroh_base::key::NodeId; +use tracing::{debug, event, Level}; + +use crate::disco::SendAddr; +use crate::magicsock::HEARTBEAT_INTERVAL; +use crate::stun; + +use super::node_state::{PongReply, SESSION_ACTIVE_TIMEOUT}; +use super::{ControlMsg, IpPort, PingRole}; + +/// The minimum time between pings to an endpoint. +/// +/// Except in the case of CallMeMaybe frames resetting the counter, as the first pings +/// likely didn't through the firewall. +const DISCO_PING_INTERVAL: Duration = Duration::from_secs(5); + +/// State about a particular path to another [`NodeState`]. +/// +/// This state is used for both the relay path and any direct UDP paths. +/// +/// [`NodeState`]: super::node_state::NodeState +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(super) struct PathState { + /// The node for which this path exists. + node_id: NodeId, + /// The path this applies for. + path: SendAddr, + /// The last (outgoing) ping time. + pub(super) last_ping: Option, + + /// If non-zero, means that this was an endpoint that we learned about at runtime (from an + /// incoming ping). If so, we keep the time updated and use it to discard old candidates. + // NOTE: tx_id Originally added in tailscale due to . + last_got_ping: Option<(Instant, stun::TransactionId)>, + + /// If non-zero, is the time this endpoint was advertised last via a call-me-maybe disco message. + pub(super) call_me_maybe_time: Option, + + /// Last [`PongReply`] received. + pub(super) recent_pong: Option, + /// When was this endpoint last used to transmit payload data (removing ping, pong, etc). + pub(super) last_payload_msg: Option, +} + +impl PathState { + pub(super) fn new(node_id: NodeId, path: SendAddr) -> Self { + Self { + node_id, + path, + last_ping: None, + last_got_ping: None, + call_me_maybe_time: None, + recent_pong: None, + last_payload_msg: None, + } + } + + pub(super) fn udp_addr(&self) -> Option { + match self.path { + SendAddr::Udp(addr) => Some(addr), + SendAddr::Relay(_) => None, + } + } + + pub(super) fn with_last_payload(node_id: NodeId, path: SendAddr, now: Instant) -> Self { + PathState { + node_id, + path, + last_ping: None, + last_got_ping: None, + call_me_maybe_time: None, + recent_pong: None, + last_payload_msg: Some(now), + } + } + + pub(super) fn with_ping( + node_id: NodeId, + path: SendAddr, + tx_id: stun::TransactionId, + now: Instant, + ) -> Self { + let mut new = PathState::new(node_id, path); + new.handle_ping(tx_id, now); + new + } + + pub(super) fn add_pong_reply(&mut self, r: PongReply) { + if let SendAddr::Udp(ref path) = self.path { + if self.recent_pong.is_none() { + event!( + target: "events.net.holepunched", + Level::DEBUG, + node = %self.node_id.fmt_short(), + path = ?path, + direction = "outgoing", + ); + } + } + self.recent_pong = Some(r); + } + + #[cfg(test)] + pub(super) fn with_pong_reply(node_id: NodeId, r: PongReply) -> Self { + PathState { + node_id, + path: r.from.clone(), + last_ping: None, + last_got_ping: None, + call_me_maybe_time: None, + recent_pong: Some(r), + last_payload_msg: None, + } + } + + /// Check whether this path is considered active. + /// + /// Active means the path has received payload messages within the last + /// [`SESSION_ACTIVE_TIMEOUT`]. + /// + /// Note that a path might be alive but not active if it's contactable but not in + /// use. + pub(super) fn is_active(&self) -> bool { + self.last_payload_msg + .as_ref() + .map(|instant| instant.elapsed() <= SESSION_ACTIVE_TIMEOUT) + .unwrap_or(false) + } + + /// Returns the instant the last incoming ping was received. + pub(super) fn last_incoming_ping(&self) -> Option<&Instant> { + self.last_got_ping.as_ref().map(|(time, _tx_id)| time) + } + + /// Reports the last instant this path was considered alive. + /// + /// Alive means the path is considered in use by the remote endpoint. Either because we + /// received a payload message, a DISCO message (ping, pong) or it was advertised in a + /// call-me-maybe message. + /// + /// This is the most recent instant between: + /// - when last pong was received. + /// - when this path was last advertised in a received CallMeMaybe message. + /// - When the last payload transmission occurred. + /// - when the last ping from them was received. + pub(super) fn last_alive(&self) -> Option { + self.recent_pong() + .map(|pong| &pong.pong_at) + .into_iter() + .chain(self.last_payload_msg.as_ref()) + .chain(self.call_me_maybe_time.as_ref()) + .chain(self.last_incoming_ping()) + .max() + .copied() + } + + pub(super) fn last_control_msg(&self, now: Instant) -> Option<(Duration, ControlMsg)> { + // get every control message and assign it its kind + let last_pong = self + .recent_pong() + .map(|pong| (pong.pong_at, ControlMsg::Pong)); + let last_call_me_maybe = self + .call_me_maybe_time + .as_ref() + .map(|call_me| (*call_me, ControlMsg::CallMeMaybe)); + let last_ping = self + .last_incoming_ping() + .map(|ping| (*ping, ControlMsg::Ping)); + + last_pong + .into_iter() + .chain(last_call_me_maybe) + .chain(last_ping) + .max_by_key(|(instant, _kind)| *instant) + .map(|(instant, kind)| (now.duration_since(instant), kind)) + } + + /// Returns the most recent pong if available. + pub(super) fn recent_pong(&self) -> Option<&PongReply> { + self.recent_pong.as_ref() + } + + /// Returns the latency from the most recent pong, if available. + pub(super) fn latency(&self) -> Option { + self.recent_pong.as_ref().map(|p| p.latency) + } + + pub(super) fn needs_ping(&self, now: &Instant) -> bool { + match self.last_ping { + None => true, + Some(last_ping) => { + let elapsed = now.duration_since(last_ping); + + // TODO: remove! + // This logs "ping is too new" for each send whenever the endpoint does *not* need + // a ping. Pretty sure this is not a useful log, but maybe there was a reason? + // if !needs_ping { + // debug!("ping is too new: {}ms", elapsed.as_millis()); + // } + elapsed > DISCO_PING_INTERVAL + } + } + } + + pub(super) fn handle_ping(&mut self, tx_id: stun::TransactionId, now: Instant) -> PingRole { + if Some(&tx_id) == self.last_got_ping.as_ref().map(|(_t, tx_id)| tx_id) { + PingRole::Duplicate + } else { + let prev = self.last_got_ping.replace((now, tx_id)); + let heartbeat_deadline = HEARTBEAT_INTERVAL + (HEARTBEAT_INTERVAL / 2); + match prev { + Some((prev_time, _tx)) if now.duration_since(prev_time) <= heartbeat_deadline => { + PingRole::LikelyHeartbeat + } + Some((prev_time, _tx)) => { + debug!( + elapsed = ?now.duration_since(prev_time), + "heartbeat missed, reactivating", + ); + PingRole::Activate + } + None => { + if let SendAddr::Udp(ref addr) = self.path { + event!( + target: "events.net.holepunched", + Level::DEBUG, + node = %self.node_id.fmt_short(), + path = ?addr, + direction = "incoming", + ); + } + PingRole::Activate + } + } + } + } + + pub(super) fn clear(&mut self) { + self.last_ping = None; + self.last_got_ping = None; + self.call_me_maybe_time = None; + self.recent_pong = None; + } + + fn summary(&self, mut w: impl std::fmt::Write) -> std::fmt::Result { + write!(w, "{{ ")?; + if self.is_active() { + write!(w, "active ")?; + } + if let Some(ref pong) = self.recent_pong { + write!(w, "pong-received({:?} ago) ", pong.pong_at.elapsed())?; + } + if let Some(when) = self.last_incoming_ping() { + write!(w, "ping-received({:?} ago) ", when.elapsed())?; + } + if let Some(ref when) = self.last_ping { + write!(w, "ping-sent({:?} ago) ", when.elapsed())?; + } + write!(w, "}}") + } +} + +// TODO: Make an `EndpointPaths` struct and do things nicely. +pub(super) fn summarize_node_paths(paths: &BTreeMap) -> String { + use std::fmt::Write; + + let mut w = String::new(); + write!(&mut w, "[").ok(); + for (i, (ipp, state)) in paths.iter().enumerate() { + if i > 0 { + write!(&mut w, ", ").ok(); + } + write!(&mut w, "{ipp}").ok(); + state.summary(&mut w).ok(); + } + write!(&mut w, "]").ok(); + w +} diff --git a/iroh-net/src/magicsock/node_map/udp_paths.rs b/iroh-net/src/magicsock/node_map/udp_paths.rs index 1154bc19c1..345fff9daf 100644 --- a/iroh-net/src/magicsock/node_map/udp_paths.rs +++ b/iroh-net/src/magicsock/node_map/udp_paths.rs @@ -15,7 +15,8 @@ use tracing::warn; use crate::disco::SendAddr; use super::best_addr::{self, BestAddr}; -use super::node_state::{PathState, PongReply}; +use super::node_state::PongReply; +use super::path_state::PathState; use super::IpPort; /// The address on which to send datagrams over UDP.