diff --git a/iroh-cli/src/commands.rs b/iroh-cli/src/commands.rs index 2bbf6b95ab..863c47cfcd 100644 --- a/iroh-cli/src/commands.rs +++ b/iroh-cli/src/commands.rs @@ -17,9 +17,9 @@ pub(crate) mod doctor; pub(crate) mod net; pub(crate) mod rpc; pub(crate) mod start; -pub use iroh_blobs::{cli as blobs, cli::tags}; -pub use iroh_docs::{cli as docs, cli::authors}; -pub use iroh_gossip::cli as gossip; +pub(crate) use iroh_blobs::{cli as blobs, cli::tags}; +pub(crate) use iroh_docs::{cli as docs, cli::authors}; +pub(crate) use iroh_gossip::cli as gossip; /// iroh is a tool for building distributed apps. /// diff --git a/iroh-dns-server/src/http/tls.rs b/iroh-dns-server/src/http/tls.rs index ef26549935..f1a78fe2e3 100644 --- a/iroh-dns-server/src/http/tls.rs +++ b/iroh-dns-server/src/http/tls.rs @@ -90,12 +90,8 @@ impl TlsAcceptor { let cert_path = dir.join(format!("{keyname}.crt")); let key_path = dir.join(format!("{keyname}.key")); - let (certs, secret_key) = tokio::task::spawn_blocking(move || { - let certs = load_certs(cert_path)?; - let key = load_secret_key(key_path)?; - anyhow::Ok((certs, key)) - }) - .await??; + let certs = load_certs(cert_path).await?; + let secret_key = load_secret_key(key_path).await?; let config = config.with_single_cert(certs, secret_key)?; let config = RustlsConfig::from_config(Arc::new(config)); @@ -136,23 +132,26 @@ impl TlsAcceptor { } } -fn load_certs( +async fn load_certs( filename: impl AsRef, ) -> Result>> { - let certfile = std::fs::File::open(filename).context("cannot open certificate file")?; - let mut reader = std::io::BufReader::new(certfile); - + let certfile = tokio::fs::read(filename) + .await + .context("cannot open certificate file")?; + let mut reader = std::io::Cursor::new(certfile); let certs: Result, std::io::Error> = rustls_pemfile::certs(&mut reader).collect(); let certs = certs?; Ok(certs) } -fn load_secret_key( +async fn load_secret_key( filename: impl AsRef, ) -> Result> { - let keyfile = std::fs::File::open(filename.as_ref()).context("cannot open secret key file")?; - let mut reader = std::io::BufReader::new(keyfile); + let keyfile = tokio::fs::read(filename.as_ref()) + .await + .context("cannot open secret key file")?; + let mut reader = std::io::Cursor::new(keyfile); loop { match rustls_pemfile::read_one(&mut reader).context("cannot parse secret key .pem file")? { diff --git a/iroh-dns-server/src/main.rs b/iroh-dns-server/src/main.rs index 60d88bcf28..e9575ff416 100644 --- a/iroh-dns-server/src/main.rs +++ b/iroh-dns-server/src/main.rs @@ -1,21 +1,11 @@ -#![allow(unused_imports)] - -use std::{ - future::Future, - net::{Ipv4Addr, SocketAddr}, - path::PathBuf, -}; +use std::path::PathBuf; use anyhow::Result; -use axum::{routing::get, Router}; use clap::Parser; -use futures_lite::FutureExt; use iroh_dns_server::{ config::Config, metrics::init_metrics, server::run_with_config_until_ctrl_c, }; -use tokio::task::JoinSet; -use tokio_util::sync::CancellationToken; -use tracing::{debug, debug_span, error, error_span, Instrument, Span}; +use tracing::debug; #[derive(Parser, Debug)] struct Cli { diff --git a/iroh-net/Cargo.toml b/iroh-net/Cargo.toml index 2e20fc3245..c47409976d 100644 --- a/iroh-net/Cargo.toml +++ b/iroh-net/Cargo.toml @@ -163,6 +163,7 @@ iroh-net = { path = "." } serde_json = "1.0.107" testresult = "0.4.0" mainline = "2.0.1" +iroh-relay = { version = "0.28", path = "../iroh-relay", features = ["test-utils", "server"] } [[bench]] name = "key" diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index 4dc720d721..446faafee7 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -1767,10 +1767,22 @@ impl Actor { discovery_events = events; } } + + let mut receiver_closed = false; + let mut portmap_watcher_closed = false; + let mut link_change_closed = false; loop { inc!(Metrics, actor_tick_main); tokio::select! { - Some(msg) = self.msg_receiver.recv() => { + msg = self.msg_receiver.recv(), if !receiver_closed => { + let Some(msg) = msg else { + trace!("tick: magicsock receiver closed"); + inc!(Metrics, actor_tick_other); + + receiver_closed = true; + continue; + }; + trace!(?msg, "tick: msg"); inc!(Metrics, actor_tick_msg); if self.handle_actor_message(msg).await { @@ -1782,7 +1794,15 @@ impl Actor { inc!(Metrics, actor_tick_re_stun); self.msock.re_stun("periodic"); } - Ok(()) = portmap_watcher.changed() => { + change = portmap_watcher.changed(), if !portmap_watcher_closed => { + if change.is_err() { + trace!("tick: portmap watcher closed"); + inc!(Metrics, actor_tick_other); + + portmap_watcher_closed = true; + continue; + } + trace!("tick: portmap changed"); inc!(Metrics, actor_tick_portmap_changed); let new_external_address = *portmap_watcher.borrow(); @@ -1809,11 +1829,22 @@ impl Actor { self.refresh_direct_addrs(reason).await; } } - Some(is_major) = link_change_r.recv() => { + is_major = link_change_r.recv(), if !link_change_closed => { + let Some(is_major) = is_major else { + trace!("tick: link change receiver closed"); + inc!(Metrics, actor_tick_other); + + link_change_closed = true; + continue; + }; + trace!("tick: link change {}", is_major); inc!(Metrics, actor_link_change); self.handle_network_change(is_major).await; } + // Even if `discovery_events` yields `None`, it could begin to yield + // `Some` again in the future, so we don't want to disable this branch + // forever like we do with the other branches that yield `Option`s Some(discovery_item) = discovery_events.next() => { trace!("tick: discovery event, address discovered: {discovery_item:?}"); let node_addr = NodeAddr {node_id: discovery_item.node_id, info: discovery_item.addr_info}; @@ -1821,10 +1852,6 @@ impl Actor { warn!(?node_addr, "unable to add discovered node address to the node map: {e:?}"); } } - else => { - trace!("tick: other"); - inc!(Metrics, actor_tick_other); - } } } } diff --git a/iroh-net/src/magicsock/relay_actor.rs b/iroh-net/src/magicsock/relay_actor.rs index 4c93581021..b1f56a7686 100644 --- a/iroh-net/src/magicsock/relay_actor.rs +++ b/iroh-net/src/magicsock/relay_actor.rs @@ -111,7 +111,12 @@ impl ActiveRelay { self.relay_client.connect().await.context("keepalive")?; } tokio::select! { - Some(msg) = inbox.recv() => { + msg = inbox.recv() => { + let Some(msg) = msg else { + debug!("all clients closed"); + break; + }; + trace!("tick: inbox: {:?}", msg); match msg { ActiveRelayMessage::GetLastWrite(r) => { @@ -144,6 +149,7 @@ impl ActiveRelay { } } } + msg = self.relay_client_receiver.recv() => { trace!("tick: relay_client_receiver"); if let Some(msg) = msg { @@ -153,10 +159,6 @@ impl ActiveRelay { } } } - else => { - debug!("all clients closed"); - break; - } } } debug!("exiting"); @@ -301,25 +303,37 @@ impl RelayActor { trace!("shutting down"); break; } - Some(Ok((url, ping_success))) = self.ping_tasks.join_next() => { - if !ping_success { - with_cancel( - self.cancel_token.child_token(), - self.close_or_reconnect_relay(&url, "rebind-ping-fail") - ).await; + // `ping_tasks` being empty is a normal situation - in fact it starts empty + // until a `MaybeCloseRelaysOnRebind` message is received. + Some(task_result) = self.ping_tasks.join_next() => { + match task_result { + Ok((url, ping_success)) => { + if !ping_success { + with_cancel( + self.cancel_token.child_token(), + self.close_or_reconnect_relay(&url, "rebind-ping-fail") + ).await; + } + } + + Err(err) => { + warn!("ping task error: {:?}", err); + } } } - Some(msg) = receiver.recv() => { + + msg = receiver.recv() => { + let Some(msg) = msg else { + trace!("shutting down relay recv loop"); + break; + }; + with_cancel(self.cancel_token.child_token(), self.handle_msg(msg)).await; } _ = cleanup_timer.tick() => { trace!("tick: cleanup"); with_cancel(self.cancel_token.child_token(), self.clean_stale_relay()).await; } - else => { - trace!("shutting down relay recv loop"); - break; - } } } diff --git a/iroh-net/src/netcheck.rs b/iroh-net/src/netcheck.rs index 1675889548..54994c2a0b 100644 --- a/iroh-net/src/netcheck.rs +++ b/iroh-net/src/netcheck.rs @@ -769,6 +769,46 @@ pub(crate) fn os_has_ipv6() -> bool { UdpSocket::bind_local_v6(0).is_ok() } +#[cfg(test)] +mod test_utils { + //! Creates a relay server against which to perform tests + + use std::sync::Arc; + + use iroh_relay::server; + + use crate::RelayNode; + + pub(crate) async fn relay() -> (server::Server, Arc) { + let server = server::Server::spawn(server::testing::server_config()) + .await + .expect("should serve relay"); + let node_desc = RelayNode { + url: server.https_url().expect("should work as relay"), + stun_only: false, // the checks above and below guarantee both stun and relay + stun_port: server.stun_addr().expect("server should serve stun").port(), + }; + + (server, Arc::new(node_desc)) + } + + /// Create a [`crate::RelayMap`] of the given size. + /// + /// This function uses [`relay`]. Note that the returned map uses internal order that will + /// often _not_ match the order of the servers. + pub(crate) async fn relay_map(relays: usize) -> (Vec, crate::RelayMap) { + let mut servers = Vec::with_capacity(relays); + let mut nodes = Vec::with_capacity(relays); + for _ in 0..relays { + let (relay_server, node) = relay().await; + servers.push(relay_server); + nodes.push(node); + } + let map = crate::RelayMap::from_nodes(nodes).expect("unuque urls"); + (servers, map) + } +} + #[cfg(test)] mod tests { use std::net::Ipv4Addr; @@ -778,11 +818,7 @@ mod tests { use tracing::info; use super::*; - use crate::{ - defaults::{staging::EU_RELAY_HOSTNAME, DEFAULT_STUN_PORT}, - ping::Pinger, - RelayNode, - }; + use crate::ping::Pinger; mod stun_utils { //! Utils for testing that expose a simple stun server. @@ -799,8 +835,6 @@ mod tests { use super::*; use crate::{RelayMap, RelayNode, RelayUrl}; - // TODO: make all this private - /// A drop guard to clean up test infrastructure. /// /// After dropping the test infrastructure will asynchronously shutdown and release its @@ -956,57 +990,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_iroh_computer_stun() -> Result<()> { - let _guard = iroh_test::logging::setup(); - - let resolver = crate::dns::default_resolver().clone(); - let mut client = Client::new(None, resolver).context("failed to create netcheck client")?; - let url: RelayUrl = format!("https://{}", EU_RELAY_HOSTNAME).parse().unwrap(); - - let dm = RelayMap::from_nodes([RelayNode { - url: url.clone(), - stun_only: true, - stun_port: DEFAULT_STUN_PORT, - }]) - .expect("hardcoded"); - - for i in 0..10 { - println!("starting report {}", i + 1); - let now = Instant::now(); - - let r = client - .get_report(dm.clone(), None, None) - .await - .context("failed to get netcheck report")?; - - if r.udp { - assert_eq!( - r.relay_latency.len(), - 1, - "expected 1 key in RelayLatency; got {}", - r.relay_latency.len() - ); - assert!( - r.relay_latency.iter().next().is_some(), - "expected key 1 in RelayLatency; got {:?}", - r.relay_latency - ); - assert!( - r.global_v4.is_some() || r.global_v6.is_some(), - "expected at least one of global_v4 or global_v6" - ); - assert!(r.preferred_relay.is_some()); - } else { - eprintln!("missing UDP, probe not returned by network"); - } - - println!("report {} done in {:?}", i + 1, now.elapsed()); - } - - Ok(()) - } - #[tokio::test] async fn test_udp_blocked() -> Result<()> { let _guard = iroh_test::logging::setup(); diff --git a/iroh-net/src/netcheck/reportgen.rs b/iroh-net/src/netcheck/reportgen.rs index 34fcb89393..8e843b1a9b 100644 --- a/iroh-net/src/netcheck/reportgen.rs +++ b/iroh-net/src/netcheck/reportgen.rs @@ -1177,21 +1177,18 @@ mod tests { use testresult::TestResult; - use super::*; - use crate::{ - defaults::staging::{default_eu_relay_node, default_na_relay_node}, - test_utils, - }; + use super::{super::test_utils, *}; - #[test] - fn test_update_report_stun_working() { - let eu_relayer = Arc::new(default_eu_relay_node()); - let na_relayer = Arc::new(default_na_relay_node()); + #[tokio::test] + async fn test_update_report_stun_working() { + let _logging = iroh_test::logging::setup(); + let (_server_a, relay_a) = test_utils::relay().await; + let (_server_b, relay_b) = test_utils::relay().await; let mut report = Report::default(); - // A STUN IPv4 probe from the EU relay server. - let probe_report_eu = ProbeReport { + // A STUN IPv4 probe from the the first relay server. + let probe_report_a = ProbeReport { ipv4_can_send: true, ipv6_can_send: false, icmpv4: None, @@ -1199,49 +1196,49 @@ mod tests { latency: Some(Duration::from_millis(5)), probe: Probe::StunIpv4 { delay: Duration::ZERO, - node: eu_relayer.clone(), + node: relay_a.clone(), }, addr: Some((Ipv4Addr::new(203, 0, 113, 1), 1234).into()), }; - update_report(&mut report, probe_report_eu.clone()); + update_report(&mut report, probe_report_a.clone()); assert!(report.udp); assert_eq!( - report.relay_latency.get(&eu_relayer.url).unwrap(), + report.relay_latency.get(&relay_a.url).unwrap(), Duration::from_millis(5) ); assert_eq!( - report.relay_v4_latency.get(&eu_relayer.url).unwrap(), + report.relay_v4_latency.get(&relay_a.url).unwrap(), Duration::from_millis(5) ); assert!(report.ipv4_can_send); assert!(!report.ipv6_can_send); // A second STUN IPv4 probe, same external IP detected but slower. - let probe_report_na = ProbeReport { + let probe_report_b = ProbeReport { latency: Some(Duration::from_millis(8)), probe: Probe::StunIpv4 { delay: Duration::ZERO, - node: na_relayer.clone(), + node: relay_b.clone(), }, - ..probe_report_eu + ..probe_report_a }; - update_report(&mut report, probe_report_na); + update_report(&mut report, probe_report_b); assert!(report.udp); assert_eq!( - report.relay_latency.get(&eu_relayer.url).unwrap(), + report.relay_latency.get(&relay_a.url).unwrap(), Duration::from_millis(5) ); assert_eq!( - report.relay_v4_latency.get(&eu_relayer.url).unwrap(), + report.relay_v4_latency.get(&relay_a.url).unwrap(), Duration::from_millis(5) ); assert!(report.ipv4_can_send); assert!(!report.ipv6_can_send); // A STUN IPv6 probe, this one is faster. - let probe_report_eu_ipv6 = ProbeReport { + let probe_report_a_ipv6 = ProbeReport { ipv4_can_send: false, ipv6_can_send: true, icmpv4: None, @@ -1249,29 +1246,30 @@ mod tests { latency: Some(Duration::from_millis(4)), probe: Probe::StunIpv6 { delay: Duration::ZERO, - node: eu_relayer.clone(), + node: relay_a.clone(), }, addr: Some((Ipv6Addr::new(2001, 0xdb8, 0, 0, 0, 0, 0, 1), 1234).into()), }; - update_report(&mut report, probe_report_eu_ipv6); + update_report(&mut report, probe_report_a_ipv6); assert!(report.udp); assert_eq!( - report.relay_latency.get(&eu_relayer.url).unwrap(), + report.relay_latency.get(&relay_a.url).unwrap(), Duration::from_millis(4) ); assert_eq!( - report.relay_v6_latency.get(&eu_relayer.url).unwrap(), + report.relay_v6_latency.get(&relay_a.url).unwrap(), Duration::from_millis(4) ); assert!(report.ipv4_can_send); assert!(report.ipv6_can_send); } - #[test] - fn test_update_report_icmp() { - let eu_relayer = Arc::new(default_eu_relay_node()); - let na_relayer = Arc::new(default_na_relay_node()); + #[tokio::test] + async fn test_update_report_icmp() { + let _logging = iroh_test::logging::setup(); + let (_server_a, relay_a) = test_utils::relay().await; + let (_server_b, relay_b) = test_utils::relay().await; let mut report = Report::default(); @@ -1284,7 +1282,7 @@ mod tests { latency: Some(Duration::from_millis(5)), probe: Probe::IcmpV4 { delay: Duration::ZERO, - node: eu_relayer.clone(), + node: relay_a.clone(), }, addr: Some((Ipv4Addr::new(203, 0, 113, 1), 1234).into()), }; @@ -1303,7 +1301,7 @@ mod tests { latency: None, probe: Probe::IcmpV4 { delay: Duration::ZERO, - node: na_relayer.clone(), + node: relay_b.clone(), }, addr: None, }; @@ -1320,7 +1318,7 @@ mod tests { latency: Some(Duration::from_millis(5)), probe: Probe::StunIpv4 { delay: Duration::ZERO, - node: eu_relayer.clone(), + node: relay_a.clone(), }, addr: Some((Ipv4Addr::new(203, 0, 113, 1), 1234).into()), }; @@ -1378,18 +1376,14 @@ mod tests { // // TODO: Not sure what about IPv6 pings using sysctl. #[tokio::test] - async fn test_icmpk_probe_eu_relayer() { + async fn test_icmpk_probe() { let _logging_guard = iroh_test::logging::setup(); let pinger = Pinger::new(); - let relay = default_eu_relay_node(); - let resolver = crate::dns::default_resolver(); - let addr = get_relay_addr(resolver, &relay, ProbeProto::IcmpV4) - .await - .map_err(|err| format!("{err:#}")) - .unwrap(); + let (server, node) = test_utils::relay().await; + let addr = server.stun_addr().expect("test relay serves stun"); let probe = Probe::IcmpV4 { delay: Duration::from_secs(0), - node: Arc::new(relay), + node, }; // A single ICMP packet might get lost. Try several and take the first. @@ -1434,20 +1428,16 @@ mod tests { #[tokio::test] async fn test_measure_https_latency() -> TestResult { let _logging_guard = iroh_test::logging::setup(); - let (_relay_map, relay_url, server) = test_utils::run_relay_server().await?; + let (server, relay) = test_utils::relay().await; let dns_resolver = crate::dns::resolver(); - warn!(?relay_url, "RELAY_URL"); - let node = RelayNode { - stun_only: false, - stun_port: 0, - url: relay_url.clone(), - }; + tracing::info!(relay_url = ?relay.url , "RELAY_URL"); let (latency, ip) = - measure_https_latency(dns_resolver, &node, server.certificates()).await?; + measure_https_latency(dns_resolver, &relay, server.certificates()).await?; assert!(latency > Duration::ZERO); - let relay_url_ip = relay_url + let relay_url_ip = relay + .url .host_str() .context("host")? .parse::()?; diff --git a/iroh-net/src/netcheck/reportgen/probes.rs b/iroh-net/src/netcheck/reportgen/probes.rs index 4850a0e49c..f2cd400aad 100644 --- a/iroh-net/src/netcheck/reportgen/probes.rs +++ b/iroh-net/src/netcheck/reportgen/probes.rs @@ -473,7 +473,7 @@ mod tests { use pretty_assertions::assert_eq; use super::*; - use crate::{defaults::staging::default_relay_map, netcheck::RelayLatencies}; + use crate::netcheck::{test_utils, RelayLatencies}; /// Shorthand which declares a new ProbeSet. /// @@ -497,7 +497,7 @@ mod tests { #[tokio::test] async fn test_initial_probeplan() { - let relay_map = default_relay_map(); + let (_servers, relay_map) = test_utils::relay_map(2).await; let relay_node_1 = relay_map.nodes().next().unwrap(); let relay_node_2 = relay_map.nodes().nth(1).unwrap(); let if_state = interfaces::State::fake(); @@ -590,12 +590,14 @@ mod tests { #[tokio::test] async fn test_plan_with_report() { + let _logging = iroh_test::logging::setup(); + let (_servers, relay_map) = test_utils::relay_map(2).await; + let relay_node_1 = relay_map.nodes().next().unwrap().clone(); + let relay_node_2 = relay_map.nodes().nth(1).unwrap().clone(); + let if_state = interfaces::State::fake(); + for i in 0..10 { println!("round {}", i); - let relay_map = default_relay_map(); - let relay_node_1 = relay_map.nodes().next().unwrap().clone(); - let relay_node_2 = relay_map.nodes().nth(1).unwrap().clone(); - let if_state = interfaces::State::fake(); let mut latencies = RelayLatencies::new(); latencies.update_relay(relay_node_1.url.clone(), Duration::from_millis(2)); latencies.update_relay(relay_node_2.url.clone(), Duration::from_millis(2)); @@ -744,9 +746,10 @@ mod tests { } } - #[test] - fn test_relay_sort_two_latencies() { - let relay_map = default_relay_map(); + #[tokio::test] + async fn test_relay_sort_two_latencies() { + let _logging = iroh_test::logging::setup(); + let (_servers, relay_map) = test_utils::relay_map(2).await; let r1 = relay_map.nodes().next().unwrap(); let r2 = relay_map.nodes().nth(1).unwrap(); let last_report = create_last_report( @@ -762,9 +765,10 @@ mod tests { assert_eq!(sorted, vec![&r1.url, &r2.url]); } - #[test] - fn test_relay_sort_equal_latencies() { - let relay_map = default_relay_map(); + #[tokio::test] + async fn test_relay_sort_equal_latencies() { + let _logging = iroh_test::logging::setup(); + let (_servers, relay_map) = test_utils::relay_map(2).await; let r1 = relay_map.nodes().next().unwrap(); let r2 = relay_map.nodes().nth(1).unwrap(); let last_report = create_last_report( @@ -780,9 +784,9 @@ mod tests { assert_eq!(sorted, vec![&r1.url, &r2.url]); } - #[test] - fn test_relay_sort_missing_latency() { - let relay_map = default_relay_map(); + #[tokio::test] + async fn test_relay_sort_missing_latency() { + let (_servers, relay_map) = test_utils::relay_map(2).await; let r1 = relay_map.nodes().next().unwrap(); let r2 = relay_map.nodes().nth(1).unwrap(); @@ -803,9 +807,10 @@ mod tests { assert_eq!(sorted, vec![&r1.url, &r2.url]); } - #[test] - fn test_relay_sort_no_latency() { - let relay_map = default_relay_map(); + #[tokio::test] + async fn test_relay_sort_no_latency() { + let _logging = iroh_test::logging::setup(); + let (_servers, relay_map) = test_utils::relay_map(2).await; let r1 = relay_map.nodes().next().unwrap(); let r2 = relay_map.nodes().nth(1).unwrap(); diff --git a/iroh-net/src/relay_map.rs b/iroh-net/src/relay_map.rs index bb48408086..727ccf61c2 100644 --- a/iroh-net/src/relay_map.rs +++ b/iroh-net/src/relay_map.rs @@ -112,11 +112,12 @@ impl RelayMap { } /// Constructs the [`RelayMap`] from an iterator of [`RelayNode`]s. - pub fn from_nodes(value: impl IntoIterator) -> Result { + pub fn from_nodes>>(value: impl IntoIterator) -> Result { let mut map = BTreeMap::new(); for node in value.into_iter() { + let node = node.into(); ensure!(!map.contains_key(&node.url), "Duplicate node url"); - map.insert(node.url.clone(), node.into()); + map.insert(node.url.clone(), node); } Ok(RelayMap { nodes: map.into() }) } diff --git a/iroh-relay/src/client.rs b/iroh-relay/src/client.rs index a6d52427dc..148c08463c 100644 --- a/iroh-relay/src/client.rs +++ b/iroh-relay/src/client.rs @@ -506,7 +506,13 @@ impl Actor { } msg_sender.send(res).await.ok(); } - Some(msg) = inbox.recv() => { + msg = inbox.recv() => { + let Some(msg) = msg else { + // Shutting down + self.close().await; + break; + }; + match msg { ActorMessage::Connect(s) => { let res = self.connect("actor msg").await.map(|(client, _)| (client)); @@ -546,11 +552,6 @@ impl Actor { }, } } - else => { - // Shutting down - self.close().await; - break; - } } } } diff --git a/iroh-relay/src/server.rs b/iroh-relay/src/server.rs index 9edad1cda4..bbe16fe090 100644 --- a/iroh-relay/src/server.rs +++ b/iroh-relay/src/server.rs @@ -24,6 +24,8 @@ use http::{ response::Builder as ResponseBuilder, HeaderMap, Method, Request, Response, StatusCode, }; use hyper::body::Incoming; +#[cfg(feature = "test-utils")] +use iroh_base::node_addr::RelayUrl; use iroh_metrics::inc; use tokio::{ net::{TcpListener, UdpSocket}, @@ -40,6 +42,8 @@ mod clients; mod http_server; mod metrics; pub(crate) mod streams; +#[cfg(feature = "test-utils")] +pub mod testing; pub use self::{ metrics::{Metrics, StunMetrics}, @@ -385,6 +389,30 @@ impl Server { pub fn certificates(&self) -> Option>> { self.certificates.clone() } + + /// Get the server's https [`RelayUrl`]. + /// + /// This uses [`Self::https_addr`] so it's mostly useful for local development. + #[cfg(feature = "test-utils")] + pub fn https_url(&self) -> Option { + self.https_addr.map(|addr| { + url::Url::parse(&format!("https://{addr}")) + .expect("valid url") + .into() + }) + } + + /// Get the server's http [`RelayUrl`]. + /// + /// This uses [`Self::http_addr`] so it's mostly useful for local development. + #[cfg(feature = "test-utils")] + pub fn http_url(&self) -> Option { + self.http_addr.map(|addr| { + url::Url::parse(&format!("http://{addr}")) + .expect("valid url") + .into() + }) + } } /// Supervisor for the relay server tasks. diff --git a/iroh-relay/src/server/testing.rs b/iroh-relay/src/server/testing.rs index 580d44addb..65508f1d38 100644 --- a/iroh-relay/src/server/testing.rs +++ b/iroh-relay/src/server/testing.rs @@ -1,75 +1,58 @@ -//! Internal utilities to support testing. +//! Exposes functions to quickly configure a server suitable for testing. use std::net::Ipv4Addr; -use anyhow::Result; -use tokio::sync::oneshot; +use super::{CertConfig, RelayConfig, ServerConfig, StunConfig, TlsConfig}; -use super::{CertConfig, RelayConfig, Server, ServerConfig, StunConfig, TlsConfig}; -use crate::{defaults::DEFAULT_STUN_PORT, RelayMap, RelayNode, RelayUrl}; - -/// A drop guard to clean up test infrastructure. -/// -/// After dropping the test infrastructure will asynchronously shutdown and release its -/// resources. -// Nightly sees the sender as dead code currently, but we only rely on Drop of the -// sender. -#[derive(Debug)] -#[allow(dead_code)] -pub struct CleanupDropGuard(pub(crate) oneshot::Sender<()>); - -/// Runs a relay server with STUN enabled suitable for tests. +/// Creates a [`StunConfig`] suitable for testing. /// -/// The returned `Url` is the url of the relay server in the returned [`RelayMap`]. -/// When dropped, the returned [`Server`] does will stop running. -pub async fn run_relay_server() -> Result<(RelayMap, RelayUrl, Server)> { - run_relay_server_with(Some(StunConfig { +/// To ensure port availability for testing, the port is configured to be assigned by the OS. +pub fn stun_config() -> StunConfig { + StunConfig { bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), - })) - .await + } } -/// Runs a relay server. +/// Creates a [`TlsConfig`] suitable for testing. /// -/// `stun` can be set to `None` to disable stun, or set to `Some` `StunConfig`, -/// to enable stun on a specific socket. -/// -/// The return value is similar to [`run_relay_server`]. -pub async fn run_relay_server_with( - stun: Option, -) -> Result<(RelayMap, RelayUrl, Server)> { +/// - Uses a self signed certificate valid for the `"localhost"` and `"127.0.0.1"` domains. +/// - Configures https to be served on an OS assigned port on ipv4. +pub fn tls_config() -> TlsConfig<()> { let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string(), "127.0.0.1".to_string()]) .expect("valid"); - let rustls_cert = rustls::pki_types::CertificateDer::from(cert.serialize_der().unwrap()); - let private_key = - rustls::pki_types::PrivatePkcs8KeyDer::from(cert.get_key_pair().serialize_der()); + let rustls_cert = cert.cert.der(); + let private_key = rustls::pki_types::PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()); let private_key = rustls::pki_types::PrivateKeyDer::from(private_key); + let certs = vec![rustls_cert.clone()]; + TlsConfig { + cert: CertConfig::<(), ()>::Manual { private_key, certs }, + https_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + } +} - let config = ServerConfig { - relay: Some(RelayConfig { - http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), - tls: Some(TlsConfig { - cert: CertConfig::<(), ()>::Manual { - private_key, - certs: vec![rustls_cert], - }, - https_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), - }), - limits: Default::default(), - }), - stun, +/// Creates a [`RelayConfig`] suitable for testing. +/// +/// - Binds http to an OS assigned port on ipv4. +/// - Uses [`tls_config`] to enable TLS. +/// - Uses default limits. +pub fn relay_config() -> RelayConfig<()> { + RelayConfig { + http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(), + tls: Some(tls_config()), + limits: Default::default(), + } +} + +/// Creates a [`ServerConfig`] suitable for testing. +/// +/// - Relaying is enabled using [`relay_config`] +/// - Stun is enabled using [`stun_config`] +/// - Metrics are not enabled. +pub fn server_config() -> ServerConfig<()> { + ServerConfig { + relay: Some(relay_config()), + stun: Some(stun_config()), #[cfg(feature = "metrics")] metrics_addr: None, - }; - let server = Server::spawn(config).await.unwrap(); - let url: RelayUrl = format!("https://{}", server.https_addr().expect("configured")) - .parse() - .unwrap(); - let m = RelayMap::from_nodes([RelayNode { - url: url.clone(), - stun_only: false, - stun_port: server.stun_addr().map_or(DEFAULT_STUN_PORT, |s| s.port()), - }]) - .unwrap(); - Ok((m, url, server)) + } } diff --git a/iroh-router/src/router.rs b/iroh-router/src/router.rs index bf4d290623..c650795222 100644 --- a/iroh-router/src/router.rs +++ b/iroh-router/src/router.rs @@ -117,7 +117,11 @@ impl RouterBuilder { break; }, // handle incoming p2p connections. - Some(incoming) = endpoint.accept() => { + incoming = endpoint.accept() => { + let Some(incoming) = incoming else { + break; + }; + let protocols = protocols.clone(); join_set.spawn(async move { handle_connection(incoming, protocols).await; @@ -144,7 +148,6 @@ impl RouterBuilder { _ => {} } }, - else => break, } } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 7012865b61..bb5f40e376 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -433,7 +433,6 @@ impl NodeInner { _ => {} } }, - else => break, } }