Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make Endpoint::close infallible #3112

Merged
merged 3 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ let response = recv.read_to_end(1000).await?;
assert_eq!(&response, b"Hello, world!");

// Close the endpoint and all its connections
endpoint.close().await?;
endpoint.close().await;
```

And on the accepting side:
Expand Down
2 changes: 1 addition & 1 deletion iroh-relay/src/server/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl Clients {
/// peer is gone from the network.
///
/// Must be passed a matching connection_id.
pub(super) async fn unregister<'a>(&self, connection_id: u64, node_id: NodeId) {
pub(super) async fn unregister(&self, connection_id: u64, node_id: NodeId) {
trace!(
node_id = node_id.fmt_short(),
connection_id,
Expand Down
7 changes: 3 additions & 4 deletions iroh/bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,16 @@ pub enum EndpointSelector {
}

impl EndpointSelector {
pub async fn close(self) -> Result<()> {
pub async fn close(self) {
match self {
EndpointSelector::Iroh(endpoint) => {
endpoint.close().await?;
endpoint.close().await;
}
#[cfg(not(any(target_os = "freebsd", target_os = "openbsd", target_os = "netbsd")))]
EndpointSelector::Quinn(endpoint) => {
endpoint.close(0u32.into(), b"");
}
}
Ok(())
}
}

Expand Down Expand Up @@ -255,7 +254,7 @@ pub async fn client_handler(
// to `Arc`ing them
connection.close(0u32, b"Benchmark done");

endpoint.close().await?;
endpoint.close().await;

if opt.stats {
println!("\nClient connection stats:\n{:#?}", connection.stats());
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,6 @@ async fn main() -> anyhow::Result<()> {

// We received the last message: close all connections and allow for the close
// message to be sent.
endpoint.close().await?;
endpoint.close().await;
Ok(())
}
8 changes: 1 addition & 7 deletions iroh/examples/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,7 @@ async fn fetch(ticket: &str, relay_url: Option<String>, relay_only: bool) -> any

// We received the last message: close all connections and allow for the close
// message to be sent.
tokio::time::timeout(Duration::from_secs(3), async move {
let res = endpoint.close().await;
if res.is_err() {
println!("failed to close connection: {res:#?}");
}
})
.await?;
tokio::time::timeout(Duration::from_secs(3), endpoint.close()).await?;

let duration = start.elapsed();
println!(
Expand Down
14 changes: 5 additions & 9 deletions iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,21 +972,17 @@ impl Endpoint {
/// Be aware however that the underlying UDP sockets are only closed
/// on [`Drop`], bearing in mind the [`Endpoint`] is only dropped once all the clones
/// are dropped.
///
/// Returns an error if closing the magic socket failed.
/// TODO: Document error cases.
pub async fn close(&self) -> Result<()> {
pub async fn close(&self) {
if self.is_closed() {
return Ok(());
return;
}

tracing::debug!("Closing connections");
self.endpoint.close(0u16.into(), b"");
self.endpoint.wait_idle().await;

tracing::debug!("Connections closed");
self.msock.close().await?;
Ok(())
self.msock.close().await;
}

/// Check if this endpoint is still alive, or already closed.
Expand Down Expand Up @@ -1594,7 +1590,7 @@ mod tests {

info!("closing endpoint");
// close the endpoint and restart it
endpoint.close().await.unwrap();
endpoint.close().await;

info!("restarting endpoint");
// now restart it and check the addressing info of the peer
Expand Down Expand Up @@ -1693,7 +1689,7 @@ mod tests {
send.stopped().await.unwrap();
recv.read_to_end(0).await.unwrap();
info!("client finished");
ep.close().await.unwrap();
ep.close().await;
info!("client closed");
}
.instrument(error_span!("client", %i))
Expand Down
4 changes: 2 additions & 2 deletions iroh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@
//!
//! // Gracefully close the connection and endpoint.
//! conn.close(1u8.into(), b"done");
//! ep.close().await?;
//! ep.close().await;
//! println!("Client closed");
//! Ok(())
//! }
Expand Down Expand Up @@ -202,7 +202,7 @@
//!
//! // Wait for the client to close the connection and gracefully close the endpoint.
//! conn.closed().await;
//! ep.close().await?;
//! ep.close().await;
//! Ok(())
//! }
//! ```
Expand Down
18 changes: 11 additions & 7 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1654,12 +1654,18 @@ impl Handle {
/// Polling the socket ([`AsyncUdpSocket::poll_recv`]) will return [`Poll::Pending`]
/// indefinitely after this call.
#[instrument(skip_all, fields(me = %self.msock.me))]
pub(crate) async fn close(&self) -> Result<()> {
pub(crate) async fn close(&self) {
if self.msock.is_closed() {
return Ok(());
return;
}
self.msock.closing.store(true, Ordering::Relaxed);
self.msock.actor_sender.send(ActorMessage::Shutdown).await?;
// If this fails, then there's no receiver listening for shutdown messages,
// so nothing to shut down anyways.
self.msock
.actor_sender
.send(ActorMessage::Shutdown)
.await
.ok();
self.msock.closed.store(true, Ordering::SeqCst);

let mut tasks = self.actor_tasks.lock().await;
Expand All @@ -1681,8 +1687,6 @@ impl Handle {
debug!("aborting remaining {}/3 tasks", tasks.len());
tasks.shutdown().await;
}

Ok(())
}
}

Expand Down Expand Up @@ -3408,8 +3412,8 @@ mod tests {
println!("closing endpoints");
let msock1 = m1.endpoint.magic_sock();
let msock2 = m2.endpoint.magic_sock();
m1.endpoint.close().await?;
m2.endpoint.close().await?;
m1.endpoint.close().await;
m2.endpoint.close().await;

assert!(msock1.msock.is_closed());
assert!(msock2.msock.is_closed());
Expand Down
Loading