From 594b86182da2481aa5b6885a38d6c19a16db25df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 10 Jan 2025 15:30:43 +0100 Subject: [PATCH] fix: Try improving `relay_datagram_send_channel()` (#3118) ## Description Closes #3067 Since multiple `Connection`s can have multiple `AsyncUdpSocket` `IpPollers`, it's incorrect to assume that a single `AtomicWaker` can wake up these tasks correctly. Instead, if there's only one `AtomicWaker` for all of them, only the last registered waker will be woken when there's capacity again. I'm changing this such that `poll_writable` will actually add the current task's waker to a list, if it hasn't been added yet. And successfully `recv`ing an item will wake all tasks. A second issue was that *between* the call to `self.sender.capacity()` (it being 0) and `self.waker.register`, another thread might have successfully `recv`d an item *and* called `self.waker.wake()`. This means that the first thread could've registered a waker, while the capacity is actually non-zero. Not terrible (it's very likely that it'll be woken up when the capacity jumps to 2 this time), but also not great. The fix is to re-check the capacity after registering the waker. The downside is that we keep the waker and thus potentially get a spurious wakeup, but that's fine. ## Notes & open questions This... "fixes" the concerns, but I'd actually like to write a loom test for the concerns eventually. It's just... a *lot* of work, and I'd rather prioritize other things, but also improve this part of the code at the same time. ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [x] All breaking changes documented. --- iroh/src/magicsock.rs | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 0bef9bd7b7..66c1c0c4ff 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -25,7 +25,7 @@ use std::{ atomic::{AtomicBool, AtomicU16, AtomicU64, AtomicUsize, Ordering}, Arc, RwLock, }, - task::{Context, Poll}, + task::{Context, Poll, Waker}, time::{Duration, Instant}, }; @@ -1548,7 +1548,7 @@ impl Handle { let (actor_sender, actor_receiver) = mpsc::channel(256); let (relay_actor_sender, relay_actor_receiver) = mpsc::channel(256); - let (relay_datagram_send_tx, relay_datagram_send_rx) = relay_datagram_sender(); + let (relay_datagram_send_tx, relay_datagram_send_rx) = relay_datagram_send_channel(); let (udp_disco_sender, mut udp_disco_receiver) = mpsc::channel(256); // load the node data @@ -1743,27 +1743,17 @@ enum DiscoBoxError { /// /// These includes the waker coordination required to support [`AsyncUdpSocket::try_send`] /// and [`quinn::UdpPoller::poll_writable`]. -/// -/// Note that this implementation has several bugs in them, but they have existed for rather -/// a while: -/// -/// - There can be multiple senders, which all have to be woken if they were blocked. But -/// only the last sender to install the waker is unblocked. -/// -/// - poll_writable may return blocking when it doesn't need to. Leaving the sender stuck -/// until another recv is called (which hopefully would happen soon given that the channel -/// is probably still rather full, but still). -fn relay_datagram_sender() -> ( +fn relay_datagram_send_channel() -> ( RelayDatagramSendChannelSender, RelayDatagramSendChannelReceiver, ) { let (sender, receiver) = mpsc::channel(256); - let waker = Arc::new(AtomicWaker::new()); + let wakers = Arc::new(std::sync::Mutex::new(Vec::new())); let tx = RelayDatagramSendChannelSender { sender, - waker: waker.clone(), + wakers: wakers.clone(), }; - let rx = RelayDatagramSendChannelReceiver { receiver, waker }; + let rx = RelayDatagramSendChannelReceiver { receiver, wakers }; (tx, rx) } @@ -1774,7 +1764,7 @@ fn relay_datagram_sender() -> ( #[derive(Debug, Clone)] struct RelayDatagramSendChannelSender { sender: mpsc::Sender, - waker: Arc, + wakers: Arc>>, } impl RelayDatagramSendChannelSender { @@ -1788,8 +1778,18 @@ impl RelayDatagramSendChannelSender { fn poll_writable(&self, cx: &mut Context) -> Poll> { match self.sender.capacity() { 0 => { - self.waker.register(cx.waker()); - Poll::Pending + let mut wakers = self.wakers.lock().expect("poisoned"); + if !wakers.iter().any(|waker| waker.will_wake(cx.waker())) { + wakers.push(cx.waker().clone()); + } + drop(wakers); + if self.sender.capacity() != 0 { + // We "risk" a spurious wake-up in this case, but rather that + // than potentially skipping a receive. + Poll::Ready(Ok(())) + } else { + Poll::Pending + } } _ => Poll::Ready(Ok(())), } @@ -1803,13 +1803,14 @@ impl RelayDatagramSendChannelSender { #[derive(Debug)] struct RelayDatagramSendChannelReceiver { receiver: mpsc::Receiver, - waker: Arc, + wakers: Arc>>, } impl RelayDatagramSendChannelReceiver { async fn recv(&mut self) -> Option { let item = self.receiver.recv().await; - self.waker.wake(); + let mut wakers = self.wakers.lock().expect("poisoned"); + wakers.drain(..).for_each(Waker::wake); item } }