Skip to content

Commit

Permalink
Implement poll-based backend
Browse files Browse the repository at this point in the history
Introduces a new backend for UNIX using the level triggered poll()
syscall instead of epoll or kqueue.  This support is crucial for
embedded systems like the esp32 family but also for alternative
operating systems like Haiku.

This diff does not introduce any new platform support targets itself but
provides the core technical implementation necessary to support these
other targets.  Future PRs will introduce specific platform support
however due to reasons outlined in tokio-rs#1602 (many thanks for this initial
effort BTW!) it is not possible to automate tests for those platforms.
We will instead rely on the fact that existing strong POSIX platforms
like Linux can serve as a proxy to prove that the mio code is working
nominally.
  • Loading branch information
jasta committed Jul 11, 2023
1 parent ec0776f commit fafe7d4
Show file tree
Hide file tree
Showing 19 changed files with 1,032 additions and 81 deletions.
10 changes: 3 additions & 7 deletions src/io_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ where
) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.associate(registry)?;
registry
.selector()
.register(self.inner.as_raw_fd(), token, interests)
self.state.register(registry, token, interests, self.inner.as_raw_fd())
}

fn reregister(
Expand All @@ -155,15 +153,13 @@ where
) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.check_association(registry)?;
registry
.selector()
.reregister(self.inner.as_raw_fd(), token, interests)
self.state.reregister(registry, token, interests, self.inner.as_raw_fd())
}

fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.remove_association(registry)?;
registry.selector().deregister(self.inner.as_raw_fd())
self.state.deregister(registry, self.inner.as_raw_fd())
}
}

Expand Down
57 changes: 57 additions & 0 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,63 @@ macro_rules! cfg_any_os_ext {
}
}

/// The current platform supports epoll.
macro_rules! cfg_epoll_selector {
($($item:item)*) => {
$(
#[cfg(all(
not(mio_unsupported_force_poll_poll),
any(
target_os = "android",
target_os = "illumos",
target_os = "linux",
target_os = "redox",
)))]
$item
)*
}
}

macro_rules! cfg_poll_selector {
($($item:item)*) => {
$(
#[cfg(mio_unsupported_force_poll_poll)]
$item
)*
}
}

/// The current platform supports kqueue.
macro_rules! cfg_kqueue_selector {
($($item:item)*) => {
$(
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd",
target_os = "tvos",
target_os = "watchos",
))]
$item
)*
}
}

macro_rules! cfg_selector_has_fd {
($($item:item)*) => {
$(
#[cfg(all(
unix,
not(mio_unsupported_force_poll_poll),
))]
$item
)*
}
}

macro_rules! trace {
($($t:tt)*) => {
log!(trace, $($t)*)
Expand Down
30 changes: 18 additions & 12 deletions src/poll.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{event, sys, Events, Interest, Token};
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, RawFd};
cfg_selector_has_fd! {
use std::os::unix::io::{AsRawFd, RawFd};
}
use std::time::Duration;
use std::{fmt, io};

Expand Down Expand Up @@ -411,7 +412,10 @@ impl Poll {
}
}

#[cfg(unix)]
#[cfg(all(
unix,
not(mio_unsupported_force_poll_poll),
))]
impl AsRawFd for Poll {
fn as_raw_fd(&self) -> RawFd {
self.registry.as_raw_fd()
Expand Down Expand Up @@ -696,18 +700,20 @@ impl fmt::Debug for Registry {
}
}

#[cfg(unix)]
impl AsRawFd for Registry {
fn as_raw_fd(&self) -> RawFd {
self.selector.as_raw_fd()
cfg_selector_has_fd! {
impl AsRawFd for Registry {
fn as_raw_fd(&self) -> RawFd {
self.selector.as_raw_fd()
}
}
}

cfg_os_poll! {
#[cfg(unix)]
#[test]
pub fn as_raw_fd() {
let poll = Poll::new().unwrap();
assert!(poll.as_raw_fd() > 0);
cfg_selector_has_fd! {
#[test]
pub fn as_raw_fd() {
let poll = Poll::new().unwrap();
assert!(poll.as_raw_fd() > 0);
}
}
}
1 change: 1 addition & 0 deletions src/sys/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//! methods.
//! * `tcp` and `udp` modules: see the [`crate::net`] module.
//! * `Waker`: see [`crate::Waker`].
//! * `WakerRegistrar`: state for `Waker` type.
cfg_os_poll! {
macro_rules! debug_detail {
Expand Down
21 changes: 2 additions & 19 deletions src/sys/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ macro_rules! syscall {
cfg_os_poll! {
mod selector;
pub(crate) use self::selector::{event, Event, Events, Selector};
pub(crate) use self::selector::WakerRegistrar;

mod sourcefd;
pub use self::sourcefd::SourceFd;
Expand All @@ -33,25 +34,7 @@ cfg_os_poll! {
}

cfg_io_source! {
use std::io;

// Both `kqueue` and `epoll` don't need to hold any user space state.
pub(crate) struct IoSourceState;

impl IoSourceState {
pub fn new() -> IoSourceState {
IoSourceState
}

pub fn do_io<T, F, R>(&self, f: F, io: &T) -> io::Result<R>
where
F: FnOnce(&T) -> io::Result<R>,
{
// We don't hold state, so we can just call the function and
// return.
f(io)
}
}
pub(crate) use self::selector::{IoSourceState};
}

cfg_os_ext! {
Expand Down
47 changes: 47 additions & 0 deletions src/sys/unix/selector/adapters/edge_triggered/io_source_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use crate::{Interest, Registry, Token};
use std::io;
use std::os::unix::io::RawFd;

pub(crate) struct IoSourceState;

impl IoSourceState {
pub fn new() -> IoSourceState {
IoSourceState
}

pub fn do_io<T, F, R>(&self, f: F, io: &T) -> io::Result<R>
where
F: FnOnce(&T) -> io::Result<R>,
{
// We don't hold state, so we can just call the function and
// return.
f(io)
}

pub fn register(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
fd: RawFd,
) -> io::Result<()> {
// Pass through, we don't have any state
registry.selector().register(fd, token, interests)
}

pub fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
fd: RawFd,
) -> io::Result<()> {
// Pass through, we don't have any state
registry.selector().reregister(fd, token, interests)
}

pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> {
// Pass through, we don't have any state
registry.selector().deregister(fd)
}
}
7 changes: 7 additions & 0 deletions src/sys/unix/selector/adapters/edge_triggered/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//! Implementation details for when we have an edge-triggered backend (i.e. epoll and kqueue).
cfg_io_source! {
pub(super) mod io_source_state;
}

pub(super) mod waker_registrar;
19 changes: 19 additions & 0 deletions src/sys/unix/selector/adapters/edge_triggered/waker_registrar.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::io;
use std::os::fd::RawFd;
use crate::{Interest, Token};
use crate::sys::Selector;

#[derive(Debug)]
pub(crate) struct WakerRegistrar;

impl WakerRegistrar {
pub fn register(selector: &Selector, fd: RawFd, token: Token) -> io::Result<Self> {
selector.register(fd, token, Interest::READABLE)?;
Ok(Self)
}

pub fn prepare_to_wake(&self) -> io::Result<()> {
// Nothing to do in the case that we are using an edge-triggered API
Ok(())
}
}
108 changes: 108 additions & 0 deletions src/sys/unix/selector/adapters/level_triggered/io_source_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use crate::sys::Selector;
use crate::{Interest, Registry, Token};
use std::io;
use std::os::unix::io::RawFd;
use std::sync::Arc;
use crate::sys::unix::selector::poll::RegistrationRecord;

struct InternalState {
selector: Selector,
token: Token,
interests: Interest,
fd: RawFd,
shared_record: Arc<RegistrationRecord>,
}

impl Drop for InternalState {
fn drop(&mut self) {
if self.shared_record.is_registered() {
let _ = self.selector.deregister(self.fd);
}
}
}

pub(crate) struct IoSourceState {
inner: Option<Box<InternalState>>,
}

impl IoSourceState {
pub fn new() -> IoSourceState {
IoSourceState { inner: None }
}

pub fn do_io<T, F, R>(&self, f: F, io: &T) -> io::Result<R>
where
F: FnOnce(&T) -> io::Result<R>,
{
let result = f(io);

if let Err(err) = &result {
println!("err={err:?}");

if err.kind() == io::ErrorKind::WouldBlock {
self.inner.as_ref().map_or(Ok(()), |state| {
state
.selector
.reregister(state.fd, state.token, state.interests)
})?;
}
}

result
}

pub fn register(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
fd: RawFd,
) -> io::Result<()> {
if self.inner.is_some() {
Err(io::ErrorKind::AlreadyExists.into())
} else {
let selector = registry.selector().try_clone()?;

selector.register_internal(fd, token, interests).map(move |shared_record| {
let state = InternalState {
selector,
token,
interests,
fd,
shared_record,
};

self.inner = Some(Box::new(state));
})
}
}

pub fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
fd: RawFd,
) -> io::Result<()> {
match self.inner.as_mut() {
Some(state) => registry
.selector()
.reregister(fd, token, interests)
.map(|()| {
state.token = token;
state.interests = interests;
}),
None => Err(io::ErrorKind::NotFound.into()),
}
}

pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> {
if let Some(state) = self.inner.take() {
// Marking unregistered will short circuit the drop behaviour of calling
// deregister so the call to deregister below is strictly required.
state.shared_record.mark_unregistered();
}

registry.selector().deregister(fd)
}
}
8 changes: 8 additions & 0 deletions src/sys/unix/selector/adapters/level_triggered/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//! Implementation details for when we need to mimic an edge-triggered backend but actually have a
//! level-triggered backend (e.g. poll).
cfg_io_source! {
pub(super) mod io_source_state;
}

pub(super) mod waker_registrar;
Loading

0 comments on commit fafe7d4

Please sign in to comment.