diff --git a/iroh-base/src/node_addr.rs b/iroh-base/src/node_addr.rs index d98e121313..346e3f525a 100644 --- a/iroh-base/src/node_addr.rs +++ b/iroh-base/src/node_addr.rs @@ -74,7 +74,7 @@ impl NodeAddr { pub fn from_parts( node_id: PublicKey, relay_url: Option, - direct_addresses: Vec, + direct_addresses: impl IntoIterator, ) -> Self { Self { node_id, diff --git a/iroh-base/src/ticket/blob.rs b/iroh-base/src/ticket/blob.rs index de51798270..ea0ac3982e 100644 --- a/iroh-base/src/ticket/blob.rs +++ b/iroh-base/src/ticket/blob.rs @@ -132,7 +132,7 @@ mod tests { let relay_url = None; BlobTicket { hash, - node: NodeAddr::from_parts(peer, relay_url, vec![addr]), + node: NodeAddr::from_parts(peer, relay_url, [addr]), format: BlobFormat::HashSeq, } } @@ -163,7 +163,7 @@ mod tests { .unwrap(); let ticket = BlobTicket { - node: NodeAddr::from_parts(node_id, None, vec![]), + node: NodeAddr::from_parts(node_id, None, []), format: BlobFormat::Raw, hash, }; diff --git a/iroh-base/src/ticket/node.rs b/iroh-base/src/ticket/node.rs index eba67b7e7e..a6715892af 100644 --- a/iroh-base/src/ticket/node.rs +++ b/iroh-base/src/ticket/node.rs @@ -128,7 +128,7 @@ mod tests { let addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 1234)); let relay_url = None; NodeTicket { - node: NodeAddr::from_parts(peer, relay_url, vec![addr]), + node: NodeAddr::from_parts(peer, relay_url, [addr]), } } @@ -158,7 +158,7 @@ mod tests { node: NodeAddr::from_parts( node_id, Some("http://derp.me./".parse().unwrap()), - vec!["127.0.0.1:1024".parse().unwrap()], + ["127.0.0.1:1024".parse().unwrap()], ), }; let base32 = base32::parse_vec(ticket.to_string().strip_prefix("node").unwrap()).unwrap(); diff --git a/iroh-cli/src/commands/blobs.rs b/iroh-cli/src/commands/blobs.rs index b32623db69..0be5657a87 100644 --- a/iroh-cli/src/commands/blobs.rs +++ b/iroh-cli/src/commands/blobs.rs @@ -211,7 +211,7 @@ impl BlobCommands { address } else { // use both the cli supplied ones and the ticket ones - address.extend(info.direct_addresses.into_iter()); + address.extend(info.direct_addresses); address }; diff --git a/iroh-docs/src/ticket.rs b/iroh-docs/src/ticket.rs index f7c763783c..0af620e49c 100644 --- a/iroh-docs/src/ticket.rs +++ b/iroh-docs/src/ticket.rs @@ -87,7 +87,7 @@ mod tests { let ticket = DocTicket { capability: Capability::Read(namespace_id), - nodes: vec![NodeAddr::from_parts(node_id, None, vec![])], + nodes: vec![NodeAddr::from_parts(node_id, None, [])], }; let base32 = base32::parse_vec(ticket.to_string().strip_prefix("doc").unwrap()).unwrap(); let expected = parse_hexdump(" diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index 23add64dba..02799609c6 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -805,11 +805,11 @@ impl Stream for TopicCommandStream { } } -fn our_peer_data(endpoint: &Endpoint, direct_addresses: &[DirectAddr]) -> Result { +fn our_peer_data(endpoint: &Endpoint, direct_addresses: &BTreeSet) -> Result { let addr = NodeAddr::from_parts( endpoint.node_id(), endpoint.home_relay(), - direct_addresses.iter().map(|x| x.addr).collect(), + direct_addresses.iter().map(|x| x.addr), ); encode_peer_data(&addr.info) } diff --git a/iroh-net/src/endpoint.rs b/iroh-net/src/endpoint.rs index c121358931..54630f94f2 100644 --- a/iroh-net/src/endpoint.rs +++ b/iroh-net/src/endpoint.rs @@ -781,8 +781,11 @@ impl Endpoint { .await .ok_or(anyhow!("No IP endpoints found"))?; let relay = self.home_relay(); - let addrs = addrs.into_iter().map(|x| x.addr).collect(); - Ok(NodeAddr::from_parts(self.node_id(), relay, addrs)) + Ok(NodeAddr::from_parts( + self.node_id(), + relay, + addrs.into_iter().map(|x| x.addr), + )) } /// Returns the [`RelayUrl`] of the Relay server used as home relay. diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index bd74fc6420..1a9341e18f 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -16,14 +16,14 @@ //! however, read any packets that come off the UDP sockets. use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, BTreeSet, HashMap}, fmt::Display, io, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, pin::Pin, sync::{ atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering}, - Arc, + Arc, RwLock, }, task::{Context, Poll, Waker}, time::{Duration, Instant}, @@ -229,7 +229,7 @@ pub(crate) struct MagicSock { discovery: Option>, /// Our discovered direct addresses. - direct_addrs: Watchable, + direct_addrs: DiscoveredDirectAddrs, /// List of CallMeMaybe disco messages that should be sent out after the next endpoint update /// completes @@ -320,10 +320,7 @@ impl MagicSock { /// To get the current direct addresses, drop the stream after the first item was /// received. pub(crate) fn direct_addresses(&self) -> DirectAddrsStream { - DirectAddrsStream { - initial: Some(self.direct_addrs.get()), - inner: self.direct_addrs.watch().into_stream(), - } + self.direct_addrs.updates_stream() } /// Watch for changes to the home relay. @@ -365,11 +362,10 @@ impl MagicSock { /// Add addresses for a node to the magic socket's addresbook. #[instrument(skip_all, fields(me = %self.me))] pub fn add_node_addr(&self, mut addr: NodeAddr, source: node_map::Source) -> Result<()> { - let my_addresses = self.direct_addrs.get().addrs; let mut pruned = 0; - for my_addr in my_addresses.into_iter().map(|ep| ep.addr) { + for my_addr in self.direct_addrs.sockaddrs() { if addr.info.direct_addresses.remove(&my_addr) { - warn!(node_id=addr.node_id.fmt_short(), %my_addr, %source, "not adding our addr for node"); + warn!( node_id=addr.node_id.fmt_short(), %my_addr, %source, "not adding our addr for node"); pruned += 1; } } @@ -385,19 +381,15 @@ impl MagicSock { } } - /// Updates our direct addresses. + /// Stores a new set of direct addresses. /// - /// On a successful update, our address is published to discovery. - pub(super) fn update_direct_addresses(&self, eps: Vec) { - let updated = self - .direct_addrs - .update(DiscoveredDirectAddrs::new(eps)) - .is_ok(); + /// If the direct addresses have changed from the previous set, they are published to + /// discovery. + pub(super) fn store_direct_addresses(&self, addrs: BTreeSet) { + let updated = self.direct_addrs.update(addrs); if updated { - let direct_addrs = self.direct_addrs.read(); - direct_addrs.log_direct_addrs_change(); self.node_map - .on_direct_addr_discovered(direct_addrs.iter().map(|addr| addr.addr)); + .on_direct_addr_discovered(self.direct_addrs.sockaddrs()); self.publish_my_addr(); } } @@ -1214,7 +1206,7 @@ impl MagicSock { } fn send_queued_call_me_maybes(&self) { - let msg = self.direct_addrs.read().to_call_me_maybe_message(); + let msg = self.direct_addrs.to_call_me_maybe_message(); let msg = disco::Message::CallMeMaybe(msg); for (public_key, url) in self.pending_call_me_maybes.lock().drain() { if !self.send_disco_message_relay(&url, public_key, msg.clone()) { @@ -1223,26 +1215,33 @@ impl MagicSock { } } + /// Sends the call-me-maybe DISCO message, queuing if addresses are too stale. + /// + /// To send the call-me-maybe message, we need to know our current direct addresses. If + /// this information is too stale, the call-me-maybe is queued while a netcheck run is + /// scheduled. Once this run finishes, the call-me-maybe will be sent. fn send_or_queue_call_me_maybe(&self, url: &RelayUrl, dst_node: NodeId) { - let direct_addrs = self.direct_addrs.read(); - if direct_addrs.fresh_enough() { - let msg = direct_addrs.to_call_me_maybe_message(); - let msg = disco::Message::CallMeMaybe(msg); - if !self.send_disco_message_relay(url, dst_node, msg) { - warn!(dstkey = %dst_node.fmt_short(), relayurl = ?url, + match self.direct_addrs.fresh_enough() { + Ok(()) => { + let msg = self.direct_addrs.to_call_me_maybe_message(); + let msg = disco::Message::CallMeMaybe(msg); + if !self.send_disco_message_relay(url, dst_node, msg) { + warn!(dstkey = %dst_node.fmt_short(), relayurl = %url, "relay channel full, dropping call-me-maybe"); - } else { - debug!(dstkey = %dst_node.fmt_short(), relayurl = ?url, "call-me-maybe sent"); + } else { + debug!(dstkey = %dst_node.fmt_short(), relayurl = %url, "call-me-maybe sent"); + } + } + Err(last_refresh_ago) => { + self.pending_call_me_maybes + .lock() + .insert(dst_node, url.clone()); + debug!( + ?last_refresh_ago, + "want call-me-maybe but direct addrs stale; queuing after restun", + ); + self.re_stun("refresh-for-peering"); } - } else { - self.pending_call_me_maybes - .lock() - .insert(dst_node, url.clone()); - debug!( - last_refresh_ago = ?direct_addrs.updated_at.map(|x| x.elapsed()), - "want call-me-maybe but direct addrs stale; queuing after restun", - ); - self.re_stun("refresh-for-peering"); } } @@ -1259,12 +1258,9 @@ impl MagicSock { /// Called whenever our addresses or home relay node changes. fn publish_my_addr(&self) { if let Some(ref discovery) = self.discovery { - let addrs = self.direct_addrs.read(); - let relay_url = self.my_relay(); - let direct_addresses = addrs.iter().map(|da| da.addr).collect(); let info = AddrInfo { - relay_url, - direct_addresses, + relay_url: self.my_relay(), + direct_addresses: self.direct_addrs.sockaddrs(), }; discovery.publish(&info); } @@ -1443,7 +1439,7 @@ impl Handle { relay_actor_sender: relay_actor_sender.clone(), udp_disco_sender, discovery, - direct_addrs: Watchable::new(Default::default()), + direct_addrs: Default::default(), pending_call_me_maybes: Default::default(), direct_addr_update_state: DirectAddrUpdateState::new(), dns_resolver, @@ -1520,7 +1516,7 @@ impl Handle { self.msock.closing.store(true, Ordering::Relaxed); self.msock.actor_sender.send(ActorMessage::Shutdown).await?; self.msock.closed.store(true, Ordering::SeqCst); - self.msock.direct_addrs.shutdown(); + self.msock.direct_addrs.addrs.shutdown(); let mut tasks = self.actor_tasks.lock().await; @@ -1546,46 +1542,6 @@ impl Handle { } } -/// Stream returning local endpoints as they change. -#[derive(Debug)] -pub struct DirectAddrsStream { - initial: Option, - inner: watchable::WatcherStream, -} - -impl Stream for DirectAddrsStream { - type Item = Vec; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = &mut *self; - if let Some(initial_endpoints) = this.initial.take() { - if !initial_endpoints.is_empty() { - return Poll::Ready(Some(initial_endpoints.into_iter().collect())); - } - } - loop { - match Pin::new(&mut this.inner).poll_next(cx) { - Poll::Pending => break Poll::Pending, - Poll::Ready(Some(discovered)) => { - if discovered.is_empty() { - // When we start up we might initially have empty local endpoints as - // the magic socket has not yet figured this out. Later on this set - // should never be empty. However even if it was the magicsock - // would be in a state not very usable so skipping those events is - // probably fine. - // To make sure we install the right waker we loop rather than - // returning Poll::Pending immediately here. - continue; - } else { - break Poll::Ready(Some(discovered.into_iter().collect())); - } - } - Poll::Ready(None) => break Poll::Ready(None), - } - } - } -} - #[derive(Debug, Default)] struct DiscoSecrets(parking_lot::Mutex>); @@ -1636,34 +1592,6 @@ enum DiscoBoxError { type RelayRecvResult = Result<(PublicKey, quinn_udp::RecvMeta, Bytes), io::Error>; -/// Reports whether x and y represent the same set of endpoints. The order doesn't matter. -fn endpoint_sets_equal(xs: &[DirectAddr], ys: &[DirectAddr]) -> bool { - if xs.is_empty() && ys.is_empty() { - return true; - } - if xs.len() == ys.len() { - let mut order_matches = true; - for (i, x) in xs.iter().enumerate() { - if x != &ys[i] { - order_matches = false; - break; - } - } - if order_matches { - return true; - } - } - let mut m: HashMap<&DirectAddr, usize> = HashMap::new(); - for x in xs { - *m.entry(x).or_default() |= 1; - } - for y in ys { - *m.entry(y).or_default() |= 2; - } - - m.values().all(|v| *v == 3) -} - impl AsyncUdpSocket for Handle { fn create_io_poller(self: Arc) -> Pin> { self.msock.create_io_poller() @@ -1879,7 +1807,7 @@ impl Actor { trace!("tick: direct addr update receiver {:?}", reason); inc!(Metrics, actor_tick_direct_addr_update_receiver); if let Some(reason) = reason { - self.update_direct_addrs(reason).await; + self.refresh_direct_addrs(reason).await; } } Some(is_major) = link_change_r.recv() => { @@ -2049,7 +1977,7 @@ impl Actor { /// never be invoked directly. Some day this will be refactored to not allow this easy /// mistake to be made. #[instrument(level = "debug", skip_all)] - async fn update_direct_addrs(&mut self, why: &'static str) { + async fn refresh_direct_addrs(&mut self, why: &'static str) { inc!(MagicsockMetrics, update_direct_addrs); debug!("starting direct addr update ({})", why); @@ -2057,54 +1985,62 @@ impl Actor { self.update_net_info(why).await; } - /// Stores the results of a successful direct addr update. - async fn store_direct_addr_update(&mut self, nr: Option>) { + /// Updates the direct addresses of this magic socket. + /// + /// Updates the [`DiscoveredDirectAddrs`] of this [`MagicSock`] with the current set of + /// direct addresses from: + /// + /// - The portmapper. + /// - A netcheck report. + /// - The local interfaces IP addresses. + fn update_direct_addresses(&mut self, netcheck_report: Option>) { let portmap_watcher = self.port_mapper.watch_external_address(); - // direct_addr -> how it was found - let mut already = HashMap::new(); - // unique direct addrs - let mut addrs = Vec::new(); - - macro_rules! add_addr { - ($already:expr, $addrs:expr, $ipp:expr, $typ:expr) => { - #[allow(clippy::map_entry)] - if !$already.contains_key(&$ipp) { - $already.insert($ipp, $typ); - $addrs.push(DirectAddr { - addr: $ipp, - typ: $typ, - }); - } - }; - } + // We only want to have one DirectAddr for each SocketAddr we have. So we store + // this as a map of SocketAddr -> DirectAddrType. At the end we will construct a + // DirectAddr from each entry. + let mut addrs: BTreeMap = BTreeMap::new(); + // First add PortMapper provided addresses. let maybe_port_mapped = *portmap_watcher.borrow(); - if let Some(portmap_ext) = maybe_port_mapped.map(SocketAddr::V4) { - add_addr!(already, addrs, portmap_ext, DirectAddrType::Portmapped); - self.set_net_info_have_port_map().await; + addrs + .entry(portmap_ext) + .or_insert(DirectAddrType::Portmapped); + self.set_net_info_have_port_map(); } - if let Some(nr) = nr { - if let Some(global_v4) = nr.global_v4 { - add_addr!(already, addrs, global_v4.into(), DirectAddrType::Stun); + // Next add STUN addresses from the netcheck report. + if let Some(netcheck_report) = netcheck_report { + if let Some(global_v4) = netcheck_report.global_v4 { + addrs + .entry(global_v4.into()) + .or_insert(DirectAddrType::Stun); // If they're behind a hard NAT and are using a fixed // port locally, assume they might've added a static // port mapping on their router to the same explicit // port that we are running with. Worst case it's an invalid candidate mapping. let port = self.msock.port.load(Ordering::Relaxed); - if nr.mapping_varies_by_dest_ip.unwrap_or_default() && port != 0 { + if netcheck_report + .mapping_varies_by_dest_ip + .unwrap_or_default() + && port != 0 + { let mut addr = global_v4; addr.set_port(port); - add_addr!(already, addrs, addr.into(), DirectAddrType::Stun4LocalPort); + addrs + .entry(addr.into()) + .or_insert(DirectAddrType::Stun4LocalPort); } } - if let Some(global_v6) = nr.global_v6 { - add_addr!(already, addrs, global_v6.into(), DirectAddrType::Stun); + if let Some(global_v6) = netcheck_report.global_v6 { + addrs + .entry(global_v6.into()) + .or_insert(DirectAddrType::Stun); } } + let local_addr_v4 = self.pconn4.local_addr().ok(); let local_addr_v6 = self.pconn6.as_ref().and_then(|c| c.local_addr().ok()); @@ -2117,98 +2053,67 @@ impl Actor { let msock = self.msock.clone(); + // The following code can be slow, we do not want to block the caller since it would + // block the actor loop. tokio::spawn( async move { - // Depending on the OS and network interfaces attached and their state enumerating - // the local interfaces can take a long time. Especially Windows is very slow. - let LocalAddresses { - regular: mut ips, - loopback, - } = tokio::task::spawn_blocking(LocalAddresses::new) - .await - .unwrap(); - + // If a socket is bound to the unspecified address, create SocketAddrs for + // each local IP address by pairing it with the port the socket is bound on. if is_unspecified_v4 || is_unspecified_v6 { + // Depending on the OS and network interfaces attached and their state + // enumerating the local interfaces can take a long time. Especially + // Windows is very slow. + let LocalAddresses { + regular: mut ips, + loopback, + } = tokio::task::spawn_blocking(LocalAddresses::new) + .await + .unwrap(); if ips.is_empty() && addrs.is_empty() { - // Only include loopback addresses if we have no - // interfaces at all to use as direct addrs and don't - // have a public IPv4 or IPv6 address. This allows - // for localhost testing when you're on a plane and - // offline, for example. + // Include loopback addresses only if there are no other interfaces + // or public addresses, this allows testing offline. ips = loopback; } - let v4_port = local_addr_v4.and_then(|addr| { - if addr.ip().is_unspecified() { - Some(addr.port()) - } else { - None - } - }); - - let v6_port = local_addr_v6.and_then(|addr| { - if addr.ip().is_unspecified() { - Some(addr.port()) - } else { - None - } - }); - for ip in ips { - match ip { - IpAddr::V4(_) => { - if let Some(port) = v4_port { - add_addr!( - already, - addrs, - SocketAddr::new(ip, port), - DirectAddrType::Local - ); - } + let port_if_unspecified = match ip { + IpAddr::V4(_) if is_unspecified_v4 => { + local_addr_v4.map(|addr| addr.port()) } - IpAddr::V6(_) => { - if let Some(port) = v6_port { - add_addr!( - already, - addrs, - SocketAddr::new(ip, port), - DirectAddrType::Local - ); - } + IpAddr::V6(_) if is_unspecified_v6 => { + local_addr_v6.map(|addr| addr.port()) } + _ => None, + }; + if let Some(port) = port_if_unspecified { + let addr = SocketAddr::new(ip, port); + addrs.entry(addr).or_insert(DirectAddrType::Local); } } } + // If a socket is bound to a specific address, add it. if !is_unspecified_v4 { if let Some(addr) = local_addr_v4 { - // Our local socket is bound to a particular address. - // Do not offer addresses on other local interfaces. - add_addr!(already, addrs, addr, DirectAddrType::Local); + addrs.entry(addr).or_insert(DirectAddrType::Local); } } - if !is_unspecified_v6 { if let Some(addr) = local_addr_v6 { - // Our local socket is bound to a particular address. - // Do not offer addresses on other local interfaces. - add_addr!(already, addrs, addr, DirectAddrType::Local); + addrs.entry(addr).or_insert(DirectAddrType::Local); } } - // Note: the direct addrs are intentionally returned in priority order, - // from "farthest but most reliable" to "closest but least - // reliable." Addresses returned from STUN should be globally - // addressable, but might go farther on the network than necessary. - // Local interface addresses might have lower latency, but not be - // globally addressable. - // - // The STUN address(es) are always first. - // Despite this sorting, clients are not relying on this sorting for decisions; - - msock.update_direct_addresses(addrs); - - // Regardless of whether our direct addrs changed, we now want to send any + // Finally create and store store all these direct addresses and send any // queued call-me-maybe messages. + msock.store_direct_addresses( + addrs + .iter() + .map(|(addr, typ)| DirectAddr { + addr: *addr, + typ: *typ, + }) + .collect(), + ); msock.send_queued_call_me_maybes(); } .instrument(Span::current()), @@ -2232,7 +2137,7 @@ impl Actor { /// Updates `NetInfo.HavePortMap` to true. #[instrument(level = "debug", skip_all)] - async fn set_net_info_have_port_map(&mut self) { + fn set_net_info_have_port_map(&mut self) { if let Some(ref mut net_info_last) = self.net_info_last { if net_info_last.have_port_map { // No change. @@ -2257,7 +2162,7 @@ impl Actor { /// Calls netcheck. /// /// Note that invoking this is managed by [`DirectAddrUpdateState`] via - /// [`Actor::update_direct_addrs`] and this should never be invoked directly. Some day + /// [`Actor::refresh_direct_addrs`] and this should never be invoked directly. Some day /// this will be refactored to not allow this easy mistake to be made. #[instrument(level = "debug", skip_all)] async fn update_net_info(&mut self, why: &'static str) { @@ -2295,7 +2200,7 @@ impl Actor { .await .ok(); // The receiver of the NetcheckReport message will call - // .finalize_endpoints_update(). + // .finalize_direct_addrs_update(). }); } Err(err) => { @@ -2353,7 +2258,7 @@ impl Actor { // TODO: set link type self.call_net_info_callback(ni).await; } - self.store_direct_addr_update(report).await; + self.update_direct_addresses(report); } fn set_nearest_relay(&mut self, relay_url: Option) -> bool { @@ -2516,56 +2421,105 @@ fn bind( #[derive(derive_more::Debug, Default, Clone)] struct DiscoveredDirectAddrs { /// The last set of discovered direct addresses. - addrs: Vec, + addrs: Watchable>, /// The last time the direct addresses were updated, even if there was no change. - updated_at: Option, -} - -impl PartialEq for DiscoveredDirectAddrs { - fn eq(&self, other: &Self) -> bool { - endpoint_sets_equal(&self.addrs, &other.addrs) - } + /// + /// This is only ever None at startup. + updated_at: Arc>>, } impl DiscoveredDirectAddrs { - fn new(endpoints: Vec) -> Self { - Self { - addrs: endpoints, - updated_at: Some(Instant::now()), + /// Updates the direct addresses, returns `true` if they changed, `false` if not. + fn update(&self, addrs: BTreeSet) -> bool { + *self.updated_at.write().unwrap() = Some(Instant::now()); + let updated = self.addrs.update(addrs).is_ok(); + if updated { + event!( + target: "events.net.direct_addrs", + Level::DEBUG, + addrs = ?self.addrs.get(), + ); } + updated } - fn into_iter(self) -> impl Iterator { - self.addrs.into_iter() + fn sockaddrs(&self) -> BTreeSet { + self.addrs.read().iter().map(|da| da.addr).collect() } - fn iter(&self) -> impl Iterator + '_ { - self.addrs.iter() + /// Whether the direct addr information is considered "fresh". + /// + /// If not fresh you should probably update the direct addresses before using this info. + /// + /// Returns `Ok(())` if fresh enough and `Err(elapsed)` if not fresh enough. + /// `elapsed` is the time elapsed since the direct addresses were last updated. + /// + /// If there is no direct address information `Err(Duration::ZERO)` is returned. + fn fresh_enough(&self) -> Result<(), Duration> { + match *self.updated_at.read().expect("poisoned") { + None => Err(Duration::ZERO), + Some(time) => { + let elapsed = time.elapsed(); + if elapsed <= ENDPOINTS_FRESH_ENOUGH_DURATION { + Ok(()) + } else { + Err(elapsed) + } + } + } } - fn is_empty(&self) -> bool { - self.addrs.is_empty() + fn to_call_me_maybe_message(&self) -> disco::CallMeMaybe { + let my_numbers = self.addrs.read().iter().map(|da| da.addr).collect(); + disco::CallMeMaybe { my_numbers } } - fn fresh_enough(&self) -> bool { - match self.updated_at.as_ref() { - None => false, - Some(time) => time.elapsed() <= ENDPOINTS_FRESH_ENOUGH_DURATION, + fn updates_stream(&self) -> DirectAddrsStream { + DirectAddrsStream { + initial: Some(self.addrs.get()), + inner: self.addrs.watch().into_stream(), } } +} - fn to_call_me_maybe_message(&self) -> disco::CallMeMaybe { - let my_numbers = self.addrs.iter().map(|ep| ep.addr).collect(); - disco::CallMeMaybe { my_numbers } - } +/// Stream returning local endpoints as they change. +#[derive(Debug)] +pub struct DirectAddrsStream { + initial: Option>, + inner: watchable::WatcherStream>, +} - fn log_direct_addrs_change(&self) { - event!( - target: "events.net.direct_addrs", - Level::DEBUG, - addrs = ?self.addrs, - ); +impl Stream for DirectAddrsStream { + type Item = BTreeSet; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = &mut *self; + if let Some(addrs) = this.initial.take() { + if !addrs.is_empty() { + return Poll::Ready(Some(addrs)); + } + } + loop { + match Pin::new(&mut this.inner).poll_next(cx) { + Poll::Pending => break Poll::Pending, + Poll::Ready(Some(addrs)) => { + if addrs.is_empty() { + // When we start up we might initially have empty direct addrs as + // the magic socket has not yet figured this out. Later on this set + // should never be empty. However even if it was the magicsock + // would be in a state not very usable so skipping those events is + // probably fine. + // To make sure we install the right waker we loop rather than + // returning Poll::Pending immediately here. + continue; + } else { + break Poll::Ready(Some(addrs)); + } + } + Poll::Ready(None) => break Poll::Ready(None), + } + } } } @@ -2900,7 +2854,11 @@ mod tests { #[instrument(skip_all)] async fn mesh_stacks(stacks: Vec) -> Result { /// Registers endpoint addresses of a node to all other nodes. - fn update_direct_addrs(stacks: &[MagicStack], my_idx: usize, new_addrs: Vec) { + fn update_direct_addrs( + stacks: &[MagicStack], + my_idx: usize, + new_addrs: BTreeSet, + ) { let me = &stacks[my_idx]; for (i, m) in stacks.iter().enumerate() { if i == my_idx { @@ -3597,14 +3555,12 @@ mod tests { let ms = Handle::new(Default::default()).await.unwrap(); // See if we can get endpoints. - let mut eps0 = ms.direct_addresses().next().await.unwrap(); - eps0.sort(); + let eps0 = ms.direct_addresses().next().await.unwrap(); println!("{eps0:?}"); assert!(!eps0.is_empty()); // Getting the endpoints again immediately should give the same results. - let mut eps1 = ms.direct_addresses().next().await.unwrap(); - eps1.sort(); + let eps1 = ms.direct_addresses().next().await.unwrap(); println!("{eps1:?}"); assert_eq!(eps0, eps1); } diff --git a/iroh-net/src/magicsock/node_map.rs b/iroh-net/src/magicsock/node_map.rs index f887854f4d..9ba9f2222d 100644 --- a/iroh-net/src/magicsock/node_map.rs +++ b/iroh-net/src/magicsock/node_map.rs @@ -1,5 +1,5 @@ use std::{ - collections::{hash_map::Entry, HashMap}, + collections::{hash_map::Entry, BTreeSet, HashMap}, hash::Hash, net::{IpAddr, SocketAddr}, pin::Pin, @@ -283,10 +283,7 @@ impl NodeMap { self.inner.lock().prune_inactive(); } - pub(crate) fn on_direct_addr_discovered( - &self, - discovered: impl Iterator>, - ) { + pub(crate) fn on_direct_addr_discovered(&self, discovered: BTreeSet) { self.inner.lock().on_direct_addr_discovered(discovered); } } @@ -321,10 +318,7 @@ impl NodeMapInner { } /// Prunes direct addresses from nodes that claim to share an address we know points to us. - pub(super) fn on_direct_addr_discovered( - &mut self, - discovered: impl Iterator>, - ) { + pub(super) fn on_direct_addr_discovered(&mut self, discovered: BTreeSet) { for addr in discovered { self.remove_by_ipp(addr.into(), ClearReason::MatchesOurLocalAddr) } diff --git a/iroh/src/client/net.rs b/iroh/src/client/net.rs index ca38a2da3e..d4a11b5f01 100644 --- a/iroh/src/client/net.rs +++ b/iroh/src/client/net.rs @@ -55,7 +55,7 @@ use crate::rpc_protocol::net::{ /// // the home relay /// Some(relay_url), /// // the direct addresses -/// vec!["120.0.0.1:0".parse().unwrap()], +/// ["120.0.0.1:0".parse().unwrap()], /// ); /// net_client.add_node_addr(addr).await?; /// // Shut down the node. Passing `true` will force the shutdown, passing in diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs index fd1ce149db..7d380dc4bb 100644 --- a/iroh/tests/provide.rs +++ b/iroh/tests/provide.rs @@ -118,7 +118,10 @@ async fn empty_files() -> Result<()> { /// Create new get options with the given node id and addresses, using a /// randomly generated secret key. -fn get_options(node_id: NodeId, addrs: Vec) -> (SecretKey, NodeAddr) { +fn get_options( + node_id: NodeId, + addrs: impl IntoIterator, +) -> (SecretKey, NodeAddr) { let relay_map = default_relay_map(); let peer = iroh_net::NodeAddr::from_parts( node_id,