From acb621b34c78bd1bfb7a40d6003e6c1c37ba661b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Thu, 12 Dec 2024 20:16:31 -0500 Subject: [PATCH 1/9] feat(iroh): add QUIC address discovery to iroh's net-report --- iroh/src/endpoint.rs | 10 ++++++---- iroh/src/magicsock.rs | 39 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 781514cc0d..085c17c051 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -554,12 +554,14 @@ impl Endpoint { )?; trace!("created quinn endpoint"); debug!(version = env!("CARGO_PKG_VERSION"), "iroh Endpoint created"); - Ok(Self { - msock, - endpoint, + let ep = Self { + msock: msock.clone(), + endpoint: endpoint.clone(), rtt_actor: Arc::new(rtt_actor::RttHandle::new()), static_config: Arc::new(static_config), - }) + }; + msock.set_quic_endpoint(Some(endpoint)); + Ok(ep) } /// Sets the list of accepted ALPN protocols. diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 66c1c0c4ff..eb259104df 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -38,6 +38,7 @@ use futures_util::{stream::BoxStream, task::AtomicWaker}; use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl, SecretKey}; use iroh_metrics::{inc, inc_by}; use iroh_relay::{protos::stun, RelayMap}; +use net_report::QuicConfig; use netwatch::{interfaces, ip::LocalAddresses, netmon, UdpSocket}; use quinn::AsyncUdpSocket; use rand::{seq::SliceRandom, Rng, SeedableRng}; @@ -231,6 +232,10 @@ pub(crate) struct MagicSock { pconn6: Option, /// NetReport client net_reporter: net_report::Addr, + /// Handle to the underlying quinn::Endpoint. + /// + /// Used in netcheck for QUIC address discovery. + quic_endpoint: Arc>>, /// The state for an active DiscoKey. disco_secrets: DiscoSecrets, @@ -282,6 +287,16 @@ impl MagicSock { self.my_relay.set(my_relay).unwrap_or_else(|e| e) } + /// Sets the internal `quinn::Endpoint` that is used for QUIC address + /// discovery. + pub(crate) fn set_quic_endpoint(&self, endpoint: Option) { + let mut ep = self + .quic_endpoint + .write() + .expect("MagicSock::endpoint RwLock is poisoned"); + *ep = endpoint; + } + fn is_closing(&self) -> bool { self.closing.load(Ordering::Relaxed) } @@ -1589,6 +1604,7 @@ impl Handle { dns_resolver, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify, + quic_endpoint: Arc::new(RwLock::new(None)), }); let mut actor_tasks = JoinSet::default(); @@ -2428,10 +2444,31 @@ impl Actor { } let relay_map = self.msock.relay_map.clone(); - let opts = net_report::Options::default() + let mut opts = net_report::Options::default() .stun_v4(Some(self.pconn4.clone())) .stun_v6(self.pconn6.clone()); + let quic_endpoint = self.msock.quic_endpoint.read().expect("poisoned").clone(); + + let quic_config = match quic_endpoint { + Some(ep) => { + let root_store = rustls::RootCertStore::from_iter( + webpki_roots::TLS_SERVER_ROOTS.iter().cloned(), + ); + let client_config = rustls::ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + Some(QuicConfig { + ep, + client_config, + ipv4: true, + ipv6: true, + }) + } + None => None, + }; + opts = opts.quic_config(quic_config); + debug!("requesting net_report report"); match self.net_reporter.get_report_channel(relay_map, opts).await { Ok(rx) => { From 44bcb59e65c9d61a2f1b4e9948ab7c505537bdf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Fri, 13 Dec 2024 17:53:41 -0500 Subject: [PATCH 2/9] feat: treat quic address discovery packets special --- iroh-net-report/Cargo.toml | 1 + iroh-net-report/src/lib.rs | 2 + iroh-net-report/src/reportgen.rs | 26 ++++ iroh/Cargo.toml | 2 +- iroh/src/magicsock.rs | 157 +++++++++++++++++++--- iroh/src/magicsock/node_map.rs | 35 +++++ iroh/src/magicsock/node_map/node_state.rs | 1 + 7 files changed, 206 insertions(+), 18 deletions(-) diff --git a/iroh-net-report/Cargo.toml b/iroh-net-report/Cargo.toml index b2cabb734f..6788209cc8 100644 --- a/iroh-net-report/Cargo.toml +++ b/iroh-net-report/Cargo.toml @@ -49,6 +49,7 @@ tokio = { version = "1", default-features = false, features = ["test-util"] } default = ["metrics"] metrics = ["iroh-metrics/metrics", "portmapper/metrics"] stun-utils = [] +iroh = [] [package.metadata.docs.rs] all-features = true diff --git a/iroh-net-report/src/lib.rs b/iroh-net-report/src/lib.rs index 4e309aff34..0f678bb804 100644 --- a/iroh-net-report/src/lib.rs +++ b/iroh-net-report/src/lib.rs @@ -38,6 +38,7 @@ mod ping; mod reportgen; pub use metrics::Metrics; +pub use reportgen::MappedAddr; use reportgen::ProbeProto; pub use reportgen::QuicConfig; #[cfg(feature = "stun-utils")] @@ -644,6 +645,7 @@ impl Actor { quic_config, .. } = opts; + trace!("Attempting probes for protocols {protos:#?}"); if self.current_report_run.is_some() { response_tx .send(Err(anyhow!( diff --git a/iroh-net-report/src/reportgen.rs b/iroh-net-report/src/reportgen.rs index fb144cdc25..38b00c39f8 100644 --- a/iroh-net-report/src/reportgen.rs +++ b/iroh-net-report/src/reportgen.rs @@ -700,8 +700,25 @@ pub struct QuicConfig { pub ipv4: bool, /// Enable ipv6 QUIC address discovery probes pub ipv6: bool, + /// A map of RelayUrls to addresses + /// + /// Only makes sense in the context of iroh, which uses QuicMappedAddrs + /// to allow dialing by NodeId + #[cfg(feature = "iroh")] + pub mapped_addrs: MappedRelayAddrs, } +/// Holds a QuicMappedAddr +#[cfg(feature = "iroh")] +#[derive(Debug, Clone, Copy)] +pub struct MappedAddr(pub SocketAddr); + +/// A relationship between a socket address and it's QUIC mapped address +/// +/// Only relevant when using the net-report with iroh. +#[cfg(feature = "iroh")] +type MappedRelayAddrs = std::collections::HashMap; + /// Executes a particular [`Probe`], including using a delayed start if needed. /// /// If *stun_sock4* and *stun_sock6* are `None` the STUN probes are disabled. @@ -923,6 +940,14 @@ async fn run_quic_probe( )); } }; + + // if we are using net-report with iroh, we must dial using the mapped address + #[cfg(feature = "iroh")] + let relay_addr = quic_config + .mapped_addrs + .get(&relay_addr) + .map_or(relay_addr, |mapped_addr| mapped_addr.0.clone()); + let quic_client = iroh_relay::quic::QuicClient::new(quic_config.ep, quic_config.client_config) .map_err(|e| ProbeError::Error(e, probe.clone()))?; let (addr, latency) = quic_client @@ -1592,6 +1617,7 @@ mod tests { client_config, ipv4: true, ipv6: true, + mapped_addrs: std::collections::HashMap::new(), }; let url = relay.url.clone(); let port = server.quic_addr().unwrap().port(); diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index e8902e1761..bdff2a5209 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -95,7 +95,7 @@ webpki = { package = "rustls-webpki", version = "0.102" } webpki-roots = "0.26" x509-parser = "0.16" z32 = "1.0.3" -net-report = { package = "iroh-net-report", path = "../iroh-net-report", version = "0.30", default-features = false } +net-report = { package = "iroh-net-report", path = "../iroh-net-report", version = "0.30", default-features = false, features = ["iroh"] } # metrics iroh-metrics = { version = "0.30", default-features = false } diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index eb259104df..cbf49ac66c 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -68,7 +68,7 @@ use crate::{ defaults::timeouts::NET_REPORT_TIMEOUT, disco::{self, CallMeMaybe, SendAddr}, discovery::{Discovery, DiscoveryItem}, - dns::DnsResolver, + dns::{DnsResolver, ResolverExt}, key::{public_ed_box, secret_ed_box, DecryptionError, SharedSecret}, watchable::{Watchable, Watcher}, }; @@ -595,7 +595,31 @@ impl MagicSock { } } None => { - error!(%dest, "no NodeState for mapped address, voiding transmit"); + // Check if this is a QUIC address discovery packet + if let Some(addr) = self.node_map.get_qad_addr(&dest) { + // send udp + // rewrite target address + transmit.destination = addr; + match self.try_send_udp(addr, &transmit) { + Ok(()) => { + trace!(dst = %addr, + "sent QAD transmit over UDP"); + } + Err(err) => { + // No need to print "WouldBlock" errors to the console + if err.kind() == io::ErrorKind::WouldBlock { + return Err(io::Error::new(io::ErrorKind::WouldBlock, "pending")); + } else { + warn!( + dst = %addr, + "failed to send QAD message over udp: {err:#}" + ); + } + } + } + } else { + error!(%dest, "no NodeState for mapped address, voiding transmit"); + } // Returning Ok here means we let QUIC timeout. Returning WouldBlock // triggers a hot loop. Returning an error would immediately fail a // connection. The philosophy of quinn-udp is that a UDP connection could @@ -834,15 +858,27 @@ impl MagicSock { // Update the NodeMap and remap RecvMeta to the QuicMappedAddr. match self.node_map.receive_udp(meta.addr) { None => { - warn!( - src = ?meta.addr, - count = %quic_datagram_count, - len = meta.len, - "UDP recv quic packets: no node state found, skipping", - ); - // If we have no node state for the from addr, set len to 0 to make - // quinn skip the buf completely. - meta.len = 0; + // Check if this is QUIC address discovery response + if let Some(quic_mapped_addr) = self.node_map.receive_qad(meta.addr) { + trace!( + src = ?meta.addr, + count = %quic_datagram_count, + len = meta.len, + "UDP recv QUIC address discovery packets", + ); + quic_packets_total += quic_datagram_count; + meta.addr = quic_mapped_addr.0; + } else { + warn!( + src = ?meta.addr, + count = %quic_datagram_count, + len = meta.len, + "UDP recv quic packets: no node state found, skipping", + ); + // If we have no node state for the from addr, set len to 0 to make + // quinn skip the buf completely. + meta.len = 0; + } } Some((node_id, quic_mapped_addr)) => { trace!( @@ -1714,17 +1750,13 @@ impl DiscoSecrets { where F: FnOnce(&mut SharedSecret) -> T, { - let mut inner = self.0.lock().expect("poisoned"); let x = inner.entry(node_id).or_insert_with(|| { let public_key = public_ed_box(&node_id.public()); - SharedSecret::new(secret, &public_key) }); - cb(x) - } + {}} - fn encode_and_seal( + fn , ResolverExt}encode_and_seal( &self, - this_secret_key: &crypto_box::SecretKey, this_node_id: NodeId, other_node_id: NodeId, msg: &disco::Message, @@ -2452,6 +2484,9 @@ impl Actor { let quic_config = match quic_endpoint { Some(ep) => { + // Need to add Quic Mapped Addrs for the relay nodes to use for + // QUIC Address Discovery + let mapped_addrs = self.resolve_qad_addrs(Duration::from_millis(500)).await; let root_store = rustls::RootCertStore::from_iter( webpki_roots::TLS_SERVER_ROOTS.iter().cloned(), ); @@ -2463,6 +2498,7 @@ impl Actor { client_config, ipv4: true, ipv6: true, + mapped_addrs, }) } None => None, @@ -2547,6 +2583,93 @@ impl Actor { self.update_direct_addresses(report); } + /// Does a DNS look up for the `RelayUrl` and returns the set of resolved + /// [`SocketAddr`]s + async fn resolve_relay_quic_endpoint( + dns_resolver: DnsResolver, + relay_node: Arc, + duration: Duration, + ) -> BTreeSet { + let mut addrs = BTreeSet::new(); + let port = if let Some(ref quic_config) = relay_node.quic { + quic_config.port + } else { + trace!( + ?relay_node, + "No quic config for the relay node: no need to resolve quic endpoint ip" + ); + return addrs; + }; + + match relay_node.url.host() { + Some(url::Host::Domain(hostname)) => { + error!(%hostname, "Performing DNS A lookup for relay addr"); + let mut set = JoinSet::new(); + let resolver = dns_resolver.clone(); + let ipv4_resolver = resolver.clone(); + let ipv4_hostname = hostname.to_owned(); + set.spawn(async move { + let res = ipv4_resolver.lookup_ipv4(ipv4_hostname, duration).await; + res.map(|addrs| addrs.collect::>()) + }); + let ipv6_hostname = hostname.to_owned(); + set.spawn(async move { + let res = resolver.lookup_ipv6(ipv6_hostname, duration).await; + res.map(|addrs| addrs.collect::>()) + }); + let responses = set.join_all().await; + for res in responses { + match res { + Err(_) => {} + Ok(resolved_addrs) => { + for addr in resolved_addrs { + addrs.insert(SocketAddr::new(addr.into(), port)); + } + } + } + } + if addrs.is_empty() { + error!(%hostname, "Unable to resolve ip addresses for relay node"); + } + } + Some(url::Host::Ipv4(addr)) => { + addrs.insert(SocketAddr::new(addr.into(), port)); + } + Some(url::Host::Ipv6(addr)) => { + addrs.insert(SocketAddr::new(addr.into(), port)); + } + None => { + error!(?relay_node.url, "No hostname for relay node, cannot resolve ip"); + } + } + return addrs; + } + + /// Resolve the relay addresses used for QUIC address discovery. + async fn resolve_qad_addrs( + &mut self, + duration: Duration, + ) -> HashMap { + let mut mapped_addrs = HashMap::new(); + let mut set = JoinSet::new(); + let resolver = self.msock.dns_resolver(); + for relay_node in self.msock.relay_map.nodes() { + set.spawn(Actor::resolve_relay_quic_endpoint( + resolver.clone(), + relay_node.clone(), + duration, + )); + } + let res = set.join_all().await; + for addrs in res { + addrs.iter().for_each(|addr| { + let mapped_addr = self.msock.node_map.add_qad_addr(*addr); + mapped_addrs.insert(*addr, net_report::MappedAddr(mapped_addr.0)); + }); + } + mapped_addrs + } + fn set_nearest_relay(&mut self, relay_url: Option) -> bool { let my_relay = self.msock.my_relay(); if relay_url == my_relay { diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index c466de7ba3..20c7f43512 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -69,6 +69,8 @@ pub(super) struct NodeMapInner { next_id: usize, #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection, + // special mapping for relay socket addresses so that we can do quic address discovery + qad_mapped_addrs: HashMap, } /// Identifier to look up a [`NodeState`] in the [`NodeMap`]. @@ -153,11 +155,44 @@ impl NodeMap { .add_node_addr(node_addr, source) } + /// Add a the SocketAddr used to preform QUIC Address Discovery to the nodemap + pub(super) fn add_qad_addr(&self, udp_addr: SocketAddr) -> QuicMappedAddr { + let quic_mapped_addr = QuicMappedAddr::generate(); + self.inner + .lock() + .qad_mapped_addrs + .insert(udp_addr, quic_mapped_addr); + quic_mapped_addr + } + + /// Get the socket address used to preform QUIC Address Discovery + pub(super) fn get_qad_addr(&self, addr: &QuicMappedAddr) -> Option { + self.inner + .lock() + .qad_mapped_addrs + .iter() + .find_map(|(udp_addr, quic_mapped_addr)| { + if addr == quic_mapped_addr { + Some(*udp_addr) + } else { + None + } + }) + } + /// Number of nodes currently listed. pub(super) fn node_count(&self) -> usize { self.inner.lock().expect("poisoned").node_count() } + pub(super) fn receive_qad(&self, udp_addr: SocketAddr) -> Option { + self.inner + .lock() + .qad_mapped_addrs + .get(&udp_addr) + .map(|addr| *addr) + } + pub(super) fn receive_udp(&self, udp_addr: SocketAddr) -> Option<(PublicKey, QuicMappedAddr)> { self.inner.lock().expect("poisoned").receive_udp(udp_addr) } diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index d116be6695..ff259ddc85 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -1691,6 +1691,7 @@ mod tests { ]), next_id: 5, path_selection: PathSelection::default(), + qad_mapped_addrs: HashMap::new(), }); let mut got = node_map.list_remote_infos(later); got.sort_by_key(|p| p.node_id); From c1a744dbd96b180844afeb20518e79f30cab1807 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Fri, 13 Dec 2024 20:10:11 -0500 Subject: [PATCH 3/9] refactor: remove magicsock / quinn::Endpoint circular dependency `MagicSock::spawn` creates a `quinn::Endpoint`. `MagicSock` is now an `AsyncUdpSocket`, and can be passed into the `quinn::Endpoint`. `magicsock::Handle` now owns the `quinn::Endpoint`, and `iroh::Endpoint` interacts with the `quinn::Endpoint` through `Handle::endpoint()`. This allows us to pass the `quinn::Endpoint` to the `magicsock::Actor` for use in QAD, without any circular dependencies. --- iroh-net-report/src/lib.rs | 2 +- iroh/examples/listen.rs | 2 +- iroh/src/endpoint.rs | 43 ++---- iroh/src/magicsock.rs | 241 ++++++++++++++++++--------------- iroh/src/magicsock/node_map.rs | 3 + 5 files changed, 151 insertions(+), 140 deletions(-) diff --git a/iroh-net-report/src/lib.rs b/iroh-net-report/src/lib.rs index 0f678bb804..c4d6221fd5 100644 --- a/iroh-net-report/src/lib.rs +++ b/iroh-net-report/src/lib.rs @@ -645,7 +645,7 @@ impl Actor { quic_config, .. } = opts; - trace!("Attempting probes for protocols {protos:#?}"); + trace!("Attempting probes for protocols {protocols:#?}"); if self.current_report_run.is_some() { response_tx .send(Err(anyhow!( diff --git a/iroh/examples/listen.rs b/iroh/examples/listen.rs index 13413992dd..e8dd616d1f 100644 --- a/iroh/examples/listen.rs +++ b/iroh/examples/listen.rs @@ -28,7 +28,7 @@ async fn main() -> anyhow::Result<()> { // Use `RelayMode::Custom` to pass in a `RelayMap` with custom relay urls. // Use `RelayMode::Disable` to disable holepunching and relaying over HTTPS // If you want to experiment with relaying using your own relay server, you must pass in the same custom relay url to both the `listen` code AND the `connect` code - .relay_mode(RelayMode::Default) + .relay_mode(RelayMode::Staging) // you can choose a port to bind to, but passing in `0` will bind the socket to a random available port .bind() .await?; diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 085c17c051..842ca786cd 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -163,6 +163,8 @@ impl Builder { 1 => Some(discovery.into_iter().next().expect("checked length")), _ => Some(Box::new(ConcurrentDiscovery::from_services(discovery))), }; + let server_config = static_config.create_server_config(self.alpn_protocols)?; + let msock_opts = magicsock::Options { addr_v4: self.addr_v4, addr_v6: self.addr_v6, @@ -172,12 +174,13 @@ impl Builder { discovery, proxy_url: self.proxy_url, dns_resolver, + server_config, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, #[cfg(any(test, feature = "test-utils"))] path_selection: self.path_selection, }; - Endpoint::bind(static_config, msock_opts, self.alpn_protocols).await + Endpoint::bind(static_config, msock_opts).await } // # The very common methods everyone basically needs. @@ -442,7 +445,6 @@ impl Builder { self } } - /// Configuration for a [`quinn::Endpoint`] that cannot be changed at runtime. #[derive(Debug)] struct StaticConfig { @@ -453,7 +455,7 @@ struct StaticConfig { impl StaticConfig { /// Create a [`quinn::ServerConfig`] with the specified ALPN protocols. - fn create_server_config(&self, alpn_protocols: Vec>) -> Result { + fn create_server_config(&self, alpn_protocols: Vec>) -> Result { let server_config = make_server_config( &self.secret_key, alpn_protocols, @@ -506,7 +508,6 @@ pub fn make_server_config( #[derive(Clone, Debug)] pub struct Endpoint { msock: Handle, - endpoint: quinn::Endpoint, rtt_actor: Arc, static_config: Arc, } @@ -528,39 +529,16 @@ impl Endpoint { /// This is for internal use, the public interface is the [`Builder`] obtained from /// [Self::builder]. See the methods on the builder for documentation of the parameters. #[instrument("ep", skip_all, fields(me = %static_config.secret_key.public().fmt_short()))] - async fn bind( - static_config: StaticConfig, - msock_opts: magicsock::Options, - initial_alpns: Vec>, - ) -> Result { + async fn bind(static_config: StaticConfig, msock_opts: magicsock::Options) -> Result { let msock = magicsock::MagicSock::spawn(msock_opts).await?; trace!("created magicsock"); - - let server_config = static_config.create_server_config(initial_alpns)?; - - let mut endpoint_config = quinn::EndpointConfig::default(); - // Setting this to false means that quinn will ignore packets that have the QUIC fixed bit - // set to 0. The fixed bit is the 3rd bit of the first byte of a packet. - // For performance reasons and to not rewrite buffers we pass non-QUIC UDP packets straight - // through to quinn. We set the first byte of the packet to zero, which makes quinn ignore - // the packet if grease_quic_bit is set to false. - endpoint_config.grease_quic_bit(false); - - let endpoint = quinn::Endpoint::new_with_abstract_socket( - endpoint_config, - Some(server_config), - Arc::new(msock.clone()), - Arc::new(quinn::TokioRuntime), - )?; trace!("created quinn endpoint"); debug!(version = env!("CARGO_PKG_VERSION"), "iroh Endpoint created"); let ep = Self { msock: msock.clone(), - endpoint: endpoint.clone(), rtt_actor: Arc::new(rtt_actor::RttHandle::new()), static_config: Arc::new(static_config), }; - msock.set_quic_endpoint(Some(endpoint)); Ok(ep) } @@ -570,7 +548,7 @@ impl Endpoint { /// Note that this *overrides* the current list of ALPNs. pub fn set_alpns(&self, alpns: Vec>) -> Result<()> { let server_config = self.static_config.create_server_config(alpns)?; - self.endpoint.set_server_config(Some(server_config)); + self.msock.endpoint().set_server_config(Some(server_config)); Ok(()) } @@ -668,7 +646,8 @@ impl Endpoint { // TODO: We'd eventually want to replace "localhost" with something that makes more sense. let connect = self - .endpoint + .msock + .endpoint() .connect_with(client_config, addr.0, "localhost")?; let connection = connect @@ -698,7 +677,7 @@ impl Endpoint { /// [`Endpoint::close`]. pub fn accept(&self) -> Accept<'_> { Accept { - inner: self.endpoint.accept(), + inner: self.msock.endpoint().accept(), ep: self.clone(), } } @@ -1062,7 +1041,7 @@ impl Endpoint { } #[cfg(test)] pub(crate) fn endpoint(&self) -> &quinn::Endpoint { - &self.endpoint + self.msock.endpoint() } } diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index cbf49ac66c..8869a8cb71 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -37,10 +37,10 @@ use futures_lite::{FutureExt, StreamExt}; use futures_util::{stream::BoxStream, task::AtomicWaker}; use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl, SecretKey}; use iroh_metrics::{inc, inc_by}; -use iroh_relay::{protos::stun, RelayMap}; +use iroh_relay::{protos::stun, RelayMap, RelayNode}; use net_report::QuicConfig; use netwatch::{interfaces, ip::LocalAddresses, netmon, UdpSocket}; -use quinn::AsyncUdpSocket; +use quinn::{AsyncUdpSocket, ServerConfig}; use rand::{seq::SliceRandom, Rng, SeedableRng}; use relay_actor::RelaySendItem; use smallvec::{smallvec, SmallVec}; @@ -126,6 +126,9 @@ pub(crate) struct Options { /// Proxy configuration. pub(crate) proxy_url: Option, + /// ServerConfig for the internal QUIC endpoint + pub(crate) server_config: ServerConfig, + /// Skip verification of SSL certificates from relay servers /// /// May only be used in tests. @@ -139,6 +142,8 @@ pub(crate) struct Options { impl Default for Options { fn default() -> Self { + let secret_key = SecretKey::generate(); + let server_config = make_default_server_config(&secret_key); Options { addr_v4: None, addr_v6: None, @@ -148,6 +153,7 @@ impl Default for Options { discovery: None, proxy_url: None, dns_resolver: crate::dns::default_resolver().clone(), + server_config, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, #[cfg(any(test, feature = "test-utils"))] @@ -156,6 +162,16 @@ impl Default for Options { } } +/// Generate a server config with no ALPNS and a default +/// transport configuration +fn make_default_server_config(secret_key: &SecretKey) -> ServerConfig { + let quic_server_config = crate::tls::make_server_config(secret_key, vec![], false) + .expect("should generate valid config"); + let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config)); + server_config.transport_config(Arc::new(quinn::TransportConfig::default())); + server_config +} + /// Contents of a relay message. Use a SmallVec to avoid allocations for the very /// common case of a single packet. type RelayContents = SmallVec<[Bytes; 1]>; @@ -169,6 +185,8 @@ pub(crate) struct Handle { msock: Arc, // Empty when closed actor_tasks: Arc>>, + // quinn endpoint + endpoint: quinn::Endpoint, } /// Iroh connectivity layer. @@ -232,10 +250,6 @@ pub(crate) struct MagicSock { pconn6: Option, /// NetReport client net_reporter: net_report::Addr, - /// Handle to the underlying quinn::Endpoint. - /// - /// Used in netcheck for QUIC address discovery. - quic_endpoint: Arc>>, /// The state for an active DiscoKey. disco_secrets: DiscoSecrets, @@ -287,16 +301,6 @@ impl MagicSock { self.my_relay.set(my_relay).unwrap_or_else(|e| e) } - /// Sets the internal `quinn::Endpoint` that is used for QUIC address - /// discovery. - pub(crate) fn set_quic_endpoint(&self, endpoint: Option) { - let mut ep = self - .quic_endpoint - .write() - .expect("MagicSock::endpoint RwLock is poisoned"); - *ep = endpoint; - } - fn is_closing(&self) -> bool { self.closing.load(Ordering::Relaxed) } @@ -442,31 +446,6 @@ impl MagicSock { Ok(addr) } - fn create_io_poller(&self) -> Pin> { - // To do this properly the MagicSock would need a registry of pollers. For each - // node we would look up the poller or create one. Then on each try_send we can - // look up the correct poller and configure it to poll the paths it needs. - // - // Note however that the current quinn impl calls UdpPoller::poll_writable() - // **before** it calls try_send(), as opposed to how it is documented. That is a - // problem as we would not yet know the path that needs to be polled. To avoid such - // ambiguity the API could be changed to a .poll_send(&self, cx: &mut Context, - // io_poller: Pin<&mut dyn UdpPoller>, transmit: &Transmit) -> Poll> - // instead of the existing .try_send() because then we would have control over this. - // - // Right now however we have one single poller behaving the same for each - // connection. It checks all paths and returns Poll::Ready as soon as any path is - // ready. - let ipv4_poller = self.pconn4.create_io_poller(); - let ipv6_poller = self.pconn6.as_ref().map(|sock| sock.create_io_poller()); - let relay_sender = self.relay_datagram_send_channel.clone(); - Box::pin(IoPoller { - ipv4_poller, - ipv6_poller, - relay_sender, - }) - } - /// Implementation for AsyncUdpSocket::try_send #[instrument(skip_all)] fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> { @@ -1570,6 +1549,7 @@ impl Handle { discovery, dns_resolver, proxy_url, + server_config, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify, #[cfg(any(test, feature = "test-utils"))] @@ -1640,9 +1620,23 @@ impl Handle { dns_resolver, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify, - quic_endpoint: Arc::new(RwLock::new(None)), }); + let mut endpoint_config = quinn::EndpointConfig::default(); + // Setting this to false means that quinn will ignore packets that have the QUIC fixed bit + // set to 0. The fixed bit is the 3rd bit of the first byte of a packet. + // For performance reasons and to not rewrite buffers we pass non-QUIC UDP packets straight + // through to quinn. We set the first byte of the packet to zero, which makes quinn ignore + // the packet if grease_quic_bit is set to false. + endpoint_config.grease_quic_bit(false); + + let endpoint = quinn::Endpoint::new_with_abstract_socket( + endpoint_config, + Some(server_config), + inner.clone(), + Arc::new(quinn::TokioRuntime), + )?; + let mut actor_tasks = JoinSet::default(); let relay_actor = RelayActor::new(inner.clone(), relay_datagram_recv_queue); @@ -1667,6 +1661,7 @@ impl Handle { let inner2 = inner.clone(); let network_monitor = netmon::Monitor::new().await?; + let qad_endpoint = endpoint.clone(); actor_tasks.spawn( async move { let actor = Actor { @@ -1683,6 +1678,7 @@ impl Handle { no_v4_send: false, net_reporter, network_monitor, + qad_endpoint, }; if let Err(err) = actor.run().await { @@ -1695,11 +1691,17 @@ impl Handle { let c = Handle { msock: inner, actor_tasks: Arc::new(Mutex::new(actor_tasks)), + endpoint, }; Ok(c) } + /// The underlying [`quinn::Endpoint`] + pub fn endpoint(&self) -> &quinn::Endpoint { + &self.endpoint + } + /// Closes the connection. /// /// Only the first close does anything. Any later closes return nil. @@ -1707,6 +1709,10 @@ impl Handle { /// indefinitely after this call. #[instrument(skip_all, fields(me = %self.msock.me))] pub(crate) async fn close(&self) { + tracing::debug!("Closing connections"); + self.endpoint.close(0u16.into(), b""); + self.endpoint.wait_idle().await; + if self.msock.is_closed() { return; } @@ -1734,7 +1740,6 @@ impl Handle { .await; if shutdown_done.is_ok() { debug!("tasks shutdown complete"); - } else { // shutdown all tasks debug!("aborting remaining {}/3 tasks", tasks.len()); tasks.shutdown().await; @@ -1750,13 +1755,17 @@ impl DiscoSecrets { where F: FnOnce(&mut SharedSecret) -> T, { + let mut inner = self.0.lock().expect("poisoned"); let x = inner.entry(node_id).or_insert_with(|| { let public_key = public_ed_box(&node_id.public()); + SharedSecret::new(secret, &public_key) }); - {}} + cb(x) + } - fn , ResolverExt}encode_and_seal( + fn encode_and_seal( &self, + this_secret_key: &crypto_box::SecretKey, this_node_id: NodeId, other_node_id: NodeId, msg: &disco::Message, @@ -1767,7 +1776,6 @@ impl DiscoSecrets { }); disco::encode_message(&this_node_id, seal).into() } - fn unseal_and_decode( &self, secret: &crypto_box::SecretKey, @@ -1935,13 +1943,35 @@ impl RelayDatagramRecvQueue { } } -impl AsyncUdpSocket for Handle { +impl AsyncUdpSocket for MagicSock { fn create_io_poller(self: Arc) -> Pin> { - self.msock.create_io_poller() + // To do this properly the MagicSock would need a registry of pollers. For each + // node we would look up the poller or create one. Then on each try_send we can + // look up the correct poller and configure it to poll the paths it needs. + // + // Note however that the current quinn impl calls UdpPoller::poll_writable() + // **before** it calls try_send(), as opposed to how it is documented. That is a + // problem as we would not yet know the path that needs to be polled. To avoid such + // ambiguity the API could be changed to a .poll_send(&self, cx: &mut Context, + // io_poller: Pin<&mut dyn UdpPoller>, transmit: &Transmit) -> Poll> + // instead of the existing .try_send() because then we would have control over this. + // + // Right now however we have one single poller behaving the same for each + // connection. It checks all paths and returns Poll::Ready as soon as any path is + // ready. + let ipv4_poller = self.pconn4.create_io_poller(); + let ipv6_poller = self.pconn6.as_ref().map(|sock| sock.create_io_poller()); + let relay_sender = self.relay_actor_sender.clone(); + Box::pin(IoPoller { + ipv4_poller, + ipv6_poller, + relay_sender, + relay_send_waker: self.relay_send_waker.clone(), + }) } fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> { - self.msock.try_send(transmit) + self.try_send(transmit) } /// NOTE: Receiving on a [`Self::close`]d socket will return [`Poll::Pending`] indefinitely. @@ -1951,11 +1981,11 @@ impl AsyncUdpSocket for Handle { bufs: &mut [io::IoSliceMut<'_>], metas: &mut [quinn_udp::RecvMeta], ) -> Poll> { - self.msock.poll_recv(cx, bufs, metas) + self.poll_recv(cx, bufs, metas) } fn local_addr(&self) -> io::Result { - match &*self.msock.local_addrs.read().expect("not poisoned") { + match &*self.local_addrs.read().expect("not poisoned") { (ipv4, None) => { // Pretend to be IPv6, because our QuinnMappedAddrs // need to be IPv6. @@ -2068,6 +2098,11 @@ struct Actor { net_reporter: net_report::Client, network_monitor: netmon::Monitor, + + /// The internal quinn::Endpoint + /// + /// Needed for Quic Address Discovery + qad_endpoint: quinn::Endpoint, } impl Actor { @@ -2480,29 +2515,25 @@ impl Actor { .stun_v4(Some(self.pconn4.clone())) .stun_v6(self.pconn6.clone()); - let quic_endpoint = self.msock.quic_endpoint.read().expect("poisoned").clone(); - - let quic_config = match quic_endpoint { - Some(ep) => { - // Need to add Quic Mapped Addrs for the relay nodes to use for - // QUIC Address Discovery - let mapped_addrs = self.resolve_qad_addrs(Duration::from_millis(500)).await; - let root_store = rustls::RootCertStore::from_iter( - webpki_roots::TLS_SERVER_ROOTS.iter().cloned(), - ); - let client_config = rustls::ClientConfig::builder() - .with_root_certificates(root_store) - .with_no_client_auth(); - Some(QuicConfig { - ep, - client_config, - ipv4: true, - ipv6: true, - mapped_addrs, - }) - } - None => None, - }; + // Need to add Quic Mapped Addrs for the relay nodes to use for + // QUIC Address Discovery + let mapped_addrs = self + .resolve_qad_addrs(std::time::Duration::from_millis(500)) + .await; + // create a client config for the endpoint to + // use for QUIC address discovery + let root_store = + rustls::RootCertStore::from_iter(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + let client_config = rustls::ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + let quic_config = Some(QuicConfig { + ep: self.qad_endpoint.clone(), + client_config, + ipv4: true, + ipv6: self.pconn6.is_some(), + mapped_addrs, + }); opts = opts.quic_config(quic_config); debug!("requesting net_report report"); @@ -3977,7 +4008,13 @@ mod tests { /// /// Use [`magicsock_connect`] to establish connections. #[instrument(name = "ep", skip_all, fields(me = secret_key.public().fmt_short()))] - async fn magicsock_ep(secret_key: SecretKey) -> anyhow::Result<(quinn::Endpoint, Handle)> { + async fn magicsock_ep(secret_key: SecretKey) -> anyhow::Result { + let server_config = crate::endpoint::make_server_config( + &secret_key, + vec![ALPN.to_vec()], + Arc::new(quinn::TransportConfig::default()), + true, + )?; let opts = Options { addr_v4: None, addr_v6: None, @@ -3987,25 +4024,12 @@ mod tests { discovery: None, dns_resolver: crate::dns::default_resolver().clone(), proxy_url: None, + server_config, insecure_skip_relay_cert_verify: true, path_selection: PathSelection::default(), }; let msock = MagicSock::spawn(opts).await?; - let server_config = crate::endpoint::make_server_config( - &secret_key, - vec![ALPN.to_vec()], - Arc::new(quinn::TransportConfig::default()), - true, - )?; - let mut endpoint_config = quinn::EndpointConfig::default(); - endpoint_config.grease_quic_bit(false); - let endpoint = quinn::Endpoint::new_with_abstract_socket( - endpoint_config, - Some(server_config), - Arc::new(msock.clone()), - Arc::new(quinn::TokioRuntime), - )?; - Ok((endpoint, msock)) + Ok(msock) } /// Connects from `ep` returned by [`magicsock_ep`] to the `node_id`. @@ -4022,7 +4046,7 @@ mod tests { let mut transport_config = quinn::TransportConfig::default(); transport_config.keep_alive_interval(Some(Duration::from_secs(1))); - magicsock_connet_with_transport_config( + magicsock_connect_with_transport_config( ep, ep_secret_key, addr, @@ -4038,7 +4062,7 @@ mod tests { /// /// Uses [`ALPN`], `node_id`, must match `addr`. #[instrument(name = "connect", skip_all, fields(me = ep_secret_key.public().fmt_short()))] - async fn magicsock_connet_with_transport_config( + async fn magicsock_connect_with_transport_config( ep: &quinn::Endpoint, ep_secret_key: SecretKey, addr: QuicMappedAddr, @@ -4067,7 +4091,7 @@ mod tests { let secret_key_missing_node = SecretKey::from_bytes(&[255u8; 32]); let node_id_missing_node = secret_key_missing_node.public(); - let (ep_1, msock_1) = magicsock_ep(secret_key_1.clone()).await.unwrap(); + let msock_1 = magicsock_ep(secret_key_1.clone()).await.unwrap(); // Generate an address not present in the NodeMap. let bad_addr = QuicMappedAddr::generate(); @@ -4078,14 +4102,19 @@ mod tests { // this speeds up the test. let res = tokio::time::timeout( Duration::from_millis(500), - magicsock_connect(&ep_1, secret_key_1.clone(), bad_addr, node_id_missing_node), + magicsock_connect( + msock_1.endpoint(), + secret_key_1.clone(), + bad_addr, + node_id_missing_node, + ), ) .await; assert!(res.is_err(), "expecting timeout"); // Now check we can still create another connection with this endpoint. - let (ep_2, msock_2) = magicsock_ep(secret_key_2.clone()).await.unwrap(); - + let msock_2 = magicsock_ep(secret_key_2.clone()).await.unwrap(); + let ep_2 = msock_2.endpoint().clone(); // This needs an accept task let accept_task = tokio::spawn({ async fn accept(ep: quinn::Endpoint) -> Result<()> { @@ -4097,7 +4126,6 @@ mod tests { info!("accept finished"); Ok(()) } - let ep_2 = ep_2.clone(); async move { if let Err(err) = accept(ep_2).await { error!("{err:#}"); @@ -4130,7 +4158,7 @@ mod tests { let addr = msock_1.get_mapping_addr(node_id_2).unwrap(); let res = tokio::time::timeout( Duration::from_secs(10), - magicsock_connect(&ep_1, secret_key_1.clone(), addr, node_id_2), + magicsock_connect(msock_1.endpoint(), secret_key_1.clone(), addr, node_id_2), ) .await .expect("timeout while connecting"); @@ -4152,8 +4180,9 @@ mod tests { let secret_key_2 = SecretKey::from_bytes(&[2u8; 32]); let node_id_2 = secret_key_2.public(); - let (ep_1, msock_1) = magicsock_ep(secret_key_1.clone()).await.unwrap(); - let (ep_2, msock_2) = magicsock_ep(secret_key_2.clone()).await.unwrap(); + let msock_1 = magicsock_ep(secret_key_1.clone()).await.unwrap(); + let msock_2 = magicsock_ep(secret_key_2.clone()).await.unwrap(); + let ep_2 = msock_2.endpoint().clone(); // We need a task to accept the connection. let accept_task = tokio::spawn({ @@ -4165,7 +4194,6 @@ mod tests { info!("accept finished"); Ok(()) } - let ep_2 = ep_2.clone(); async move { if let Err(err) = accept(ep_2).await { error!("{err:#}"); @@ -4200,8 +4228,8 @@ mod tests { // little slower though. let mut transport_config = quinn::TransportConfig::default(); transport_config.max_idle_timeout(Some(Duration::from_millis(200).try_into().unwrap())); - let res = magicsock_connet_with_transport_config( - &ep_1, + let res = magicsock_connect_with_transport_config( + msock_1.endpoint(), secret_key_1.clone(), addr_2, node_id_2, @@ -4233,9 +4261,10 @@ mod tests { // We can now connect tokio::time::timeout(Duration::from_secs(10), async move { info!("establishing new connection"); - let conn = magicsock_connect(&ep_1, secret_key_1.clone(), addr_2, node_id_2) - .await - .unwrap(); + let conn = + magicsock_connect(msock_1.endpoint(), secret_key_1.clone(), addr_2, node_id_2) + .await + .unwrap(); info!("have connection"); let mut stream = conn.open_uni().await.unwrap(); stream.write_all(b"hello").await.unwrap(); diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index 20c7f43512..3b8aaa1e95 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -160,6 +160,7 @@ impl NodeMap { let quic_mapped_addr = QuicMappedAddr::generate(); self.inner .lock() + .expect("poisoned") .qad_mapped_addrs .insert(udp_addr, quic_mapped_addr); quic_mapped_addr @@ -169,6 +170,7 @@ impl NodeMap { pub(super) fn get_qad_addr(&self, addr: &QuicMappedAddr) -> Option { self.inner .lock() + .expect("poisoned") .qad_mapped_addrs .iter() .find_map(|(udp_addr, quic_mapped_addr)| { @@ -188,6 +190,7 @@ impl NodeMap { pub(super) fn receive_qad(&self, udp_addr: SocketAddr) -> Option { self.inner .lock() + .expect("poisoned") .qad_mapped_addrs .get(&udp_addr) .map(|addr| *addr) From 29d28c3e017a5a4944088bd52662ead2b73e08d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Fri, 13 Dec 2024 21:57:59 -0500 Subject: [PATCH 4/9] simplify QAD addresses in `NodeMap` --- iroh-net-report/Cargo.toml | 1 - iroh-net-report/src/lib.rs | 1 - iroh-net-report/src/reportgen.rs | 26 -------- iroh/Cargo.toml | 2 +- iroh/examples/listen.rs | 2 +- iroh/src/endpoint.rs | 3 +- iroh/src/magicsock.rs | 49 +++++++-------- iroh/src/magicsock/node_map.rs | 75 ++++++++++++++--------- iroh/src/magicsock/node_map/node_state.rs | 2 +- 9 files changed, 73 insertions(+), 88 deletions(-) diff --git a/iroh-net-report/Cargo.toml b/iroh-net-report/Cargo.toml index 6788209cc8..b2cabb734f 100644 --- a/iroh-net-report/Cargo.toml +++ b/iroh-net-report/Cargo.toml @@ -49,7 +49,6 @@ tokio = { version = "1", default-features = false, features = ["test-util"] } default = ["metrics"] metrics = ["iroh-metrics/metrics", "portmapper/metrics"] stun-utils = [] -iroh = [] [package.metadata.docs.rs] all-features = true diff --git a/iroh-net-report/src/lib.rs b/iroh-net-report/src/lib.rs index c4d6221fd5..28b404388c 100644 --- a/iroh-net-report/src/lib.rs +++ b/iroh-net-report/src/lib.rs @@ -38,7 +38,6 @@ mod ping; mod reportgen; pub use metrics::Metrics; -pub use reportgen::MappedAddr; use reportgen::ProbeProto; pub use reportgen::QuicConfig; #[cfg(feature = "stun-utils")] diff --git a/iroh-net-report/src/reportgen.rs b/iroh-net-report/src/reportgen.rs index 38b00c39f8..fb144cdc25 100644 --- a/iroh-net-report/src/reportgen.rs +++ b/iroh-net-report/src/reportgen.rs @@ -700,25 +700,8 @@ pub struct QuicConfig { pub ipv4: bool, /// Enable ipv6 QUIC address discovery probes pub ipv6: bool, - /// A map of RelayUrls to addresses - /// - /// Only makes sense in the context of iroh, which uses QuicMappedAddrs - /// to allow dialing by NodeId - #[cfg(feature = "iroh")] - pub mapped_addrs: MappedRelayAddrs, } -/// Holds a QuicMappedAddr -#[cfg(feature = "iroh")] -#[derive(Debug, Clone, Copy)] -pub struct MappedAddr(pub SocketAddr); - -/// A relationship between a socket address and it's QUIC mapped address -/// -/// Only relevant when using the net-report with iroh. -#[cfg(feature = "iroh")] -type MappedRelayAddrs = std::collections::HashMap; - /// Executes a particular [`Probe`], including using a delayed start if needed. /// /// If *stun_sock4* and *stun_sock6* are `None` the STUN probes are disabled. @@ -940,14 +923,6 @@ async fn run_quic_probe( )); } }; - - // if we are using net-report with iroh, we must dial using the mapped address - #[cfg(feature = "iroh")] - let relay_addr = quic_config - .mapped_addrs - .get(&relay_addr) - .map_or(relay_addr, |mapped_addr| mapped_addr.0.clone()); - let quic_client = iroh_relay::quic::QuicClient::new(quic_config.ep, quic_config.client_config) .map_err(|e| ProbeError::Error(e, probe.clone()))?; let (addr, latency) = quic_client @@ -1617,7 +1592,6 @@ mod tests { client_config, ipv4: true, ipv6: true, - mapped_addrs: std::collections::HashMap::new(), }; let url = relay.url.clone(); let port = server.quic_addr().unwrap().port(); diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index bdff2a5209..89554fe55a 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -95,7 +95,7 @@ webpki = { package = "rustls-webpki", version = "0.102" } webpki-roots = "0.26" x509-parser = "0.16" z32 = "1.0.3" -net-report = { package = "iroh-net-report", path = "../iroh-net-report", version = "0.30", default-features = false, features = ["iroh"] } +net-report = { package = "iroh-net-report", path = "../iroh-net-report", version = "0.29", default-features = false } # metrics iroh-metrics = { version = "0.30", default-features = false } diff --git a/iroh/examples/listen.rs b/iroh/examples/listen.rs index e8dd616d1f..13413992dd 100644 --- a/iroh/examples/listen.rs +++ b/iroh/examples/listen.rs @@ -28,7 +28,7 @@ async fn main() -> anyhow::Result<()> { // Use `RelayMode::Custom` to pass in a `RelayMap` with custom relay urls. // Use `RelayMode::Disable` to disable holepunching and relaying over HTTPS // If you want to experiment with relaying using your own relay server, you must pass in the same custom relay url to both the `listen` code AND the `connect` code - .relay_mode(RelayMode::Staging) + .relay_mode(RelayMode::Default) // you can choose a port to bind to, but passing in `0` will bind the socket to a random available port .bind() .await?; diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 842ca786cd..c3d91ce772 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -445,6 +445,7 @@ impl Builder { self } } + /// Configuration for a [`quinn::Endpoint`] that cannot be changed at runtime. #[derive(Debug)] struct StaticConfig { @@ -455,7 +456,7 @@ struct StaticConfig { impl StaticConfig { /// Create a [`quinn::ServerConfig`] with the specified ALPN protocols. - fn create_server_config(&self, alpn_protocols: Vec>) -> Result { + fn create_server_config(&self, alpn_protocols: Vec>) -> Result { let server_config = make_server_config( &self.secret_key, alpn_protocols, diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 8869a8cb71..3ef69b04fa 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -162,8 +162,7 @@ impl Default for Options { } } -/// Generate a server config with no ALPNS and a default -/// transport configuration +/// Generate a server config with no ALPNS and a default transport configuration fn make_default_server_config(secret_key: &SecretKey) -> ServerConfig { let quic_server_config = crate::tls::make_server_config(secret_key, vec![], false) .expect("should generate valid config"); @@ -574,11 +573,11 @@ impl MagicSock { } } None => { - // Check if this is a QUIC address discovery packet - if let Some(addr) = self.node_map.get_qad_addr(&dest) { - // send udp + // Check if this is addr is used to perform QUIC Address Discovery + if let Some(addr) = self.node_map.qad_addr_for_send(&dest.0) { // rewrite target address transmit.destination = addr; + // send udp match self.try_send_udp(addr, &transmit) { Ok(()) => { trace!(dst = %addr, @@ -837,8 +836,8 @@ impl MagicSock { // Update the NodeMap and remap RecvMeta to the QuicMappedAddr. match self.node_map.receive_udp(meta.addr) { None => { - // Check if this is QUIC address discovery response - if let Some(quic_mapped_addr) = self.node_map.receive_qad(meta.addr) { + // Check if this address is used for QUIC address discovery + if let Some(addr) = self.node_map.qad_addr_for_recv(&meta.addr) { trace!( src = ?meta.addr, count = %quic_datagram_count, @@ -846,7 +845,7 @@ impl MagicSock { "UDP recv QUIC address discovery packets", ); quic_packets_total += quic_datagram_count; - meta.addr = quic_mapped_addr.0; + meta.addr = addr; } else { warn!( src = ?meta.addr, @@ -1974,7 +1973,7 @@ impl AsyncUdpSocket for MagicSock { self.try_send(transmit) } - /// NOTE: Receiving on a [`Self::close`]d socket will return [`Poll::Pending`] indefinitely. + /// NOTE: Receiving on a closed socket will return [`Poll::Pending`] indefinitely. fn poll_recv( &self, cx: &mut Context, @@ -2137,6 +2136,9 @@ impl Actor { } } + // kick off resolving the URLs for relay addresses + self.resolve_qad_addrs(Duration::from_millis(10)).await; + let mut receiver_closed = false; let mut portmap_watcher_closed = false; let mut link_change_closed = false; @@ -2515,13 +2517,11 @@ impl Actor { .stun_v4(Some(self.pconn4.clone())) .stun_v6(self.pconn6.clone()); - // Need to add Quic Mapped Addrs for the relay nodes to use for - // QUIC Address Discovery - let mapped_addrs = self - .resolve_qad_addrs(std::time::Duration::from_millis(500)) + // Need to get the SocketAddrs of the relay urls and add them to the node map + // so the socket knows to treat them special + self.resolve_qad_addrs(std::time::Duration::from_millis(300)) .await; - // create a client config for the endpoint to - // use for QUIC address discovery + // create a client config for the endpoint to use for QUIC address discovery let root_store = rustls::RootCertStore::from_iter(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); let client_config = rustls::ClientConfig::builder() @@ -2532,7 +2532,6 @@ impl Actor { client_config, ipv4: true, ipv6: self.pconn6.is_some(), - mapped_addrs, }); opts = opts.quic_config(quic_config); @@ -2634,7 +2633,7 @@ impl Actor { match relay_node.url.host() { Some(url::Host::Domain(hostname)) => { - error!(%hostname, "Performing DNS A lookup for relay addr"); + debug!(%hostname, "Performing DNS A lookup for relay addr"); let mut set = JoinSet::new(); let resolver = dns_resolver.clone(); let ipv4_resolver = resolver.clone(); @@ -2654,13 +2653,13 @@ impl Actor { Err(_) => {} Ok(resolved_addrs) => { for addr in resolved_addrs { - addrs.insert(SocketAddr::new(addr.into(), port)); + addrs.insert(SocketAddr::new(addr, port)); } } } } if addrs.is_empty() { - error!(%hostname, "Unable to resolve ip addresses for relay node"); + debug!(%hostname, "Unable to resolve ip addresses for relay node"); } } Some(url::Host::Ipv4(addr)) => { @@ -2673,15 +2672,11 @@ impl Actor { error!(?relay_node.url, "No hostname for relay node, cannot resolve ip"); } } - return addrs; + addrs } /// Resolve the relay addresses used for QUIC address discovery. - async fn resolve_qad_addrs( - &mut self, - duration: Duration, - ) -> HashMap { - let mut mapped_addrs = HashMap::new(); + async fn resolve_qad_addrs(&mut self, duration: Duration) { let mut set = JoinSet::new(); let resolver = self.msock.dns_resolver(); for relay_node in self.msock.relay_map.nodes() { @@ -2694,11 +2689,9 @@ impl Actor { let res = set.join_all().await; for addrs in res { addrs.iter().for_each(|addr| { - let mapped_addr = self.msock.node_map.add_qad_addr(*addr); - mapped_addrs.insert(*addr, net_report::MappedAddr(mapped_addr.0)); + self.msock.node_map.add_qad_addr(*addr); }); } - mapped_addrs } fn set_nearest_relay(&mut self, relay_url: Option) -> bool { diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index 3b8aaa1e95..f45e2026db 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -69,8 +69,8 @@ pub(super) struct NodeMapInner { next_id: usize, #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection, - // special mapping for relay socket addresses so that we can do quic address discovery - qad_mapped_addrs: HashMap, + // special set of relay socket addresses that we use to do quic address discovery + qad_addrs: BTreeSet, } /// Identifier to look up a [`NodeState`] in the [`NodeMap`]. @@ -155,31 +155,59 @@ impl NodeMap { .add_node_addr(node_addr, source) } - /// Add a the SocketAddr used to preform QUIC Address Discovery to the nodemap - pub(super) fn add_qad_addr(&self, udp_addr: SocketAddr) -> QuicMappedAddr { - let quic_mapped_addr = QuicMappedAddr::generate(); + /// Add a the SocketAddr used to perform QUIC Address Discovery to the nodemap + pub(super) fn add_qad_addr(&self, udp_addr: SocketAddr) { self.inner .lock() .expect("poisoned") - .qad_mapped_addrs - .insert(udp_addr, quic_mapped_addr); - quic_mapped_addr + .qad_addrs + .insert(udp_addr); } - /// Get the socket address used to preform QUIC Address Discovery - pub(super) fn get_qad_addr(&self, addr: &QuicMappedAddr) -> Option { - self.inner + /// Return a correctly canonicalized SocketAddr if this address is one + /// used to perform QUIC Address Discovery + pub(super) fn qad_addr_for_send(&self, addr: &SocketAddr) -> Option { + // all addresses given to the endpoint are Ipv6 addresses, so we need to + // canonicalize before we check for the actual addr we are trying to send to + let canonicalized_addr = SocketAddr::new(addr.ip().to_canonical(), addr.port()); + if self + .inner + .lock() + .expect("poisoned") + .qad_addrs + .contains(&canonicalized_addr) + { + Some(canonicalized_addr) + } else { + None + } + } + + /// Return a correctly formed SocketAddr if this address is one used to + /// perform QUIC Address Discovery + pub(super) fn qad_addr_for_recv(&self, addr: &SocketAddr) -> Option { + if self + .inner .lock() .expect("poisoned") - .qad_mapped_addrs - .iter() - .find_map(|(udp_addr, quic_mapped_addr)| { - if addr == quic_mapped_addr { - Some(*udp_addr) - } else { - None + .qad_addrs + .contains(addr) + { + match addr.ip() { + IpAddr::V4(ipv4_addr) => { + // if this is an ipv4 addr, we need to map it back to + // an ipv6 addr, since all addresses we use to dial on + // the underlying quinn endpoint are mapped ipv6 addrs + Some(SocketAddr::new( + ipv4_addr.to_ipv6_mapped().into(), + addr.port(), + )) } - }) + IpAddr::V6(_) => Some(*addr), + } + } else { + None + } } /// Number of nodes currently listed. @@ -187,15 +215,6 @@ impl NodeMap { self.inner.lock().expect("poisoned").node_count() } - pub(super) fn receive_qad(&self, udp_addr: SocketAddr) -> Option { - self.inner - .lock() - .expect("poisoned") - .qad_mapped_addrs - .get(&udp_addr) - .map(|addr| *addr) - } - pub(super) fn receive_udp(&self, udp_addr: SocketAddr) -> Option<(PublicKey, QuicMappedAddr)> { self.inner.lock().expect("poisoned").receive_udp(udp_addr) } diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index ff259ddc85..a2f97b46b3 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -1691,7 +1691,7 @@ mod tests { ]), next_id: 5, path_selection: PathSelection::default(), - qad_mapped_addrs: HashMap::new(), + qad_addrs: BTreeSet::new(), }); let mut got = node_map.list_remote_infos(later); got.sort_by_key(|p| p.node_id); From 301a815fd5116dcac053dd98f5d49347c633daa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Fri, 3 Jan 2025 16:49:06 -0500 Subject: [PATCH 5/9] rename QuicMappedAddrs to NodeIdMappedAddr and add IpMappedAddr also fix weird rebase errors --- iroh/Cargo.toml | 2 +- iroh/src/endpoint.rs | 7 +- iroh/src/magicsock.rs | 451 +++++++++++++++------- iroh/src/magicsock/node_map.rs | 86 +---- iroh/src/magicsock/node_map/node_state.rs | 17 +- 5 files changed, 337 insertions(+), 226 deletions(-) diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 89554fe55a..e8902e1761 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -95,7 +95,7 @@ webpki = { package = "rustls-webpki", version = "0.102" } webpki-roots = "0.26" x509-parser = "0.16" z32 = "1.0.3" -net-report = { package = "iroh-net-report", path = "../iroh-net-report", version = "0.29", default-features = false } +net-report = { package = "iroh-net-report", path = "../iroh-net-report", version = "0.30", default-features = false } # metrics iroh-metrics = { version = "0.30", default-features = false } diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index c3d91ce772..7e131a8b77 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -34,7 +34,7 @@ use crate::{ dns::DnsDiscovery, pkarr::PkarrPublisher, ConcurrentDiscovery, Discovery, DiscoveryTask, }, dns::{default_resolver, DnsResolver}, - magicsock::{self, Handle, QuicMappedAddr}, + magicsock::{self, Handle, NodeIdMappedAddr}, tls, watchable::Watcher, }; @@ -533,7 +533,6 @@ impl Endpoint { async fn bind(static_config: StaticConfig, msock_opts: magicsock::Options) -> Result { let msock = magicsock::MagicSock::spawn(msock_opts).await?; trace!("created magicsock"); - trace!("created quinn endpoint"); debug!(version = env!("CARGO_PKG_VERSION"), "iroh Endpoint created"); let ep = Self { msock: msock.clone(), @@ -627,7 +626,7 @@ impl Endpoint { &self, node_id: NodeId, alpn: &[u8], - addr: QuicMappedAddr, + addr: NodeIdMappedAddr, ) -> Result { debug!("Attempting connection..."); let client_config = { @@ -990,7 +989,7 @@ impl Endpoint { async fn get_mapping_addr_and_maybe_start_discovery( &self, node_addr: NodeAddr, - ) -> Result<(QuicMappedAddr, Option)> { + ) -> Result<(NodeIdMappedAddr, Option)> { let node_id = node_addr.node_id; // Only return a mapped addr if we have some way of dialing this node, in other diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 3ef69b04fa..d65b56a416 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -142,12 +142,12 @@ pub(crate) struct Options { impl Default for Options { fn default() -> Self { - let secret_key = SecretKey::generate(); + let secret_key = SecretKey::generate(rand::rngs::OsRng); let server_config = make_default_server_config(&secret_key); Options { addr_v4: None, addr_v6: None, - secret_key: SecretKey::generate(rand::rngs::OsRng), + secret_key, relay_map: RelayMap::empty(), node_map: None, discovery: None, @@ -243,6 +243,8 @@ pub(crate) struct MagicSock { my_relay: Watchable>, /// Tracks the networkmap node entity for each node discovery key. node_map: NodeMap, + /// Tracks the mapped IP addresses + ip_mapped_addrs: IpMappedAddrs, /// UDP IPv4 socket pconn4: UdpConn, /// UDP IPv6 socket @@ -373,7 +375,7 @@ impl MagicSock { } /// Returns the socket address which can be used by the QUIC layer to dial this node. - pub(crate) fn get_mapping_addr(&self, node_id: NodeId) -> Option { + pub(crate) fn get_mapping_addr(&self, node_id: NodeId) -> Option { self.node_map.get_quic_mapped_addr_for_node_key(node_id) } @@ -461,151 +463,180 @@ impl MagicSock { "connection closed", )); } - - let dest = QuicMappedAddr(transmit.destination); trace!( - dst = %dest, + dst = %transmit.destination, src = ?transmit.src_ip, len = %transmit.contents.len(), "sending", ); - let mut transmit = transmit.clone(); - match self - .node_map - .get_send_addrs(dest, self.ipv6_reported.load(Ordering::Relaxed)) - { - Some((node_id, udp_addr, relay_url, msgs)) => { - let mut pings_sent = false; - // If we have pings to send, we *have* to send them out first. - if !msgs.is_empty() { - if let Err(err) = self.try_send_ping_actions(msgs) { - warn!( - node = %node_id.fmt_short(), - "failed to handle ping actions: {err:#}", - ); + + // Check if the destination address is a NodeIdMappedAddr. If so, + // get the node's relay address and best direct address, as well + // as any pings that need to be sent for hole-punching purposes. + if let Ok(dest) = NodeIdMappedAddr::try_from(transmit.destination) { + let mut transmit = transmit.clone(); + match self + .node_map + .get_send_addrs(dest, self.ipv6_reported.load(Ordering::Relaxed)) + { + Some((node_id, udp_addr, relay_url, msgs)) => { + let mut pings_sent = false; + // If we have pings to send, we *have* to send them out first. + if !msgs.is_empty() { + if let Err(err) = self.try_send_ping_actions(msgs) { + warn!( + node = %node_id.fmt_short(), + "failed to handle ping actions: {err:#}", + ); + } + pings_sent = true; } - pings_sent = true; - } - let mut udp_sent = false; - let mut udp_error = None; - let mut relay_sent = false; - let mut relay_error = None; + let mut udp_sent = false; + let mut udp_error = None; + let mut relay_sent = false; + let mut relay_error = None; - // send udp - if let Some(addr) = udp_addr { - // rewrite target address - transmit.destination = addr; - match self.try_send_udp(addr, &transmit) { - Ok(()) => { - trace!(node = %node_id.fmt_short(), dst = %addr, + // send udp + if let Some(addr) = udp_addr { + // rewrite target address + transmit.destination = addr; + match self.try_send_udp(addr, &transmit) { + Ok(()) => { + trace!(node = %node_id.fmt_short(), dst = %addr, "sent transmit over UDP"); - udp_sent = true; - } - Err(err) => { - // No need to print "WouldBlock" errors to the console - if err.kind() != io::ErrorKind::WouldBlock { - warn!( - node = %node_id.fmt_short(), - dst = %addr, - "failed to send udp: {err:#}" - ); + udp_sent = true; + } + Err(err) => { + // No need to print "WouldBlock" errors to the console + if err.kind() != io::ErrorKind::WouldBlock { + warn!( + node = %node_id.fmt_short(), + dst = %addr, + "failed to send udp: {err:#}" + ); + } + udp_error = Some(err); } - udp_error = Some(err); } } - } - // send relay - if let Some(ref relay_url) = relay_url { - match self.try_send_relay(relay_url, node_id, split_packets(&transmit)) { - Ok(()) => { - relay_sent = true; - } - Err(err) => { - relay_error = Some(err); + // send relay + if let Some(ref relay_url) = relay_url { + match self.try_send_relay(relay_url, node_id, split_packets(&transmit)) { + Ok(()) => { + relay_sent = true; + } + Err(err) => { + relay_error = Some(err); + } } } - } - let udp_pending = udp_error - .as_ref() - .map(|err| err.kind() == io::ErrorKind::WouldBlock) - .unwrap_or_default(); - let relay_pending = relay_error - .as_ref() - .map(|err| err.kind() == io::ErrorKind::WouldBlock) - .unwrap_or_default(); - if udp_pending && relay_pending { - // Handle backpressure. - Err(io::Error::new(io::ErrorKind::WouldBlock, "pending")) - } else { - if relay_sent || udp_sent { - trace!( - node = %node_id.fmt_short(), - send_udp = ?udp_addr, - send_relay = ?relay_url, - "sent transmit", - ); - } else if !pings_sent { - // Returning Ok here means we let QUIC handle a timeout for a lost - // packet, same would happen if we returned any errors. The - // philosophy of quinn-udp is that a UDP connection could come back - // at any time so these errors should be treated as transient and - // are just timeouts. Hence we opt for returning Ok. See - // test_try_send_no_udp_addr_or_relay_url to explore this further. - debug!( - node = %node_id.fmt_short(), - "no UDP or relay paths available for node, voiding transmit", - ); - // We log this as debug instead of error, because this is a - // situation that comes up under normal operation. If this were an - // error log, it would unnecessarily pollute logs. - // This situation happens essentially when `pings_sent` is false, - // `relay_url` is `None`, so `relay_sent` is false, and the UDP - // path is blocking, so `udp_sent` is false and `udp_pending` is - // true. - // Alternatively returning a WouldBlock error here would - // potentially needlessly block sending on the relay path for the - // next datagram. + let udp_pending = udp_error + .as_ref() + .map(|err| err.kind() == io::ErrorKind::WouldBlock) + .unwrap_or_default(); + let relay_pending = relay_error + .as_ref() + .map(|err| err.kind() == io::ErrorKind::WouldBlock) + .unwrap_or_default(); + if udp_pending && relay_pending { + // Handle backpressure. + return Err(io::Error::new(io::ErrorKind::WouldBlock, "pending")); + } else { + if relay_sent || udp_sent { + trace!( + node = %node_id.fmt_short(), + send_udp = ?udp_addr, + send_relay = ?relay_url, + "sent transmit", + ); + } else if !pings_sent { + // Returning Ok here means we let QUIC handle a timeout for a lost + // packet, same would happen if we returned any errors. The + // philosophy of quinn-udp is that a UDP connection could come back + // at any time so these errors should be treated as transient and + // are just timeouts. Hence we opt for returning Ok. See + // test_try_send_no_udp_addr_or_relay_url to explore this further. + debug!( + node = %node_id.fmt_short(), + "no UDP or relay paths available for node, voiding transmit", + ); + // We log this as debug instead of error, because this is a + // situation that comes up under normal operation. If this were an + // error log, it would unnecessarily pollute logs. + // This situation happens essentially when `pings_sent` is false, + // `relay_url` is `None`, so `relay_sent` is false, and the UDP + // path is blocking, so `udp_sent` is false and `udp_pending` is + // true. + // Alternatively returning a WouldBlock error here would + // potentially needlessly block sending on the relay path for the + // next datagram. + } + return Ok(()); } - Ok(()) + } + None => { + error!(%dest, "no NodeState for mapped address, voiding transmit"); + // Returning Ok here means we let QUIC timeout. Returning WouldBlock + // triggers a hot loop. Returning an error would immediately fail a + // connection. The philosophy of quinn-udp is that a UDP connection could + // come back at any time or missing should be transient so chooses to let + // these kind of errors time out. See test_try_send_no_send_addr to try + // this out. + return Ok(()); } } - None => { - // Check if this is addr is used to perform QUIC Address Discovery - if let Some(addr) = self.node_map.qad_addr_for_send(&dest.0) { - // rewrite target address - transmit.destination = addr; - // send udp - match self.try_send_udp(addr, &transmit) { - Ok(()) => { - trace!(dst = %addr, - "sent QAD transmit over UDP"); - } - Err(err) => { - // No need to print "WouldBlock" errors to the console - if err.kind() == io::ErrorKind::WouldBlock { - return Err(io::Error::new(io::ErrorKind::WouldBlock, "pending")); - } else { - warn!( - dst = %addr, - "failed to send QAD message over udp: {err:#}" - ); - } + } + // Check if the destination address is an IpMappedAddr, which we use for + // quic address discovery, whose endpoints don't have `NodeId`s. + // If so, send the packet over udp. + else if let Ok(dest) = IpMappedAddr::try_from(transmit.destination) { + let mut transmit = transmit.clone(); + + // Get the socket addr + if let Some(addr) = self.ip_mapped_addrs.get_ip_addr(&dest) { + // rewrite target address + transmit.destination = addr; + // send udp + match self.try_send_udp(addr, &transmit) { + Ok(()) => { + trace!(dst = %addr, + "sent IpMapped transmit over UDP"); + } + Err(err) => { + // No need to print "WouldBlock" errors to the console + if err.kind() == io::ErrorKind::WouldBlock { + return Err(io::Error::new(io::ErrorKind::WouldBlock, "pending")); + } else { + warn!( + dst = %addr, + "failed to send IpMapped message over udp: {err:#}" + ); } } - } else { - error!(%dest, "no NodeState for mapped address, voiding transmit"); } - // Returning Ok here means we let QUIC timeout. Returning WouldBlock - // triggers a hot loop. Returning an error would immediately fail a - // connection. The philosophy of quinn-udp is that a UDP connection could + return Ok(()); + } else { + error!(%dest, "no socketaddr for mapped address, voiding transmit"); + // Returning Ok here means we let QUIC timeout. + // Returning an error would immediately fail a connection. + // The philosophy of quinn-udp is that a UDP connection could // come back at any time or missing should be transient so chooses to let // these kind of errors time out. See test_try_send_no_send_addr to try // this out. - Ok(()) + return Ok(()); } + } else { + error!(%transmit.destination, "unrecognized address, voiding transmit"); + // Returning Ok here means we let QUIC timeout. + // Returning an error would immediately fail a connection. + // The philosophy of quinn-udp is that a UDP connection could + // come back at any time or missing should be transient so chooses to let + // these kind of errors time out. See test_try_send_no_send_addr to try + // this out. + return Ok(()); } } @@ -754,7 +785,7 @@ impl MagicSock { /// /// All the `bufs` and `metas` should have initialized packets in them. /// - /// This fixes up the datagrams to use the correct [`QuicMappedAddr`] and extracts DISCO + /// This fixes up the datagrams to use the correct [`NodeIdMappedAddr`] and extracts DISCO /// packets, processing them inside the magic socket. fn process_udp_datagrams( &self, @@ -766,7 +797,7 @@ impl MagicSock { // Adding the IP address we received something on results in Quinn using this // address on the send path to send from. However we let Quinn use a - // QuicMappedAddress, not a real address. So we used to substitute our bind address + // NodeIdMappedAddress, not a real address. So we used to substitute our bind address // here so that Quinn would send on the right address. But that would sometimes // result in the wrong address family and Windows trips up on that. // @@ -833,11 +864,13 @@ impl MagicSock { } if buf_contains_quic_datagrams { - // Update the NodeMap and remap RecvMeta to the QuicMappedAddr. + // Update the NodeMap and remap RecvMeta to the NodeIdMappedAddr. match self.node_map.receive_udp(meta.addr) { None => { - // Check if this address is used for QUIC address discovery - if let Some(addr) = self.node_map.qad_addr_for_recv(&meta.addr) { + // Check if this address is mapped to an IpMappedAddr + if let Some(ip_mapped_addr) = + self.ip_mapped_addrs.get_mapped_addr(&meta.addr) + { trace!( src = ?meta.addr, count = %quic_datagram_count, @@ -845,7 +878,7 @@ impl MagicSock { "UDP recv QUIC address discovery packets", ); quic_packets_total += quic_datagram_count; - meta.addr = addr; + meta.addr = ip_mapped_addr.0; } else { warn!( src = ?meta.addr, @@ -1611,6 +1644,7 @@ impl Handle { pconn6, disco_secrets: DiscoSecrets::default(), node_map, + ip_mapped_addrs: IpMappedAddrs::new(), udp_disco_sender, discovery, direct_addrs: Default::default(), @@ -1960,12 +1994,11 @@ impl AsyncUdpSocket for MagicSock { // ready. let ipv4_poller = self.pconn4.create_io_poller(); let ipv6_poller = self.pconn6.as_ref().map(|sock| sock.create_io_poller()); - let relay_sender = self.relay_actor_sender.clone(); + let relay_sender = self.relay_datagram_send_channel.clone(); Box::pin(IoPoller { ipv4_poller, ipv6_poller, relay_sender, - relay_send_waker: self.relay_send_waker.clone(), }) } @@ -2689,7 +2722,7 @@ impl Actor { let res = set.join_all().await; for addrs in res { addrs.iter().for_each(|addr| { - self.msock.node_map.add_qad_addr(*addr); + self.msock.ip_mapped_addrs.add(*addr); }); } } @@ -2930,12 +2963,14 @@ fn split_packets(transmit: &quinn_udp::Transmit) -> RelayContents { /// comes in as the inner [`SocketAddr`], in those interfaces we have to be careful to do /// the conversion to this type. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub(crate) struct QuicMappedAddr(pub(crate) SocketAddr); +pub(crate) struct NodeIdMappedAddr(pub(crate) SocketAddr); -/// Counter to always generate unique addresses for [`QuicMappedAddr`]. -static ADDR_COUNTER: AtomicU64 = AtomicU64::new(1); +/// Counter to always generate unique addresses for [`NodeIdMappedAddr`]. +static NODE_ID_ADDR_COUNTER: AtomicU64 = AtomicU64::new(1); -impl QuicMappedAddr { +const MAPPED_ADDR_PORT: u16 = 12345; + +impl NodeIdMappedAddr { /// The Prefix/L of our Unique Local Addresses. const ADDR_PREFIXL: u8 = 0xfd; /// The Global ID used in our Unique Local Addresses. @@ -2952,18 +2987,150 @@ impl QuicMappedAddr { addr[1..6].copy_from_slice(&Self::ADDR_GLOBAL_ID); addr[6..8].copy_from_slice(&Self::ADDR_SUBNET); - let counter = ADDR_COUNTER.fetch_add(1, Ordering::Relaxed); + let counter = NODE_ID_ADDR_COUNTER.fetch_add(1, Ordering::Relaxed); addr[8..16].copy_from_slice(&counter.to_be_bytes()); - Self(SocketAddr::new(IpAddr::V6(Ipv6Addr::from(addr)), 12345)) + Self(SocketAddr::new( + IpAddr::V6(Ipv6Addr::from(addr)), + MAPPED_ADDR_PORT, + )) } } -impl std::fmt::Display for QuicMappedAddr { +impl TryFrom for NodeIdMappedAddr { + type Error = anyhow::Error; + + fn try_from(value: SocketAddr) -> std::result::Result { + match value { + SocketAddr::V4(_) => anyhow::bail!("NodeIdMappedAddrs are all Ipv6, addr {value:?}"), + SocketAddr::V6(addr) => { + if addr.port() != MAPPED_ADDR_PORT { + anyhow::bail!("not a mapped addr"); + } + let octets = addr.ip().octets(); + if octets[6..8] != NodeIdMappedAddr::ADDR_SUBNET { + anyhow::bail!("not a NodeIdMappedAddr"); + } + Ok(NodeIdMappedAddr(value)) + } + } + } +} +impl std::fmt::Display for NodeIdMappedAddr { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "QuicMappedAddr({})", self.0) + write!(f, "NodeIdMappedAddr({})", self.0) + } +} + +/// A mirror for the `NodeIdMappedAddr`, mapping a fake Ipv6 address with an actual IP address. +/// +/// You can consider this as nothing more than a lookup key for an IP the [`MagicSock`] knows +/// about. +/// +/// And in our QUIC-facing socket APIs like [`AsyncUdpSocket`] it +/// comes in as the inner [`SocketAddr`], in those interfaces we have to be careful to do +/// the conversion to this type. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)] +pub(crate) struct IpMappedAddr(pub(crate) SocketAddr); + +/// Counter to always generate unique addresses for [`NodeIdMappedAddr`]. +static IP_ADDR_COUNTER: AtomicU64 = AtomicU64::new(1); + +impl IpMappedAddr { + /// The Prefix/L of our Unique Local Addresses. + const ADDR_PREFIXL: u8 = 0xfd; + /// The Global ID used in our Unique Local Addresses. + const ADDR_GLOBAL_ID: [u8; 5] = [21, 7, 10, 81, 11]; + /// The Subnet ID used in our Unique Local Addresses. + const ADDR_SUBNET: [u8; 2] = [0, 1]; + + /// Generates a globally unique fake UDP address. + /// + /// This generates and IPv6 Unique Local Address according to RFC 4193. + pub(crate) fn generate() -> Self { + let mut addr = [0u8; 16]; + addr[0] = Self::ADDR_PREFIXL; + addr[1..6].copy_from_slice(&Self::ADDR_GLOBAL_ID); + addr[6..8].copy_from_slice(&Self::ADDR_SUBNET); + + let counter = IP_ADDR_COUNTER.fetch_add(1, Ordering::Relaxed); + addr[8..16].copy_from_slice(&counter.to_be_bytes()); + + Self(SocketAddr::new( + IpAddr::V6(Ipv6Addr::from(addr)), + MAPPED_ADDR_PORT, + )) } } + +impl TryFrom for IpMappedAddr { + type Error = anyhow::Error; + + fn try_from(value: SocketAddr) -> std::result::Result { + match value { + SocketAddr::V4(_) => anyhow::bail!("IpMappedAddrs are all Ipv6, addr {value:?}"), + SocketAddr::V6(addr) => { + if addr.port() != MAPPED_ADDR_PORT { + anyhow::bail!("not a mapped addr"); + } + let octets = addr.ip().octets(); + if octets[6..8] != IpMappedAddr::ADDR_SUBNET { + anyhow::bail!("not an IpMappedAddr"); + } + Ok(IpMappedAddr(value)) + } + } + } +} + +impl std::fmt::Display for IpMappedAddr { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "IpMappedAddr({})", self.0) + } +} + +#[derive(Debug, Clone)] +/// A Map of [`IpMappedAddrs`] to [`SocketAddrs`] +pub(crate) struct IpMappedAddrs(Arc>>); + +impl IpMappedAddrs { + pub fn new() -> Self { + Self(Arc::new(std::sync::Mutex::new(BTreeMap::new()))) + } + + /// Add a [`SocketAddr`] to the map and the generated [`IpMappedAddr`] it is now associated with back. + /// + /// If this [`SocketAddr`] already exists in the map, it returns its associated [`IpMappedAddr`]. + pub fn add(&self, ip_addr: SocketAddr) -> IpMappedAddr { + let mut map = self.0.lock().expect("poisoned"); + for (mapped_addr, ip) in map.iter() { + if ip == &ip_addr { + return *mapped_addr; + } + } + let ip_mapped_addr = IpMappedAddr::generate(); + map.insert(ip_mapped_addr, ip_addr); + ip_mapped_addr + } + + /// Get the [`IpMappedAddr`] for the given [`SocketAddr`]. + pub fn get_mapped_addr(&self, ip_addr: &SocketAddr) -> Option { + let map = self.0.lock().expect("poisoned"); + for (mapped_addr, ip) in map.iter() { + if ip == ip_addr { + return Some(*mapped_addr); + } + } + None + } + + /// Get the [`SocketAddr`] for the given [`IpMappedAddr`]. + pub fn get_ip_addr(&self, mapped_addr: &IpMappedAddr) -> Option { + let map = self.0.lock().expect("poisoned"); + map.get(mapped_addr).copied() + } +} + fn disco_message_sent(msg: &disco::Message) { match msg { disco::Message::Ping(_) => { @@ -4032,7 +4199,7 @@ mod tests { async fn magicsock_connect( ep: &quinn::Endpoint, ep_secret_key: SecretKey, - addr: QuicMappedAddr, + addr: NodeIdMappedAddr, node_id: NodeId, ) -> Result { // Endpoint::connect sets this, do the same to have similar behaviour. @@ -4058,7 +4225,7 @@ mod tests { async fn magicsock_connect_with_transport_config( ep: &quinn::Endpoint, ep_secret_key: SecretKey, - addr: QuicMappedAddr, + addr: NodeIdMappedAddr, node_id: NodeId, transport_config: Arc, ) -> Result { @@ -4087,7 +4254,7 @@ mod tests { let msock_1 = magicsock_ep(secret_key_1.clone()).await.unwrap(); // Generate an address not present in the NodeMap. - let bad_addr = QuicMappedAddr::generate(); + let bad_addr = NodeIdMappedAddr::generate(); // 500ms is rather fast here. Running this locally it should always be the correct // timeout. If this is too slow however the test will not become flaky as we are diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index f45e2026db..edabf0826f 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -17,7 +17,7 @@ use self::{ node_state::{NodeState, Options, PingHandled}, }; use super::{ - metrics::Metrics as MagicsockMetrics, ActorMessage, DiscoMessageSource, QuicMappedAddr, + metrics::Metrics as MagicsockMetrics, ActorMessage, DiscoMessageSource, NodeIdMappedAddr, }; #[cfg(any(test, feature = "test-utils"))] use crate::endpoint::PathSelection; @@ -45,7 +45,7 @@ const MAX_INACTIVE_NODES: usize = 30; /// - The node's ID in this map, only useful if you know the ID from an insert or lookup. /// This is static and never changes. /// -/// - The [`QuicMappedAddr`] which internally identifies the node to the QUIC stack. This +/// - The [`NodeIdMappedAddr`] which internally identifies the node to the QUIC stack. This /// is static and never changes. /// /// - The nodes's public key, aka `PublicKey` or "node_key". This is static and never changes, @@ -54,7 +54,7 @@ const MAX_INACTIVE_NODES: usize = 30; /// - A public socket address on which they are reachable on the internet, known as ip-port. /// These come and go as the node moves around on the internet /// -/// An index of nodeInfos by node key, QuicMappedAddr, and discovered ip:port endpoints. +/// An index of nodeInfos by node key, NodeIdMappedAddr, and discovered ip:port endpoints. #[derive(Default, Debug)] pub(super) struct NodeMap { inner: Mutex, @@ -64,13 +64,11 @@ pub(super) struct NodeMap { pub(super) struct NodeMapInner { by_node_key: HashMap, by_ip_port: HashMap, - by_quic_mapped_addr: HashMap, + by_quic_mapped_addr: HashMap, by_id: HashMap, next_id: usize, #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection, - // special set of relay socket addresses that we use to do quic address discovery - qad_addrs: BTreeSet, } /// Identifier to look up a [`NodeState`] in the [`NodeMap`]. @@ -81,7 +79,7 @@ pub(super) struct NodeMapInner { enum NodeStateKey { Idx(usize), NodeId(NodeId), - QuicMappedAddr(QuicMappedAddr), + NodeIdMappedAddr(NodeIdMappedAddr), IpPort(IpPort), } @@ -155,71 +153,19 @@ impl NodeMap { .add_node_addr(node_addr, source) } - /// Add a the SocketAddr used to perform QUIC Address Discovery to the nodemap - pub(super) fn add_qad_addr(&self, udp_addr: SocketAddr) { - self.inner - .lock() - .expect("poisoned") - .qad_addrs - .insert(udp_addr); - } - - /// Return a correctly canonicalized SocketAddr if this address is one - /// used to perform QUIC Address Discovery - pub(super) fn qad_addr_for_send(&self, addr: &SocketAddr) -> Option { - // all addresses given to the endpoint are Ipv6 addresses, so we need to - // canonicalize before we check for the actual addr we are trying to send to - let canonicalized_addr = SocketAddr::new(addr.ip().to_canonical(), addr.port()); - if self - .inner - .lock() - .expect("poisoned") - .qad_addrs - .contains(&canonicalized_addr) - { - Some(canonicalized_addr) - } else { - None - } - } - - /// Return a correctly formed SocketAddr if this address is one used to - /// perform QUIC Address Discovery - pub(super) fn qad_addr_for_recv(&self, addr: &SocketAddr) -> Option { - if self - .inner - .lock() - .expect("poisoned") - .qad_addrs - .contains(addr) - { - match addr.ip() { - IpAddr::V4(ipv4_addr) => { - // if this is an ipv4 addr, we need to map it back to - // an ipv6 addr, since all addresses we use to dial on - // the underlying quinn endpoint are mapped ipv6 addrs - Some(SocketAddr::new( - ipv4_addr.to_ipv6_mapped().into(), - addr.port(), - )) - } - IpAddr::V6(_) => Some(*addr), - } - } else { - None - } - } - /// Number of nodes currently listed. pub(super) fn node_count(&self) -> usize { self.inner.lock().expect("poisoned").node_count() } - pub(super) fn receive_udp(&self, udp_addr: SocketAddr) -> Option<(PublicKey, QuicMappedAddr)> { + pub(super) fn receive_udp( + &self, + udp_addr: SocketAddr, + ) -> Option<(PublicKey, NodeIdMappedAddr)> { self.inner.lock().expect("poisoned").receive_udp(udp_addr) } - pub(super) fn receive_relay(&self, relay_url: &RelayUrl, src: NodeId) -> QuicMappedAddr { + pub(super) fn receive_relay(&self, relay_url: &RelayUrl, src: NodeId) -> NodeIdMappedAddr { self.inner .lock() .expect("poisoned") @@ -258,7 +204,7 @@ impl NodeMap { pub(super) fn get_quic_mapped_addr_for_node_key( &self, node_key: NodeId, - ) -> Option { + ) -> Option { self.inner .lock() .expect("poisoned") @@ -302,7 +248,7 @@ impl NodeMap { #[allow(clippy::type_complexity)] pub(super) fn get_send_addrs( &self, - addr: QuicMappedAddr, + addr: NodeIdMappedAddr, have_ipv6: bool, ) -> Option<( PublicKey, @@ -311,7 +257,7 @@ impl NodeMap { Vec, )> { let mut inner = self.inner.lock().expect("poisoned"); - let ep = inner.get_mut(NodeStateKey::QuicMappedAddr(addr))?; + let ep = inner.get_mut(NodeStateKey::NodeIdMappedAddr(addr))?; let public_key = *ep.public_key(); trace!(dest = %addr, node_id = %public_key.fmt_short(), "dst mapped to NodeId"); let (udp_addr, relay_url, msgs) = ep.get_send_addrs(have_ipv6); @@ -461,7 +407,7 @@ impl NodeMapInner { match id { NodeStateKey::Idx(id) => Some(id), NodeStateKey::NodeId(node_key) => self.by_node_key.get(&node_key).copied(), - NodeStateKey::QuicMappedAddr(addr) => self.by_quic_mapped_addr.get(&addr).copied(), + NodeStateKey::NodeIdMappedAddr(addr) => self.by_quic_mapped_addr.get(&addr).copied(), NodeStateKey::IpPort(ipp) => self.by_ip_port.get(&ipp).copied(), } } @@ -492,7 +438,7 @@ impl NodeMapInner { } /// Marks the node we believe to be at `ipp` as recently used. - fn receive_udp(&mut self, udp_addr: SocketAddr) -> Option<(NodeId, QuicMappedAddr)> { + fn receive_udp(&mut self, udp_addr: SocketAddr) -> Option<(NodeId, NodeIdMappedAddr)> { let ip_port: IpPort = udp_addr.into(); let Some(node_state) = self.get_mut(NodeStateKey::IpPort(ip_port)) else { info!(src=%udp_addr, "receive_udp: no node_state found for addr, ignore"); @@ -503,7 +449,7 @@ impl NodeMapInner { } #[instrument(skip_all, fields(src = %src.fmt_short()))] - fn receive_relay(&mut self, relay_url: &RelayUrl, src: NodeId) -> QuicMappedAddr { + fn receive_relay(&mut self, relay_url: &RelayUrl, src: NodeId) -> NodeIdMappedAddr { #[cfg(any(test, feature = "test-utils"))] let path_selection = self.path_selection; let node_state = self.get_or_insert_with(NodeStateKey::NodeId(src), || { diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index a2f97b46b3..cf5ba41259 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -24,7 +24,7 @@ use super::{ use crate::endpoint::PathSelection; use crate::{ disco::{self, SendAddr}, - magicsock::{ActorMessage, MagicsockMetrics, QuicMappedAddr, Timer, HEARTBEAT_INTERVAL}, + magicsock::{ActorMessage, MagicsockMetrics, NodeIdMappedAddr, Timer, HEARTBEAT_INTERVAL}, watchable::{Watchable, Watcher}, }; @@ -103,7 +103,7 @@ pub(super) struct NodeState { /// [`NodeMap`]: super::NodeMap id: usize, /// The UDP address used on the QUIC-layer to address this node. - quic_mapped_addr: QuicMappedAddr, + quic_mapped_addr: NodeIdMappedAddr, /// The global identifier for this endpoint. node_id: NodeId, /// The last time we pinged all endpoints. @@ -156,7 +156,7 @@ pub(super) struct Options { impl NodeState { pub(super) fn new(id: usize, options: Options) -> Self { - let quic_mapped_addr = QuicMappedAddr::generate(); + let quic_mapped_addr = NodeIdMappedAddr::generate(); if options.relay_url.is_some() { // we potentially have a relay connection to the node @@ -191,7 +191,7 @@ impl NodeState { &self.node_id } - pub(super) fn quic_mapped_addr(&self) -> &QuicMappedAddr { + pub(super) fn quic_mapped_addr(&self) -> &NodeIdMappedAddr { &self.quic_mapped_addr } @@ -1487,7 +1487,7 @@ mod tests { ( NodeState { id: 0, - quic_mapped_addr: QuicMappedAddr::generate(), + quic_mapped_addr: NodeIdMappedAddr::generate(), node_id: key.public(), last_full_ping: None, relay_url: None, @@ -1517,7 +1517,7 @@ mod tests { let key = SecretKey::generate(rand::thread_rng()); NodeState { id: 1, - quic_mapped_addr: QuicMappedAddr::generate(), + quic_mapped_addr: NodeIdMappedAddr::generate(), node_id: key.public(), last_full_ping: None, relay_url: relay_and_state(key.public(), send_addr.clone()), @@ -1538,7 +1538,7 @@ mod tests { let key = SecretKey::generate(rand::thread_rng()); NodeState { id: 2, - quic_mapped_addr: QuicMappedAddr::generate(), + quic_mapped_addr: NodeIdMappedAddr::generate(), node_id: key.public(), last_full_ping: None, relay_url: Some(( @@ -1582,7 +1582,7 @@ mod tests { ( NodeState { id: 3, - quic_mapped_addr: QuicMappedAddr::generate(), + quic_mapped_addr: NodeIdMappedAddr::generate(), node_id: key.public(), last_full_ping: None, relay_url: relay_and_state(key.public(), send_addr.clone()), @@ -1691,7 +1691,6 @@ mod tests { ]), next_id: 5, path_selection: PathSelection::default(), - qad_addrs: BTreeSet::new(), }); let mut got = node_map.list_remote_infos(later); got.sort_by_key(|p| p.node_id); From 222c2766a202ed85ff1a59b15026e17b73dbb3a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Wed, 8 Jan 2025 17:07:00 -0500 Subject: [PATCH 6/9] fix rebase mistake --- iroh/src/endpoint.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 7e131a8b77..02673aca50 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -958,10 +958,6 @@ impl Endpoint { return; } - tracing::debug!("Closing connections"); - self.endpoint.close(0u16.into(), b""); - self.endpoint.wait_idle().await; - tracing::debug!("Connections closed"); self.msock.close().await; } From c23f1eb2d827ef620eaad6fc53c211e755474fc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Thu, 9 Jan 2025 19:12:56 -0500 Subject: [PATCH 7/9] path of least resistance refactoring to see if the slowness was due to mis-handling dns resolutions --- iroh-base/src/ip_mapped_addrs.rs | 139 +++++++++++++++++++ iroh-base/src/lib.rs | 4 + iroh-net-report/src/lib.rs | 29 ++-- iroh-net-report/src/reportgen.rs | 40 +++++- iroh/src/magicsock.rs | 220 ++----------------------------- 5 files changed, 214 insertions(+), 218 deletions(-) create mode 100644 iroh-base/src/ip_mapped_addrs.rs diff --git a/iroh-base/src/ip_mapped_addrs.rs b/iroh-base/src/ip_mapped_addrs.rs new file mode 100644 index 0000000000..356a3e96d8 --- /dev/null +++ b/iroh-base/src/ip_mapped_addrs.rs @@ -0,0 +1,139 @@ +use std::{ + collections::BTreeMap, + net::{IpAddr, Ipv6Addr, SocketAddr}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; + +/// The dummy port used for all mapped addresses +pub const MAPPED_ADDR_PORT: u16 = 12345; + +/// Can occur when converting a [`SocketAddr`] to an [`IpMappedAddr`] +#[derive(Debug, thiserror::Error)] +#[error("Failed to convert: {0}")] +pub struct IpMappedAddrError(String); + +/// A mirror for the `NodeIdMappedAddr`, mapping a fake Ipv6 address with an actual IP address. +/// +/// You can consider this as nothing more than a lookup key for an IP the [`MagicSock`] knows +/// about. +/// +/// And in our QUIC-facing socket APIs like [`AsyncUdpSocket`] it +/// comes in as the inner [`SocketAddr`], in those interfaces we have to be careful to do +/// the conversion to this type. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)] +pub struct IpMappedAddr(pub(crate) SocketAddr); + +/// Counter to always generate unique addresses for [`NodeIdMappedAddr`]. +static IP_ADDR_COUNTER: AtomicU64 = AtomicU64::new(1); + +impl IpMappedAddr { + /// The Prefix/L of our Unique Local Addresses. + const ADDR_PREFIXL: u8 = 0xfd; + /// The Global ID used in our Unique Local Addresses. + const ADDR_GLOBAL_ID: [u8; 5] = [21, 7, 10, 81, 11]; + /// The Subnet ID used in our Unique Local Addresses. + const ADDR_SUBNET: [u8; 2] = [0, 1]; + + /// Generates a globally unique fake UDP address. + /// + /// This generates and IPv6 Unique Local Address according to RFC 4193. + pub fn generate() -> Self { + let mut addr = [0u8; 16]; + addr[0] = Self::ADDR_PREFIXL; + addr[1..6].copy_from_slice(&Self::ADDR_GLOBAL_ID); + addr[6..8].copy_from_slice(&Self::ADDR_SUBNET); + + let counter = IP_ADDR_COUNTER.fetch_add(1, Ordering::Relaxed); + addr[8..16].copy_from_slice(&counter.to_be_bytes()); + + Self(SocketAddr::new( + IpAddr::V6(Ipv6Addr::from(addr)), + MAPPED_ADDR_PORT, + )) + } + + /// Return the underlying [`SocketAddr`]. + pub fn addr(&self) -> SocketAddr { + self.0 + } +} + +impl TryFrom for IpMappedAddr { + type Error = IpMappedAddrError; + + fn try_from(value: SocketAddr) -> std::result::Result { + match value { + SocketAddr::V4(_) => Err(IpMappedAddrError(String::from( + "IpMappedAddrs are all Ipv6, found Ipv4 address", + ))), + SocketAddr::V6(addr) => { + if addr.port() != MAPPED_ADDR_PORT { + return Err(IpMappedAddrError(String::from("not mapped addr"))); + } + let octets = addr.ip().octets(); + if octets[6..8] != IpMappedAddr::ADDR_SUBNET { + return Err(IpMappedAddrError(String::from("not an IpMappedAddr"))); + } + Ok(IpMappedAddr(value)) + } + } + } +} + +impl std::fmt::Display for IpMappedAddr { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "IpMappedAddr({})", self.0) + } +} + +#[derive(Debug, Clone)] +/// A Map of [`IpMappedAddrs`] to [`SocketAddrs`] +pub struct IpMappedAddrs(Arc>>); + +impl IpMappedAddrs { + /// Create an empty [`IpMappedAddrs`] + pub fn new() -> Self { + Self(Arc::new(std::sync::Mutex::new(BTreeMap::new()))) + } + + /// Add a [`SocketAddr`] to the map and the generated [`IpMappedAddr`] it is now associated with back. + /// + /// If this [`SocketAddr`] already exists in the map, it returns its associated [`IpMappedAddr`]. + pub fn add(&self, ip_addr: SocketAddr) -> IpMappedAddr { + let mut map = self.0.lock().expect("poisoned"); + for (mapped_addr, ip) in map.iter() { + if ip == &ip_addr { + return *mapped_addr; + } + } + let ip_mapped_addr = IpMappedAddr::generate(); + map.insert(ip_mapped_addr, ip_addr); + ip_mapped_addr + } + + /// Get the [`IpMappedAddr`] for the given [`SocketAddr`]. + pub fn get_mapped_addr(&self, ip_addr: &SocketAddr) -> Option { + let map = self.0.lock().expect("poisoned"); + for (mapped_addr, ip) in map.iter() { + if ip == ip_addr { + return Some(*mapped_addr); + } + } + None + } + + /// Get the [`SocketAddr`] for the given [`IpMappedAddr`]. + pub fn get_ip_addr(&self, mapped_addr: &IpMappedAddr) -> Option { + let map = self.0.lock().expect("poisoned"); + map.get(mapped_addr).copied() + } +} + +impl Default for IpMappedAddrs { + fn default() -> Self { + IpMappedAddrs::new() + } +} diff --git a/iroh-base/src/lib.rs b/iroh-base/src/lib.rs index 2b32e2719b..ac66391ed8 100644 --- a/iroh-base/src/lib.rs +++ b/iroh-base/src/lib.rs @@ -7,6 +7,8 @@ #[cfg(feature = "ticket")] pub mod ticket; +#[cfg(feature = "relay")] +mod ip_mapped_addrs; #[cfg(feature = "key")] mod key; #[cfg(feature = "key")] @@ -14,6 +16,8 @@ mod node_addr; #[cfg(feature = "relay")] mod relay_url; +#[cfg(feature = "relay")] +pub use self::ip_mapped_addrs::{IpMappedAddr, IpMappedAddrs, MAPPED_ADDR_PORT}; #[cfg(feature = "key")] pub use self::key::{KeyParsingError, NodeId, PublicKey, SecretKey, Signature}; #[cfg(feature = "key")] diff --git a/iroh-net-report/src/lib.rs b/iroh-net-report/src/lib.rs index 28b404388c..ea877a5c11 100644 --- a/iroh-net-report/src/lib.rs +++ b/iroh-net-report/src/lib.rs @@ -19,7 +19,7 @@ use std::{ use anyhow::{anyhow, Result}; use bytes::Bytes; use hickory_resolver::TokioResolver as DnsResolver; -use iroh_base::RelayUrl; +use iroh_base::{IpMappedAddrs, RelayUrl}; #[cfg(feature = "metrics")] use iroh_metrics::inc; use iroh_relay::{protos::stun, RelayMap}; @@ -348,8 +348,12 @@ impl Client { /// /// This starts a connected actor in the background. Once the client is dropped it will /// stop running. - pub fn new(port_mapper: Option, dns_resolver: DnsResolver) -> Result { - let mut actor = Actor::new(port_mapper, dns_resolver)?; + pub fn new( + port_mapper: Option, + dns_resolver: DnsResolver, + ip_mapped_addrs: Option, + ) -> Result { + let mut actor = Actor::new(port_mapper, dns_resolver, ip_mapped_addrs)?; let addr = actor.addr(); let task = tokio::spawn( async move { actor.run().await }.instrument(info_span!("net_report.actor")), @@ -566,6 +570,9 @@ struct Actor { /// The DNS resolver to use for probes that need to perform DNS lookups dns_resolver: DnsResolver, + + /// The [`IpMappedAddrs`] that allows you to do QAD in iroh + ip_mapped_addrs: Option, } impl Actor { @@ -573,7 +580,11 @@ impl Actor { /// /// This does not start the actor, see [`Actor::run`] for this. You should not /// normally create this directly but rather create a [`Client`]. - fn new(port_mapper: Option, dns_resolver: DnsResolver) -> Result { + fn new( + port_mapper: Option, + dns_resolver: DnsResolver, + ip_mapped_addrs: Option, + ) -> Result { // TODO: consider an instrumented flume channel so we have metrics. let (sender, receiver) = mpsc::channel(32); Ok(Self { @@ -584,6 +595,7 @@ impl Actor { in_flight_stun_requests: Default::default(), current_report_run: None, dns_resolver, + ip_mapped_addrs, }) } @@ -687,6 +699,7 @@ impl Actor { quic_config, self.dns_resolver.clone(), protocols, + self.ip_mapped_addrs.clone(), ); self.current_report_run = Some(ReportRun { @@ -1135,7 +1148,7 @@ mod tests { stun_utils::serve("127.0.0.1".parse().unwrap()).await?; let resolver = crate::dns::tests::resolver(); - let mut client = Client::new(None, resolver.clone())?; + let mut client = Client::new(None, resolver.clone(), None)?; let dm = stun_utils::relay_map_of([stun_addr].into_iter()); // Note that the ProbePlan will change with each iteration. @@ -1183,7 +1196,7 @@ mod tests { // Now create a client and generate a report. let resolver = crate::dns::tests::resolver(); - let mut client = Client::new(None, resolver.clone())?; + let mut client = Client::new(None, resolver.clone(), None)?; let r = client.get_report(dm, None, None, None).await?; let mut r: Report = (*r).clone(); @@ -1386,7 +1399,7 @@ mod tests { let resolver = crate::dns::tests::resolver(); for mut tt in tests { println!("test: {}", tt.name); - let mut actor = Actor::new(None, resolver.clone()).unwrap(); + let mut actor = Actor::new(None, resolver.clone(), None).unwrap(); for s in &mut tt.steps { // trigger the timer time::advance(Duration::from_secs(s.after)).await; @@ -1421,7 +1434,7 @@ mod tests { dbg!(&dm); let resolver = crate::dns::tests::resolver().clone(); - let mut client = Client::new(None, resolver)?; + let mut client = Client::new(None, resolver, None)?; // Set up an external socket to send STUN requests from, this will be discovered as // our public socket address by STUN. We send back any packets received on this diff --git a/iroh-net-report/src/reportgen.rs b/iroh-net-report/src/reportgen.rs index fb144cdc25..b0d3090b26 100644 --- a/iroh-net-report/src/reportgen.rs +++ b/iroh-net-report/src/reportgen.rs @@ -28,7 +28,7 @@ use std::{ use anyhow::{anyhow, bail, Context as _, Result}; use hickory_resolver::TokioResolver as DnsResolver; -use iroh_base::RelayUrl; +use iroh_base::{IpMappedAddrs, RelayUrl}; #[cfg(feature = "metrics")] use iroh_metrics::inc; use iroh_relay::{ @@ -94,6 +94,7 @@ impl Client { quic_config: Option, dns_resolver: DnsResolver, protocols: BTreeSet, + ip_mapped_addrs: Option, ) -> Self { let (msg_tx, msg_rx) = mpsc::channel(32); let addr = Addr { @@ -114,6 +115,7 @@ impl Client { outstanding_tasks: OutstandingTasks::default(), dns_resolver, protocols, + ip_mapped_addrs, }; let task = tokio::spawn( async move { actor.run().await }.instrument(info_span!("reportgen.actor")), @@ -200,6 +202,8 @@ struct Actor { /// Protocols we should attempt to create probes for, if we have the correct /// configuration for that protocol. protocols: BTreeSet, + /// Optional [`IpMappedAddrs`] used to enable QAD in iroh + ip_mapped_addrs: Option, } impl Actor { @@ -569,6 +573,7 @@ impl Actor { let net_report = self.net_report.clone(); let pinger = pinger.clone(); let dns_resolver = self.dns_resolver.clone(); + let ip_mapped_addrs = self.ip_mapped_addrs.clone(); set.spawn( run_probe( @@ -581,6 +586,7 @@ impl Actor { net_report, pinger, dns_resolver, + ip_mapped_addrs, ) .instrument(debug_span!("run_probe", %probe)), ); @@ -716,6 +722,7 @@ async fn run_probe( net_report: net_report::Addr, pinger: Pinger, dns_resolver: DnsResolver, + ip_mapped_addrs: Option, ) -> Result { if !probe.delay().is_zero() { trace!("delaying probe"); @@ -749,7 +756,7 @@ async fn run_probe( )); } - let relay_addr = get_relay_addr(&dns_resolver, &relay_node, probe.proto()) + let relay_addr = get_relay_addr(&dns_resolver, &relay_node, probe.proto(), ip_mapped_addrs) .await .context("no relay node addr") .map_err(|e| ProbeError::AbortSet(e, probe.clone()))?; @@ -1058,6 +1065,7 @@ async fn get_relay_addr( dns_resolver: &DnsResolver, relay_node: &RelayNode, proto: ProbeProto, + ip_mapped_addrs: Option, ) -> Result { if relay_node.stun_only && !matches!(proto, ProbeProto::StunIpv4 | ProbeProto::StunIpv6) { bail!("Relay node not suitable for non-STUN probes"); @@ -1074,11 +1082,16 @@ async fn get_relay_addr( .next() .map(|ip| ip.to_canonical()) .map(|addr| SocketAddr::new(addr, port)) + .map(|addr| maybe_to_mapped_addr(ip_mapped_addrs, proto, addr)) .ok_or(anyhow!("No suitable relay addr found")), Err(err) => Err(err.context("No suitable relay addr found")), } } - Some(url::Host::Ipv4(addr)) => Ok(SocketAddr::new(addr.into(), port)), + Some(url::Host::Ipv4(addr)) => Ok(maybe_to_mapped_addr( + ip_mapped_addrs, + proto, + SocketAddr::new(addr.into(), port), + )), Some(url::Host::Ipv6(_addr)) => Err(anyhow!("No suitable relay addr found")), None => Err(anyhow!("No valid hostname in RelayUrl")), } @@ -1093,12 +1106,17 @@ async fn get_relay_addr( .next() .map(|ip| ip.to_canonical()) .map(|addr| SocketAddr::new(addr, port)) + .map(|addr| maybe_to_mapped_addr(ip_mapped_addrs, proto, addr)) .ok_or(anyhow!("No suitable relay addr found")), Err(err) => Err(err.context("No suitable relay addr found")), } } Some(url::Host::Ipv4(_addr)) => Err(anyhow!("No suitable relay addr found")), - Some(url::Host::Ipv6(addr)) => Ok(SocketAddr::new(addr.into(), port)), + Some(url::Host::Ipv6(addr)) => Ok(maybe_to_mapped_addr( + ip_mapped_addrs, + proto, + SocketAddr::new(addr.into(), port), + )), None => Err(anyhow!("No valid hostname in RelayUrl")), } } @@ -1107,6 +1125,20 @@ async fn get_relay_addr( } } +fn maybe_to_mapped_addr( + ip_mapped_addrs: Option, + proto: ProbeProto, + addr: SocketAddr, +) -> SocketAddr { + if !matches!(proto, ProbeProto::QuicIpv4 | ProbeProto::QuicIpv6) { + return addr; + } + if let Some(ip_mapped_addrs) = ip_mapped_addrs.as_ref() { + return ip_mapped_addrs.add(addr).addr(); + } + addr +} + /// Runs an ICMP IPv4 or IPv6 probe. /// /// The `pinger` is passed in so the ping sockets are only bound once diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index d65b56a416..1e9284cff1 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -35,9 +35,11 @@ use concurrent_queue::ConcurrentQueue; use data_encoding::HEXLOWER; use futures_lite::{FutureExt, StreamExt}; use futures_util::{stream::BoxStream, task::AtomicWaker}; -use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl, SecretKey}; +use iroh_base::{ + IpMappedAddr, IpMappedAddrs, NodeAddr, NodeId, PublicKey, RelayUrl, SecretKey, MAPPED_ADDR_PORT, +}; use iroh_metrics::{inc, inc_by}; -use iroh_relay::{protos::stun, RelayMap, RelayNode}; +use iroh_relay::{protos::stun, RelayMap}; use net_report::QuicConfig; use netwatch::{interfaces, ip::LocalAddresses, netmon, UdpSocket}; use quinn::{AsyncUdpSocket, ServerConfig}; @@ -68,7 +70,7 @@ use crate::{ defaults::timeouts::NET_REPORT_TIMEOUT, disco::{self, CallMeMaybe, SendAddr}, discovery::{Discovery, DiscoveryItem}, - dns::{DnsResolver, ResolverExt}, + dns::DnsResolver, key::{public_ed_box, secret_ed_box, DecryptionError, SharedSecret}, watchable::{Watchable, Watcher}, }; @@ -878,7 +880,7 @@ impl MagicSock { "UDP recv QUIC address discovery packets", ); quic_packets_total += quic_datagram_count; - meta.addr = ip_mapped_addr.0; + meta.addr = ip_mapped_addr.addr(); } else { warn!( src = ?meta.addr, @@ -1603,8 +1605,13 @@ impl Handle { let ipv4_addr = pconn4.local_addr()?; let ipv6_addr = pconn6.as_ref().and_then(|c| c.local_addr().ok()); - let net_reporter = - net_report::Client::new(Some(port_mapper.clone()), dns_resolver.clone())?; + let ip_mapped_addrs = IpMappedAddrs::default(); + + let net_reporter = net_report::Client::new( + Some(port_mapper.clone()), + dns_resolver.clone(), + Some(ip_mapped_addrs.clone()), + )?; let pconn4_sock = pconn4.as_socket(); let pconn6_sock = pconn6.as_ref().map(|p| p.as_socket()); @@ -1644,7 +1651,7 @@ impl Handle { pconn6, disco_secrets: DiscoSecrets::default(), node_map, - ip_mapped_addrs: IpMappedAddrs::new(), + ip_mapped_addrs, udp_disco_sender, discovery, direct_addrs: Default::default(), @@ -2169,9 +2176,6 @@ impl Actor { } } - // kick off resolving the URLs for relay addresses - self.resolve_qad_addrs(Duration::from_millis(10)).await; - let mut receiver_closed = false; let mut portmap_watcher_closed = false; let mut link_change_closed = false; @@ -2550,10 +2554,6 @@ impl Actor { .stun_v4(Some(self.pconn4.clone())) .stun_v6(self.pconn6.clone()); - // Need to get the SocketAddrs of the relay urls and add them to the node map - // so the socket knows to treat them special - self.resolve_qad_addrs(std::time::Duration::from_millis(300)) - .await; // create a client config for the endpoint to use for QUIC address discovery let root_store = rustls::RootCertStore::from_iter(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); @@ -2646,87 +2646,6 @@ impl Actor { self.update_direct_addresses(report); } - /// Does a DNS look up for the `RelayUrl` and returns the set of resolved - /// [`SocketAddr`]s - async fn resolve_relay_quic_endpoint( - dns_resolver: DnsResolver, - relay_node: Arc, - duration: Duration, - ) -> BTreeSet { - let mut addrs = BTreeSet::new(); - let port = if let Some(ref quic_config) = relay_node.quic { - quic_config.port - } else { - trace!( - ?relay_node, - "No quic config for the relay node: no need to resolve quic endpoint ip" - ); - return addrs; - }; - - match relay_node.url.host() { - Some(url::Host::Domain(hostname)) => { - debug!(%hostname, "Performing DNS A lookup for relay addr"); - let mut set = JoinSet::new(); - let resolver = dns_resolver.clone(); - let ipv4_resolver = resolver.clone(); - let ipv4_hostname = hostname.to_owned(); - set.spawn(async move { - let res = ipv4_resolver.lookup_ipv4(ipv4_hostname, duration).await; - res.map(|addrs| addrs.collect::>()) - }); - let ipv6_hostname = hostname.to_owned(); - set.spawn(async move { - let res = resolver.lookup_ipv6(ipv6_hostname, duration).await; - res.map(|addrs| addrs.collect::>()) - }); - let responses = set.join_all().await; - for res in responses { - match res { - Err(_) => {} - Ok(resolved_addrs) => { - for addr in resolved_addrs { - addrs.insert(SocketAddr::new(addr, port)); - } - } - } - } - if addrs.is_empty() { - debug!(%hostname, "Unable to resolve ip addresses for relay node"); - } - } - Some(url::Host::Ipv4(addr)) => { - addrs.insert(SocketAddr::new(addr.into(), port)); - } - Some(url::Host::Ipv6(addr)) => { - addrs.insert(SocketAddr::new(addr.into(), port)); - } - None => { - error!(?relay_node.url, "No hostname for relay node, cannot resolve ip"); - } - } - addrs - } - - /// Resolve the relay addresses used for QUIC address discovery. - async fn resolve_qad_addrs(&mut self, duration: Duration) { - let mut set = JoinSet::new(); - let resolver = self.msock.dns_resolver(); - for relay_node in self.msock.relay_map.nodes() { - set.spawn(Actor::resolve_relay_quic_endpoint( - resolver.clone(), - relay_node.clone(), - duration, - )); - } - let res = set.join_all().await; - for addrs in res { - addrs.iter().for_each(|addr| { - self.msock.ip_mapped_addrs.add(*addr); - }); - } - } - fn set_nearest_relay(&mut self, relay_url: Option) -> bool { let my_relay = self.msock.my_relay(); if relay_url == my_relay { @@ -2968,8 +2887,6 @@ pub(crate) struct NodeIdMappedAddr(pub(crate) SocketAddr); /// Counter to always generate unique addresses for [`NodeIdMappedAddr`]. static NODE_ID_ADDR_COUNTER: AtomicU64 = AtomicU64::new(1); -const MAPPED_ADDR_PORT: u16 = 12345; - impl NodeIdMappedAddr { /// The Prefix/L of our Unique Local Addresses. const ADDR_PREFIXL: u8 = 0xfd; @@ -3022,115 +2939,6 @@ impl std::fmt::Display for NodeIdMappedAddr { } } -/// A mirror for the `NodeIdMappedAddr`, mapping a fake Ipv6 address with an actual IP address. -/// -/// You can consider this as nothing more than a lookup key for an IP the [`MagicSock`] knows -/// about. -/// -/// And in our QUIC-facing socket APIs like [`AsyncUdpSocket`] it -/// comes in as the inner [`SocketAddr`], in those interfaces we have to be careful to do -/// the conversion to this type. -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)] -pub(crate) struct IpMappedAddr(pub(crate) SocketAddr); - -/// Counter to always generate unique addresses for [`NodeIdMappedAddr`]. -static IP_ADDR_COUNTER: AtomicU64 = AtomicU64::new(1); - -impl IpMappedAddr { - /// The Prefix/L of our Unique Local Addresses. - const ADDR_PREFIXL: u8 = 0xfd; - /// The Global ID used in our Unique Local Addresses. - const ADDR_GLOBAL_ID: [u8; 5] = [21, 7, 10, 81, 11]; - /// The Subnet ID used in our Unique Local Addresses. - const ADDR_SUBNET: [u8; 2] = [0, 1]; - - /// Generates a globally unique fake UDP address. - /// - /// This generates and IPv6 Unique Local Address according to RFC 4193. - pub(crate) fn generate() -> Self { - let mut addr = [0u8; 16]; - addr[0] = Self::ADDR_PREFIXL; - addr[1..6].copy_from_slice(&Self::ADDR_GLOBAL_ID); - addr[6..8].copy_from_slice(&Self::ADDR_SUBNET); - - let counter = IP_ADDR_COUNTER.fetch_add(1, Ordering::Relaxed); - addr[8..16].copy_from_slice(&counter.to_be_bytes()); - - Self(SocketAddr::new( - IpAddr::V6(Ipv6Addr::from(addr)), - MAPPED_ADDR_PORT, - )) - } -} - -impl TryFrom for IpMappedAddr { - type Error = anyhow::Error; - - fn try_from(value: SocketAddr) -> std::result::Result { - match value { - SocketAddr::V4(_) => anyhow::bail!("IpMappedAddrs are all Ipv6, addr {value:?}"), - SocketAddr::V6(addr) => { - if addr.port() != MAPPED_ADDR_PORT { - anyhow::bail!("not a mapped addr"); - } - let octets = addr.ip().octets(); - if octets[6..8] != IpMappedAddr::ADDR_SUBNET { - anyhow::bail!("not an IpMappedAddr"); - } - Ok(IpMappedAddr(value)) - } - } - } -} - -impl std::fmt::Display for IpMappedAddr { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "IpMappedAddr({})", self.0) - } -} - -#[derive(Debug, Clone)] -/// A Map of [`IpMappedAddrs`] to [`SocketAddrs`] -pub(crate) struct IpMappedAddrs(Arc>>); - -impl IpMappedAddrs { - pub fn new() -> Self { - Self(Arc::new(std::sync::Mutex::new(BTreeMap::new()))) - } - - /// Add a [`SocketAddr`] to the map and the generated [`IpMappedAddr`] it is now associated with back. - /// - /// If this [`SocketAddr`] already exists in the map, it returns its associated [`IpMappedAddr`]. - pub fn add(&self, ip_addr: SocketAddr) -> IpMappedAddr { - let mut map = self.0.lock().expect("poisoned"); - for (mapped_addr, ip) in map.iter() { - if ip == &ip_addr { - return *mapped_addr; - } - } - let ip_mapped_addr = IpMappedAddr::generate(); - map.insert(ip_mapped_addr, ip_addr); - ip_mapped_addr - } - - /// Get the [`IpMappedAddr`] for the given [`SocketAddr`]. - pub fn get_mapped_addr(&self, ip_addr: &SocketAddr) -> Option { - let map = self.0.lock().expect("poisoned"); - for (mapped_addr, ip) in map.iter() { - if ip == ip_addr { - return Some(*mapped_addr); - } - } - None - } - - /// Get the [`SocketAddr`] for the given [`IpMappedAddr`]. - pub fn get_ip_addr(&self, mapped_addr: &IpMappedAddr) -> Option { - let map = self.0.lock().expect("poisoned"); - map.get(mapped_addr).copied() - } -} - fn disco_message_sent(msg: &disco::Message) { match msg { disco::Message::Ping(_) => { From 96aa2a50f71e0e0041e69717f60dd5db8a299f1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Fri, 10 Jan 2025 15:16:55 -0500 Subject: [PATCH 8/9] clean up rustdocs --- iroh-base/src/ip_mapped_addrs.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/iroh-base/src/ip_mapped_addrs.rs b/iroh-base/src/ip_mapped_addrs.rs index 356a3e96d8..9b2be37342 100644 --- a/iroh-base/src/ip_mapped_addrs.rs +++ b/iroh-base/src/ip_mapped_addrs.rs @@ -17,16 +17,16 @@ pub struct IpMappedAddrError(String); /// A mirror for the `NodeIdMappedAddr`, mapping a fake Ipv6 address with an actual IP address. /// -/// You can consider this as nothing more than a lookup key for an IP the [`MagicSock`] knows +/// You can consider this as nothing more than a lookup key for an IP that iroh's magicsocket knows /// about. /// -/// And in our QUIC-facing socket APIs like [`AsyncUdpSocket`] it +/// And in our QUIC-facing socket APIs like iroh's `AsyncUdpSocket` it /// comes in as the inner [`SocketAddr`], in those interfaces we have to be careful to do /// the conversion to this type. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)] pub struct IpMappedAddr(pub(crate) SocketAddr); -/// Counter to always generate unique addresses for [`NodeIdMappedAddr`]. +/// Counter to always generate unique addresses for `NodeIdMappedAddr`. static IP_ADDR_COUNTER: AtomicU64 = AtomicU64::new(1); impl IpMappedAddr { @@ -90,7 +90,7 @@ impl std::fmt::Display for IpMappedAddr { } #[derive(Debug, Clone)] -/// A Map of [`IpMappedAddrs`] to [`SocketAddrs`] +/// A Map of [`IpMappedAddrs`] to [`SocketAddr`] pub struct IpMappedAddrs(Arc>>); impl IpMappedAddrs { From 39b2cca2cab71cf85c28a59d6bad2ae0d20e4015 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Fri, 10 Jan 2025 16:27:11 -0500 Subject: [PATCH 9/9] add `relay_lookup_ipvX_staggered` helper functions --- iroh-net-report/src/reportgen.rs | 111 ++++++++++++++++++------------- 1 file changed, 63 insertions(+), 48 deletions(-) diff --git a/iroh-net-report/src/reportgen.rs b/iroh-net-report/src/reportgen.rs index b0d3090b26..bdd8dbd7f0 100644 --- a/iroh-net-report/src/reportgen.rs +++ b/iroh-net-report/src/reportgen.rs @@ -1074,65 +1074,80 @@ async fn get_relay_addr( match proto { ProbeProto::StunIpv4 | ProbeProto::IcmpV4 | ProbeProto::QuicIpv4 => { - match relay_node.url.host() { - Some(url::Host::Domain(hostname)) => { - debug!(?proto, %hostname, "Performing DNS A lookup for relay addr"); - match dns_resolver.lookup_ipv4_staggered(hostname).await { - Ok(mut addrs) => addrs - .next() - .map(|ip| ip.to_canonical()) - .map(|addr| SocketAddr::new(addr, port)) - .map(|addr| maybe_to_mapped_addr(ip_mapped_addrs, proto, addr)) - .ok_or(anyhow!("No suitable relay addr found")), - Err(err) => Err(err.context("No suitable relay addr found")), - } - } - Some(url::Host::Ipv4(addr)) => Ok(maybe_to_mapped_addr( - ip_mapped_addrs, - proto, - SocketAddr::new(addr.into(), port), - )), - Some(url::Host::Ipv6(_addr)) => Err(anyhow!("No suitable relay addr found")), - None => Err(anyhow!("No valid hostname in RelayUrl")), - } + relay_lookup_ipv4_staggered(dns_resolver, ip_mapped_addrs, relay_node, port).await } ProbeProto::StunIpv6 | ProbeProto::IcmpV6 | ProbeProto::QuicIpv6 => { - match relay_node.url.host() { - Some(url::Host::Domain(hostname)) => { - debug!(?proto, %hostname, "Performing DNS AAAA lookup for relay addr"); - match dns_resolver.lookup_ipv6_staggered(hostname).await { - Ok(mut addrs) => addrs - .next() - .map(|ip| ip.to_canonical()) - .map(|addr| SocketAddr::new(addr, port)) - .map(|addr| maybe_to_mapped_addr(ip_mapped_addrs, proto, addr)) - .ok_or(anyhow!("No suitable relay addr found")), - Err(err) => Err(err.context("No suitable relay addr found")), - } - } - Some(url::Host::Ipv4(_addr)) => Err(anyhow!("No suitable relay addr found")), - Some(url::Host::Ipv6(addr)) => Ok(maybe_to_mapped_addr( - ip_mapped_addrs, - proto, - SocketAddr::new(addr.into(), port), - )), - None => Err(anyhow!("No valid hostname in RelayUrl")), - } + relay_lookup_ipv6_staggered(dns_resolver, ip_mapped_addrs, relay_node, port).await } ProbeProto::Https => Err(anyhow!("Not implemented")), } } -fn maybe_to_mapped_addr( +/// Do a staggared ipv4 DNS lookup based on [`RelayNode`] +/// +/// `port` is combined with the resolved [`Ipv4Addr`] to return a [`SocketAddr`] +async fn relay_lookup_ipv4_staggered( + dns_resolver: &DnsResolver, ip_mapped_addrs: Option, - proto: ProbeProto, - addr: SocketAddr, -) -> SocketAddr { - if !matches!(proto, ProbeProto::QuicIpv4 | ProbeProto::QuicIpv6) { - return addr; + relay: &RelayNode, + port: u16, +) -> Result { + match relay.url.host() { + Some(url::Host::Domain(hostname)) => { + debug!(%hostname, "Performing DNS A lookup for relay addr"); + match dns_resolver.lookup_ipv4_staggered(hostname).await { + Ok(mut addrs) => addrs + .next() + .map(|ip| ip.to_canonical()) + .map(|addr| SocketAddr::new(addr, port)) + .map(|addr| maybe_to_mapped_addr(ip_mapped_addrs, addr)) + .ok_or(anyhow!("No suitable relay addr found")), + Err(err) => Err(err.context("No suitable relay addr found")), + } + } + Some(url::Host::Ipv4(addr)) => Ok(maybe_to_mapped_addr( + ip_mapped_addrs, + SocketAddr::new(addr.into(), port), + )), + Some(url::Host::Ipv6(_addr)) => Err(anyhow!("No suitable relay addr found")), + None => Err(anyhow!("No valid hostname in RelayUrl")), } +} + +/// Do a staggared ipv6 DNS lookup based on [`RelayNode`] +/// +/// `port` is combined with the resolved [`Ipv6Addr`] to return a [`SocketAddr`] +async fn relay_lookup_ipv6_staggered( + dns_resolver: &DnsResolver, + ip_mapped_addrs: Option, + relay: &RelayNode, + port: u16, +) -> Result { + match relay.url.host() { + Some(url::Host::Domain(hostname)) => { + debug!(%hostname, "Performing DNS AAAA lookup for relay addr"); + match dns_resolver.lookup_ipv6_staggered(hostname).await { + Ok(mut addrs) => addrs + .next() + .map(|ip| ip.to_canonical()) + .map(|addr| SocketAddr::new(addr, port)) + .map(|addr| maybe_to_mapped_addr(ip_mapped_addrs, addr)) + .ok_or(anyhow!("No suitable relay addr found")), + Err(err) => Err(err.context("No suitable relay addr found")), + } + } + Some(url::Host::Ipv4(_addr)) => Err(anyhow!("No suitable relay addr found")), + Some(url::Host::Ipv6(addr)) => Ok(maybe_to_mapped_addr( + ip_mapped_addrs, + SocketAddr::new(addr.into(), port), + )), + None => Err(anyhow!("No valid hostname in RelayUrl")), + } +} + +fn maybe_to_mapped_addr(ip_mapped_addrs: Option, addr: SocketAddr) -> SocketAddr { if let Some(ip_mapped_addrs) = ip_mapped_addrs.as_ref() { return ip_mapped_addrs.add(addr).addr(); }