Skip to content

Commit

Permalink
fix: Try improving relay_datagram_send_channel() (#3118)
Browse files Browse the repository at this point in the history
## Description

<!-- A summary of what this pull request achieves and a rough list of
changes. -->
Closes #3067 

Since multiple `Connection`s can have multiple `AsyncUdpSocket`
`IpPollers`, it's incorrect to assume that a single `AtomicWaker` can
wake up these tasks correctly.
Instead, if there's only one `AtomicWaker` for all of them, only the
last registered waker will be woken when there's capacity again.

I'm changing this such that `poll_writable` will actually add the
current task's waker to a list, if it hasn't been added yet.
And successfully `recv`ing an item will wake all tasks.

A second issue was that *between* the call to `self.sender.capacity()`
(it being 0) and `self.waker.register`, another thread might have
successfully `recv`d an item *and* called `self.waker.wake()`.
This means that the first thread could've registered a waker, while the
capacity is actually non-zero. Not terrible (it's very likely that it'll
be woken up when the capacity jumps to 2 this time), but also not great.
The fix is to re-check the capacity after registering the waker.
The downside is that we keep the waker and thus potentially get a
spurious wakeup, but that's fine.

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->
This... "fixes" the concerns, but I'd actually like to write a loom test
for the concerns eventually.

It's just... a *lot* of work, and I'd rather prioritize other things,
but also improve this part of the code at the same time.

## Change checklist

- [x] Self-review.
- [x] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- [ ] Tests if relevant.
- [x] All breaking changes documented.
  • Loading branch information
matheus23 authored Jan 10, 2025
1 parent 870c76e commit 594b861
Showing 1 changed file with 22 additions and 21 deletions.
43 changes: 22 additions & 21 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{
atomic::{AtomicBool, AtomicU16, AtomicU64, AtomicUsize, Ordering},
Arc, RwLock,
},
task::{Context, Poll},
task::{Context, Poll, Waker},
time::{Duration, Instant},
};

Expand Down Expand Up @@ -1548,7 +1548,7 @@ impl Handle {

let (actor_sender, actor_receiver) = mpsc::channel(256);
let (relay_actor_sender, relay_actor_receiver) = mpsc::channel(256);
let (relay_datagram_send_tx, relay_datagram_send_rx) = relay_datagram_sender();
let (relay_datagram_send_tx, relay_datagram_send_rx) = relay_datagram_send_channel();
let (udp_disco_sender, mut udp_disco_receiver) = mpsc::channel(256);

// load the node data
Expand Down Expand Up @@ -1743,27 +1743,17 @@ enum DiscoBoxError {
///
/// These includes the waker coordination required to support [`AsyncUdpSocket::try_send`]
/// and [`quinn::UdpPoller::poll_writable`].
///
/// Note that this implementation has several bugs in them, but they have existed for rather
/// a while:
///
/// - There can be multiple senders, which all have to be woken if they were blocked. But
/// only the last sender to install the waker is unblocked.
///
/// - poll_writable may return blocking when it doesn't need to. Leaving the sender stuck
/// until another recv is called (which hopefully would happen soon given that the channel
/// is probably still rather full, but still).
fn relay_datagram_sender() -> (
fn relay_datagram_send_channel() -> (
RelayDatagramSendChannelSender,
RelayDatagramSendChannelReceiver,
) {
let (sender, receiver) = mpsc::channel(256);
let waker = Arc::new(AtomicWaker::new());
let wakers = Arc::new(std::sync::Mutex::new(Vec::new()));
let tx = RelayDatagramSendChannelSender {
sender,
waker: waker.clone(),
wakers: wakers.clone(),
};
let rx = RelayDatagramSendChannelReceiver { receiver, waker };
let rx = RelayDatagramSendChannelReceiver { receiver, wakers };
(tx, rx)
}

Expand All @@ -1774,7 +1764,7 @@ fn relay_datagram_sender() -> (
#[derive(Debug, Clone)]
struct RelayDatagramSendChannelSender {
sender: mpsc::Sender<RelaySendItem>,
waker: Arc<AtomicWaker>,
wakers: Arc<std::sync::Mutex<Vec<Waker>>>,
}

impl RelayDatagramSendChannelSender {
Expand All @@ -1788,8 +1778,18 @@ impl RelayDatagramSendChannelSender {
fn poll_writable(&self, cx: &mut Context) -> Poll<io::Result<()>> {
match self.sender.capacity() {
0 => {
self.waker.register(cx.waker());
Poll::Pending
let mut wakers = self.wakers.lock().expect("poisoned");
if !wakers.iter().any(|waker| waker.will_wake(cx.waker())) {
wakers.push(cx.waker().clone());
}
drop(wakers);
if self.sender.capacity() != 0 {
// We "risk" a spurious wake-up in this case, but rather that
// than potentially skipping a receive.
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
_ => Poll::Ready(Ok(())),
}
Expand All @@ -1803,13 +1803,14 @@ impl RelayDatagramSendChannelSender {
#[derive(Debug)]
struct RelayDatagramSendChannelReceiver {
receiver: mpsc::Receiver<RelaySendItem>,
waker: Arc<AtomicWaker>,
wakers: Arc<std::sync::Mutex<Vec<Waker>>>,
}

impl RelayDatagramSendChannelReceiver {
async fn recv(&mut self) -> Option<RelaySendItem> {
let item = self.receiver.recv().await;
self.waker.wake();
let mut wakers = self.wakers.lock().expect("poisoned");
wakers.drain(..).for_each(Waker::wake);
item
}
}
Expand Down

0 comments on commit 594b861

Please sign in to comment.