diff --git a/Cargo.lock b/Cargo.lock index fc470e7718..3feccf73a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1130,15 +1130,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" -[[package]] -name = "fastrand" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" -dependencies = [ - "instant", -] - [[package]] name = "fastrand" version = "2.3.0" @@ -1151,12 +1142,6 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" -[[package]] -name = "fixedbitset" -version = "0.5.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" - [[package]] name = "flume" version = "0.11.1" @@ -1237,21 +1222,6 @@ dependencies = [ "futures-sink", ] -[[package]] -name = "futures-concurrency" -version = "7.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b724496da7c26fcce66458526ce68fc2ecf4aaaa994281cf322ded5755520c" -dependencies = [ - "fixedbitset", - "futures-buffered", - "futures-core", - "futures-lite 1.13.0", - "pin-project", - "slab", - "smallvec", -] - [[package]] name = "futures-core" version = "0.3.31" @@ -1275,28 +1245,13 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" -[[package]] -name = "futures-lite" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" -dependencies = [ - "fastrand 1.9.0", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - [[package]] name = "futures-lite" version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cef40d21ae2c515b51041df9ed313ed21e572df340ea58a922a0aefe7e8891a1" dependencies = [ - "fastrand 2.3.0", + "fastrand", "futures-core", "futures-io", "parking", @@ -2068,8 +2023,7 @@ dependencies = [ "derive_more", "ed25519-dalek", "futures-buffered", - "futures-concurrency", - "futures-lite 2.5.0", + "futures-lite", "futures-sink", "futures-util", "governor 0.7.0", @@ -2169,7 +2123,7 @@ dependencies = [ "anyhow", "bytes", "clap", - "futures-lite 2.5.0", + "futures-lite", "hdrhistogram", "iroh", "iroh-metrics", @@ -2196,7 +2150,7 @@ dependencies = [ "criterion", "derive_more", "dirs-next", - "futures-lite 2.5.0", + "futures-lite", "governor 0.6.3", "hickory-resolver", "hickory-server", @@ -2261,7 +2215,7 @@ dependencies = [ "bytes", "derive_more", "futures-buffered", - "futures-lite 2.5.0", + "futures-lite", "hickory-resolver", "iroh-base", "iroh-metrics", @@ -2345,7 +2299,7 @@ dependencies = [ "data-encoding", "derive_more", "futures-buffered", - "futures-lite 2.5.0", + "futures-lite", "futures-sink", "futures-util", "governor 0.7.0", @@ -2804,7 +2758,7 @@ dependencies = [ "atomic-waker", "bytes", "derive_more", - "futures-lite 2.5.0", + "futures-lite", "futures-sink", "futures-util", "iroh-quinn-udp", @@ -3279,7 +3233,7 @@ dependencies = [ "base64", "bytes", "derive_more", - "futures-lite 2.5.0", + "futures-lite", "futures-util", "igd-next", "iroh-metrics", @@ -4516,7 +4470,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ "cfg-if", - "fastrand 2.3.0", + "fastrand", "once_cell", "rustix", "windows-sys 0.59.0", @@ -5133,12 +5087,6 @@ dependencies = [ "libc", ] -[[package]] -name = "waker-fn" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" - [[package]] name = "walkdir" version = "2.5.0" diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 220700700b..39aff890d1 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -36,7 +36,6 @@ derive_more = { version = "1.0.0", features = [ ] } ed25519-dalek = "2.0" futures-buffered = "0.2.8" -futures-concurrency = "7.6" futures-lite = "2.5" futures-sink = "0.3" futures-util = "0.3" diff --git a/iroh/src/endpoint/rtt_actor.rs b/iroh/src/endpoint/rtt_actor.rs index c1a9748ef5..e9b0b1aecb 100644 --- a/iroh/src/endpoint/rtt_actor.rs +++ b/iroh/src/endpoint/rtt_actor.rs @@ -1,17 +1,14 @@ //! Actor which coordinates the congestion controller for the magic socket -use std::collections::HashMap; +use std::{pin::Pin, task::Poll}; -use futures_concurrency::stream::stream_group; -use futures_lite::StreamExt; +use futures_buffered::MergeUnbounded; +use futures_lite::{Stream, StreamExt}; use iroh_base::NodeId; use iroh_metrics::inc; -use tokio::{ - sync::{mpsc, Notify}, - time::Duration, -}; +use tokio::sync::mpsc; use tokio_util::task::AbortOnDropHandle; -use tracing::{debug, error, info_span, trace, Instrument}; +use tracing::{debug, info_span, Instrument}; use crate::{magicsock::ConnectionType, metrics::MagicsockMetrics, watchable::WatcherStream}; @@ -25,9 +22,7 @@ pub(super) struct RttHandle { impl RttHandle { pub(super) fn new() -> Self { let mut actor = RttActor { - connection_events: stream_group::StreamGroup::new().keyed(), - connections: HashMap::new(), - tick: Notify::new(), + connection_events: Default::default(), }; let (msg_tx, msg_rx) = mpsc::channel(16); let handle = tokio::spawn( @@ -61,21 +56,57 @@ pub(super) enum RttMessage { /// /// The magic socket can change the underlying network path, between two nodes. If we can /// inform the QUIC congestion controller of this event it will work much more efficiently. -#[derive(Debug)] +#[derive(derive_more::Debug)] struct RttActor { /// Stream of connection type changes. - connection_events: stream_group::Keyed>, - /// References to the connections. - /// - /// These are weak references so not to keep the connections alive. The key allows - /// removing the corresponding stream from `conn_type_changes`. - /// The boolean is an indiciator of whether this connection was direct before. + #[debug("MergeUnbounded>")] + connection_events: MergeUnbounded, +} + +#[derive(Debug)] +struct MappedStream { + stream: WatcherStream, + node_id: NodeId, + /// Reference to the connection. + connection: quinn::WeakConnectionHandle, + /// This an indiciator of whether this connection was direct before. /// This helps establish metrics on number of connections that became direct. - connections: HashMap, - /// A way to notify the main actor loop to run over. + was_direct_before: bool, +} + +impl Stream for MappedStream { + type Item = ConnectionType; + + /// Performs the congestion controller reset for a magic socket path change. /// - /// E.g. when a new stream was added. - tick: Notify, + /// Regardless of which kind of path we are changed to, the congestion controller needs + /// resetting. Even when switching to mixed we should reset the state as e.g. switching + /// from direct to mixed back to direct should be a rare exception and is a bug if this + /// happens commonly. + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Ready(Some(new_conn_type)) => { + if self.connection.network_path_changed() { + debug!( + node_id = %self.node_id.fmt_short(), + new_type = ?new_conn_type, + "Congestion controller state reset", + ); + if !self.was_direct_before && matches!(new_conn_type, ConnectionType::Direct(_)) + { + self.was_direct_before = true; + inc!(MagicsockMetrics, connection_became_direct); + } + } + Poll::Ready(Some(new_conn_type)) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } } impl RttActor { @@ -83,8 +114,6 @@ impl RttActor { /// /// The main loop will finish when the sender is dropped. async fn run(&mut self, mut msg_rx: mpsc::Receiver) { - let mut cleanup_interval = tokio::time::interval(Duration::from_secs(5)); - cleanup_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { tokio::select! { biased; @@ -94,11 +123,7 @@ impl RttActor { None => break, } } - item = self.connection_events.next(), if !self.connection_events.is_empty() => { - self.do_reset_rtt(item); - } - _ = cleanup_interval.tick() => self.do_connections_cleanup(), - () = self.tick.notified() => continue, + _item = self.connection_events.next(), if !self.connection_events.is_empty() => {} } } debug!("rtt-actor finished"); @@ -124,82 +149,12 @@ impl RttActor { conn_type_changes: WatcherStream, node_id: NodeId, ) { - let key = self.connection_events.insert(conn_type_changes); - self.connections.insert(key, (connection, node_id, false)); - self.tick.notify_one(); - inc!(MagicsockMetrics, connection_handshake_success); - } - - /// Performs the congestion controller reset for a magic socket path change. - /// - /// Regardless of which kind of path we are changed to, the congestion controller needs - /// resetting. Even when switching to mixed we should reset the state as e.g. switching - /// from direct to mixed back to direct should be a rare exception and is a bug if this - /// happens commonly. - fn do_reset_rtt(&mut self, item: Option<(stream_group::Key, ConnectionType)>) { - match item { - Some((key, new_conn_type)) => match self.connections.get_mut(&key) { - Some((handle, node_id, was_direct_before)) => { - if handle.network_path_changed() { - debug!( - node_id = %node_id.fmt_short(), - new_type = ?new_conn_type, - "Congestion controller state reset", - ); - if !*was_direct_before && matches!(new_conn_type, ConnectionType::Direct(_)) - { - *was_direct_before = true; - inc!(MagicsockMetrics, connection_became_direct); - } - } else { - debug!( - node_id = %node_id.fmt_short(), - "removing dropped connection", - ); - self.connection_events.remove(key); - } - } - None => error!("No connection found for stream item"), - }, - None => { - trace!("No more connections"); - } - } - } - - /// Performs cleanup for closed connection. - fn do_connections_cleanup(&mut self) { - for (key, (handle, node_id, _)) in self.connections.iter() { - if !handle.is_alive() { - trace!(node_id = %node_id.fmt_short(), "removing stale connection"); - self.connection_events.remove(*key); - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_actor_mspc_close() { - let mut actor = RttActor { - connection_events: stream_group::StreamGroup::new().keyed(), - connections: HashMap::new(), - tick: Notify::new(), - }; - let (msg_tx, msg_rx) = mpsc::channel(16); - let handle = tokio::spawn(async move { - actor.run(msg_rx).await; + self.connection_events.push(MappedStream { + stream: conn_type_changes, + connection, + node_id, + was_direct_before: false, }); - - // Dropping the msg_tx should stop the actor - drop(msg_tx); - - let task_res = tokio::time::timeout(Duration::from_secs(5), handle) - .await - .expect("timeout - actor did not finish"); - assert!(task_res.is_ok()); + inc!(MagicsockMetrics, connection_handshake_success); } }