Skip to content

Commit

Permalink
path of least resistance refactoring to see if the slowness was due t…
Browse files Browse the repository at this point in the history
…o mis-handling dns resolutions
  • Loading branch information
“ramfox” committed Jan 10, 2025
1 parent bc523e4 commit 201ef1e
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 218 deletions.
139 changes: 139 additions & 0 deletions iroh-base/src/ip_mapped_addrs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use std::{
collections::BTreeMap,
net::{IpAddr, Ipv6Addr, SocketAddr},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};

/// The dummy port used for all mapped addresses
pub const MAPPED_ADDR_PORT: u16 = 12345;

/// Can occur when converting a [`SocketAddr`] to an [`IpMappedAddr`]
#[derive(Debug, thiserror::Error)]
#[error("Failed to convert: {0}")]
pub struct IpMappedAddrError(String);

/// A mirror for the `NodeIdMappedAddr`, mapping a fake Ipv6 address with an actual IP address.
///
/// You can consider this as nothing more than a lookup key for an IP the [`MagicSock`] knows
/// about.
///
/// And in our QUIC-facing socket APIs like [`AsyncUdpSocket`] it
/// comes in as the inner [`SocketAddr`], in those interfaces we have to be careful to do
/// the conversion to this type.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub struct IpMappedAddr(pub(crate) SocketAddr);

/// Counter to always generate unique addresses for [`NodeIdMappedAddr`].
static IP_ADDR_COUNTER: AtomicU64 = AtomicU64::new(1);

impl IpMappedAddr {
/// The Prefix/L of our Unique Local Addresses.
const ADDR_PREFIXL: u8 = 0xfd;
/// The Global ID used in our Unique Local Addresses.
const ADDR_GLOBAL_ID: [u8; 5] = [21, 7, 10, 81, 11];
/// The Subnet ID used in our Unique Local Addresses.
const ADDR_SUBNET: [u8; 2] = [0, 1];

/// Generates a globally unique fake UDP address.
///
/// This generates and IPv6 Unique Local Address according to RFC 4193.
pub fn generate() -> Self {
let mut addr = [0u8; 16];
addr[0] = Self::ADDR_PREFIXL;
addr[1..6].copy_from_slice(&Self::ADDR_GLOBAL_ID);
addr[6..8].copy_from_slice(&Self::ADDR_SUBNET);

let counter = IP_ADDR_COUNTER.fetch_add(1, Ordering::Relaxed);
addr[8..16].copy_from_slice(&counter.to_be_bytes());

Self(SocketAddr::new(
IpAddr::V6(Ipv6Addr::from(addr)),
MAPPED_ADDR_PORT,
))
}

/// Return the underlying [`SocketAddr`].
pub fn addr(&self) -> SocketAddr {
self.0
}
}

impl TryFrom<SocketAddr> for IpMappedAddr {
type Error = IpMappedAddrError;

fn try_from(value: SocketAddr) -> std::result::Result<Self, Self::Error> {
match value {
SocketAddr::V4(_) => Err(IpMappedAddrError(String::from(
"IpMappedAddrs are all Ipv6, found Ipv4 address",
))),
SocketAddr::V6(addr) => {
if addr.port() != MAPPED_ADDR_PORT {
return Err(IpMappedAddrError(String::from("not mapped addr")));
}
let octets = addr.ip().octets();
if octets[6..8] != IpMappedAddr::ADDR_SUBNET {
return Err(IpMappedAddrError(String::from("not an IpMappedAddr")));
}
Ok(IpMappedAddr(value))
}
}
}
}

impl std::fmt::Display for IpMappedAddr {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "IpMappedAddr({})", self.0)
}
}

#[derive(Debug, Clone)]
/// A Map of [`IpMappedAddrs`] to [`SocketAddrs`]
pub struct IpMappedAddrs(Arc<std::sync::Mutex<BTreeMap<IpMappedAddr, SocketAddr>>>);

impl IpMappedAddrs {
/// Create an empty [`IpMappedAddrs`]
pub fn new() -> Self {
Self(Arc::new(std::sync::Mutex::new(BTreeMap::new())))
}

/// Add a [`SocketAddr`] to the map and the generated [`IpMappedAddr`] it is now associated with back.
///
/// If this [`SocketAddr`] already exists in the map, it returns its associated [`IpMappedAddr`].
pub fn add(&self, ip_addr: SocketAddr) -> IpMappedAddr {
let mut map = self.0.lock().expect("poisoned");
for (mapped_addr, ip) in map.iter() {
if ip == &ip_addr {
return *mapped_addr;
}
}
let ip_mapped_addr = IpMappedAddr::generate();
map.insert(ip_mapped_addr, ip_addr);
ip_mapped_addr
}

/// Get the [`IpMappedAddr`] for the given [`SocketAddr`].
pub fn get_mapped_addr(&self, ip_addr: &SocketAddr) -> Option<IpMappedAddr> {
let map = self.0.lock().expect("poisoned");
for (mapped_addr, ip) in map.iter() {
if ip == ip_addr {
return Some(*mapped_addr);
}
}
None
}

/// Get the [`SocketAddr`] for the given [`IpMappedAddr`].
pub fn get_ip_addr(&self, mapped_addr: &IpMappedAddr) -> Option<SocketAddr> {
let map = self.0.lock().expect("poisoned");
map.get(mapped_addr).copied()
}
}

impl Default for IpMappedAddrs {
fn default() -> Self {
IpMappedAddrs::new()
}
}
4 changes: 4 additions & 0 deletions iroh-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@
#[cfg(feature = "ticket")]
pub mod ticket;

#[cfg(feature = "relay")]
mod ip_mapped_addrs;
#[cfg(feature = "key")]
mod key;
#[cfg(feature = "key")]
mod node_addr;
#[cfg(feature = "relay")]
mod relay_url;

#[cfg(feature = "relay")]
pub use self::ip_mapped_addrs::{IpMappedAddr, IpMappedAddrs, MAPPED_ADDR_PORT};
#[cfg(feature = "key")]
pub use self::key::{KeyParsingError, NodeId, PublicKey, SecretKey, Signature};
#[cfg(feature = "key")]
Expand Down
29 changes: 21 additions & 8 deletions iroh-net-report/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{
use anyhow::{anyhow, Result};
use bytes::Bytes;
use hickory_resolver::TokioResolver as DnsResolver;
use iroh_base::RelayUrl;
use iroh_base::{IpMappedAddrs, RelayUrl};
#[cfg(feature = "metrics")]
use iroh_metrics::inc;
use iroh_relay::{protos::stun, RelayMap};
Expand Down Expand Up @@ -348,8 +348,12 @@ impl Client {
///
/// This starts a connected actor in the background. Once the client is dropped it will
/// stop running.
pub fn new(port_mapper: Option<portmapper::Client>, dns_resolver: DnsResolver) -> Result<Self> {
let mut actor = Actor::new(port_mapper, dns_resolver)?;
pub fn new(
port_mapper: Option<portmapper::Client>,
dns_resolver: DnsResolver,
ip_mapped_addrs: Option<IpMappedAddrs>,
) -> Result<Self> {
let mut actor = Actor::new(port_mapper, dns_resolver, ip_mapped_addrs)?;
let addr = actor.addr();
let task = tokio::spawn(
async move { actor.run().await }.instrument(info_span!("net_report.actor")),
Expand Down Expand Up @@ -566,14 +570,21 @@ struct Actor {

/// The DNS resolver to use for probes that need to perform DNS lookups
dns_resolver: DnsResolver,

/// The [`IpMappedAddrs`] that allows you to do QAD in iroh
ip_mapped_addrs: Option<IpMappedAddrs>,
}

impl Actor {
/// Creates a new actor.
///
/// This does not start the actor, see [`Actor::run`] for this. You should not
/// normally create this directly but rather create a [`Client`].
fn new(port_mapper: Option<portmapper::Client>, dns_resolver: DnsResolver) -> Result<Self> {
fn new(
port_mapper: Option<portmapper::Client>,
dns_resolver: DnsResolver,
ip_mapped_addrs: Option<IpMappedAddrs>,
) -> Result<Self> {
// TODO: consider an instrumented flume channel so we have metrics.
let (sender, receiver) = mpsc::channel(32);
Ok(Self {
Expand All @@ -584,6 +595,7 @@ impl Actor {
in_flight_stun_requests: Default::default(),
current_report_run: None,
dns_resolver,
ip_mapped_addrs,
})
}

Expand Down Expand Up @@ -687,6 +699,7 @@ impl Actor {
quic_config,
self.dns_resolver.clone(),
protocols,
self.ip_mapped_addrs.clone(),
);

self.current_report_run = Some(ReportRun {
Expand Down Expand Up @@ -1135,7 +1148,7 @@ mod tests {
stun_utils::serve("127.0.0.1".parse().unwrap()).await?;

let resolver = crate::dns::tests::resolver();
let mut client = Client::new(None, resolver.clone())?;
let mut client = Client::new(None, resolver.clone(), None)?;
let dm = stun_utils::relay_map_of([stun_addr].into_iter());

// Note that the ProbePlan will change with each iteration.
Expand Down Expand Up @@ -1183,7 +1196,7 @@ mod tests {

// Now create a client and generate a report.
let resolver = crate::dns::tests::resolver();
let mut client = Client::new(None, resolver.clone())?;
let mut client = Client::new(None, resolver.clone(), None)?;

let r = client.get_report(dm, None, None, None).await?;
let mut r: Report = (*r).clone();
Expand Down Expand Up @@ -1386,7 +1399,7 @@ mod tests {
let resolver = crate::dns::tests::resolver();
for mut tt in tests {
println!("test: {}", tt.name);
let mut actor = Actor::new(None, resolver.clone()).unwrap();
let mut actor = Actor::new(None, resolver.clone(), None).unwrap();
for s in &mut tt.steps {
// trigger the timer
time::advance(Duration::from_secs(s.after)).await;
Expand Down Expand Up @@ -1421,7 +1434,7 @@ mod tests {
dbg!(&dm);

let resolver = crate::dns::tests::resolver().clone();
let mut client = Client::new(None, resolver)?;
let mut client = Client::new(None, resolver, None)?;

// Set up an external socket to send STUN requests from, this will be discovered as
// our public socket address by STUN. We send back any packets received on this
Expand Down
40 changes: 36 additions & 4 deletions iroh-net-report/src/reportgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::{

use anyhow::{anyhow, bail, Context as _, Result};
use hickory_resolver::TokioResolver as DnsResolver;
use iroh_base::RelayUrl;
use iroh_base::{IpMappedAddrs, RelayUrl};
#[cfg(feature = "metrics")]
use iroh_metrics::inc;
use iroh_relay::{
Expand Down Expand Up @@ -94,6 +94,7 @@ impl Client {
quic_config: Option<QuicConfig>,
dns_resolver: DnsResolver,
protocols: BTreeSet<ProbeProto>,
ip_mapped_addrs: Option<IpMappedAddrs>,
) -> Self {
let (msg_tx, msg_rx) = mpsc::channel(32);
let addr = Addr {
Expand All @@ -114,6 +115,7 @@ impl Client {
outstanding_tasks: OutstandingTasks::default(),
dns_resolver,
protocols,
ip_mapped_addrs,
};
let task = tokio::spawn(
async move { actor.run().await }.instrument(info_span!("reportgen.actor")),
Expand Down Expand Up @@ -200,6 +202,8 @@ struct Actor {
/// Protocols we should attempt to create probes for, if we have the correct
/// configuration for that protocol.
protocols: BTreeSet<ProbeProto>,
/// Optional [`IpMappedAddrs`] used to enable QAD in iroh
ip_mapped_addrs: Option<IpMappedAddrs>,
}

impl Actor {
Expand Down Expand Up @@ -569,6 +573,7 @@ impl Actor {
let net_report = self.net_report.clone();
let pinger = pinger.clone();
let dns_resolver = self.dns_resolver.clone();
let ip_mapped_addrs = self.ip_mapped_addrs.clone();

set.spawn(
run_probe(
Expand All @@ -581,6 +586,7 @@ impl Actor {
net_report,
pinger,
dns_resolver,
ip_mapped_addrs,
)
.instrument(debug_span!("run_probe", %probe)),
);
Expand Down Expand Up @@ -716,6 +722,7 @@ async fn run_probe(
net_report: net_report::Addr,
pinger: Pinger,
dns_resolver: DnsResolver,
ip_mapped_addrs: Option<IpMappedAddrs>,
) -> Result<ProbeReport, ProbeError> {
if !probe.delay().is_zero() {
trace!("delaying probe");
Expand Down Expand Up @@ -749,7 +756,7 @@ async fn run_probe(
));
}

let relay_addr = get_relay_addr(&dns_resolver, &relay_node, probe.proto())
let relay_addr = get_relay_addr(&dns_resolver, &relay_node, probe.proto(), ip_mapped_addrs)
.await
.context("no relay node addr")
.map_err(|e| ProbeError::AbortSet(e, probe.clone()))?;
Expand Down Expand Up @@ -1058,6 +1065,7 @@ async fn get_relay_addr(
dns_resolver: &DnsResolver,
relay_node: &RelayNode,
proto: ProbeProto,
ip_mapped_addrs: Option<IpMappedAddrs>,
) -> Result<SocketAddr> {
if relay_node.stun_only && !matches!(proto, ProbeProto::StunIpv4 | ProbeProto::StunIpv6) {
bail!("Relay node not suitable for non-STUN probes");
Expand All @@ -1074,11 +1082,16 @@ async fn get_relay_addr(
.next()
.map(|ip| ip.to_canonical())
.map(|addr| SocketAddr::new(addr, port))
.map(|addr| maybe_to_mapped_addr(ip_mapped_addrs, proto, addr))
.ok_or(anyhow!("No suitable relay addr found")),
Err(err) => Err(err.context("No suitable relay addr found")),
}
}
Some(url::Host::Ipv4(addr)) => Ok(SocketAddr::new(addr.into(), port)),
Some(url::Host::Ipv4(addr)) => Ok(maybe_to_mapped_addr(
ip_mapped_addrs,
proto,
SocketAddr::new(addr.into(), port),
)),
Some(url::Host::Ipv6(_addr)) => Err(anyhow!("No suitable relay addr found")),
None => Err(anyhow!("No valid hostname in RelayUrl")),
}
Expand All @@ -1093,12 +1106,17 @@ async fn get_relay_addr(
.next()
.map(|ip| ip.to_canonical())
.map(|addr| SocketAddr::new(addr, port))
.map(|addr| maybe_to_mapped_addr(ip_mapped_addrs, proto, addr))
.ok_or(anyhow!("No suitable relay addr found")),
Err(err) => Err(err.context("No suitable relay addr found")),
}
}
Some(url::Host::Ipv4(_addr)) => Err(anyhow!("No suitable relay addr found")),
Some(url::Host::Ipv6(addr)) => Ok(SocketAddr::new(addr.into(), port)),
Some(url::Host::Ipv6(addr)) => Ok(maybe_to_mapped_addr(
ip_mapped_addrs,
proto,
SocketAddr::new(addr.into(), port),
)),
None => Err(anyhow!("No valid hostname in RelayUrl")),
}
}
Expand All @@ -1107,6 +1125,20 @@ async fn get_relay_addr(
}
}

fn maybe_to_mapped_addr(
ip_mapped_addrs: Option<IpMappedAddrs>,
proto: ProbeProto,
addr: SocketAddr,
) -> SocketAddr {
if !matches!(proto, ProbeProto::QuicIpv4 | ProbeProto::QuicIpv6) {
return addr;
}
if let Some(ip_mapped_addrs) = ip_mapped_addrs.as_ref() {
return ip_mapped_addrs.add(addr).addr();
}
addr
}

/// Runs an ICMP IPv4 or IPv6 probe.
///
/// The `pinger` is passed in so the ping sockets are only bound once
Expand Down
Loading

0 comments on commit 201ef1e

Please sign in to comment.