diff --git a/src/executor/mod.rs b/src/executor/mod.rs index 0f84b413cb..bf05943f69 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -9,25 +9,34 @@ pub(crate) mod task; use alloc::sync::Arc; use alloc::task::Wake; use core::future::Future; +use core::sync::atomic::AtomicU32; use core::task::{Context, Poll, Waker}; use hermit_sync::without_interrupts; use crate::arch::core_local::*; use crate::executor::task::AsyncTask; -use crate::scheduler::task::TaskHandle; +use crate::synch::futex::*; struct TaskNotify { - /// The single task executor . - handle: TaskHandle, + /// Futex to wakeup a single task + futex: AtomicU32, } impl TaskNotify { - pub fn new() -> Self { + pub const fn new() -> Self { Self { - handle: core_scheduler().get_current_task_handle(), + futex: AtomicU32::new(0), } } + + pub fn wait(&self, timeout: Option) { + // Wait for a futex and reset the value to zero. If the value + // is not zero, someone already wanted to wakeup a taks and stored another + // value to the futex address. In this case, the function directly returns + // and doesn't block. + let _ = futex_wait_and_set(&self.futex, 0, timeout, Flags::RELATIVE, 0); + } } impl Wake for TaskNotify { @@ -36,8 +45,7 @@ impl Wake for TaskNotify { } fn wake_by_ref(self: &Arc) { - trace!("Wakeup task {}", self.handle.get_id()); - core_scheduler().custom_wakeup(self.handle); + let _ = futex_wake_or_set(&self.futex, 1, u32::MAX); } } diff --git a/src/executor/network.rs b/src/executor/network.rs index 3b700a33d8..edaa04a956 100644 --- a/src/executor/network.rs +++ b/src/executor/network.rs @@ -7,7 +7,7 @@ use core::sync::atomic::{AtomicU16, Ordering}; use core::task::{Context, Poll}; use crossbeam_utils::Backoff; -use hermit_sync::{without_interrupts, InterruptTicketMutex}; +use hermit_sync::InterruptTicketMutex; use smoltcp::iface::{SocketHandle, SocketSet}; #[cfg(feature = "dhcpv4")] use smoltcp::socket::dhcpv4; @@ -16,8 +16,8 @@ use smoltcp::time::{Duration, Instant}; #[cfg(feature = "dhcpv4")] use smoltcp::wire::{IpCidr, Ipv4Address, Ipv4Cidr}; +use crate::arch; use crate::arch::core_local::*; -use crate::arch::{self, interrupts}; #[cfg(not(feature = "pci"))] use crate::drivers::mmio::get_network_driver; use crate::drivers::net::NetworkDriver; @@ -238,10 +238,9 @@ where get_network_driver().unwrap().lock().set_polling_mode(true); let backoff = Backoff::new(); - let mut blocking_time = 1000; let start = now(); let task_notify = Arc::new(TaskNotify::new()); - let waker = task_notify.into(); + let waker = task_notify.clone().into(); let mut cx = Context::from_waker(&waker); let mut future = future; let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) }; @@ -250,8 +249,10 @@ where // run background tasks crate::executor::run(); + let now = crate::executor::network::now(); + if let Poll::Ready(t) = future.as_mut().poll(&mut cx) { - let network_timer = network_delay(crate::executor::network::now()) + let network_timer = network_delay(now) .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); core_scheduler().add_network_timer(network_timer); @@ -263,7 +264,7 @@ where if let Some(duration) = timeout { if crate::executor::network::now() >= start + duration { - let network_timer = network_delay(crate::executor::network::now()) + let network_timer = network_delay(now) .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); core_scheduler().add_network_timer(network_timer); @@ -274,53 +275,24 @@ where } } - // disable all interrupts - interrupts::disable(); - let now = crate::executor::network::now(); let delay = network_delay(now).map(|d| d.total_micros()); if backoff.is_completed() && delay.unwrap_or(10_000_000) > 10_000 { - // add additional check before the task will block - if let Poll::Ready(t) = future.as_mut().poll(&mut cx) { - // allow network interrupts - get_network_driver().unwrap().lock().set_polling_mode(false); - // enable interrupts - interrupts::enable(); - - return t; - } - let ticks = crate::arch::processor::get_timer_ticks(); - let wakeup_time = timeout - .map(|duration| { - core::cmp::min( - u64::try_from((start + duration).total_micros()).unwrap(), - ticks + delay.unwrap_or(blocking_time), - ) - }) - .or(Some(ticks + delay.unwrap_or(blocking_time))); + let wakeup_time = + timeout.map(|duration| u64::try_from((start + duration).total_micros()).unwrap()); let network_timer = delay.map(|d| ticks + d); - let core_scheduler = core_scheduler(); - blocking_time *= 2; - - core_scheduler.add_network_timer(network_timer); - core_scheduler.block_current_task(wakeup_time); + core_scheduler().add_network_timer(network_timer); // allow network interrupts get_network_driver().unwrap().lock().set_polling_mode(false); - // enable interrupts - interrupts::enable(); - // switch to another task - core_scheduler.reschedule(); + task_notify.wait(wakeup_time); // restore default values get_network_driver().unwrap().lock().set_polling_mode(true); backoff.reset(); } else { - // enable interrupts - interrupts::enable(); - backoff.snooze(); } } @@ -331,23 +303,32 @@ pub(crate) fn poll_on(future: F, timeout: Option) -> Result>, { - // be sure that we are not interrupted by a timer, which is able - // to call `reschedule` - without_interrupts(|| { - // avoid network interrupts - get_network_driver().unwrap().lock().set_polling_mode(true); - - let start = now(); - let waker = core::task::Waker::noop(); - let mut cx = Context::from_waker(&waker); - let mut future = future; - let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) }; - - loop { - // run background tasks - crate::executor::run(); - - if let Poll::Ready(t) = future.as_mut().poll(&mut cx) { + // avoid network interrupts + get_network_driver().unwrap().lock().set_polling_mode(true); + + let start = now(); + let waker = core::task::Waker::noop(); + let mut cx = Context::from_waker(&waker); + let mut future = future; + let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) }; + + loop { + // run background tasks + crate::executor::run(); + + if let Poll::Ready(t) = future.as_mut().poll(&mut cx) { + let wakeup_time = network_delay(now()) + .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); + core_scheduler().add_network_timer(wakeup_time); + + // allow network interrupts + get_network_driver().unwrap().lock().set_polling_mode(false); + + return t; + } + + if let Some(duration) = timeout { + if crate::executor::network::now() >= start + duration { let wakeup_time = network_delay(now()) .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); core_scheduler().add_network_timer(wakeup_time); @@ -355,21 +336,8 @@ where // allow network interrupts get_network_driver().unwrap().lock().set_polling_mode(false); - return t; - } - - if let Some(duration) = timeout { - if crate::executor::network::now() >= start + duration { - let wakeup_time = network_delay(now()) - .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); - core_scheduler().add_network_timer(wakeup_time); - - // allow network interrupts - get_network_driver().unwrap().lock().set_polling_mode(false); - - return Err(-crate::errno::ETIME); - } + return Err(-crate::errno::ETIME); } } - }) + } } diff --git a/src/synch/futex.rs b/src/synch/futex.rs index 0e2a4f50a9..db1196d281 100644 --- a/src/synch/futex.rs +++ b/src/synch/futex.rs @@ -33,7 +33,12 @@ fn addr(addr: &AtomicU32) -> usize { /// The timeout is given in microseconds. If [`Flags::RELATIVE`] is given, it is interpreted as /// relative to the current time. Otherwise it is understood to be an absolute time /// (see `get_timer_ticks`). -pub fn futex_wait(address: &AtomicU32, expected: u32, timeout: Option, flags: Flags) -> i32 { +pub(crate) fn futex_wait( + address: &AtomicU32, + expected: u32, + timeout: Option, + flags: Flags, +) -> i32 { let mut parking_lot = PARKING_LOT.lock(); // Check the futex value after locking the parking lot so that all changes are observed. if address.load(SeqCst) != expected { @@ -89,10 +94,79 @@ pub fn futex_wait(address: &AtomicU32, expected: u32, timeout: Option, flag } } +/// If the value at address matches the expected value, park the current thread until it is either +/// woken up with `futex_wake` (returns 0) or the specified timeout elapses (returns -ETIMEDOUT). +/// In addition, the value `new_value` will stored at address. +/// +/// The timeout is given in microseconds. If [`Flags::RELATIVE`] is given, it is interpreted as +/// relative to the current time. Otherwise it is understood to be an absolute time +/// (see `get_timer_ticks`). +pub(crate) fn futex_wait_and_set( + address: &AtomicU32, + expected: u32, + timeout: Option, + flags: Flags, + new_value: u32, +) -> i32 { + let mut parking_lot = PARKING_LOT.lock(); + // Check the futex value after locking the parking lot so that all changes are observed. + if address.swap(new_value, SeqCst) != expected { + return -EAGAIN; + } + + let wakeup_time = if flags.contains(Flags::RELATIVE) { + timeout.and_then(|t| get_timer_ticks().checked_add(t)) + } else { + timeout + }; + + let scheduler = core_scheduler(); + scheduler.block_current_task(wakeup_time); + let handle = scheduler.get_current_task_handle(); + parking_lot.entry(addr(address)).or_default().push(handle); + drop(parking_lot); + + loop { + scheduler.reschedule(); + + let mut parking_lot = PARKING_LOT.lock(); + if matches!(wakeup_time, Some(t) if t <= get_timer_ticks()) { + let mut wakeup = true; + // Timeout occurred, try to remove ourselves from the waiting queue. + if let Entry::Occupied(mut queue) = parking_lot.entry(addr(address)) { + // If we are not in the waking queue, this must have been a wakeup. + wakeup = !queue.get_mut().remove(handle); + if queue.get().is_empty() { + queue.remove(); + } + } + + if wakeup { + return 0; + } else { + return -ETIMEDOUT; + } + } else { + // If we are not in the waking queue, this must have been a wakeup. + let wakeup = !matches!(parking_lot + .get(&addr(address)), Some(queue) if queue.contains(handle)); + + if wakeup { + return 0; + } else { + // A spurious wakeup occurred, sleep again. + // Tasks do not change core, so the handle in the parking lot is still current. + scheduler.block_current_task(wakeup_time); + } + } + drop(parking_lot); + } +} + /// Wake `count` threads waiting on the futex at address. Returns the number of threads /// woken up (saturates to `i32::MAX`). If `count` is `i32::MAX`, wake up all matching /// waiting threads. If `count` is negative, returns -EINVAL. -pub fn futex_wake(address: &AtomicU32, count: i32) -> i32 { +pub(crate) fn futex_wake(address: &AtomicU32, count: i32) -> i32 { if count < 0 { return -EINVAL; } @@ -119,3 +193,42 @@ pub fn futex_wake(address: &AtomicU32, count: i32) -> i32 { woken } + +/// Wake `count` threads waiting on the futex at address. Returns the number of threads +/// woken up (saturates to `i32::MAX`). If `count` is `i32::MAX`, wake up all matching +/// waiting threads. If `count` is negative, returns -EINVAL. If no thread is available, +/// the futex at address will set to `new_value`. +pub(crate) fn futex_wake_or_set(address: &AtomicU32, count: i32, new_value: u32) -> i32 { + if count < 0 { + return -EINVAL; + } + + let mut parking_lot = PARKING_LOT.lock(); + let mut queue = match parking_lot.entry(addr(address)) { + Entry::Occupied(entry) => entry, + Entry::Vacant(_) => { + address.store(new_value, SeqCst); + return 0; + } + }; + + let scheduler = core_scheduler(); + let mut woken = 0; + while woken != count || count == i32::MAX { + match queue.get_mut().pop() { + Some(handle) => scheduler.custom_wakeup(handle), + None => break, + } + woken = woken.saturating_add(1); + } + + if queue.get().is_empty() { + queue.remove(); + } + + if woken == 0 { + address.store(new_value, SeqCst); + } + + woken +}