Skip to content

Commit

Permalink
Merge branch 'remove-docs-cli' into remove-gossip-cli
Browse files Browse the repository at this point in the history
# Conflicts:
#	iroh-cli/src/commands.rs
  • Loading branch information
rklaehn committed Nov 18, 2024
2 parents a3cfb8a + 62e9d6c commit 2209fe4
Show file tree
Hide file tree
Showing 15 changed files with 271 additions and 247 deletions.
6 changes: 3 additions & 3 deletions iroh-cli/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ pub(crate) mod doctor;
pub(crate) mod net;
pub(crate) mod rpc;
pub(crate) mod start;
pub use iroh_blobs::{cli as blobs, cli::tags};
pub use iroh_docs::{cli as docs, cli::authors};
pub use iroh_gossip::cli as gossip;
pub(crate) use iroh_blobs::{cli as blobs, cli::tags};
pub(crate) use iroh_docs::{cli as docs, cli::authors};
pub(crate) use iroh_gossip::cli as gossip;

/// iroh is a tool for building distributed apps.
///
Expand Down
25 changes: 12 additions & 13 deletions iroh-dns-server/src/http/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,8 @@ impl TlsAcceptor {
let cert_path = dir.join(format!("{keyname}.crt"));
let key_path = dir.join(format!("{keyname}.key"));

let (certs, secret_key) = tokio::task::spawn_blocking(move || {
let certs = load_certs(cert_path)?;
let key = load_secret_key(key_path)?;
anyhow::Ok((certs, key))
})
.await??;
let certs = load_certs(cert_path).await?;
let secret_key = load_secret_key(key_path).await?;

let config = config.with_single_cert(certs, secret_key)?;
let config = RustlsConfig::from_config(Arc::new(config));
Expand Down Expand Up @@ -136,23 +132,26 @@ impl TlsAcceptor {
}
}

fn load_certs(
async fn load_certs(
filename: impl AsRef<Path>,
) -> Result<Vec<rustls::pki_types::CertificateDer<'static>>> {
let certfile = std::fs::File::open(filename).context("cannot open certificate file")?;
let mut reader = std::io::BufReader::new(certfile);

let certfile = tokio::fs::read(filename)
.await
.context("cannot open certificate file")?;
let mut reader = std::io::Cursor::new(certfile);
let certs: Result<Vec<_>, std::io::Error> = rustls_pemfile::certs(&mut reader).collect();
let certs = certs?;

Ok(certs)
}

fn load_secret_key(
async fn load_secret_key(
filename: impl AsRef<Path>,
) -> Result<rustls::pki_types::PrivateKeyDer<'static>> {
let keyfile = std::fs::File::open(filename.as_ref()).context("cannot open secret key file")?;
let mut reader = std::io::BufReader::new(keyfile);
let keyfile = tokio::fs::read(filename.as_ref())
.await
.context("cannot open secret key file")?;
let mut reader = std::io::Cursor::new(keyfile);

loop {
match rustls_pemfile::read_one(&mut reader).context("cannot parse secret key .pem file")? {
Expand Down
14 changes: 2 additions & 12 deletions iroh-dns-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,11 @@
#![allow(unused_imports)]

use std::{
future::Future,
net::{Ipv4Addr, SocketAddr},
path::PathBuf,
};
use std::path::PathBuf;

use anyhow::Result;
use axum::{routing::get, Router};
use clap::Parser;
use futures_lite::FutureExt;
use iroh_dns_server::{
config::Config, metrics::init_metrics, server::run_with_config_until_ctrl_c,
};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, error, error_span, Instrument, Span};
use tracing::debug;

#[derive(Parser, Debug)]
struct Cli {
Expand Down
1 change: 1 addition & 0 deletions iroh-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ iroh-net = { path = "." }
serde_json = "1.0.107"
testresult = "0.4.0"
mainline = "2.0.1"
iroh-relay = { version = "0.28", path = "../iroh-relay", features = ["test-utils", "server"] }

[[bench]]
name = "key"
Expand Down
41 changes: 34 additions & 7 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1767,10 +1767,22 @@ impl Actor {
discovery_events = events;
}
}

let mut receiver_closed = false;
let mut portmap_watcher_closed = false;
let mut link_change_closed = false;
loop {
inc!(Metrics, actor_tick_main);
tokio::select! {
Some(msg) = self.msg_receiver.recv() => {
msg = self.msg_receiver.recv(), if !receiver_closed => {
let Some(msg) = msg else {
trace!("tick: magicsock receiver closed");
inc!(Metrics, actor_tick_other);

receiver_closed = true;
continue;
};

trace!(?msg, "tick: msg");
inc!(Metrics, actor_tick_msg);
if self.handle_actor_message(msg).await {
Expand All @@ -1782,7 +1794,15 @@ impl Actor {
inc!(Metrics, actor_tick_re_stun);
self.msock.re_stun("periodic");
}
Ok(()) = portmap_watcher.changed() => {
change = portmap_watcher.changed(), if !portmap_watcher_closed => {
if change.is_err() {
trace!("tick: portmap watcher closed");
inc!(Metrics, actor_tick_other);

portmap_watcher_closed = true;
continue;
}

trace!("tick: portmap changed");
inc!(Metrics, actor_tick_portmap_changed);
let new_external_address = *portmap_watcher.borrow();
Expand All @@ -1809,22 +1829,29 @@ impl Actor {
self.refresh_direct_addrs(reason).await;
}
}
Some(is_major) = link_change_r.recv() => {
is_major = link_change_r.recv(), if !link_change_closed => {
let Some(is_major) = is_major else {
trace!("tick: link change receiver closed");
inc!(Metrics, actor_tick_other);

link_change_closed = true;
continue;
};

trace!("tick: link change {}", is_major);
inc!(Metrics, actor_link_change);
self.handle_network_change(is_major).await;
}
// Even if `discovery_events` yields `None`, it could begin to yield
// `Some` again in the future, so we don't want to disable this branch
// forever like we do with the other branches that yield `Option`s
Some(discovery_item) = discovery_events.next() => {
trace!("tick: discovery event, address discovered: {discovery_item:?}");
let node_addr = NodeAddr {node_id: discovery_item.node_id, info: discovery_item.addr_info};
if let Err(e) = self.msock.add_node_addr(node_addr.clone(), Source::Discovery { name: discovery_item.provenance.into() }) {
warn!(?node_addr, "unable to add discovered node address to the node map: {e:?}");
}
}
else => {
trace!("tick: other");
inc!(Metrics, actor_tick_other);
}
}
}
}
Expand Down
46 changes: 30 additions & 16 deletions iroh-net/src/magicsock/relay_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ impl ActiveRelay {
self.relay_client.connect().await.context("keepalive")?;
}
tokio::select! {
Some(msg) = inbox.recv() => {
msg = inbox.recv() => {
let Some(msg) = msg else {
debug!("all clients closed");
break;
};

trace!("tick: inbox: {:?}", msg);
match msg {
ActiveRelayMessage::GetLastWrite(r) => {
Expand Down Expand Up @@ -144,6 +149,7 @@ impl ActiveRelay {
}
}
}

msg = self.relay_client_receiver.recv() => {
trace!("tick: relay_client_receiver");
if let Some(msg) = msg {
Expand All @@ -153,10 +159,6 @@ impl ActiveRelay {
}
}
}
else => {
debug!("all clients closed");
break;
}
}
}
debug!("exiting");
Expand Down Expand Up @@ -301,25 +303,37 @@ impl RelayActor {
trace!("shutting down");
break;
}
Some(Ok((url, ping_success))) = self.ping_tasks.join_next() => {
if !ping_success {
with_cancel(
self.cancel_token.child_token(),
self.close_or_reconnect_relay(&url, "rebind-ping-fail")
).await;
// `ping_tasks` being empty is a normal situation - in fact it starts empty
// until a `MaybeCloseRelaysOnRebind` message is received.
Some(task_result) = self.ping_tasks.join_next() => {
match task_result {
Ok((url, ping_success)) => {
if !ping_success {
with_cancel(
self.cancel_token.child_token(),
self.close_or_reconnect_relay(&url, "rebind-ping-fail")
).await;
}
}

Err(err) => {
warn!("ping task error: {:?}", err);
}
}
}
Some(msg) = receiver.recv() => {

msg = receiver.recv() => {
let Some(msg) = msg else {
trace!("shutting down relay recv loop");
break;
};

with_cancel(self.cancel_token.child_token(), self.handle_msg(msg)).await;
}
_ = cleanup_timer.tick() => {
trace!("tick: cleanup");
with_cancel(self.cancel_token.child_token(), self.clean_stale_relay()).await;
}
else => {
trace!("shutting down relay recv loop");
break;
}
}
}

Expand Down
99 changes: 41 additions & 58 deletions iroh-net/src/netcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,46 @@ pub(crate) fn os_has_ipv6() -> bool {
UdpSocket::bind_local_v6(0).is_ok()
}

#[cfg(test)]
mod test_utils {
//! Creates a relay server against which to perform tests
use std::sync::Arc;

use iroh_relay::server;

use crate::RelayNode;

pub(crate) async fn relay() -> (server::Server, Arc<RelayNode>) {
let server = server::Server::spawn(server::testing::server_config())
.await
.expect("should serve relay");
let node_desc = RelayNode {
url: server.https_url().expect("should work as relay"),
stun_only: false, // the checks above and below guarantee both stun and relay
stun_port: server.stun_addr().expect("server should serve stun").port(),
};

(server, Arc::new(node_desc))
}

/// Create a [`crate::RelayMap`] of the given size.
///
/// This function uses [`relay`]. Note that the returned map uses internal order that will
/// often _not_ match the order of the servers.
pub(crate) async fn relay_map(relays: usize) -> (Vec<server::Server>, crate::RelayMap) {
let mut servers = Vec::with_capacity(relays);
let mut nodes = Vec::with_capacity(relays);
for _ in 0..relays {
let (relay_server, node) = relay().await;
servers.push(relay_server);
nodes.push(node);
}
let map = crate::RelayMap::from_nodes(nodes).expect("unuque urls");
(servers, map)
}
}

#[cfg(test)]
mod tests {
use std::net::Ipv4Addr;
Expand All @@ -778,11 +818,7 @@ mod tests {
use tracing::info;

use super::*;
use crate::{
defaults::{staging::EU_RELAY_HOSTNAME, DEFAULT_STUN_PORT},
ping::Pinger,
RelayNode,
};
use crate::ping::Pinger;

mod stun_utils {
//! Utils for testing that expose a simple stun server.
Expand All @@ -799,8 +835,6 @@ mod tests {
use super::*;
use crate::{RelayMap, RelayNode, RelayUrl};

// TODO: make all this private

/// A drop guard to clean up test infrastructure.
///
/// After dropping the test infrastructure will asynchronously shutdown and release its
Expand Down Expand Up @@ -956,57 +990,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_iroh_computer_stun() -> Result<()> {
let _guard = iroh_test::logging::setup();

let resolver = crate::dns::default_resolver().clone();
let mut client = Client::new(None, resolver).context("failed to create netcheck client")?;
let url: RelayUrl = format!("https://{}", EU_RELAY_HOSTNAME).parse().unwrap();

let dm = RelayMap::from_nodes([RelayNode {
url: url.clone(),
stun_only: true,
stun_port: DEFAULT_STUN_PORT,
}])
.expect("hardcoded");

for i in 0..10 {
println!("starting report {}", i + 1);
let now = Instant::now();

let r = client
.get_report(dm.clone(), None, None)
.await
.context("failed to get netcheck report")?;

if r.udp {
assert_eq!(
r.relay_latency.len(),
1,
"expected 1 key in RelayLatency; got {}",
r.relay_latency.len()
);
assert!(
r.relay_latency.iter().next().is_some(),
"expected key 1 in RelayLatency; got {:?}",
r.relay_latency
);
assert!(
r.global_v4.is_some() || r.global_v6.is_some(),
"expected at least one of global_v4 or global_v6"
);
assert!(r.preferred_relay.is_some());
} else {
eprintln!("missing UDP, probe not returned by network");
}

println!("report {} done in {:?}", i + 1, now.elapsed());
}

Ok(())
}

#[tokio::test]
async fn test_udp_blocked() -> Result<()> {
let _guard = iroh_test::logging::setup();
Expand Down
Loading

0 comments on commit 2209fe4

Please sign in to comment.