diff --git a/iroh-base/src/ip_mapped_addrs.rs b/iroh-base/src/ip_mapped_addrs.rs new file mode 100644 index 0000000000..9b2be37342 --- /dev/null +++ b/iroh-base/src/ip_mapped_addrs.rs @@ -0,0 +1,139 @@ +use std::{ + collections::BTreeMap, + net::{IpAddr, Ipv6Addr, SocketAddr}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; + +/// The dummy port used for all mapped addresses +pub const MAPPED_ADDR_PORT: u16 = 12345; + +/// Can occur when converting a [`SocketAddr`] to an [`IpMappedAddr`] +#[derive(Debug, thiserror::Error)] +#[error("Failed to convert: {0}")] +pub struct IpMappedAddrError(String); + +/// A mirror for the `NodeIdMappedAddr`, mapping a fake Ipv6 address with an actual IP address. +/// +/// You can consider this as nothing more than a lookup key for an IP that iroh's magicsocket knows +/// about. +/// +/// And in our QUIC-facing socket APIs like iroh's `AsyncUdpSocket` it +/// comes in as the inner [`SocketAddr`], in those interfaces we have to be careful to do +/// the conversion to this type. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)] +pub struct IpMappedAddr(pub(crate) SocketAddr); + +/// Counter to always generate unique addresses for `NodeIdMappedAddr`. +static IP_ADDR_COUNTER: AtomicU64 = AtomicU64::new(1); + +impl IpMappedAddr { + /// The Prefix/L of our Unique Local Addresses. + const ADDR_PREFIXL: u8 = 0xfd; + /// The Global ID used in our Unique Local Addresses. + const ADDR_GLOBAL_ID: [u8; 5] = [21, 7, 10, 81, 11]; + /// The Subnet ID used in our Unique Local Addresses. + const ADDR_SUBNET: [u8; 2] = [0, 1]; + + /// Generates a globally unique fake UDP address. + /// + /// This generates and IPv6 Unique Local Address according to RFC 4193. + pub fn generate() -> Self { + let mut addr = [0u8; 16]; + addr[0] = Self::ADDR_PREFIXL; + addr[1..6].copy_from_slice(&Self::ADDR_GLOBAL_ID); + addr[6..8].copy_from_slice(&Self::ADDR_SUBNET); + + let counter = IP_ADDR_COUNTER.fetch_add(1, Ordering::Relaxed); + addr[8..16].copy_from_slice(&counter.to_be_bytes()); + + Self(SocketAddr::new( + IpAddr::V6(Ipv6Addr::from(addr)), + MAPPED_ADDR_PORT, + )) + } + + /// Return the underlying [`SocketAddr`]. + pub fn addr(&self) -> SocketAddr { + self.0 + } +} + +impl TryFrom for IpMappedAddr { + type Error = IpMappedAddrError; + + fn try_from(value: SocketAddr) -> std::result::Result { + match value { + SocketAddr::V4(_) => Err(IpMappedAddrError(String::from( + "IpMappedAddrs are all Ipv6, found Ipv4 address", + ))), + SocketAddr::V6(addr) => { + if addr.port() != MAPPED_ADDR_PORT { + return Err(IpMappedAddrError(String::from("not mapped addr"))); + } + let octets = addr.ip().octets(); + if octets[6..8] != IpMappedAddr::ADDR_SUBNET { + return Err(IpMappedAddrError(String::from("not an IpMappedAddr"))); + } + Ok(IpMappedAddr(value)) + } + } + } +} + +impl std::fmt::Display for IpMappedAddr { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "IpMappedAddr({})", self.0) + } +} + +#[derive(Debug, Clone)] +/// A Map of [`IpMappedAddrs`] to [`SocketAddr`] +pub struct IpMappedAddrs(Arc>>); + +impl IpMappedAddrs { + /// Create an empty [`IpMappedAddrs`] + pub fn new() -> Self { + Self(Arc::new(std::sync::Mutex::new(BTreeMap::new()))) + } + + /// Add a [`SocketAddr`] to the map and the generated [`IpMappedAddr`] it is now associated with back. + /// + /// If this [`SocketAddr`] already exists in the map, it returns its associated [`IpMappedAddr`]. + pub fn add(&self, ip_addr: SocketAddr) -> IpMappedAddr { + let mut map = self.0.lock().expect("poisoned"); + for (mapped_addr, ip) in map.iter() { + if ip == &ip_addr { + return *mapped_addr; + } + } + let ip_mapped_addr = IpMappedAddr::generate(); + map.insert(ip_mapped_addr, ip_addr); + ip_mapped_addr + } + + /// Get the [`IpMappedAddr`] for the given [`SocketAddr`]. + pub fn get_mapped_addr(&self, ip_addr: &SocketAddr) -> Option { + let map = self.0.lock().expect("poisoned"); + for (mapped_addr, ip) in map.iter() { + if ip == ip_addr { + return Some(*mapped_addr); + } + } + None + } + + /// Get the [`SocketAddr`] for the given [`IpMappedAddr`]. + pub fn get_ip_addr(&self, mapped_addr: &IpMappedAddr) -> Option { + let map = self.0.lock().expect("poisoned"); + map.get(mapped_addr).copied() + } +} + +impl Default for IpMappedAddrs { + fn default() -> Self { + IpMappedAddrs::new() + } +} diff --git a/iroh-base/src/lib.rs b/iroh-base/src/lib.rs index 2b32e2719b..ac66391ed8 100644 --- a/iroh-base/src/lib.rs +++ b/iroh-base/src/lib.rs @@ -7,6 +7,8 @@ #[cfg(feature = "ticket")] pub mod ticket; +#[cfg(feature = "relay")] +mod ip_mapped_addrs; #[cfg(feature = "key")] mod key; #[cfg(feature = "key")] @@ -14,6 +16,8 @@ mod node_addr; #[cfg(feature = "relay")] mod relay_url; +#[cfg(feature = "relay")] +pub use self::ip_mapped_addrs::{IpMappedAddr, IpMappedAddrs, MAPPED_ADDR_PORT}; #[cfg(feature = "key")] pub use self::key::{KeyParsingError, NodeId, PublicKey, SecretKey, Signature}; #[cfg(feature = "key")] diff --git a/iroh-net-report/src/lib.rs b/iroh-net-report/src/lib.rs index 4e309aff34..ea877a5c11 100644 --- a/iroh-net-report/src/lib.rs +++ b/iroh-net-report/src/lib.rs @@ -19,7 +19,7 @@ use std::{ use anyhow::{anyhow, Result}; use bytes::Bytes; use hickory_resolver::TokioResolver as DnsResolver; -use iroh_base::RelayUrl; +use iroh_base::{IpMappedAddrs, RelayUrl}; #[cfg(feature = "metrics")] use iroh_metrics::inc; use iroh_relay::{protos::stun, RelayMap}; @@ -348,8 +348,12 @@ impl Client { /// /// This starts a connected actor in the background. Once the client is dropped it will /// stop running. - pub fn new(port_mapper: Option, dns_resolver: DnsResolver) -> Result { - let mut actor = Actor::new(port_mapper, dns_resolver)?; + pub fn new( + port_mapper: Option, + dns_resolver: DnsResolver, + ip_mapped_addrs: Option, + ) -> Result { + let mut actor = Actor::new(port_mapper, dns_resolver, ip_mapped_addrs)?; let addr = actor.addr(); let task = tokio::spawn( async move { actor.run().await }.instrument(info_span!("net_report.actor")), @@ -566,6 +570,9 @@ struct Actor { /// The DNS resolver to use for probes that need to perform DNS lookups dns_resolver: DnsResolver, + + /// The [`IpMappedAddrs`] that allows you to do QAD in iroh + ip_mapped_addrs: Option, } impl Actor { @@ -573,7 +580,11 @@ impl Actor { /// /// This does not start the actor, see [`Actor::run`] for this. You should not /// normally create this directly but rather create a [`Client`]. - fn new(port_mapper: Option, dns_resolver: DnsResolver) -> Result { + fn new( + port_mapper: Option, + dns_resolver: DnsResolver, + ip_mapped_addrs: Option, + ) -> Result { // TODO: consider an instrumented flume channel so we have metrics. let (sender, receiver) = mpsc::channel(32); Ok(Self { @@ -584,6 +595,7 @@ impl Actor { in_flight_stun_requests: Default::default(), current_report_run: None, dns_resolver, + ip_mapped_addrs, }) } @@ -644,6 +656,7 @@ impl Actor { quic_config, .. } = opts; + trace!("Attempting probes for protocols {protocols:#?}"); if self.current_report_run.is_some() { response_tx .send(Err(anyhow!( @@ -686,6 +699,7 @@ impl Actor { quic_config, self.dns_resolver.clone(), protocols, + self.ip_mapped_addrs.clone(), ); self.current_report_run = Some(ReportRun { @@ -1134,7 +1148,7 @@ mod tests { stun_utils::serve("127.0.0.1".parse().unwrap()).await?; let resolver = crate::dns::tests::resolver(); - let mut client = Client::new(None, resolver.clone())?; + let mut client = Client::new(None, resolver.clone(), None)?; let dm = stun_utils::relay_map_of([stun_addr].into_iter()); // Note that the ProbePlan will change with each iteration. @@ -1182,7 +1196,7 @@ mod tests { // Now create a client and generate a report. let resolver = crate::dns::tests::resolver(); - let mut client = Client::new(None, resolver.clone())?; + let mut client = Client::new(None, resolver.clone(), None)?; let r = client.get_report(dm, None, None, None).await?; let mut r: Report = (*r).clone(); @@ -1385,7 +1399,7 @@ mod tests { let resolver = crate::dns::tests::resolver(); for mut tt in tests { println!("test: {}", tt.name); - let mut actor = Actor::new(None, resolver.clone()).unwrap(); + let mut actor = Actor::new(None, resolver.clone(), None).unwrap(); for s in &mut tt.steps { // trigger the timer time::advance(Duration::from_secs(s.after)).await; @@ -1420,7 +1434,7 @@ mod tests { dbg!(&dm); let resolver = crate::dns::tests::resolver().clone(); - let mut client = Client::new(None, resolver)?; + let mut client = Client::new(None, resolver, None)?; // Set up an external socket to send STUN requests from, this will be discovered as // our public socket address by STUN. We send back any packets received on this diff --git a/iroh-net-report/src/reportgen.rs b/iroh-net-report/src/reportgen.rs index fb144cdc25..bdd8dbd7f0 100644 --- a/iroh-net-report/src/reportgen.rs +++ b/iroh-net-report/src/reportgen.rs @@ -28,7 +28,7 @@ use std::{ use anyhow::{anyhow, bail, Context as _, Result}; use hickory_resolver::TokioResolver as DnsResolver; -use iroh_base::RelayUrl; +use iroh_base::{IpMappedAddrs, RelayUrl}; #[cfg(feature = "metrics")] use iroh_metrics::inc; use iroh_relay::{ @@ -94,6 +94,7 @@ impl Client { quic_config: Option, dns_resolver: DnsResolver, protocols: BTreeSet, + ip_mapped_addrs: Option, ) -> Self { let (msg_tx, msg_rx) = mpsc::channel(32); let addr = Addr { @@ -114,6 +115,7 @@ impl Client { outstanding_tasks: OutstandingTasks::default(), dns_resolver, protocols, + ip_mapped_addrs, }; let task = tokio::spawn( async move { actor.run().await }.instrument(info_span!("reportgen.actor")), @@ -200,6 +202,8 @@ struct Actor { /// Protocols we should attempt to create probes for, if we have the correct /// configuration for that protocol. protocols: BTreeSet, + /// Optional [`IpMappedAddrs`] used to enable QAD in iroh + ip_mapped_addrs: Option, } impl Actor { @@ -569,6 +573,7 @@ impl Actor { let net_report = self.net_report.clone(); let pinger = pinger.clone(); let dns_resolver = self.dns_resolver.clone(); + let ip_mapped_addrs = self.ip_mapped_addrs.clone(); set.spawn( run_probe( @@ -581,6 +586,7 @@ impl Actor { net_report, pinger, dns_resolver, + ip_mapped_addrs, ) .instrument(debug_span!("run_probe", %probe)), ); @@ -716,6 +722,7 @@ async fn run_probe( net_report: net_report::Addr, pinger: Pinger, dns_resolver: DnsResolver, + ip_mapped_addrs: Option, ) -> Result { if !probe.delay().is_zero() { trace!("delaying probe"); @@ -749,7 +756,7 @@ async fn run_probe( )); } - let relay_addr = get_relay_addr(&dns_resolver, &relay_node, probe.proto()) + let relay_addr = get_relay_addr(&dns_resolver, &relay_node, probe.proto(), ip_mapped_addrs) .await .context("no relay node addr") .map_err(|e| ProbeError::AbortSet(e, probe.clone()))?; @@ -1058,6 +1065,7 @@ async fn get_relay_addr( dns_resolver: &DnsResolver, relay_node: &RelayNode, proto: ProbeProto, + ip_mapped_addrs: Option, ) -> Result { if relay_node.stun_only && !matches!(proto, ProbeProto::StunIpv4 | ProbeProto::StunIpv6) { bail!("Relay node not suitable for non-STUN probes"); @@ -1066,47 +1074,86 @@ async fn get_relay_addr( match proto { ProbeProto::StunIpv4 | ProbeProto::IcmpV4 | ProbeProto::QuicIpv4 => { - match relay_node.url.host() { - Some(url::Host::Domain(hostname)) => { - debug!(?proto, %hostname, "Performing DNS A lookup for relay addr"); - match dns_resolver.lookup_ipv4_staggered(hostname).await { - Ok(mut addrs) => addrs - .next() - .map(|ip| ip.to_canonical()) - .map(|addr| SocketAddr::new(addr, port)) - .ok_or(anyhow!("No suitable relay addr found")), - Err(err) => Err(err.context("No suitable relay addr found")), - } - } - Some(url::Host::Ipv4(addr)) => Ok(SocketAddr::new(addr.into(), port)), - Some(url::Host::Ipv6(_addr)) => Err(anyhow!("No suitable relay addr found")), - None => Err(anyhow!("No valid hostname in RelayUrl")), - } + relay_lookup_ipv4_staggered(dns_resolver, ip_mapped_addrs, relay_node, port).await } ProbeProto::StunIpv6 | ProbeProto::IcmpV6 | ProbeProto::QuicIpv6 => { - match relay_node.url.host() { - Some(url::Host::Domain(hostname)) => { - debug!(?proto, %hostname, "Performing DNS AAAA lookup for relay addr"); - match dns_resolver.lookup_ipv6_staggered(hostname).await { - Ok(mut addrs) => addrs - .next() - .map(|ip| ip.to_canonical()) - .map(|addr| SocketAddr::new(addr, port)) - .ok_or(anyhow!("No suitable relay addr found")), - Err(err) => Err(err.context("No suitable relay addr found")), - } - } - Some(url::Host::Ipv4(_addr)) => Err(anyhow!("No suitable relay addr found")), - Some(url::Host::Ipv6(addr)) => Ok(SocketAddr::new(addr.into(), port)), - None => Err(anyhow!("No valid hostname in RelayUrl")), - } + relay_lookup_ipv6_staggered(dns_resolver, ip_mapped_addrs, relay_node, port).await } ProbeProto::Https => Err(anyhow!("Not implemented")), } } +/// Do a staggared ipv4 DNS lookup based on [`RelayNode`] +/// +/// `port` is combined with the resolved [`Ipv4Addr`] to return a [`SocketAddr`] +async fn relay_lookup_ipv4_staggered( + dns_resolver: &DnsResolver, + ip_mapped_addrs: Option, + relay: &RelayNode, + port: u16, +) -> Result { + match relay.url.host() { + Some(url::Host::Domain(hostname)) => { + debug!(%hostname, "Performing DNS A lookup for relay addr"); + match dns_resolver.lookup_ipv4_staggered(hostname).await { + Ok(mut addrs) => addrs + .next() + .map(|ip| ip.to_canonical()) + .map(|addr| SocketAddr::new(addr, port)) + .map(|addr| maybe_to_mapped_addr(ip_mapped_addrs, addr)) + .ok_or(anyhow!("No suitable relay addr found")), + Err(err) => Err(err.context("No suitable relay addr found")), + } + } + Some(url::Host::Ipv4(addr)) => Ok(maybe_to_mapped_addr( + ip_mapped_addrs, + SocketAddr::new(addr.into(), port), + )), + Some(url::Host::Ipv6(_addr)) => Err(anyhow!("No suitable relay addr found")), + None => Err(anyhow!("No valid hostname in RelayUrl")), + } +} + +/// Do a staggared ipv6 DNS lookup based on [`RelayNode`] +/// +/// `port` is combined with the resolved [`Ipv6Addr`] to return a [`SocketAddr`] +async fn relay_lookup_ipv6_staggered( + dns_resolver: &DnsResolver, + ip_mapped_addrs: Option, + relay: &RelayNode, + port: u16, +) -> Result { + match relay.url.host() { + Some(url::Host::Domain(hostname)) => { + debug!(%hostname, "Performing DNS AAAA lookup for relay addr"); + match dns_resolver.lookup_ipv6_staggered(hostname).await { + Ok(mut addrs) => addrs + .next() + .map(|ip| ip.to_canonical()) + .map(|addr| SocketAddr::new(addr, port)) + .map(|addr| maybe_to_mapped_addr(ip_mapped_addrs, addr)) + .ok_or(anyhow!("No suitable relay addr found")), + Err(err) => Err(err.context("No suitable relay addr found")), + } + } + Some(url::Host::Ipv4(_addr)) => Err(anyhow!("No suitable relay addr found")), + Some(url::Host::Ipv6(addr)) => Ok(maybe_to_mapped_addr( + ip_mapped_addrs, + SocketAddr::new(addr.into(), port), + )), + None => Err(anyhow!("No valid hostname in RelayUrl")), + } +} + +fn maybe_to_mapped_addr(ip_mapped_addrs: Option, addr: SocketAddr) -> SocketAddr { + if let Some(ip_mapped_addrs) = ip_mapped_addrs.as_ref() { + return ip_mapped_addrs.add(addr).addr(); + } + addr +} + /// Runs an ICMP IPv4 or IPv6 probe. /// /// The `pinger` is passed in so the ping sockets are only bound once diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 781514cc0d..02673aca50 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -34,7 +34,7 @@ use crate::{ dns::DnsDiscovery, pkarr::PkarrPublisher, ConcurrentDiscovery, Discovery, DiscoveryTask, }, dns::{default_resolver, DnsResolver}, - magicsock::{self, Handle, QuicMappedAddr}, + magicsock::{self, Handle, NodeIdMappedAddr}, tls, watchable::Watcher, }; @@ -163,6 +163,8 @@ impl Builder { 1 => Some(discovery.into_iter().next().expect("checked length")), _ => Some(Box::new(ConcurrentDiscovery::from_services(discovery))), }; + let server_config = static_config.create_server_config(self.alpn_protocols)?; + let msock_opts = magicsock::Options { addr_v4: self.addr_v4, addr_v6: self.addr_v6, @@ -172,12 +174,13 @@ impl Builder { discovery, proxy_url: self.proxy_url, dns_resolver, + server_config, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, #[cfg(any(test, feature = "test-utils"))] path_selection: self.path_selection, }; - Endpoint::bind(static_config, msock_opts, self.alpn_protocols).await + Endpoint::bind(static_config, msock_opts).await } // # The very common methods everyone basically needs. @@ -506,7 +509,6 @@ pub fn make_server_config( #[derive(Clone, Debug)] pub struct Endpoint { msock: Handle, - endpoint: quinn::Endpoint, rtt_actor: Arc, static_config: Arc, } @@ -528,38 +530,16 @@ impl Endpoint { /// This is for internal use, the public interface is the [`Builder`] obtained from /// [Self::builder]. See the methods on the builder for documentation of the parameters. #[instrument("ep", skip_all, fields(me = %static_config.secret_key.public().fmt_short()))] - async fn bind( - static_config: StaticConfig, - msock_opts: magicsock::Options, - initial_alpns: Vec>, - ) -> Result { + async fn bind(static_config: StaticConfig, msock_opts: magicsock::Options) -> Result { let msock = magicsock::MagicSock::spawn(msock_opts).await?; trace!("created magicsock"); - - let server_config = static_config.create_server_config(initial_alpns)?; - - let mut endpoint_config = quinn::EndpointConfig::default(); - // Setting this to false means that quinn will ignore packets that have the QUIC fixed bit - // set to 0. The fixed bit is the 3rd bit of the first byte of a packet. - // For performance reasons and to not rewrite buffers we pass non-QUIC UDP packets straight - // through to quinn. We set the first byte of the packet to zero, which makes quinn ignore - // the packet if grease_quic_bit is set to false. - endpoint_config.grease_quic_bit(false); - - let endpoint = quinn::Endpoint::new_with_abstract_socket( - endpoint_config, - Some(server_config), - Arc::new(msock.clone()), - Arc::new(quinn::TokioRuntime), - )?; - trace!("created quinn endpoint"); debug!(version = env!("CARGO_PKG_VERSION"), "iroh Endpoint created"); - Ok(Self { - msock, - endpoint, + let ep = Self { + msock: msock.clone(), rtt_actor: Arc::new(rtt_actor::RttHandle::new()), static_config: Arc::new(static_config), - }) + }; + Ok(ep) } /// Sets the list of accepted ALPN protocols. @@ -568,7 +548,7 @@ impl Endpoint { /// Note that this *overrides* the current list of ALPNs. pub fn set_alpns(&self, alpns: Vec>) -> Result<()> { let server_config = self.static_config.create_server_config(alpns)?; - self.endpoint.set_server_config(Some(server_config)); + self.msock.endpoint().set_server_config(Some(server_config)); Ok(()) } @@ -646,7 +626,7 @@ impl Endpoint { &self, node_id: NodeId, alpn: &[u8], - addr: QuicMappedAddr, + addr: NodeIdMappedAddr, ) -> Result { debug!("Attempting connection..."); let client_config = { @@ -666,7 +646,8 @@ impl Endpoint { // TODO: We'd eventually want to replace "localhost" with something that makes more sense. let connect = self - .endpoint + .msock + .endpoint() .connect_with(client_config, addr.0, "localhost")?; let connection = connect @@ -696,7 +677,7 @@ impl Endpoint { /// [`Endpoint::close`]. pub fn accept(&self) -> Accept<'_> { Accept { - inner: self.endpoint.accept(), + inner: self.msock.endpoint().accept(), ep: self.clone(), } } @@ -977,10 +958,6 @@ impl Endpoint { return; } - tracing::debug!("Closing connections"); - self.endpoint.close(0u16.into(), b""); - self.endpoint.wait_idle().await; - tracing::debug!("Connections closed"); self.msock.close().await; } @@ -1008,7 +985,7 @@ impl Endpoint { async fn get_mapping_addr_and_maybe_start_discovery( &self, node_addr: NodeAddr, - ) -> Result<(QuicMappedAddr, Option)> { + ) -> Result<(NodeIdMappedAddr, Option)> { let node_id = node_addr.node_id; // Only return a mapped addr if we have some way of dialing this node, in other @@ -1060,7 +1037,7 @@ impl Endpoint { } #[cfg(test)] pub(crate) fn endpoint(&self) -> &quinn::Endpoint { - &self.endpoint + self.msock.endpoint() } } diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 66c1c0c4ff..1e9284cff1 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -35,11 +35,14 @@ use concurrent_queue::ConcurrentQueue; use data_encoding::HEXLOWER; use futures_lite::{FutureExt, StreamExt}; use futures_util::{stream::BoxStream, task::AtomicWaker}; -use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl, SecretKey}; +use iroh_base::{ + IpMappedAddr, IpMappedAddrs, NodeAddr, NodeId, PublicKey, RelayUrl, SecretKey, MAPPED_ADDR_PORT, +}; use iroh_metrics::{inc, inc_by}; use iroh_relay::{protos::stun, RelayMap}; +use net_report::QuicConfig; use netwatch::{interfaces, ip::LocalAddresses, netmon, UdpSocket}; -use quinn::AsyncUdpSocket; +use quinn::{AsyncUdpSocket, ServerConfig}; use rand::{seq::SliceRandom, Rng, SeedableRng}; use relay_actor::RelaySendItem; use smallvec::{smallvec, SmallVec}; @@ -125,6 +128,9 @@ pub(crate) struct Options { /// Proxy configuration. pub(crate) proxy_url: Option, + /// ServerConfig for the internal QUIC endpoint + pub(crate) server_config: ServerConfig, + /// Skip verification of SSL certificates from relay servers /// /// May only be used in tests. @@ -138,15 +144,18 @@ pub(crate) struct Options { impl Default for Options { fn default() -> Self { + let secret_key = SecretKey::generate(rand::rngs::OsRng); + let server_config = make_default_server_config(&secret_key); Options { addr_v4: None, addr_v6: None, - secret_key: SecretKey::generate(rand::rngs::OsRng), + secret_key, relay_map: RelayMap::empty(), node_map: None, discovery: None, proxy_url: None, dns_resolver: crate::dns::default_resolver().clone(), + server_config, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, #[cfg(any(test, feature = "test-utils"))] @@ -155,6 +164,15 @@ impl Default for Options { } } +/// Generate a server config with no ALPNS and a default transport configuration +fn make_default_server_config(secret_key: &SecretKey) -> ServerConfig { + let quic_server_config = crate::tls::make_server_config(secret_key, vec![], false) + .expect("should generate valid config"); + let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config)); + server_config.transport_config(Arc::new(quinn::TransportConfig::default())); + server_config +} + /// Contents of a relay message. Use a SmallVec to avoid allocations for the very /// common case of a single packet. type RelayContents = SmallVec<[Bytes; 1]>; @@ -168,6 +186,8 @@ pub(crate) struct Handle { msock: Arc, // Empty when closed actor_tasks: Arc>>, + // quinn endpoint + endpoint: quinn::Endpoint, } /// Iroh connectivity layer. @@ -225,6 +245,8 @@ pub(crate) struct MagicSock { my_relay: Watchable>, /// Tracks the networkmap node entity for each node discovery key. node_map: NodeMap, + /// Tracks the mapped IP addresses + ip_mapped_addrs: IpMappedAddrs, /// UDP IPv4 socket pconn4: UdpConn, /// UDP IPv6 socket @@ -355,7 +377,7 @@ impl MagicSock { } /// Returns the socket address which can be used by the QUIC layer to dial this node. - pub(crate) fn get_mapping_addr(&self, node_id: NodeId) -> Option { + pub(crate) fn get_mapping_addr(&self, node_id: NodeId) -> Option { self.node_map.get_quic_mapped_addr_for_node_key(node_id) } @@ -427,31 +449,6 @@ impl MagicSock { Ok(addr) } - fn create_io_poller(&self) -> Pin> { - // To do this properly the MagicSock would need a registry of pollers. For each - // node we would look up the poller or create one. Then on each try_send we can - // look up the correct poller and configure it to poll the paths it needs. - // - // Note however that the current quinn impl calls UdpPoller::poll_writable() - // **before** it calls try_send(), as opposed to how it is documented. That is a - // problem as we would not yet know the path that needs to be polled. To avoid such - // ambiguity the API could be changed to a .poll_send(&self, cx: &mut Context, - // io_poller: Pin<&mut dyn UdpPoller>, transmit: &Transmit) -> Poll> - // instead of the existing .try_send() because then we would have control over this. - // - // Right now however we have one single poller behaving the same for each - // connection. It checks all paths and returns Poll::Ready as soon as any path is - // ready. - let ipv4_poller = self.pconn4.create_io_poller(); - let ipv6_poller = self.pconn6.as_ref().map(|sock| sock.create_io_poller()); - let relay_sender = self.relay_datagram_send_channel.clone(); - Box::pin(IoPoller { - ipv4_poller, - ipv6_poller, - relay_sender, - }) - } - /// Implementation for AsyncUdpSocket::try_send #[instrument(skip_all)] fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> { @@ -468,127 +465,180 @@ impl MagicSock { "connection closed", )); } - - let dest = QuicMappedAddr(transmit.destination); trace!( - dst = %dest, + dst = %transmit.destination, src = ?transmit.src_ip, len = %transmit.contents.len(), "sending", ); - let mut transmit = transmit.clone(); - match self - .node_map - .get_send_addrs(dest, self.ipv6_reported.load(Ordering::Relaxed)) - { - Some((node_id, udp_addr, relay_url, msgs)) => { - let mut pings_sent = false; - // If we have pings to send, we *have* to send them out first. - if !msgs.is_empty() { - if let Err(err) = self.try_send_ping_actions(msgs) { - warn!( - node = %node_id.fmt_short(), - "failed to handle ping actions: {err:#}", - ); - } - pings_sent = true; - } - let mut udp_sent = false; - let mut udp_error = None; - let mut relay_sent = false; - let mut relay_error = None; + // Check if the destination address is a NodeIdMappedAddr. If so, + // get the node's relay address and best direct address, as well + // as any pings that need to be sent for hole-punching purposes. + if let Ok(dest) = NodeIdMappedAddr::try_from(transmit.destination) { + let mut transmit = transmit.clone(); + match self + .node_map + .get_send_addrs(dest, self.ipv6_reported.load(Ordering::Relaxed)) + { + Some((node_id, udp_addr, relay_url, msgs)) => { + let mut pings_sent = false; + // If we have pings to send, we *have* to send them out first. + if !msgs.is_empty() { + if let Err(err) = self.try_send_ping_actions(msgs) { + warn!( + node = %node_id.fmt_short(), + "failed to handle ping actions: {err:#}", + ); + } + pings_sent = true; + } - // send udp - if let Some(addr) = udp_addr { - // rewrite target address - transmit.destination = addr; - match self.try_send_udp(addr, &transmit) { - Ok(()) => { - trace!(node = %node_id.fmt_short(), dst = %addr, + let mut udp_sent = false; + let mut udp_error = None; + let mut relay_sent = false; + let mut relay_error = None; + + // send udp + if let Some(addr) = udp_addr { + // rewrite target address + transmit.destination = addr; + match self.try_send_udp(addr, &transmit) { + Ok(()) => { + trace!(node = %node_id.fmt_short(), dst = %addr, "sent transmit over UDP"); - udp_sent = true; - } - Err(err) => { - // No need to print "WouldBlock" errors to the console - if err.kind() != io::ErrorKind::WouldBlock { - warn!( - node = %node_id.fmt_short(), - dst = %addr, - "failed to send udp: {err:#}" - ); + udp_sent = true; + } + Err(err) => { + // No need to print "WouldBlock" errors to the console + if err.kind() != io::ErrorKind::WouldBlock { + warn!( + node = %node_id.fmt_short(), + dst = %addr, + "failed to send udp: {err:#}" + ); + } + udp_error = Some(err); } - udp_error = Some(err); } } - } - // send relay - if let Some(ref relay_url) = relay_url { - match self.try_send_relay(relay_url, node_id, split_packets(&transmit)) { - Ok(()) => { - relay_sent = true; - } - Err(err) => { - relay_error = Some(err); + // send relay + if let Some(ref relay_url) = relay_url { + match self.try_send_relay(relay_url, node_id, split_packets(&transmit)) { + Ok(()) => { + relay_sent = true; + } + Err(err) => { + relay_error = Some(err); + } } } - } - let udp_pending = udp_error - .as_ref() - .map(|err| err.kind() == io::ErrorKind::WouldBlock) - .unwrap_or_default(); - let relay_pending = relay_error - .as_ref() - .map(|err| err.kind() == io::ErrorKind::WouldBlock) - .unwrap_or_default(); - if udp_pending && relay_pending { - // Handle backpressure. - Err(io::Error::new(io::ErrorKind::WouldBlock, "pending")) - } else { - if relay_sent || udp_sent { - trace!( - node = %node_id.fmt_short(), - send_udp = ?udp_addr, - send_relay = ?relay_url, - "sent transmit", - ); - } else if !pings_sent { - // Returning Ok here means we let QUIC handle a timeout for a lost - // packet, same would happen if we returned any errors. The - // philosophy of quinn-udp is that a UDP connection could come back - // at any time so these errors should be treated as transient and - // are just timeouts. Hence we opt for returning Ok. See - // test_try_send_no_udp_addr_or_relay_url to explore this further. - debug!( - node = %node_id.fmt_short(), - "no UDP or relay paths available for node, voiding transmit", - ); - // We log this as debug instead of error, because this is a - // situation that comes up under normal operation. If this were an - // error log, it would unnecessarily pollute logs. - // This situation happens essentially when `pings_sent` is false, - // `relay_url` is `None`, so `relay_sent` is false, and the UDP - // path is blocking, so `udp_sent` is false and `udp_pending` is - // true. - // Alternatively returning a WouldBlock error here would - // potentially needlessly block sending on the relay path for the - // next datagram. + let udp_pending = udp_error + .as_ref() + .map(|err| err.kind() == io::ErrorKind::WouldBlock) + .unwrap_or_default(); + let relay_pending = relay_error + .as_ref() + .map(|err| err.kind() == io::ErrorKind::WouldBlock) + .unwrap_or_default(); + if udp_pending && relay_pending { + // Handle backpressure. + return Err(io::Error::new(io::ErrorKind::WouldBlock, "pending")); + } else { + if relay_sent || udp_sent { + trace!( + node = %node_id.fmt_short(), + send_udp = ?udp_addr, + send_relay = ?relay_url, + "sent transmit", + ); + } else if !pings_sent { + // Returning Ok here means we let QUIC handle a timeout for a lost + // packet, same would happen if we returned any errors. The + // philosophy of quinn-udp is that a UDP connection could come back + // at any time so these errors should be treated as transient and + // are just timeouts. Hence we opt for returning Ok. See + // test_try_send_no_udp_addr_or_relay_url to explore this further. + debug!( + node = %node_id.fmt_short(), + "no UDP or relay paths available for node, voiding transmit", + ); + // We log this as debug instead of error, because this is a + // situation that comes up under normal operation. If this were an + // error log, it would unnecessarily pollute logs. + // This situation happens essentially when `pings_sent` is false, + // `relay_url` is `None`, so `relay_sent` is false, and the UDP + // path is blocking, so `udp_sent` is false and `udp_pending` is + // true. + // Alternatively returning a WouldBlock error here would + // potentially needlessly block sending on the relay path for the + // next datagram. + } + return Ok(()); } - Ok(()) + } + None => { + error!(%dest, "no NodeState for mapped address, voiding transmit"); + // Returning Ok here means we let QUIC timeout. Returning WouldBlock + // triggers a hot loop. Returning an error would immediately fail a + // connection. The philosophy of quinn-udp is that a UDP connection could + // come back at any time or missing should be transient so chooses to let + // these kind of errors time out. See test_try_send_no_send_addr to try + // this out. + return Ok(()); } } - None => { - error!(%dest, "no NodeState for mapped address, voiding transmit"); - // Returning Ok here means we let QUIC timeout. Returning WouldBlock - // triggers a hot loop. Returning an error would immediately fail a - // connection. The philosophy of quinn-udp is that a UDP connection could + } + // Check if the destination address is an IpMappedAddr, which we use for + // quic address discovery, whose endpoints don't have `NodeId`s. + // If so, send the packet over udp. + else if let Ok(dest) = IpMappedAddr::try_from(transmit.destination) { + let mut transmit = transmit.clone(); + + // Get the socket addr + if let Some(addr) = self.ip_mapped_addrs.get_ip_addr(&dest) { + // rewrite target address + transmit.destination = addr; + // send udp + match self.try_send_udp(addr, &transmit) { + Ok(()) => { + trace!(dst = %addr, + "sent IpMapped transmit over UDP"); + } + Err(err) => { + // No need to print "WouldBlock" errors to the console + if err.kind() == io::ErrorKind::WouldBlock { + return Err(io::Error::new(io::ErrorKind::WouldBlock, "pending")); + } else { + warn!( + dst = %addr, + "failed to send IpMapped message over udp: {err:#}" + ); + } + } + } + return Ok(()); + } else { + error!(%dest, "no socketaddr for mapped address, voiding transmit"); + // Returning Ok here means we let QUIC timeout. + // Returning an error would immediately fail a connection. + // The philosophy of quinn-udp is that a UDP connection could // come back at any time or missing should be transient so chooses to let // these kind of errors time out. See test_try_send_no_send_addr to try // this out. - Ok(()) + return Ok(()); } + } else { + error!(%transmit.destination, "unrecognized address, voiding transmit"); + // Returning Ok here means we let QUIC timeout. + // Returning an error would immediately fail a connection. + // The philosophy of quinn-udp is that a UDP connection could + // come back at any time or missing should be transient so chooses to let + // these kind of errors time out. See test_try_send_no_send_addr to try + // this out. + return Ok(()); } } @@ -737,7 +787,7 @@ impl MagicSock { /// /// All the `bufs` and `metas` should have initialized packets in them. /// - /// This fixes up the datagrams to use the correct [`QuicMappedAddr`] and extracts DISCO + /// This fixes up the datagrams to use the correct [`NodeIdMappedAddr`] and extracts DISCO /// packets, processing them inside the magic socket. fn process_udp_datagrams( &self, @@ -749,7 +799,7 @@ impl MagicSock { // Adding the IP address we received something on results in Quinn using this // address on the send path to send from. However we let Quinn use a - // QuicMappedAddress, not a real address. So we used to substitute our bind address + // NodeIdMappedAddress, not a real address. So we used to substitute our bind address // here so that Quinn would send on the right address. But that would sometimes // result in the wrong address family and Windows trips up on that. // @@ -816,18 +866,32 @@ impl MagicSock { } if buf_contains_quic_datagrams { - // Update the NodeMap and remap RecvMeta to the QuicMappedAddr. + // Update the NodeMap and remap RecvMeta to the NodeIdMappedAddr. match self.node_map.receive_udp(meta.addr) { None => { - warn!( - src = ?meta.addr, - count = %quic_datagram_count, - len = meta.len, - "UDP recv quic packets: no node state found, skipping", - ); - // If we have no node state for the from addr, set len to 0 to make - // quinn skip the buf completely. - meta.len = 0; + // Check if this address is mapped to an IpMappedAddr + if let Some(ip_mapped_addr) = + self.ip_mapped_addrs.get_mapped_addr(&meta.addr) + { + trace!( + src = ?meta.addr, + count = %quic_datagram_count, + len = meta.len, + "UDP recv QUIC address discovery packets", + ); + quic_packets_total += quic_datagram_count; + meta.addr = ip_mapped_addr.addr(); + } else { + warn!( + src = ?meta.addr, + count = %quic_datagram_count, + len = meta.len, + "UDP recv quic packets: no node state found, skipping", + ); + // If we have no node state for the from addr, set len to 0 to make + // quinn skip the buf completely. + meta.len = 0; + } } Some((node_id, quic_mapped_addr)) => { trace!( @@ -1519,6 +1583,7 @@ impl Handle { discovery, dns_resolver, proxy_url, + server_config, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify, #[cfg(any(test, feature = "test-utils"))] @@ -1540,8 +1605,13 @@ impl Handle { let ipv4_addr = pconn4.local_addr()?; let ipv6_addr = pconn6.as_ref().and_then(|c| c.local_addr().ok()); - let net_reporter = - net_report::Client::new(Some(port_mapper.clone()), dns_resolver.clone())?; + let ip_mapped_addrs = IpMappedAddrs::default(); + + let net_reporter = net_report::Client::new( + Some(port_mapper.clone()), + dns_resolver.clone(), + Some(ip_mapped_addrs.clone()), + )?; let pconn4_sock = pconn4.as_socket(); let pconn6_sock = pconn6.as_ref().map(|p| p.as_socket()); @@ -1581,6 +1651,7 @@ impl Handle { pconn6, disco_secrets: DiscoSecrets::default(), node_map, + ip_mapped_addrs, udp_disco_sender, discovery, direct_addrs: Default::default(), @@ -1591,6 +1662,21 @@ impl Handle { insecure_skip_relay_cert_verify, }); + let mut endpoint_config = quinn::EndpointConfig::default(); + // Setting this to false means that quinn will ignore packets that have the QUIC fixed bit + // set to 0. The fixed bit is the 3rd bit of the first byte of a packet. + // For performance reasons and to not rewrite buffers we pass non-QUIC UDP packets straight + // through to quinn. We set the first byte of the packet to zero, which makes quinn ignore + // the packet if grease_quic_bit is set to false. + endpoint_config.grease_quic_bit(false); + + let endpoint = quinn::Endpoint::new_with_abstract_socket( + endpoint_config, + Some(server_config), + inner.clone(), + Arc::new(quinn::TokioRuntime), + )?; + let mut actor_tasks = JoinSet::default(); let relay_actor = RelayActor::new(inner.clone(), relay_datagram_recv_queue); @@ -1615,6 +1701,7 @@ impl Handle { let inner2 = inner.clone(); let network_monitor = netmon::Monitor::new().await?; + let qad_endpoint = endpoint.clone(); actor_tasks.spawn( async move { let actor = Actor { @@ -1631,6 +1718,7 @@ impl Handle { no_v4_send: false, net_reporter, network_monitor, + qad_endpoint, }; if let Err(err) = actor.run().await { @@ -1643,11 +1731,17 @@ impl Handle { let c = Handle { msock: inner, actor_tasks: Arc::new(Mutex::new(actor_tasks)), + endpoint, }; Ok(c) } + /// The underlying [`quinn::Endpoint`] + pub fn endpoint(&self) -> &quinn::Endpoint { + &self.endpoint + } + /// Closes the connection. /// /// Only the first close does anything. Any later closes return nil. @@ -1655,6 +1749,10 @@ impl Handle { /// indefinitely after this call. #[instrument(skip_all, fields(me = %self.msock.me))] pub(crate) async fn close(&self) { + tracing::debug!("Closing connections"); + self.endpoint.close(0u16.into(), b""); + self.endpoint.wait_idle().await; + if self.msock.is_closed() { return; } @@ -1682,7 +1780,6 @@ impl Handle { .await; if shutdown_done.is_ok() { debug!("tasks shutdown complete"); - } else { // shutdown all tasks debug!("aborting remaining {}/3 tasks", tasks.len()); tasks.shutdown().await; @@ -1719,7 +1816,6 @@ impl DiscoSecrets { }); disco::encode_message(&this_node_id, seal).into() } - fn unseal_and_decode( &self, secret: &crypto_box::SecretKey, @@ -1887,27 +1983,48 @@ impl RelayDatagramRecvQueue { } } -impl AsyncUdpSocket for Handle { +impl AsyncUdpSocket for MagicSock { fn create_io_poller(self: Arc) -> Pin> { - self.msock.create_io_poller() + // To do this properly the MagicSock would need a registry of pollers. For each + // node we would look up the poller or create one. Then on each try_send we can + // look up the correct poller and configure it to poll the paths it needs. + // + // Note however that the current quinn impl calls UdpPoller::poll_writable() + // **before** it calls try_send(), as opposed to how it is documented. That is a + // problem as we would not yet know the path that needs to be polled. To avoid such + // ambiguity the API could be changed to a .poll_send(&self, cx: &mut Context, + // io_poller: Pin<&mut dyn UdpPoller>, transmit: &Transmit) -> Poll> + // instead of the existing .try_send() because then we would have control over this. + // + // Right now however we have one single poller behaving the same for each + // connection. It checks all paths and returns Poll::Ready as soon as any path is + // ready. + let ipv4_poller = self.pconn4.create_io_poller(); + let ipv6_poller = self.pconn6.as_ref().map(|sock| sock.create_io_poller()); + let relay_sender = self.relay_datagram_send_channel.clone(); + Box::pin(IoPoller { + ipv4_poller, + ipv6_poller, + relay_sender, + }) } fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> { - self.msock.try_send(transmit) + self.try_send(transmit) } - /// NOTE: Receiving on a [`Self::close`]d socket will return [`Poll::Pending`] indefinitely. + /// NOTE: Receiving on a closed socket will return [`Poll::Pending`] indefinitely. fn poll_recv( &self, cx: &mut Context, bufs: &mut [io::IoSliceMut<'_>], metas: &mut [quinn_udp::RecvMeta], ) -> Poll> { - self.msock.poll_recv(cx, bufs, metas) + self.poll_recv(cx, bufs, metas) } fn local_addr(&self) -> io::Result { - match &*self.msock.local_addrs.read().expect("not poisoned") { + match &*self.local_addrs.read().expect("not poisoned") { (ipv4, None) => { // Pretend to be IPv6, because our QuinnMappedAddrs // need to be IPv6. @@ -2020,6 +2137,11 @@ struct Actor { net_reporter: net_report::Client, network_monitor: netmon::Monitor, + + /// The internal quinn::Endpoint + /// + /// Needed for Quic Address Discovery + qad_endpoint: quinn::Endpoint, } impl Actor { @@ -2428,10 +2550,24 @@ impl Actor { } let relay_map = self.msock.relay_map.clone(); - let opts = net_report::Options::default() + let mut opts = net_report::Options::default() .stun_v4(Some(self.pconn4.clone())) .stun_v6(self.pconn6.clone()); + // create a client config for the endpoint to use for QUIC address discovery + let root_store = + rustls::RootCertStore::from_iter(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + let client_config = rustls::ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + let quic_config = Some(QuicConfig { + ep: self.qad_endpoint.clone(), + client_config, + ipv4: true, + ipv6: self.pconn6.is_some(), + }); + opts = opts.quic_config(quic_config); + debug!("requesting net_report report"); match self.net_reporter.get_report_channel(relay_map, opts).await { Ok(rx) => { @@ -2746,12 +2882,12 @@ fn split_packets(transmit: &quinn_udp::Transmit) -> RelayContents { /// comes in as the inner [`SocketAddr`], in those interfaces we have to be careful to do /// the conversion to this type. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub(crate) struct QuicMappedAddr(pub(crate) SocketAddr); +pub(crate) struct NodeIdMappedAddr(pub(crate) SocketAddr); -/// Counter to always generate unique addresses for [`QuicMappedAddr`]. -static ADDR_COUNTER: AtomicU64 = AtomicU64::new(1); +/// Counter to always generate unique addresses for [`NodeIdMappedAddr`]. +static NODE_ID_ADDR_COUNTER: AtomicU64 = AtomicU64::new(1); -impl QuicMappedAddr { +impl NodeIdMappedAddr { /// The Prefix/L of our Unique Local Addresses. const ADDR_PREFIXL: u8 = 0xfd; /// The Global ID used in our Unique Local Addresses. @@ -2768,18 +2904,41 @@ impl QuicMappedAddr { addr[1..6].copy_from_slice(&Self::ADDR_GLOBAL_ID); addr[6..8].copy_from_slice(&Self::ADDR_SUBNET); - let counter = ADDR_COUNTER.fetch_add(1, Ordering::Relaxed); + let counter = NODE_ID_ADDR_COUNTER.fetch_add(1, Ordering::Relaxed); addr[8..16].copy_from_slice(&counter.to_be_bytes()); - Self(SocketAddr::new(IpAddr::V6(Ipv6Addr::from(addr)), 12345)) + Self(SocketAddr::new( + IpAddr::V6(Ipv6Addr::from(addr)), + MAPPED_ADDR_PORT, + )) } } -impl std::fmt::Display for QuicMappedAddr { +impl TryFrom for NodeIdMappedAddr { + type Error = anyhow::Error; + + fn try_from(value: SocketAddr) -> std::result::Result { + match value { + SocketAddr::V4(_) => anyhow::bail!("NodeIdMappedAddrs are all Ipv6, addr {value:?}"), + SocketAddr::V6(addr) => { + if addr.port() != MAPPED_ADDR_PORT { + anyhow::bail!("not a mapped addr"); + } + let octets = addr.ip().octets(); + if octets[6..8] != NodeIdMappedAddr::ADDR_SUBNET { + anyhow::bail!("not a NodeIdMappedAddr"); + } + Ok(NodeIdMappedAddr(value)) + } + } + } +} +impl std::fmt::Display for NodeIdMappedAddr { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "QuicMappedAddr({})", self.0) + write!(f, "NodeIdMappedAddr({})", self.0) } } + fn disco_message_sent(msg: &disco::Message) { match msg { disco::Message::Ping(_) => { @@ -3817,7 +3976,13 @@ mod tests { /// /// Use [`magicsock_connect`] to establish connections. #[instrument(name = "ep", skip_all, fields(me = secret_key.public().fmt_short()))] - async fn magicsock_ep(secret_key: SecretKey) -> anyhow::Result<(quinn::Endpoint, Handle)> { + async fn magicsock_ep(secret_key: SecretKey) -> anyhow::Result { + let server_config = crate::endpoint::make_server_config( + &secret_key, + vec![ALPN.to_vec()], + Arc::new(quinn::TransportConfig::default()), + true, + )?; let opts = Options { addr_v4: None, addr_v6: None, @@ -3827,25 +3992,12 @@ mod tests { discovery: None, dns_resolver: crate::dns::default_resolver().clone(), proxy_url: None, + server_config, insecure_skip_relay_cert_verify: true, path_selection: PathSelection::default(), }; let msock = MagicSock::spawn(opts).await?; - let server_config = crate::endpoint::make_server_config( - &secret_key, - vec![ALPN.to_vec()], - Arc::new(quinn::TransportConfig::default()), - true, - )?; - let mut endpoint_config = quinn::EndpointConfig::default(); - endpoint_config.grease_quic_bit(false); - let endpoint = quinn::Endpoint::new_with_abstract_socket( - endpoint_config, - Some(server_config), - Arc::new(msock.clone()), - Arc::new(quinn::TokioRuntime), - )?; - Ok((endpoint, msock)) + Ok(msock) } /// Connects from `ep` returned by [`magicsock_ep`] to the `node_id`. @@ -3855,14 +4007,14 @@ mod tests { async fn magicsock_connect( ep: &quinn::Endpoint, ep_secret_key: SecretKey, - addr: QuicMappedAddr, + addr: NodeIdMappedAddr, node_id: NodeId, ) -> Result { // Endpoint::connect sets this, do the same to have similar behaviour. let mut transport_config = quinn::TransportConfig::default(); transport_config.keep_alive_interval(Some(Duration::from_secs(1))); - magicsock_connet_with_transport_config( + magicsock_connect_with_transport_config( ep, ep_secret_key, addr, @@ -3878,10 +4030,10 @@ mod tests { /// /// Uses [`ALPN`], `node_id`, must match `addr`. #[instrument(name = "connect", skip_all, fields(me = ep_secret_key.public().fmt_short()))] - async fn magicsock_connet_with_transport_config( + async fn magicsock_connect_with_transport_config( ep: &quinn::Endpoint, ep_secret_key: SecretKey, - addr: QuicMappedAddr, + addr: NodeIdMappedAddr, node_id: NodeId, transport_config: Arc, ) -> Result { @@ -3907,10 +4059,10 @@ mod tests { let secret_key_missing_node = SecretKey::from_bytes(&[255u8; 32]); let node_id_missing_node = secret_key_missing_node.public(); - let (ep_1, msock_1) = magicsock_ep(secret_key_1.clone()).await.unwrap(); + let msock_1 = magicsock_ep(secret_key_1.clone()).await.unwrap(); // Generate an address not present in the NodeMap. - let bad_addr = QuicMappedAddr::generate(); + let bad_addr = NodeIdMappedAddr::generate(); // 500ms is rather fast here. Running this locally it should always be the correct // timeout. If this is too slow however the test will not become flaky as we are @@ -3918,14 +4070,19 @@ mod tests { // this speeds up the test. let res = tokio::time::timeout( Duration::from_millis(500), - magicsock_connect(&ep_1, secret_key_1.clone(), bad_addr, node_id_missing_node), + magicsock_connect( + msock_1.endpoint(), + secret_key_1.clone(), + bad_addr, + node_id_missing_node, + ), ) .await; assert!(res.is_err(), "expecting timeout"); // Now check we can still create another connection with this endpoint. - let (ep_2, msock_2) = magicsock_ep(secret_key_2.clone()).await.unwrap(); - + let msock_2 = magicsock_ep(secret_key_2.clone()).await.unwrap(); + let ep_2 = msock_2.endpoint().clone(); // This needs an accept task let accept_task = tokio::spawn({ async fn accept(ep: quinn::Endpoint) -> Result<()> { @@ -3937,7 +4094,6 @@ mod tests { info!("accept finished"); Ok(()) } - let ep_2 = ep_2.clone(); async move { if let Err(err) = accept(ep_2).await { error!("{err:#}"); @@ -3970,7 +4126,7 @@ mod tests { let addr = msock_1.get_mapping_addr(node_id_2).unwrap(); let res = tokio::time::timeout( Duration::from_secs(10), - magicsock_connect(&ep_1, secret_key_1.clone(), addr, node_id_2), + magicsock_connect(msock_1.endpoint(), secret_key_1.clone(), addr, node_id_2), ) .await .expect("timeout while connecting"); @@ -3992,8 +4148,9 @@ mod tests { let secret_key_2 = SecretKey::from_bytes(&[2u8; 32]); let node_id_2 = secret_key_2.public(); - let (ep_1, msock_1) = magicsock_ep(secret_key_1.clone()).await.unwrap(); - let (ep_2, msock_2) = magicsock_ep(secret_key_2.clone()).await.unwrap(); + let msock_1 = magicsock_ep(secret_key_1.clone()).await.unwrap(); + let msock_2 = magicsock_ep(secret_key_2.clone()).await.unwrap(); + let ep_2 = msock_2.endpoint().clone(); // We need a task to accept the connection. let accept_task = tokio::spawn({ @@ -4005,7 +4162,6 @@ mod tests { info!("accept finished"); Ok(()) } - let ep_2 = ep_2.clone(); async move { if let Err(err) = accept(ep_2).await { error!("{err:#}"); @@ -4040,8 +4196,8 @@ mod tests { // little slower though. let mut transport_config = quinn::TransportConfig::default(); transport_config.max_idle_timeout(Some(Duration::from_millis(200).try_into().unwrap())); - let res = magicsock_connet_with_transport_config( - &ep_1, + let res = magicsock_connect_with_transport_config( + msock_1.endpoint(), secret_key_1.clone(), addr_2, node_id_2, @@ -4073,9 +4229,10 @@ mod tests { // We can now connect tokio::time::timeout(Duration::from_secs(10), async move { info!("establishing new connection"); - let conn = magicsock_connect(&ep_1, secret_key_1.clone(), addr_2, node_id_2) - .await - .unwrap(); + let conn = + magicsock_connect(msock_1.endpoint(), secret_key_1.clone(), addr_2, node_id_2) + .await + .unwrap(); info!("have connection"); let mut stream = conn.open_uni().await.unwrap(); stream.write_all(b"hello").await.unwrap(); diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index c466de7ba3..edabf0826f 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -17,7 +17,7 @@ use self::{ node_state::{NodeState, Options, PingHandled}, }; use super::{ - metrics::Metrics as MagicsockMetrics, ActorMessage, DiscoMessageSource, QuicMappedAddr, + metrics::Metrics as MagicsockMetrics, ActorMessage, DiscoMessageSource, NodeIdMappedAddr, }; #[cfg(any(test, feature = "test-utils"))] use crate::endpoint::PathSelection; @@ -45,7 +45,7 @@ const MAX_INACTIVE_NODES: usize = 30; /// - The node's ID in this map, only useful if you know the ID from an insert or lookup. /// This is static and never changes. /// -/// - The [`QuicMappedAddr`] which internally identifies the node to the QUIC stack. This +/// - The [`NodeIdMappedAddr`] which internally identifies the node to the QUIC stack. This /// is static and never changes. /// /// - The nodes's public key, aka `PublicKey` or "node_key". This is static and never changes, @@ -54,7 +54,7 @@ const MAX_INACTIVE_NODES: usize = 30; /// - A public socket address on which they are reachable on the internet, known as ip-port. /// These come and go as the node moves around on the internet /// -/// An index of nodeInfos by node key, QuicMappedAddr, and discovered ip:port endpoints. +/// An index of nodeInfos by node key, NodeIdMappedAddr, and discovered ip:port endpoints. #[derive(Default, Debug)] pub(super) struct NodeMap { inner: Mutex, @@ -64,7 +64,7 @@ pub(super) struct NodeMap { pub(super) struct NodeMapInner { by_node_key: HashMap, by_ip_port: HashMap, - by_quic_mapped_addr: HashMap, + by_quic_mapped_addr: HashMap, by_id: HashMap, next_id: usize, #[cfg(any(test, feature = "test-utils"))] @@ -79,7 +79,7 @@ pub(super) struct NodeMapInner { enum NodeStateKey { Idx(usize), NodeId(NodeId), - QuicMappedAddr(QuicMappedAddr), + NodeIdMappedAddr(NodeIdMappedAddr), IpPort(IpPort), } @@ -158,11 +158,14 @@ impl NodeMap { self.inner.lock().expect("poisoned").node_count() } - pub(super) fn receive_udp(&self, udp_addr: SocketAddr) -> Option<(PublicKey, QuicMappedAddr)> { + pub(super) fn receive_udp( + &self, + udp_addr: SocketAddr, + ) -> Option<(PublicKey, NodeIdMappedAddr)> { self.inner.lock().expect("poisoned").receive_udp(udp_addr) } - pub(super) fn receive_relay(&self, relay_url: &RelayUrl, src: NodeId) -> QuicMappedAddr { + pub(super) fn receive_relay(&self, relay_url: &RelayUrl, src: NodeId) -> NodeIdMappedAddr { self.inner .lock() .expect("poisoned") @@ -201,7 +204,7 @@ impl NodeMap { pub(super) fn get_quic_mapped_addr_for_node_key( &self, node_key: NodeId, - ) -> Option { + ) -> Option { self.inner .lock() .expect("poisoned") @@ -245,7 +248,7 @@ impl NodeMap { #[allow(clippy::type_complexity)] pub(super) fn get_send_addrs( &self, - addr: QuicMappedAddr, + addr: NodeIdMappedAddr, have_ipv6: bool, ) -> Option<( PublicKey, @@ -254,7 +257,7 @@ impl NodeMap { Vec, )> { let mut inner = self.inner.lock().expect("poisoned"); - let ep = inner.get_mut(NodeStateKey::QuicMappedAddr(addr))?; + let ep = inner.get_mut(NodeStateKey::NodeIdMappedAddr(addr))?; let public_key = *ep.public_key(); trace!(dest = %addr, node_id = %public_key.fmt_short(), "dst mapped to NodeId"); let (udp_addr, relay_url, msgs) = ep.get_send_addrs(have_ipv6); @@ -404,7 +407,7 @@ impl NodeMapInner { match id { NodeStateKey::Idx(id) => Some(id), NodeStateKey::NodeId(node_key) => self.by_node_key.get(&node_key).copied(), - NodeStateKey::QuicMappedAddr(addr) => self.by_quic_mapped_addr.get(&addr).copied(), + NodeStateKey::NodeIdMappedAddr(addr) => self.by_quic_mapped_addr.get(&addr).copied(), NodeStateKey::IpPort(ipp) => self.by_ip_port.get(&ipp).copied(), } } @@ -435,7 +438,7 @@ impl NodeMapInner { } /// Marks the node we believe to be at `ipp` as recently used. - fn receive_udp(&mut self, udp_addr: SocketAddr) -> Option<(NodeId, QuicMappedAddr)> { + fn receive_udp(&mut self, udp_addr: SocketAddr) -> Option<(NodeId, NodeIdMappedAddr)> { let ip_port: IpPort = udp_addr.into(); let Some(node_state) = self.get_mut(NodeStateKey::IpPort(ip_port)) else { info!(src=%udp_addr, "receive_udp: no node_state found for addr, ignore"); @@ -446,7 +449,7 @@ impl NodeMapInner { } #[instrument(skip_all, fields(src = %src.fmt_short()))] - fn receive_relay(&mut self, relay_url: &RelayUrl, src: NodeId) -> QuicMappedAddr { + fn receive_relay(&mut self, relay_url: &RelayUrl, src: NodeId) -> NodeIdMappedAddr { #[cfg(any(test, feature = "test-utils"))] let path_selection = self.path_selection; let node_state = self.get_or_insert_with(NodeStateKey::NodeId(src), || { diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index d116be6695..cf5ba41259 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -24,7 +24,7 @@ use super::{ use crate::endpoint::PathSelection; use crate::{ disco::{self, SendAddr}, - magicsock::{ActorMessage, MagicsockMetrics, QuicMappedAddr, Timer, HEARTBEAT_INTERVAL}, + magicsock::{ActorMessage, MagicsockMetrics, NodeIdMappedAddr, Timer, HEARTBEAT_INTERVAL}, watchable::{Watchable, Watcher}, }; @@ -103,7 +103,7 @@ pub(super) struct NodeState { /// [`NodeMap`]: super::NodeMap id: usize, /// The UDP address used on the QUIC-layer to address this node. - quic_mapped_addr: QuicMappedAddr, + quic_mapped_addr: NodeIdMappedAddr, /// The global identifier for this endpoint. node_id: NodeId, /// The last time we pinged all endpoints. @@ -156,7 +156,7 @@ pub(super) struct Options { impl NodeState { pub(super) fn new(id: usize, options: Options) -> Self { - let quic_mapped_addr = QuicMappedAddr::generate(); + let quic_mapped_addr = NodeIdMappedAddr::generate(); if options.relay_url.is_some() { // we potentially have a relay connection to the node @@ -191,7 +191,7 @@ impl NodeState { &self.node_id } - pub(super) fn quic_mapped_addr(&self) -> &QuicMappedAddr { + pub(super) fn quic_mapped_addr(&self) -> &NodeIdMappedAddr { &self.quic_mapped_addr } @@ -1487,7 +1487,7 @@ mod tests { ( NodeState { id: 0, - quic_mapped_addr: QuicMappedAddr::generate(), + quic_mapped_addr: NodeIdMappedAddr::generate(), node_id: key.public(), last_full_ping: None, relay_url: None, @@ -1517,7 +1517,7 @@ mod tests { let key = SecretKey::generate(rand::thread_rng()); NodeState { id: 1, - quic_mapped_addr: QuicMappedAddr::generate(), + quic_mapped_addr: NodeIdMappedAddr::generate(), node_id: key.public(), last_full_ping: None, relay_url: relay_and_state(key.public(), send_addr.clone()), @@ -1538,7 +1538,7 @@ mod tests { let key = SecretKey::generate(rand::thread_rng()); NodeState { id: 2, - quic_mapped_addr: QuicMappedAddr::generate(), + quic_mapped_addr: NodeIdMappedAddr::generate(), node_id: key.public(), last_full_ping: None, relay_url: Some(( @@ -1582,7 +1582,7 @@ mod tests { ( NodeState { id: 3, - quic_mapped_addr: QuicMappedAddr::generate(), + quic_mapped_addr: NodeIdMappedAddr::generate(), node_id: key.public(), last_full_ping: None, relay_url: relay_and_state(key.public(), send_addr.clone()),