From beae3c72a5eb4c5893dd113aafccf2957e47f32f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Mon, 6 Jan 2025 23:26:16 -0500 Subject: [PATCH 1/7] fix(iroh-relay): removes deadlock in `Clients` Also downgrades "stream terminated" from a `warn` to a `trace`, so we don not get an warning everytime a connection to the relay closes --- iroh-relay/src/server/client.rs | 5 ++++- iroh-relay/src/server/clients.rs | 10 ++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/iroh-relay/src/server/client.rs b/iroh-relay/src/server/client.rs index f941b9dd0c9..b7d8c30784a 100644 --- a/iroh-relay/src/server/client.rs +++ b/iroh-relay/src/server/client.rs @@ -299,7 +299,10 @@ impl Actor { trace!(?maybe_frame, "handle incoming frame"); let frame = match maybe_frame { Some(frame) => frame?, - None => anyhow::bail!("stream terminated"), + None => { + tracing::trace!("stream terminated"); + return Ok(()); + } }; match frame { Frame::SendPacket { dst_key, packet } => { diff --git a/iroh-relay/src/server/clients.rs b/iroh-relay/src/server/clients.rs index 607f7960b9e..b7d48c9a220 100644 --- a/iroh-relay/src/server/clients.rs +++ b/iroh-relay/src/server/clients.rs @@ -85,8 +85,11 @@ impl Clients { /// Attempt to send a packet to client with [`NodeId`] `dst` pub(super) async fn send_packet(&self, dst: NodeId, data: Bytes, src: NodeId) -> Result<()> { + let mut res = None; if let Some(client) = self.0.clients.get(&dst) { - let res = client.try_send_packet(src, data); + res = Some(client.try_send_packet(src, data)); + } + if let Some(res) = res { return self.process_result(src, dst, res).await; } debug!(dst = dst.fmt_short(), "no connected client, dropped packet"); @@ -100,8 +103,11 @@ impl Clients { data: Bytes, src: NodeId, ) -> Result<()> { + let mut res = None; if let Some(client) = self.0.clients.get(&dst) { - let res = client.try_send_disco_packet(src, data); + res = Some(client.try_send_disco_packet(src, data)); + } + if let Some(res) = res { return self.process_result(src, dst, res).await; } debug!( From 87d39ecd1951dcc24f15072fb721130bd5fdc551 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Mon, 6 Jan 2025 23:31:59 -0500 Subject: [PATCH 2/7] add a comment --- iroh-relay/src/server/clients.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/iroh-relay/src/server/clients.rs b/iroh-relay/src/server/clients.rs index b7d48c9a220..07851daa6cf 100644 --- a/iroh-relay/src/server/clients.rs +++ b/iroh-relay/src/server/clients.rs @@ -90,6 +90,8 @@ impl Clients { res = Some(client.try_send_packet(src, data)); } if let Some(res) = res { + // `process_result` may call `unregister`, which itself calls `self.0.clients` + // so we can't nest this under the above call to `self.0.clients` return self.process_result(src, dst, res).await; } debug!(dst = dst.fmt_short(), "no connected client, dropped packet"); @@ -108,6 +110,8 @@ impl Clients { res = Some(client.try_send_disco_packet(src, data)); } if let Some(res) = res { + // `process_result` may call `unregister`, which itself calls `self.0.clients` + // so we can't nest this under the above call to `self.0.clients` return self.process_result(src, dst, res).await; } debug!( From e58586071ce63f53ce8c7d6c7f37ec4609d36707 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Tue, 7 Jan 2025 00:27:11 -0500 Subject: [PATCH 3/7] add back handle frame error / warning --- iroh-relay/src/server/client.rs | 3 +-- iroh/src/endpoint.rs | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/iroh-relay/src/server/client.rs b/iroh-relay/src/server/client.rs index b7d8c30784a..e5f505cc9aa 100644 --- a/iroh-relay/src/server/client.rs +++ b/iroh-relay/src/server/client.rs @@ -300,8 +300,7 @@ impl Actor { let frame = match maybe_frame { Some(frame) => frame?, None => { - tracing::trace!("stream terminated"); - return Ok(()); + anyhow::bail!("stream terminated"); } }; match frame { diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 02493efafd4..f4ae2d46f02 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -1663,6 +1663,7 @@ mod tests { send.finish().unwrap(); send.stopped().await.unwrap(); recv.read_to_end(0).await.unwrap(); + conn.close(0u16.into(), b""); info!(%i, peer = %peer_id.fmt_short(), "finished"); info!("[server] round {i} done in {:?}", round_start.elapsed()); } From 909d072f3b9153c39483ff96e5932d66bf8b1444 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Tue, 7 Jan 2025 18:54:31 +0100 Subject: [PATCH 4/7] refactor(iroh-relay): simplify structure in clients (#3100) --- iroh-relay/src/server/clients.rs | 85 +++++++++++++++++--------------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/iroh-relay/src/server/clients.rs b/iroh-relay/src/server/clients.rs index 07851daa6cf..e130572638a 100644 --- a/iroh-relay/src/server/clients.rs +++ b/iroh-relay/src/server/clients.rs @@ -11,7 +11,7 @@ use iroh_metrics::inc; use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, trace}; -use super::client::{Client, Config, Packet}; +use super::client::{Client, Config}; use crate::server::metrics::Metrics; /// Manages the connections to all currently connected clients. @@ -83,52 +83,54 @@ impl Clients { } } - /// Attempt to send a packet to client with [`NodeId`] `dst` + /// Attempt to send a packet to client with [`NodeId`] `dst`. pub(super) async fn send_packet(&self, dst: NodeId, data: Bytes, src: NodeId) -> Result<()> { - let mut res = None; - if let Some(client) = self.0.clients.get(&dst) { - res = Some(client.try_send_packet(src, data)); - } - if let Some(res) = res { - // `process_result` may call `unregister`, which itself calls `self.0.clients` - // so we can't nest this under the above call to `self.0.clients` - return self.process_result(src, dst, res).await; + let Some(client) = self.0.clients.get(&dst) else { + debug!(dst = dst.fmt_short(), "no connected client, dropped packet"); + inc!(Metrics, send_packets_dropped); + return Ok(()); + }; + match client.try_send_packet(src, data) { + Ok(_) => { + // Record sent_to relationship + self.0.sent_to.entry(src).or_default().insert(dst); + Ok(()) + } + Err(TrySendError::Full(_)) => { + debug!( + dst = dst.fmt_short(), + "client too busy to receive packet, dropping packet" + ); + bail!("failed to send message: full"); + } + Err(TrySendError::Closed(_)) => { + debug!( + dst = dst.fmt_short(), + "can no longer write to client, dropping message and pruning connection" + ); + drop(client); // avoid deadlock + self.unregister(dst).await; + bail!("failed to send message: gone"); + } } - debug!(dst = dst.fmt_short(), "no connected client, dropped packet"); - inc!(Metrics, send_packets_dropped); - Ok(()) } + /// Attempt to send a disco packet to client with [`NodeId`] `dst`. pub(super) async fn send_disco_packet( &self, dst: NodeId, data: Bytes, src: NodeId, ) -> Result<()> { - let mut res = None; - if let Some(client) = self.0.clients.get(&dst) { - res = Some(client.try_send_disco_packet(src, data)); - } - if let Some(res) = res { - // `process_result` may call `unregister`, which itself calls `self.0.clients` - // so we can't nest this under the above call to `self.0.clients` - return self.process_result(src, dst, res).await; - } - debug!( - dst = dst.fmt_short(), - "no connected client, dropped disco packet" - ); - inc!(Metrics, disco_packets_dropped); - Ok(()) - } - - async fn process_result( - &self, - src: NodeId, - dst: NodeId, - res: Result<(), TrySendError>, - ) -> Result<()> { - match res { + let Some(client) = self.0.clients.get(&dst) else { + debug!( + dst = dst.fmt_short(), + "no connected client, dropped disco packet" + ); + inc!(Metrics, disco_packets_dropped); + return Ok(()); + }; + match client.try_send_disco_packet(src, data) { Ok(_) => { // Record sent_to relationship self.0.sent_to.entry(src).or_default().insert(dst); @@ -137,17 +139,18 @@ impl Clients { Err(TrySendError::Full(_)) => { debug!( dst = dst.fmt_short(), - "client too busy to receive packet, dropping packet" + "client too busy to receive disco packet, dropping packet" ); - bail!("failed to send message"); + bail!("failed to send message: full"); } Err(TrySendError::Closed(_)) => { debug!( dst = dst.fmt_short(), - "can no longer write to client, dropping message and pruning connection" + "can no longer write to client, dropping disco message and pruning connection" ); + drop(client); // avoid deadlock self.unregister(dst).await; - bail!("failed to send message"); + bail!("failed to send message: gone"); } } } From 725f7299480fef8dfe52b31bc9ab57a41ce7794d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Tue, 7 Jan 2025 12:56:17 -0500 Subject: [PATCH 5/7] fix fmt --- iroh-relay/src/server/client.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/iroh-relay/src/server/client.rs b/iroh-relay/src/server/client.rs index e5f505cc9aa..f941b9dd0c9 100644 --- a/iroh-relay/src/server/client.rs +++ b/iroh-relay/src/server/client.rs @@ -299,9 +299,7 @@ impl Actor { trace!(?maybe_frame, "handle incoming frame"); let frame = match maybe_frame { Some(frame) => frame?, - None => { - anyhow::bail!("stream terminated"); - } + None => anyhow::bail!("stream terminated"), }; match frame { Frame::SendPacket { dst_key, packet } => { From 7d99b48b879ae83e6cb295b1169e81c4a426f8b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Tue, 7 Jan 2025 12:57:20 -0500 Subject: [PATCH 6/7] revert unnecessary test change --- iroh/src/endpoint.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index f4ae2d46f02..02493efafd4 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -1663,7 +1663,6 @@ mod tests { send.finish().unwrap(); send.stopped().await.unwrap(); recv.read_to_end(0).await.unwrap(); - conn.close(0u16.into(), b""); info!(%i, peer = %peer_id.fmt_short(), "finished"); info!("[server] round {i} done in {:?}", round_start.elapsed()); } From 0ef80930eb4002ce76a22d73b57e636b00b1d449 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Tue, 7 Jan 2025 13:23:13 -0500 Subject: [PATCH 7/7] add `client` param that gets explicitly dropped in `register` --- iroh-relay/src/server/clients.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/iroh-relay/src/server/clients.rs b/iroh-relay/src/server/clients.rs index e130572638a..322df35b789 100644 --- a/iroh-relay/src/server/clients.rs +++ b/iroh-relay/src/server/clients.rs @@ -56,7 +56,14 @@ impl Clients { /// Removes the client from the map of clients, & sends a notification /// to each client that peers has sent data to, to let them know that /// peer is gone from the network. - async fn unregister(&self, node_id: NodeId) { + /// + /// Explicitly drops the reference to the client to avoid deadlock. + async fn unregister<'a>( + &self, + client: dashmap::mapref::one::Ref<'a, iroh_base::PublicKey, Client>, + node_id: NodeId, + ) { + drop(client); // avoid deadlock trace!(node_id = node_id.fmt_short(), "unregistering client"); if let Some((_, client)) = self.0.clients.remove(&node_id) { @@ -108,8 +115,7 @@ impl Clients { dst = dst.fmt_short(), "can no longer write to client, dropping message and pruning connection" ); - drop(client); // avoid deadlock - self.unregister(dst).await; + self.unregister(client, dst).await; bail!("failed to send message: gone"); } } @@ -148,8 +154,7 @@ impl Clients { dst = dst.fmt_short(), "can no longer write to client, dropping disco message and pruning connection" ); - drop(client); // avoid deadlock - self.unregister(dst).await; + self.unregister(client, dst).await; bail!("failed to send message: gone"); } } @@ -225,8 +230,11 @@ mod tests { } ); - // send peer_gone - clients.unregister(a_key).await; + let client = clients.0.clients.get(&a_key).unwrap(); + + // send peer_gone. Also, tests that we do not get a deadlock + // when unregistering. + clients.unregister(client, a_key).await; assert!(!clients.0.clients.contains_key(&a_key)); clients.shutdown().await;