Skip to content

Commit

Permalink
Merge pull request #819 from stlankes/net
Browse files Browse the repository at this point in the history
using a futex to wakeup network tasks
  • Loading branch information
stlankes authored Aug 7, 2023
2 parents 6d07976 + 47bc146 commit 22f4068
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 80 deletions.
22 changes: 15 additions & 7 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,34 @@ pub(crate) mod task;
use alloc::sync::Arc;
use alloc::task::Wake;
use core::future::Future;
use core::sync::atomic::AtomicU32;
use core::task::{Context, Poll, Waker};

use hermit_sync::without_interrupts;

use crate::arch::core_local::*;
use crate::executor::task::AsyncTask;
use crate::scheduler::task::TaskHandle;
use crate::synch::futex::*;

struct TaskNotify {
/// The single task executor .
handle: TaskHandle,
/// Futex to wakeup a single task
futex: AtomicU32,
}

impl TaskNotify {
pub fn new() -> Self {
pub const fn new() -> Self {
Self {
handle: core_scheduler().get_current_task_handle(),
futex: AtomicU32::new(0),
}
}

pub fn wait(&self, timeout: Option<u64>) {
// Wait for a futex and reset the value to zero. If the value
// is not zero, someone already wanted to wakeup a taks and stored another
// value to the futex address. In this case, the function directly returns
// and doesn't block.
let _ = futex_wait_and_set(&self.futex, 0, timeout, Flags::RELATIVE, 0);
}
}

impl Wake for TaskNotify {
Expand All @@ -36,8 +45,7 @@ impl Wake for TaskNotify {
}

fn wake_by_ref(self: &Arc<Self>) {
trace!("Wakeup task {}", self.handle.get_id());
core_scheduler().custom_wakeup(self.handle);
let _ = futex_wake_or_set(&self.futex, 1, u32::MAX);
}
}

Expand Down
110 changes: 39 additions & 71 deletions src/executor/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use core::sync::atomic::{AtomicU16, Ordering};
use core::task::{Context, Poll};

use crossbeam_utils::Backoff;
use hermit_sync::{without_interrupts, InterruptTicketMutex};
use hermit_sync::InterruptTicketMutex;
use smoltcp::iface::{SocketHandle, SocketSet};
#[cfg(feature = "dhcpv4")]
use smoltcp::socket::dhcpv4;
Expand All @@ -16,8 +16,8 @@ use smoltcp::time::{Duration, Instant};
#[cfg(feature = "dhcpv4")]
use smoltcp::wire::{IpCidr, Ipv4Address, Ipv4Cidr};

use crate::arch;
use crate::arch::core_local::*;
use crate::arch::{self, interrupts};
#[cfg(not(feature = "pci"))]
use crate::drivers::mmio::get_network_driver;
use crate::drivers::net::NetworkDriver;
Expand Down Expand Up @@ -238,10 +238,9 @@ where
get_network_driver().unwrap().lock().set_polling_mode(true);

let backoff = Backoff::new();
let mut blocking_time = 1000;
let start = now();
let task_notify = Arc::new(TaskNotify::new());
let waker = task_notify.into();
let waker = task_notify.clone().into();
let mut cx = Context::from_waker(&waker);
let mut future = future;
let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) };
Expand All @@ -250,8 +249,10 @@ where
// run background tasks
crate::executor::run();

let now = crate::executor::network::now();

if let Poll::Ready(t) = future.as_mut().poll(&mut cx) {
let network_timer = network_delay(crate::executor::network::now())
let network_timer = network_delay(now)
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(network_timer);

Expand All @@ -263,7 +264,7 @@ where

if let Some(duration) = timeout {
if crate::executor::network::now() >= start + duration {
let network_timer = network_delay(crate::executor::network::now())
let network_timer = network_delay(now)
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(network_timer);

Expand All @@ -274,53 +275,24 @@ where
}
}

// disable all interrupts
interrupts::disable();
let now = crate::executor::network::now();
let delay = network_delay(now).map(|d| d.total_micros());
if backoff.is_completed() && delay.unwrap_or(10_000_000) > 10_000 {
// add additional check before the task will block
if let Poll::Ready(t) = future.as_mut().poll(&mut cx) {
// allow network interrupts
get_network_driver().unwrap().lock().set_polling_mode(false);
// enable interrupts
interrupts::enable();

return t;
}

let ticks = crate::arch::processor::get_timer_ticks();
let wakeup_time = timeout
.map(|duration| {
core::cmp::min(
u64::try_from((start + duration).total_micros()).unwrap(),
ticks + delay.unwrap_or(blocking_time),
)
})
.or(Some(ticks + delay.unwrap_or(blocking_time)));
let wakeup_time =
timeout.map(|duration| u64::try_from((start + duration).total_micros()).unwrap());
let network_timer = delay.map(|d| ticks + d);
let core_scheduler = core_scheduler();
blocking_time *= 2;

core_scheduler.add_network_timer(network_timer);
core_scheduler.block_current_task(wakeup_time);
core_scheduler().add_network_timer(network_timer);

// allow network interrupts
get_network_driver().unwrap().lock().set_polling_mode(false);

// enable interrupts
interrupts::enable();

// switch to another task
core_scheduler.reschedule();
task_notify.wait(wakeup_time);

// restore default values
get_network_driver().unwrap().lock().set_polling_mode(true);
backoff.reset();
} else {
// enable interrupts
interrupts::enable();

backoff.snooze();
}
}
Expand All @@ -331,45 +303,41 @@ pub(crate) fn poll_on<F, T>(future: F, timeout: Option<Duration>) -> Result<T, i
where
F: Future<Output = Result<T, i32>>,
{
// be sure that we are not interrupted by a timer, which is able
// to call `reschedule`
without_interrupts(|| {
// avoid network interrupts
get_network_driver().unwrap().lock().set_polling_mode(true);

let start = now();
let waker = core::task::Waker::noop();
let mut cx = Context::from_waker(&waker);
let mut future = future;
let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) };

loop {
// run background tasks
crate::executor::run();

if let Poll::Ready(t) = future.as_mut().poll(&mut cx) {
// avoid network interrupts
get_network_driver().unwrap().lock().set_polling_mode(true);

let start = now();
let waker = core::task::Waker::noop();
let mut cx = Context::from_waker(&waker);
let mut future = future;
let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) };

loop {
// run background tasks
crate::executor::run();

if let Poll::Ready(t) = future.as_mut().poll(&mut cx) {
let wakeup_time = network_delay(now())
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(wakeup_time);

// allow network interrupts
get_network_driver().unwrap().lock().set_polling_mode(false);

return t;
}

if let Some(duration) = timeout {
if crate::executor::network::now() >= start + duration {
let wakeup_time = network_delay(now())
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(wakeup_time);

// allow network interrupts
get_network_driver().unwrap().lock().set_polling_mode(false);

return t;
}

if let Some(duration) = timeout {
if crate::executor::network::now() >= start + duration {
let wakeup_time = network_delay(now())
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(wakeup_time);

// allow network interrupts
get_network_driver().unwrap().lock().set_polling_mode(false);

return Err(-crate::errno::ETIME);
}
return Err(-crate::errno::ETIME);
}
}
})
}
}
117 changes: 115 additions & 2 deletions src/synch/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ fn addr(addr: &AtomicU32) -> usize {
/// The timeout is given in microseconds. If [`Flags::RELATIVE`] is given, it is interpreted as
/// relative to the current time. Otherwise it is understood to be an absolute time
/// (see `get_timer_ticks`).
pub fn futex_wait(address: &AtomicU32, expected: u32, timeout: Option<u64>, flags: Flags) -> i32 {
pub(crate) fn futex_wait(
address: &AtomicU32,
expected: u32,
timeout: Option<u64>,
flags: Flags,
) -> i32 {
let mut parking_lot = PARKING_LOT.lock();
// Check the futex value after locking the parking lot so that all changes are observed.
if address.load(SeqCst) != expected {
Expand Down Expand Up @@ -89,10 +94,79 @@ pub fn futex_wait(address: &AtomicU32, expected: u32, timeout: Option<u64>, flag
}
}

/// If the value at address matches the expected value, park the current thread until it is either
/// woken up with `futex_wake` (returns 0) or the specified timeout elapses (returns -ETIMEDOUT).
/// In addition, the value `new_value` will stored at address.
///
/// The timeout is given in microseconds. If [`Flags::RELATIVE`] is given, it is interpreted as
/// relative to the current time. Otherwise it is understood to be an absolute time
/// (see `get_timer_ticks`).
pub(crate) fn futex_wait_and_set(
address: &AtomicU32,
expected: u32,
timeout: Option<u64>,
flags: Flags,
new_value: u32,
) -> i32 {
let mut parking_lot = PARKING_LOT.lock();
// Check the futex value after locking the parking lot so that all changes are observed.
if address.swap(new_value, SeqCst) != expected {
return -EAGAIN;
}

let wakeup_time = if flags.contains(Flags::RELATIVE) {
timeout.and_then(|t| get_timer_ticks().checked_add(t))
} else {
timeout
};

let scheduler = core_scheduler();
scheduler.block_current_task(wakeup_time);
let handle = scheduler.get_current_task_handle();
parking_lot.entry(addr(address)).or_default().push(handle);
drop(parking_lot);

loop {
scheduler.reschedule();

let mut parking_lot = PARKING_LOT.lock();
if matches!(wakeup_time, Some(t) if t <= get_timer_ticks()) {
let mut wakeup = true;
// Timeout occurred, try to remove ourselves from the waiting queue.
if let Entry::Occupied(mut queue) = parking_lot.entry(addr(address)) {
// If we are not in the waking queue, this must have been a wakeup.
wakeup = !queue.get_mut().remove(handle);
if queue.get().is_empty() {
queue.remove();
}
}

if wakeup {
return 0;
} else {
return -ETIMEDOUT;
}
} else {
// If we are not in the waking queue, this must have been a wakeup.
let wakeup = !matches!(parking_lot
.get(&addr(address)), Some(queue) if queue.contains(handle));

if wakeup {
return 0;
} else {
// A spurious wakeup occurred, sleep again.
// Tasks do not change core, so the handle in the parking lot is still current.
scheduler.block_current_task(wakeup_time);
}
}
drop(parking_lot);
}
}

/// Wake `count` threads waiting on the futex at address. Returns the number of threads
/// woken up (saturates to `i32::MAX`). If `count` is `i32::MAX`, wake up all matching
/// waiting threads. If `count` is negative, returns -EINVAL.
pub fn futex_wake(address: &AtomicU32, count: i32) -> i32 {
pub(crate) fn futex_wake(address: &AtomicU32, count: i32) -> i32 {
if count < 0 {
return -EINVAL;
}
Expand All @@ -119,3 +193,42 @@ pub fn futex_wake(address: &AtomicU32, count: i32) -> i32 {

woken
}

/// Wake `count` threads waiting on the futex at address. Returns the number of threads
/// woken up (saturates to `i32::MAX`). If `count` is `i32::MAX`, wake up all matching
/// waiting threads. If `count` is negative, returns -EINVAL. If no thread is available,
/// the futex at address will set to `new_value`.
pub(crate) fn futex_wake_or_set(address: &AtomicU32, count: i32, new_value: u32) -> i32 {
if count < 0 {
return -EINVAL;
}

let mut parking_lot = PARKING_LOT.lock();
let mut queue = match parking_lot.entry(addr(address)) {
Entry::Occupied(entry) => entry,
Entry::Vacant(_) => {
address.store(new_value, SeqCst);
return 0;
}
};

let scheduler = core_scheduler();
let mut woken = 0;
while woken != count || count == i32::MAX {
match queue.get_mut().pop() {
Some(handle) => scheduler.custom_wakeup(handle),
None => break,
}
woken = woken.saturating_add(1);
}

if queue.get().is_empty() {
queue.remove();
}

if woken == 0 {
address.store(new_value, SeqCst);
}

woken
}

0 comments on commit 22f4068

Please sign in to comment.