From 2fc0831d405edf1a7ddd1e2e7ce10b728f04acd9 Mon Sep 17 00:00:00 2001 From: Koxiaet Date: Thu, 17 Dec 2020 18:56:48 +0000 Subject: [PATCH 01/14] Use poll on VxWorks, Fuchsia and other Unix systems --- Cargo.toml | 2 +- src/lib.rs | 10 +- src/poll.rs | 238 +++++++++++++++++++++++++++++++++++++++++++++ tests/precision.rs | 28 ++++-- 4 files changed, 270 insertions(+), 8 deletions(-) create mode 100644 src/poll.rs diff --git a/Cargo.toml b/Cargo.toml index c150280..10b2e95 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ cfg-if = "0.1.10" log = "0.4.11" [target."cfg(unix)".dependencies] -libc = "0.2.77" +libc = { version = "0.2.77", features = ["extra_traits"] } [target.'cfg(windows)'.dependencies] wepoll-sys = "3.0.0" diff --git a/src/lib.rs b/src/lib.rs index 58f37ea..b70ab11 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ //! - [kqueue](https://en.wikipedia.org/wiki/Kqueue): macOS, iOS, FreeBSD, NetBSD, OpenBSD, //! DragonFly BSD //! - [event ports](https://illumos.org/man/port_create): illumos, Solaris +//! - [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, other Unix systems //! - [wepoll](https://github.com/piscisaureus/wepoll): Windows //! //! Polling is done in oneshot mode, which means interest in I/O events needs to be re-enabled @@ -92,6 +93,13 @@ cfg_if! { ))] { mod kqueue; use kqueue as sys; + } else if #[cfg(any( + target_os = "vxworks", + target_os = "fuchsia", + unix, + ))] { + mod poll; + use poll as sys; } else if #[cfg(target_os = "windows")] { mod wepoll; use wepoll as sys; @@ -104,7 +112,7 @@ cfg_if! { const NOTIFY_KEY: usize = std::usize::MAX; /// Indicates that a file descriptor or socket can read or write without blocking. -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub struct Event { /// Key identifying the file descriptor or socket. pub key: usize, diff --git a/src/poll.rs b/src/poll.rs new file mode 100644 index 0000000..360fce0 --- /dev/null +++ b/src/poll.rs @@ -0,0 +1,238 @@ +//! Bindings to poll (VxWorks, Fuchsia, other Unix systems). + +use std::collections::HashMap; +use std::convert::TryInto; +use std::io; +use std::sync::Mutex; +use std::time::Duration; + +// std::os::unix doesn't exist on Fuchsia +use libc::c_int as RawFd; + +use crate::Event; + +/// Special value for an fd in a pollfd to signal that it should be removed. +const REMOVE_FD: RawFd = -2; + +/// Interface to poll. +#[derive(Debug)] +pub struct Poller { + /// File descriptors to poll. + fds: Mutex, + /// The file descriptor of the read half of the notify pipe. This is also stored as the first + /// file descriptor in `fds.poll_fds`. + notify_read: RawFd, + /// The file descriptor of the write half of the notify pipe. + notify_write: RawFd, +} + +/// The file descriptors to poll in a `Poller`. +#[derive(Debug)] +struct Fds { + /// The list of `pollfds` taken by poll. + /// + /// The first file descriptor is always present and is used to notify the poller. It is also + /// stored in `notify_read`. + /// + /// If the fd stored in here is `REMOVE_FD`, it should be removed. + poll_fds: Vec, + /// The map of each file descriptor to data associated with it. This does not include the file + /// descriptors `notify_read` or `notify_write`. + fd_data: HashMap, +} + +/// Data associated with a file descriptor in a poller. +#[derive(Debug)] +struct FdData { + /// The index into `poll_fds` this file descriptor is. + poll_fds_index: usize, + /// The key of the `Event` associated with this file descriptor. + key: usize, +} + +impl Poller { + /// Creates a new poller. + pub fn new() -> io::Result { + // Create the notification pipe. + let mut notify_pipe = [0; 2]; + syscall!(pipe(notify_pipe.as_mut_ptr()))?; + + // Put the reading side into non-blocking mode. + let notify_read_flags = syscall!(fcntl(notify_pipe[0], libc::F_GETFL))?; + syscall!(fcntl( + notify_pipe[0], + libc::F_SETFL, + notify_read_flags | libc::O_NONBLOCK + ))?; + + Ok(Self { + fds: Mutex::new(Fds { + poll_fds: vec![libc::pollfd { + fd: notify_pipe[0], + events: libc::POLLRDNORM, + revents: 0, + }], + fd_data: HashMap::new(), + }), + notify_read: notify_pipe[0], + notify_write: notify_pipe[1], + }) + } + + /// Adds a new file descriptor. + pub fn add(&self, fd: RawFd, ev: Event) -> io::Result<()> { + if fd == self.notify_read || fd == self.notify_write { + return Err(io::Error::from(io::ErrorKind::InvalidInput)); + } + + let mut fds = self.fds.lock().unwrap(); + + if fds.fd_data.contains_key(&fd) { + return Err(io::Error::from(io::ErrorKind::AlreadyExists)); + } + + let poll_fds_index = fds.poll_fds.len(); + fds.fd_data.insert( + fd, + FdData { + poll_fds_index, + key: ev.key, + }, + ); + + fds.poll_fds.push(libc::pollfd { + fd, + events: poll_events(ev), + revents: 0, + }); + + Ok(()) + } + + /// Modifies an existing file descriptor. + pub fn modify(&self, fd: RawFd, ev: Event) -> io::Result<()> { + let mut fds = self.fds.lock().unwrap(); + + let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?; + data.key = ev.key; + let poll_fds_index = data.poll_fds_index; + fds.poll_fds[poll_fds_index].events = poll_events(ev); + + Ok(()) + } + + /// Deletes a file descriptor. + pub fn delete(&self, fd: RawFd) -> io::Result<()> { + let mut fds = self.fds.lock().unwrap(); + + let data = fds.fd_data.remove(&fd).ok_or(io::ErrorKind::NotFound)?; + fds.poll_fds[data.poll_fds_index].fd = REMOVE_FD; + + Ok(()) + } + + /// Waits for I/O events with an optional timeout. + pub fn wait(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + events.inner.clear(); + + let timeout_ms = timeout + .map(|timeout| { + // Round up to a whole millisecond. + let mut ms = timeout.as_millis().try_into().unwrap_or(std::u64::MAX); + if Duration::from_millis(ms) < timeout { + ms += 1; + } + ms.try_into().unwrap_or(std::i32::MAX) + }) + .unwrap_or(-1); + + let mut fds = self.fds.lock().unwrap(); + let fds = &mut *fds; + + // Remove all fds that have been marked to be removed. + fds.poll_fds.retain(|poll_fd| poll_fd.fd != REMOVE_FD); + + let num_events = loop { + match syscall!(poll( + fds.poll_fds.as_mut_ptr(), + fds.poll_fds.len() as u64, + timeout_ms, + )) { + Ok(num_events) => break num_events as usize, + // EAGAIN is translated into WouldBlock, and EWOULDBLOCK cannot be returned by + // poll. + Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => return Err(e), + }; + }; + + // Store any events that occured and remove interest. + events.inner.reserve(num_events); + for fd_data in fds.fd_data.values_mut() { + let mut poll_fd = fds.poll_fds[fd_data.poll_fds_index]; + if poll_fd.revents != 0 { + events.inner.push(Event { + key: fd_data.key, + readable: poll_fd.revents & READ_REVENTS != 0, + writable: poll_fd.revents & WRITE_REVENTS != 0, + }); + poll_fd.events = 0; + } + } + + // Read all notifications. + while syscall!(read(self.notify_read, &mut [0; 64] as *mut _ as *mut _, 64)).is_ok() {} + + Ok(()) + } + + /// Sends a notification to wake up the current or next `wait()` call. + pub fn notify(&self) -> io::Result<()> { + syscall!(write(self.notify_write, &0_u8 as *const _ as *const _, 1))?; + Ok(()) + } +} + +impl Drop for Poller { + fn drop(&mut self) { + let _ = syscall!(close(self.notify_read)); + let _ = syscall!(close(self.notify_write)); + } +} + +/// Get the input poll events for the given event. +fn poll_events(ev: Event) -> libc::c_short { + (if ev.readable { + libc::POLLIN | libc::POLLPRI + } else { + 0 + }) | (if ev.writable { + libc::POLLOUT | libc::POLLWRBAND + } else { + 0 + }) +} + +/// Returned poll events for reading. +const READ_REVENTS: libc::c_short = libc::POLLIN | libc::POLLPRI | libc::POLLHUP | libc::POLLERR; + +/// Returned poll events for writing. +const WRITE_REVENTS: libc::c_short = + libc::POLLOUT | libc::POLLWRBAND | libc::POLLHUP | libc::POLLERR; + +/// A list of reported I/O events. +pub struct Events { + inner: Vec, +} + +impl Events { + /// Creates an empty list. + pub fn new() -> Events { + Self { inner: Vec::new() } + } + + /// Iterates over I/O events. + pub fn iter(&self) -> impl Iterator + '_ { + self.inner.iter().copied() + } +} diff --git a/tests/precision.rs b/tests/precision.rs index 3f6316a..9a3cac4 100644 --- a/tests/precision.rs +++ b/tests/precision.rs @@ -22,11 +22,16 @@ fn below_ms() -> io::Result<()> { lowest = lowest.min(elapsed); } - if cfg!(not(any( - windows, - target_os = "illumos", - target_os = "solaris" - ))) { + if cfg!(any( + target_os = "linux", + target_os = "android", + target_os = "macos", + target_os = "ios", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", + )) { assert!(lowest < dur + margin); } Ok(()) @@ -51,7 +56,18 @@ fn above_ms() -> io::Result<()> { lowest = lowest.min(elapsed); } - if cfg!(not(windows)) { + if cfg!(any( + target_os = "linux", + target_os = "android", + target_os = "illumos", + target_os = "solaris", + target_os = "macos", + target_os = "ios", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly", + )) { assert!(lowest < dur + margin); } Ok(()) From 0f2f6ed15a585c58d6f3ebeb2d5c2ddc25979166 Mon Sep 17 00:00:00 2001 From: Koxiaet Date: Thu, 17 Dec 2020 19:08:08 +0000 Subject: [PATCH 02/14] Don't set CURRENT_WEEK in CI --- .github/workflows/build-and-test.yaml | 8 -------- .github/workflows/lint.yaml | 3 --- .github/workflows/security.yaml | 3 --- 3 files changed, 14 deletions(-) diff --git a/.github/workflows/build-and-test.yaml b/.github/workflows/build-and-test.yaml index 87a26f7..5d41d60 100644 --- a/.github/workflows/build-and-test.yaml +++ b/.github/workflows/build-and-test.yaml @@ -17,14 +17,6 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Set current week of the year in environnement - if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macOS') - run: echo "::set-env name=CURRENT_WEEK::$(date +%V)" - - - name: Set current week of the year in environnement - if: startsWith(matrix.os, 'windows') - run: echo "::set-env name=CURRENT_WEEK::$(Get-Date -UFormat %V)" - - name: Install latest ${{ matrix.rust }} uses: actions-rs/toolchain@v1 with: diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index bc3d50a..d1b9f87 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -12,9 +12,6 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Set current week of the year in environnement - run: echo "::set-env name=CURRENT_WEEK::$(date +%V)" - - uses: actions-rs/toolchain@v1 with: toolchain: stable diff --git a/.github/workflows/security.yaml b/.github/workflows/security.yaml index 8f722e7..3de4c44 100644 --- a/.github/workflows/security.yaml +++ b/.github/workflows/security.yaml @@ -12,9 +12,6 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Set current week of the year in environnement - run: echo "::set-env name=CURRENT_WEEK::$(date +%V)" - - uses: actions-rs/audit-check@v1 with: token: ${{ secrets.GITHUB_TOKEN }} From e0789a8ee03ae45878be34825d225b86aeed790f Mon Sep 17 00:00:00 2001 From: Koxiaet Date: Fri, 18 Dec 2020 14:18:03 +0000 Subject: [PATCH 03/14] Make poll's poller modifications interrupt `wait` --- src/lib.rs | 4 +- src/poll.rs | 247 +++++++++++++++++++++++++++++++++++----------------- 2 files changed, 170 insertions(+), 81 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b70ab11..973c8d6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,10 +74,10 @@ macro_rules! syscall { } cfg_if! { - if #[cfg(any(target_os = "linux", target_os = "android"))] { + /*if #[cfg(any(target_os = "linux", target_os = "android"))] { mod epoll; use epoll as sys; - } else if #[cfg(any( + } else*/ if #[cfg(any( target_os = "illumos", target_os = "solaris", ))] { diff --git a/src/poll.rs b/src/poll.rs index 360fce0..ddc4467 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -3,8 +3,9 @@ use std::collections::HashMap; use std::convert::TryInto; use std::io; -use std::sync::Mutex; -use std::time::Duration; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Condvar, Mutex}; +use std::time::{Duration, Instant}; // std::os::unix doesn't exist on Fuchsia use libc::c_int as RawFd; @@ -19,11 +20,27 @@ const REMOVE_FD: RawFd = -2; pub struct Poller { /// File descriptors to poll. fds: Mutex, + /// The file descriptor of the read half of the notify pipe. This is also stored as the first /// file descriptor in `fds.poll_fds`. notify_read: RawFd, /// The file descriptor of the write half of the notify pipe. + /// + /// Data is written to this to wake up the current instance of `wait`, which can occur when the + /// user notifies it (in which case `notified` would have been set) or when an operation needs + /// to occur (in which case `waiting_operations` would have been incremented). notify_write: RawFd, + + /// The number of operations (`add`, `modify` or `delete`) that are currently waiting on the + /// mutex to become free. When this is nonzero, `wait` must be suspended until it reaches zero + /// again. + waiting_operations: AtomicUsize, + /// The condition variable that gets notified when `waiting_operations` reaches zero. This is + /// used with the `fds` mutex. + operations_complete: Condvar, + + /// Whether `wait` has been notified by the user. + notified: AtomicBool, } /// The file descriptors to poll in a `Poller`. @@ -76,6 +93,9 @@ impl Poller { }), notify_read: notify_pipe[0], notify_write: notify_pipe[1], + waiting_operations: AtomicUsize::new(0), + operations_complete: Condvar::new(), + notified: AtomicBool::new(false), }) } @@ -85,112 +105,155 @@ impl Poller { return Err(io::Error::from(io::ErrorKind::InvalidInput)); } - let mut fds = self.fds.lock().unwrap(); - - if fds.fd_data.contains_key(&fd) { - return Err(io::Error::from(io::ErrorKind::AlreadyExists)); - } - - let poll_fds_index = fds.poll_fds.len(); - fds.fd_data.insert( - fd, - FdData { - poll_fds_index, - key: ev.key, - }, - ); - - fds.poll_fds.push(libc::pollfd { - fd, - events: poll_events(ev), - revents: 0, - }); + self.modify_fds(|fds| { + if fds.fd_data.contains_key(&fd) { + return Err(io::Error::from(io::ErrorKind::AlreadyExists)); + } - Ok(()) + let poll_fds_index = fds.poll_fds.len(); + fds.fd_data.insert( + fd, + FdData { + poll_fds_index, + key: ev.key, + }, + ); + + fds.poll_fds.push(libc::pollfd { + fd, + events: poll_events(ev), + revents: 0, + }); + + Ok(()) + }) } /// Modifies an existing file descriptor. pub fn modify(&self, fd: RawFd, ev: Event) -> io::Result<()> { - let mut fds = self.fds.lock().unwrap(); + self.modify_fds(|fds| { + let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?; + data.key = ev.key; + let poll_fds_index = data.poll_fds_index; + fds.poll_fds[poll_fds_index].events = poll_events(ev); - let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?; - data.key = ev.key; - let poll_fds_index = data.poll_fds_index; - fds.poll_fds[poll_fds_index].events = poll_events(ev); - - Ok(()) + Ok(()) + }) } /// Deletes a file descriptor. pub fn delete(&self, fd: RawFd) -> io::Result<()> { - let mut fds = self.fds.lock().unwrap(); + self.modify_fds(|fds| { + let data = fds.fd_data.remove(&fd).ok_or(io::ErrorKind::NotFound)?; + fds.poll_fds[data.poll_fds_index].fd = REMOVE_FD; - let data = fds.fd_data.remove(&fd).ok_or(io::ErrorKind::NotFound)?; - fds.poll_fds[data.poll_fds_index].fd = REMOVE_FD; - - Ok(()) + Ok(()) + }) } /// Waits for I/O events with an optional timeout. pub fn wait(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + let deadline = timeout.map(|t| Instant::now() + t); + events.inner.clear(); - let timeout_ms = timeout - .map(|timeout| { - // Round up to a whole millisecond. - let mut ms = timeout.as_millis().try_into().unwrap_or(std::u64::MAX); - if Duration::from_millis(ms) < timeout { - ms += 1; + let mut fds = self.fds.lock().unwrap(); + + loop { + // Complete all current operations. + while self.waiting_operations.load(Ordering::SeqCst) != 0 { + fds = self.operations_complete.wait(fds).unwrap(); + } + + // Remove all fds that have been marked to be removed. + fds.poll_fds.retain(|poll_fd| poll_fd.fd != REMOVE_FD); + + // Perform the poll. + let num_events = poll(&mut fds.poll_fds, deadline)?; + let notified = fds.poll_fds[0].revents != 0; + let num_fd_events = if notified { num_events - 1 } else { num_events }; + + // Read all notifications. + if notified { + while syscall!(read(self.notify_read, &mut [0; 64] as *mut _ as *mut _, 64)).is_ok() + { } - ms.try_into().unwrap_or(std::i32::MAX) - }) - .unwrap_or(-1); + } - let mut fds = self.fds.lock().unwrap(); - let fds = &mut *fds; - - // Remove all fds that have been marked to be removed. - fds.poll_fds.retain(|poll_fd| poll_fd.fd != REMOVE_FD); - - let num_events = loop { - match syscall!(poll( - fds.poll_fds.as_mut_ptr(), - fds.poll_fds.len() as u64, - timeout_ms, - )) { - Ok(num_events) => break num_events as usize, - // EAGAIN is translated into WouldBlock, and EWOULDBLOCK cannot be returned by - // poll. - Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue, - Err(e) => return Err(e), - }; - }; - - // Store any events that occured and remove interest. - events.inner.reserve(num_events); - for fd_data in fds.fd_data.values_mut() { - let mut poll_fd = fds.poll_fds[fd_data.poll_fds_index]; - if poll_fd.revents != 0 { - events.inner.push(Event { - key: fd_data.key, - readable: poll_fd.revents & READ_REVENTS != 0, - writable: poll_fd.revents & WRITE_REVENTS != 0, - }); - poll_fd.events = 0; + // If the only event that occurred during polling was notification and it wasn't to + // exit, another thread is trying to perform an operation on the fds. Continue the + // loop. + if !self.notified.swap(false, Ordering::SeqCst) && num_fd_events == 0 && notified { + continue; } - } - // Read all notifications. - while syscall!(read(self.notify_read, &mut [0; 64] as *mut _ as *mut _, 64)).is_ok() {} + // Store the events if there were any. + if num_fd_events > 0 { + let fds = &mut *fds; + + events.inner.reserve(num_fd_events); + for fd_data in fds.fd_data.values_mut() { + let mut poll_fd = fds.poll_fds[fd_data.poll_fds_index]; + if poll_fd.revents != 0 { + // Store event + events.inner.push(Event { + key: fd_data.key, + readable: poll_fd.revents & READ_REVENTS != 0, + writable: poll_fd.revents & WRITE_REVENTS != 0, + }); + // Remove interest + poll_fd.events = 0; + + if events.inner.len() == num_fd_events { + break; + } + } + } + } + + break; + } Ok(()) } /// Sends a notification to wake up the current or next `wait()` call. pub fn notify(&self) -> io::Result<()> { - syscall!(write(self.notify_write, &0_u8 as *const _ as *const _, 1))?; + if !self.notified.swap(true, Ordering::SeqCst) { + self.notify_inner()?; + } + Ok(()) } + + /// Perform a modification on `fds`, interrupting the current caller of `wait` if it's running. + fn modify_fds(&self, f: impl FnOnce(&mut Fds) -> io::Result<()>) -> io::Result<()> { + self.waiting_operations.fetch_add(1, Ordering::SeqCst); + + // Wake up the current caller of `wait` if there is one. + let sent_notification = self.notify_inner().is_ok(); + + let mut fds = self.fds.lock().unwrap(); + + // If there was no caller of `wait` our byte was not removed from the pipe, so attempt to + // remove one byte from the pipe. + if sent_notification { + let _ = syscall!(read(self.notify_read, &mut [0; 1] as *mut _ as *mut _, 1)); + } + + let res = f(&mut *fds); + + if self.waiting_operations.fetch_sub(1, Ordering::SeqCst) == 1 { + self.operations_complete.notify_one(); + } + + res + } + + /// Wake the current thread that is calling `wait`. + fn notify_inner(&self) -> io::Result<()> { + syscall!(write(self.notify_write, &0_u8 as *const _ as *const _, 1)).map(drop) + } } impl Drop for Poller { @@ -236,3 +299,29 @@ impl Events { self.inner.iter().copied() } } + +/// Helper function to call poll. +fn poll(fds: &mut [libc::pollfd], deadline: Option) -> io::Result { + loop { + // Convert the timeout to milliseconds. + let timeout_ms = deadline + .map(|deadline| { + let timeout = deadline.saturating_duration_since(Instant::now()); + + // Round up to a whole millisecond. + let mut ms = timeout.as_millis().try_into().unwrap_or(std::u64::MAX); + if Duration::from_millis(ms) < timeout { + ms += 1; + } + ms.try_into().unwrap_or(std::i32::MAX) + }) + .unwrap_or(-1); + + match syscall!(poll(fds.as_mut_ptr(), fds.len() as u64, timeout_ms,)) { + Ok(num_events) => break Ok(num_events as usize), + // poll returns EAGAIN if we can retry it. + Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => return Err(e), + } + } +} From dd15b4cd8a83c944a203fa6a05e21d902bc55f6d Mon Sep 17 00:00:00 2001 From: Koxiaet Date: Fri, 18 Dec 2020 14:33:05 +0000 Subject: [PATCH 04/14] Add logging to poll backend --- src/lib.rs | 4 ++-- src/poll.rs | 16 +++++++++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 973c8d6..b70ab11 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,10 +74,10 @@ macro_rules! syscall { } cfg_if! { - /*if #[cfg(any(target_os = "linux", target_os = "android"))] { + if #[cfg(any(target_os = "linux", target_os = "android"))] { mod epoll; use epoll as sys; - } else*/ if #[cfg(any( + } else if #[cfg(any( target_os = "illumos", target_os = "solaris", ))] { diff --git a/src/poll.rs b/src/poll.rs index ddc4467..759ee3b 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -82,6 +82,8 @@ impl Poller { notify_read_flags | libc::O_NONBLOCK ))?; + log::trace!("new: notify_read={}, notify_write={}", notify_pipe[0], notify_pipe[1]); + Ok(Self { fds: Mutex::new(Fds { poll_fds: vec![libc::pollfd { @@ -105,6 +107,8 @@ impl Poller { return Err(io::Error::from(io::ErrorKind::InvalidInput)); } + log::trace!("add: notify_read={}, fd={}, ev={:?}", self.notify_read, fd, ev); + self.modify_fds(|fds| { if fds.fd_data.contains_key(&fd) { return Err(io::Error::from(io::ErrorKind::AlreadyExists)); @@ -131,6 +135,8 @@ impl Poller { /// Modifies an existing file descriptor. pub fn modify(&self, fd: RawFd, ev: Event) -> io::Result<()> { + log::trace!("modify: notify_read={}, fd={}, ev={:?}", self.notify_read, fd, ev); + self.modify_fds(|fds| { let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?; data.key = ev.key; @@ -143,6 +149,8 @@ impl Poller { /// Deletes a file descriptor. pub fn delete(&self, fd: RawFd) -> io::Result<()> { + log::trace!("delete: notify_read={}, fd={}", self.notify_read, fd); + self.modify_fds(|fds| { let data = fds.fd_data.remove(&fd).ok_or(io::ErrorKind::NotFound)?; fds.poll_fds[data.poll_fds_index].fd = REMOVE_FD; @@ -153,6 +161,8 @@ impl Poller { /// Waits for I/O events with an optional timeout. pub fn wait(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + log::trace!("wait: notify_read={}, timeout={:?}", self.notify_read, timeout); + let deadline = timeout.map(|t| Instant::now() + t); events.inner.clear(); @@ -172,6 +182,7 @@ impl Poller { let num_events = poll(&mut fds.poll_fds, deadline)?; let notified = fds.poll_fds[0].revents != 0; let num_fd_events = if notified { num_events - 1 } else { num_events }; + log::trace!("new events: notify_read={}, num={}", self.notify_read, num_events); // Read all notifications. if notified { @@ -219,6 +230,8 @@ impl Poller { /// Sends a notification to wake up the current or next `wait()` call. pub fn notify(&self) -> io::Result<()> { + log::trace!("notify: notify_read={}", self.notify_read); + if !self.notified.swap(true, Ordering::SeqCst) { self.notify_inner()?; } @@ -258,6 +271,7 @@ impl Poller { impl Drop for Poller { fn drop(&mut self) { + log::trace!("drop: notify_read={}", self.notify_read); let _ = syscall!(close(self.notify_read)); let _ = syscall!(close(self.notify_write)); } @@ -320,7 +334,7 @@ fn poll(fds: &mut [libc::pollfd], deadline: Option) -> io::Result break Ok(num_events as usize), // poll returns EAGAIN if we can retry it. - Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) if e.raw_os_error() == Some(libc::EAGAIN) => continue, Err(e) => return Err(e), } } From 20c1e19c46d00607d952b7d9bbf196b9bd334a09 Mon Sep 17 00:00:00 2001 From: Koxiaet Date: Fri, 18 Dec 2020 15:03:59 +0000 Subject: [PATCH 05/14] Have notifications break poll's operation loop --- src/poll.rs | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/src/poll.rs b/src/poll.rs index 759ee3b..a4cd9a3 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -35,12 +35,13 @@ pub struct Poller { /// mutex to become free. When this is nonzero, `wait` must be suspended until it reaches zero /// again. waiting_operations: AtomicUsize, - /// The condition variable that gets notified when `waiting_operations` reaches zero. This is - /// used with the `fds` mutex. - operations_complete: Condvar, - /// Whether `wait` has been notified by the user. notified: AtomicBool, + /// The condition variable that gets notified when `waiting_operations` reaches zero or + /// `notified` becomes true. + /// + /// This is used with the `fds` mutex. + operations_complete: Condvar, } /// The file descriptors to poll in a `Poller`. @@ -171,7 +172,15 @@ impl Poller { loop { // Complete all current operations. - while self.waiting_operations.load(Ordering::SeqCst) != 0 { + loop { + if self.notified.swap(false, Ordering::SeqCst) { + // `notify` will have sent a notification in case we were polling. We weren't, + // so remove it. + return self.pop_notification(); + } else if self.waiting_operations.load(Ordering::SeqCst) == 0 { + break; + } + fds = self.operations_complete.wait(fds).unwrap(); } @@ -234,6 +243,7 @@ impl Poller { if !self.notified.swap(true, Ordering::SeqCst) { self.notify_inner()?; + self.operations_complete.notify_one(); } Ok(()) @@ -248,10 +258,9 @@ impl Poller { let mut fds = self.fds.lock().unwrap(); - // If there was no caller of `wait` our byte was not removed from the pipe, so attempt to - // remove one byte from the pipe. + // If there was no caller of `wait` our notification was not removed from the pipe. if sent_notification { - let _ = syscall!(read(self.notify_read, &mut [0; 1] as *mut _ as *mut _, 1)); + let _ = self.pop_notification(); } let res = f(&mut *fds); @@ -265,7 +274,14 @@ impl Poller { /// Wake the current thread that is calling `wait`. fn notify_inner(&self) -> io::Result<()> { - syscall!(write(self.notify_write, &0_u8 as *const _ as *const _, 1)).map(drop) + syscall!(write(self.notify_write, &0_u8 as *const _ as *const _, 1))?; + Ok(()) + } + + /// Remove a notification created by `notify_inner`. + fn pop_notification(&self) -> io::Result<()> { + syscall!(read(self.notify_read, &mut [0; 1] as *mut _ as *mut _, 1))?; + Ok(()) } } From e0c0032cc02c88d304a88625d4e0fb22e57a31cf Mon Sep 17 00:00:00 2001 From: Kestrer Date: Wed, 26 May 2021 10:25:24 +0100 Subject: [PATCH 06/14] swap_remove deleted file descriptors in poll backend --- src/poll.rs | 48 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/src/poll.rs b/src/poll.rs index a4cd9a3..5c1b90a 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -12,9 +12,6 @@ use libc::c_int as RawFd; use crate::Event; -/// Special value for an fd in a pollfd to signal that it should be removed. -const REMOVE_FD: RawFd = -2; - /// Interface to poll. #[derive(Debug)] pub struct Poller { @@ -51,8 +48,6 @@ struct Fds { /// /// The first file descriptor is always present and is used to notify the poller. It is also /// stored in `notify_read`. - /// - /// If the fd stored in here is `REMOVE_FD`, it should be removed. poll_fds: Vec, /// The map of each file descriptor to data associated with it. This does not include the file /// descriptors `notify_read` or `notify_write`. @@ -83,7 +78,11 @@ impl Poller { notify_read_flags | libc::O_NONBLOCK ))?; - log::trace!("new: notify_read={}, notify_write={}", notify_pipe[0], notify_pipe[1]); + log::trace!( + "new: notify_read={}, notify_write={}", + notify_pipe[0], + notify_pipe[1] + ); Ok(Self { fds: Mutex::new(Fds { @@ -108,7 +107,12 @@ impl Poller { return Err(io::Error::from(io::ErrorKind::InvalidInput)); } - log::trace!("add: notify_read={}, fd={}, ev={:?}", self.notify_read, fd, ev); + log::trace!( + "add: notify_read={}, fd={}, ev={:?}", + self.notify_read, + fd, + ev + ); self.modify_fds(|fds| { if fds.fd_data.contains_key(&fd) { @@ -136,7 +140,12 @@ impl Poller { /// Modifies an existing file descriptor. pub fn modify(&self, fd: RawFd, ev: Event) -> io::Result<()> { - log::trace!("modify: notify_read={}, fd={}, ev={:?}", self.notify_read, fd, ev); + log::trace!( + "modify: notify_read={}, fd={}, ev={:?}", + self.notify_read, + fd, + ev + ); self.modify_fds(|fds| { let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?; @@ -154,7 +163,13 @@ impl Poller { self.modify_fds(|fds| { let data = fds.fd_data.remove(&fd).ok_or(io::ErrorKind::NotFound)?; - fds.poll_fds[data.poll_fds_index].fd = REMOVE_FD; + fds.poll_fds.swap_remove(data.poll_fds_index); + if let Some(swapped_pollfd) = fds.poll_fds.get(data.poll_fds_index) { + fds.fd_data + .get_mut(&swapped_pollfd.fd) + .unwrap() + .poll_fds_index = data.poll_fds_index; + } Ok(()) }) @@ -162,7 +177,11 @@ impl Poller { /// Waits for I/O events with an optional timeout. pub fn wait(&self, events: &mut Events, timeout: Option) -> io::Result<()> { - log::trace!("wait: notify_read={}, timeout={:?}", self.notify_read, timeout); + log::trace!( + "wait: notify_read={}, timeout={:?}", + self.notify_read, + timeout + ); let deadline = timeout.map(|t| Instant::now() + t); @@ -184,14 +203,15 @@ impl Poller { fds = self.operations_complete.wait(fds).unwrap(); } - // Remove all fds that have been marked to be removed. - fds.poll_fds.retain(|poll_fd| poll_fd.fd != REMOVE_FD); - // Perform the poll. let num_events = poll(&mut fds.poll_fds, deadline)?; let notified = fds.poll_fds[0].revents != 0; let num_fd_events = if notified { num_events - 1 } else { num_events }; - log::trace!("new events: notify_read={}, num={}", self.notify_read, num_events); + log::trace!( + "new events: notify_read={}, num={}", + self.notify_read, + num_events + ); // Read all notifications. if notified { From c174c9b7bb773cf57e29995b1e2a7fe96162eed1 Mon Sep 17 00:00:00 2001 From: Kestrer Date: Wed, 26 May 2021 10:33:24 +0100 Subject: [PATCH 07/14] Prevent timeout overflow for poll and wepoll --- src/poll.rs | 2 +- src/wepoll.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/poll.rs b/src/poll.rs index 5c1b90a..70b6ddd 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -361,7 +361,7 @@ fn poll(fds: &mut [libc::pollfd], deadline: Option) -> io::Result Date: Wed, 26 May 2021 13:57:38 +0100 Subject: [PATCH 08/14] Depend on libc on Fuchsia and VxWorks --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8f18c07..ec5f7d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ std = [] cfg-if = "1" log = "0.4.11" -[target."cfg(unix)".dependencies] +[target.'cfg(any(unix, target_os = "fuchsia", target_os = "vxworks"))'.dependencies] libc = { version = "0.2.77", features = ["extra_traits"] } [target.'cfg(windows)'.dependencies] From 9c257ccef66ac44a1186e1229b6c4ccf330c11f5 Mon Sep 17 00:00:00 2001 From: Kestrer Date: Sat, 4 Sep 2021 17:47:54 +0100 Subject: [PATCH 09/14] Cast to `nfds_t` instead of `u64` in `poll` --- src/poll.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/poll.rs b/src/poll.rs index 70b6ddd..3a7bda3 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -367,7 +367,7 @@ fn poll(fds: &mut [libc::pollfd], deadline: Option) -> io::Result break Ok(num_events as usize), // poll returns EAGAIN if we can retry it. Err(e) if e.raw_os_error() == Some(libc::EAGAIN) => continue, From ee13911717235a9e0cfd4eafed36c4dad9343eb1 Mon Sep 17 00:00:00 2001 From: Kestrer Date: Sat, 4 Sep 2021 17:58:56 +0100 Subject: [PATCH 10/14] Remove `extra_traits` libc feature --- Cargo.toml | 2 +- src/lib.rs | 2 +- src/poll.rs | 42 +++++++++++++++++++++++++++++++----------- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ec5f7d8..85ac8f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ cfg-if = "1" log = "0.4.11" [target.'cfg(any(unix, target_os = "fuchsia", target_os = "vxworks"))'.dependencies] -libc = { version = "0.2.77", features = ["extra_traits"] } +libc = "0.2.77" [target.'cfg(windows)'.dependencies] wepoll-sys = "3.0.0" diff --git a/src/lib.rs b/src/lib.rs index 9ae0b7b..bbd273d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,7 +74,7 @@ macro_rules! syscall { } cfg_if! { - if #[cfg(any(target_os = "linux", target_os = "android"))] { + if #[cfg(any(/*target_os = "linux",*/ target_os = "android"))] { mod epoll; use epoll as sys; } else if #[cfg(any( diff --git a/src/poll.rs b/src/poll.rs index 3a7bda3..63fee66 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::convert::TryInto; +use std::fmt::{self, Debug, Formatter}; use std::io; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Condvar, Mutex}; @@ -48,12 +49,27 @@ struct Fds { /// /// The first file descriptor is always present and is used to notify the poller. It is also /// stored in `notify_read`. - poll_fds: Vec, + poll_fds: Vec, /// The map of each file descriptor to data associated with it. This does not include the file /// descriptors `notify_read` or `notify_write`. fd_data: HashMap, } +/// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives without adding the +/// `extra_traits` feature of `libc`. +#[repr(transparent)] +struct PollFd(libc::pollfd); + +impl Debug for PollFd { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("pollfd") + .field("fd", &self.0.fd) + .field("events", &self.0.events) + .field("revents", &self.0.revents) + .finish() + } +} + /// Data associated with a file descriptor in a poller. #[derive(Debug)] struct FdData { @@ -86,11 +102,11 @@ impl Poller { Ok(Self { fds: Mutex::new(Fds { - poll_fds: vec![libc::pollfd { + poll_fds: vec![PollFd(libc::pollfd { fd: notify_pipe[0], events: libc::POLLRDNORM, revents: 0, - }], + })], fd_data: HashMap::new(), }), notify_read: notify_pipe[0], @@ -128,11 +144,11 @@ impl Poller { }, ); - fds.poll_fds.push(libc::pollfd { + fds.poll_fds.push(PollFd(libc::pollfd { fd, events: poll_events(ev), revents: 0, - }); + })); Ok(()) }) @@ -151,7 +167,7 @@ impl Poller { let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?; data.key = ev.key; let poll_fds_index = data.poll_fds_index; - fds.poll_fds[poll_fds_index].events = poll_events(ev); + fds.poll_fds[poll_fds_index].0.events = poll_events(ev); Ok(()) }) @@ -166,7 +182,7 @@ impl Poller { fds.poll_fds.swap_remove(data.poll_fds_index); if let Some(swapped_pollfd) = fds.poll_fds.get(data.poll_fds_index) { fds.fd_data - .get_mut(&swapped_pollfd.fd) + .get_mut(&swapped_pollfd.0.fd) .unwrap() .poll_fds_index = data.poll_fds_index; } @@ -205,7 +221,7 @@ impl Poller { // Perform the poll. let num_events = poll(&mut fds.poll_fds, deadline)?; - let notified = fds.poll_fds[0].revents != 0; + let notified = fds.poll_fds[0].0.revents != 0; let num_fd_events = if notified { num_events - 1 } else { num_events }; log::trace!( "new events: notify_read={}, num={}", @@ -233,7 +249,7 @@ impl Poller { events.inner.reserve(num_fd_events); for fd_data in fds.fd_data.values_mut() { - let mut poll_fd = fds.poll_fds[fd_data.poll_fds_index]; + let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index]; if poll_fd.revents != 0 { // Store event events.inner.push(Event { @@ -351,7 +367,7 @@ impl Events { } /// Helper function to call poll. -fn poll(fds: &mut [libc::pollfd], deadline: Option) -> io::Result { +fn poll(fds: &mut [PollFd], deadline: Option) -> io::Result { loop { // Convert the timeout to milliseconds. let timeout_ms = deadline @@ -367,7 +383,11 @@ fn poll(fds: &mut [libc::pollfd], deadline: Option) -> io::Result break Ok(num_events as usize), // poll returns EAGAIN if we can retry it. Err(e) if e.raw_os_error() == Some(libc::EAGAIN) => continue, From 1930801c402be671a09e3f25abf997c80f9aa011 Mon Sep 17 00:00:00 2001 From: Kestrer Date: Sat, 4 Sep 2021 18:00:25 +0100 Subject: [PATCH 11/14] Document `poll` support in README --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 44a0e8b..9a1bec6 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ Supported platforms: - [kqueue](https://en.wikipedia.org/wiki/Kqueue): macOS, iOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD - [event ports](https://illumos.org/man/port_create): illumos, Solaris +- [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, other Unix systems - [wepoll](https://github.com/piscisaureus/wepoll): Windows Polling is done in oneshot mode, which means interest in I/O events needs to be reset after From bbce346140f4d260c35891f4b6244f04c6e4537d Mon Sep 17 00:00:00 2001 From: Kestrer Date: Sat, 4 Sep 2021 18:01:56 +0100 Subject: [PATCH 12/14] Add Fuchsia to cross workflow on CI --- .github/workflows/cross.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/cross.yaml b/.github/workflows/cross.yaml index 381ce5c..ba0f396 100644 --- a/.github/workflows/cross.yaml +++ b/.github/workflows/cross.yaml @@ -46,6 +46,10 @@ jobs: if: startsWith(matrix.os, 'ubuntu') run: cross check --target x86_64-unknown-linux-gnux32 + - name: Fuchsia + if: startsWith(matrix.os, 'ubuntu') + run: cross build --target x86_64-fuchsia + # - name: illumos # if: startsWith(matrix.os, 'ubuntu') # run: cross build --target x86_64-unknown-illumos From 597b6aed8687fa74b289c6332625db8ec036fef7 Mon Sep 17 00:00:00 2001 From: Kestrer Date: Sat, 4 Sep 2021 19:19:02 +0100 Subject: [PATCH 13/14] Add concurrent modification tests --- src/lib.rs | 4 +- tests/concurrent_modification.rs | 69 ++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 2 deletions(-) create mode 100644 tests/concurrent_modification.rs diff --git a/src/lib.rs b/src/lib.rs index bbd273d..5591ca7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,7 +74,7 @@ macro_rules! syscall { } cfg_if! { - if #[cfg(any(/*target_os = "linux",*/ target_os = "android"))] { + if #[cfg(any(target_os = "linux", target_os = "android"))] { mod epoll; use epoll as sys; } else if #[cfg(any( @@ -112,7 +112,7 @@ cfg_if! { const NOTIFY_KEY: usize = std::usize::MAX; /// Indicates that a file descriptor or socket can read or write without blocking. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct Event { /// Key identifying the file descriptor or socket. pub key: usize, diff --git a/tests/concurrent_modification.rs b/tests/concurrent_modification.rs new file mode 100644 index 0000000..be28195 --- /dev/null +++ b/tests/concurrent_modification.rs @@ -0,0 +1,69 @@ +use std::time::Duration; +use std::net::{TcpListener, TcpStream}; +use std::thread; +use std::io::{self, Write}; + +use easy_parallel::Parallel; +use polling::{Poller, Event}; + +#[test] +fn concurrent_add() -> io::Result<()> { + let (reader, mut writer) = tcp_pair()?; + let poller = Poller::new()?; + + let mut events = Vec::new(); + + Parallel::new() + .add(|| { + poller.wait(&mut events, None)?; + Ok(()) + }) + .add(|| { + thread::sleep(Duration::from_millis(100)); + poller.add(&reader, Event::readable(0))?; + writer.write_all(&[1])?; + Ok(()) + }) + .run() + .into_iter() + .collect::>()?; + + assert_eq!(events, [Event::readable(0)]); + + Ok(()) +} + +#[test] +fn concurrent_modify() -> io::Result<()> { + let (reader, mut writer) = tcp_pair()?; + let poller = Poller::new()?; + poller.add(&reader, Event::none(0))?; + + let mut events = Vec::new(); + + Parallel::new() + .add(|| { + poller.wait(&mut events, None)?; + Ok(()) + }) + .add(|| { + thread::sleep(Duration::from_millis(100)); + poller.modify(&reader, Event::readable(0))?; + writer.write_all(&[1])?; + Ok(()) + }) + .run() + .into_iter() + .collect::>()?; + + assert_eq!(events, [Event::readable(0)]); + + Ok(()) +} + +fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> { + let listener = TcpListener::bind("127.0.0.1:0")?; + let a = TcpStream::connect(listener.local_addr()?)?; + let (b, _) = listener.accept()?; + Ok((a, b)) +} From e902924621e19c6ee8af3d9e41bb199f7ee9ff0b Mon Sep 17 00:00:00 2001 From: Kestrer Date: Sat, 4 Sep 2021 19:52:59 +0100 Subject: [PATCH 14/14] Add Fuchsia target before crosscompiling in CI --- .github/workflows/cross.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/cross.yaml b/.github/workflows/cross.yaml index ba0f396..cbcfb17 100644 --- a/.github/workflows/cross.yaml +++ b/.github/workflows/cross.yaml @@ -46,6 +46,10 @@ jobs: if: startsWith(matrix.os, 'ubuntu') run: cross check --target x86_64-unknown-linux-gnux32 + - name: Add fuchsia target + if: startsWith(matrix.os, 'ubuntu') + run: rustup target add x86_64-fuchsia + - name: Fuchsia if: startsWith(matrix.os, 'ubuntu') run: cross build --target x86_64-fuchsia