Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(iroh-relay): removes deadlock in Clients #3099

Merged
merged 7 commits into from
Jan 7, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 56 additions & 35 deletions iroh-relay/src/server/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use iroh_metrics::inc;
use tokio::sync::mpsc::error::TrySendError;
use tracing::{debug, trace};

use super::client::{Client, Config, Packet};
use super::client::{Client, Config};
use crate::server::metrics::Metrics;

/// Manages the connections to all currently connected clients.
Expand Down Expand Up @@ -56,7 +56,14 @@ impl Clients {
/// Removes the client from the map of clients, & sends a notification
/// to each client that peers has sent data to, to let them know that
/// peer is gone from the network.
async fn unregister(&self, node_id: NodeId) {
///
/// Explicitly drops the reference to the client to avoid deadlock.
async fn unregister<'a>(
&self,
client: dashmap::mapref::one::Ref<'a, iroh_base::PublicKey, Client>,
node_id: NodeId,
) {
drop(client); // avoid deadlock
trace!(node_id = node_id.fmt_short(), "unregistering client");

if let Some((_, client)) = self.0.clients.remove(&node_id) {
Expand All @@ -83,42 +90,53 @@ impl Clients {
}
}

/// Attempt to send a packet to client with [`NodeId`] `dst`
/// Attempt to send a packet to client with [`NodeId`] `dst`.
pub(super) async fn send_packet(&self, dst: NodeId, data: Bytes, src: NodeId) -> Result<()> {
if let Some(client) = self.0.clients.get(&dst) {
let res = client.try_send_packet(src, data);
return self.process_result(src, dst, res).await;
let Some(client) = self.0.clients.get(&dst) else {
debug!(dst = dst.fmt_short(), "no connected client, dropped packet");
inc!(Metrics, send_packets_dropped);
return Ok(());
};
match client.try_send_packet(src, data) {
Ok(_) => {
// Record sent_to relationship
self.0.sent_to.entry(src).or_default().insert(dst);
Ok(())
}
Err(TrySendError::Full(_)) => {
debug!(
dst = dst.fmt_short(),
"client too busy to receive packet, dropping packet"
);
bail!("failed to send message: full");
}
Err(TrySendError::Closed(_)) => {
debug!(
dst = dst.fmt_short(),
"can no longer write to client, dropping message and pruning connection"
);
self.unregister(client, dst).await;
bail!("failed to send message: gone");
}
}
debug!(dst = dst.fmt_short(), "no connected client, dropped packet");
inc!(Metrics, send_packets_dropped);
Ok(())
}

/// Attempt to send a disco packet to client with [`NodeId`] `dst`.
pub(super) async fn send_disco_packet(
&self,
dst: NodeId,
data: Bytes,
src: NodeId,
) -> Result<()> {
if let Some(client) = self.0.clients.get(&dst) {
let res = client.try_send_disco_packet(src, data);
return self.process_result(src, dst, res).await;
}
debug!(
dst = dst.fmt_short(),
"no connected client, dropped disco packet"
);
inc!(Metrics, disco_packets_dropped);
Ok(())
}

async fn process_result(
&self,
src: NodeId,
dst: NodeId,
res: Result<(), TrySendError<Packet>>,
) -> Result<()> {
match res {
let Some(client) = self.0.clients.get(&dst) else {
debug!(
dst = dst.fmt_short(),
"no connected client, dropped disco packet"
);
inc!(Metrics, disco_packets_dropped);
return Ok(());
};
match client.try_send_disco_packet(src, data) {
Ok(_) => {
// Record sent_to relationship
self.0.sent_to.entry(src).or_default().insert(dst);
Expand All @@ -127,17 +145,17 @@ impl Clients {
Err(TrySendError::Full(_)) => {
debug!(
dst = dst.fmt_short(),
"client too busy to receive packet, dropping packet"
"client too busy to receive disco packet, dropping packet"
);
bail!("failed to send message");
bail!("failed to send message: full");
}
Err(TrySendError::Closed(_)) => {
debug!(
dst = dst.fmt_short(),
"can no longer write to client, dropping message and pruning connection"
"can no longer write to client, dropping disco message and pruning connection"
);
self.unregister(dst).await;
bail!("failed to send message");
self.unregister(client, dst).await;
bail!("failed to send message: gone");
}
}
}
Expand Down Expand Up @@ -212,8 +230,11 @@ mod tests {
}
);

// send peer_gone
clients.unregister(a_key).await;
let client = clients.0.clients.get(&a_key).unwrap();

// send peer_gone. Also, tests that we do not get a deadlock
// when unregistering.
clients.unregister(client, a_key).await;

assert!(!clients.0.clients.contains_key(&a_key));
clients.shutdown().await;
Expand Down
Loading