diff --git a/Cargo.lock b/Cargo.lock index 5b427a876e..774cca321e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -53,12 +53,6 @@ dependencies = [ "bitflags 2.3.3", ] -[[package]] -name = "async-task" -version = "4.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" - [[package]] name = "atomic-polyfill" version = "0.1.11" @@ -218,22 +212,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "futures-core" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" - -[[package]] -name = "futures-lite" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" -dependencies = [ - "futures-core", - "pin-project-lite", -] - [[package]] name = "generic_once_cell" version = "0.1.1" @@ -365,13 +343,11 @@ dependencies = [ "ahash", "align-address", "arm-gic", - "async-task", "bit_field", "bitflags 2.3.3", "crossbeam-utils", "dyn-clone", "float-cmp", - "futures-lite", "hashbrown", "hermit-dtb", "hermit-entry", @@ -640,12 +616,6 @@ dependencies = [ "siphasher", ] -[[package]] -name = "pin-project-lite" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" - [[package]] name = "plain" version = "0.2.3" diff --git a/Cargo.toml b/Cargo.toml index 6f82ba8bfa..8c86f06199 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,8 +57,6 @@ smp = ["include-transformed"] fsgsbase = [] trace = [] tcp = [ - "async-task", - "futures-lite", "smoltcp", ] dhcpv4 = [ @@ -85,8 +83,6 @@ pflock = "0.2" shell-words = { version = "1.1", default-features = false } qemu-exit = "3.0" rand_chacha = { version = "0.3", default-features = false } -futures-lite = { version = "1.11", default-features = false, optional = true } -async-task = { version = "4.4", default-features = false, optional = true } lock_api = "0.4" num = { version = "0.4", default-features = false } num-traits = { version = "0.2", default-features = false } diff --git a/src/arch/aarch64/kernel/core_local.rs b/src/arch/aarch64/kernel/core_local.rs index dc553a960c..b41a6a24f3 100644 --- a/src/arch/aarch64/kernel/core_local.rs +++ b/src/arch/aarch64/kernel/core_local.rs @@ -1,14 +1,16 @@ use alloc::boxed::Box; +use alloc::vec::Vec; use core::arch::asm; -use core::cell::Cell; +use core::cell::{Cell, RefCell, RefMut}; use core::ptr; use core::sync::atomic::Ordering; use super::interrupts::{IrqStatistics, IRQ_COUNTERS}; use super::CPU_ONLINE; +use crate::executor::task::AsyncTask; use crate::scheduler::{CoreId, PerCoreScheduler}; -pub struct CoreLocal { +pub(crate) struct CoreLocal { this: *const Self, /// ID of the current Core. core_id: CoreId, @@ -16,6 +18,8 @@ pub struct CoreLocal { scheduler: Cell<*mut PerCoreScheduler>, /// Interface to the interrupt counters irq_statistics: &'static IrqStatistics, + /// Queue of async tasks + async_tasks: RefCell>, } impl CoreLocal { @@ -30,6 +34,7 @@ impl CoreLocal { core_id, scheduler: Cell::new(ptr::null_mut()), irq_statistics, + async_tasks: RefCell::new(Vec::new()), }; let this = Box::leak(Box::new(this)); this.this = &*this; @@ -50,7 +55,7 @@ impl CoreLocal { } #[inline] -pub fn core_id() -> CoreId { +pub(crate) fn core_id() -> CoreId { if cfg!(target_os = "none") { CoreLocal::get().core_id } else { @@ -59,14 +64,18 @@ pub fn core_id() -> CoreId { } #[inline] -pub fn core_scheduler() -> &'static mut PerCoreScheduler { +pub(crate) fn core_scheduler() -> &'static mut PerCoreScheduler { unsafe { &mut *CoreLocal::get().scheduler.get() } } -pub fn set_core_scheduler(scheduler: *mut PerCoreScheduler) { +pub(crate) fn async_tasks() -> RefMut<'static, Vec> { + CoreLocal::get().async_tasks.borrow_mut() +} + +pub(crate) fn set_core_scheduler(scheduler: *mut PerCoreScheduler) { CoreLocal::get().scheduler.set(scheduler); } -pub fn increment_irq_counter(irq_no: u8) { +pub(crate) fn increment_irq_counter(irq_no: u8) { CoreLocal::get().irq_statistics.inc(irq_no); } diff --git a/src/arch/x86_64/kernel/core_local.rs b/src/arch/x86_64/kernel/core_local.rs index 118d2b1a2b..5aec27b29d 100644 --- a/src/arch/x86_64/kernel/core_local.rs +++ b/src/arch/x86_64/kernel/core_local.rs @@ -1,6 +1,7 @@ use alloc::boxed::Box; +use alloc::vec::Vec; use core::arch::asm; -use core::cell::Cell; +use core::cell::{Cell, RefCell, RefMut}; use core::ptr; use core::sync::atomic::Ordering; @@ -10,10 +11,11 @@ use x86_64::VirtAddr; use super::interrupts::{IrqStatistics, IRQ_COUNTERS}; use super::CPU_ONLINE; +use crate::executor::task::AsyncTask; use crate::scheduler::{CoreId, PerCoreScheduler}; #[repr(C)] -pub struct CoreLocal { +pub(crate) struct CoreLocal { this: *const Self, /// Sequential ID of this CPU Core. core_id: CoreId, @@ -25,6 +27,8 @@ pub struct CoreLocal { pub kernel_stack: Cell, /// Interface to the interrupt counters irq_statistics: &'static IrqStatistics, + /// Queue of async tasks + async_tasks: RefCell>, } impl CoreLocal { @@ -43,6 +47,7 @@ impl CoreLocal { tss: Cell::new(ptr::null_mut()), kernel_stack: Cell::new(0), irq_statistics, + async_tasks: RefCell::new(Vec::new()), }; let this = Box::leak(Box::new(this)); this.this = &*this; @@ -50,6 +55,7 @@ impl CoreLocal { GsBase::write(VirtAddr::from_ptr(this)); } + #[inline] pub fn get() -> &'static Self { debug_assert_ne!(VirtAddr::zero(), GsBase::read()); unsafe { @@ -60,7 +66,7 @@ impl CoreLocal { } } -pub fn core_id() -> CoreId { +pub(crate) fn core_id() -> CoreId { if cfg!(target_os = "none") { CoreLocal::get().core_id } else { @@ -68,14 +74,18 @@ pub fn core_id() -> CoreId { } } -pub fn core_scheduler() -> &'static mut PerCoreScheduler { +pub(crate) fn core_scheduler() -> &'static mut PerCoreScheduler { unsafe { &mut *CoreLocal::get().scheduler.get() } } -pub fn set_core_scheduler(scheduler: *mut PerCoreScheduler) { +pub(crate) fn async_tasks() -> RefMut<'static, Vec> { + CoreLocal::get().async_tasks.borrow_mut() +} + +pub(crate) fn set_core_scheduler(scheduler: *mut PerCoreScheduler) { CoreLocal::get().scheduler.set(scheduler); } -pub fn increment_irq_counter(irq_no: u8) { +pub(crate) fn increment_irq_counter(irq_no: u8) { CoreLocal::get().irq_statistics.inc(irq_no); } diff --git a/src/drivers/net/mod.rs b/src/drivers/net/mod.rs index bbd2b7cff9..d5626b3f34 100644 --- a/src/drivers/net/mod.rs +++ b/src/drivers/net/mod.rs @@ -47,37 +47,30 @@ pub trait NetworkInterface { #[inline] fn _irqhandler() -> bool { - if let Some(driver) = hardware::get_network_driver() { + let result = if let Some(driver) = hardware::get_network_driver() { driver.lock().handle_interrupt() } else { debug!("Unable to handle interrupt!"); false - } + }; + + // TODO: do we need it? + crate::executor::run(); + + result } #[cfg(target_arch = "aarch64")] pub(crate) fn network_irqhandler(_state: &State) -> bool { debug!("Receive network interrupt"); - let has_packet = _irqhandler(); - - if has_packet { - #[cfg(feature = "tcp")] - core_scheduler().wakeup_async_tasks(); - } - - has_packet + _irqhandler() } #[cfg(target_arch = "x86_64")] pub(crate) extern "x86-interrupt" fn network_irqhandler(_stack_frame: ExceptionStackFrame) { debug!("Receive network interrupt"); apic::eoi(); - let has_packet = _irqhandler(); + let _ = _irqhandler(); - if has_packet { - let core_scheduler = core_scheduler(); - #[cfg(feature = "tcp")] - core_scheduler.wakeup_async_tasks(); - core_scheduler.reschedule(); - } + core_scheduler().reschedule(); } diff --git a/src/drivers/net/virtio_net.rs b/src/drivers/net/virtio_net.rs index 0a0e525151..bf0db81cd1 100644 --- a/src/drivers/net/virtio_net.rs +++ b/src/drivers/net/virtio_net.rs @@ -10,6 +10,7 @@ use core::cell::RefCell; use core::cmp::Ordering; use core::mem; use core::result::Result; +use core::str::FromStr; use pci_types::InterruptLine; use zerocopy::AsBytes; @@ -517,7 +518,13 @@ impl NetworkInterface for VirtioNetDriver { /// Currently, if VIRTIO_NET_F_MAC is not set // MTU is set static to 1500 bytes. fn get_mtu(&self) -> u16 { - if self.dev_cfg.features.is_feature(Features::VIRTIO_NET_F_MTU) { + if let Some(my_mtu) = hermit_var!("HERMIT_MTU") { + warn!( + "Using value of the environment variable HERMIT_MTU ({}) as MTU", + my_mtu + ); + u16::from_str(&my_mtu).unwrap() + } else if self.dev_cfg.features.is_feature(Features::VIRTIO_NET_F_MTU) { self.dev_cfg.raw.get_mtu() } else { 1500 diff --git a/src/env.rs b/src/env.rs index b7da72f817..5ca1667b8b 100644 --- a/src/env.rs +++ b/src/env.rs @@ -108,7 +108,6 @@ pub fn freq() -> Option { CLI.get().unwrap().freq } -#[cfg(all(feature = "tcp", not(feature = "dhcpv4")))] pub fn var(key: &str) -> Option<&String> { CLI.get().unwrap().env_vars.get(key) } diff --git a/src/executor/device.rs b/src/executor/device.rs index 40cfce4d44..476f019a09 100644 --- a/src/executor/device.rs +++ b/src/executor/device.rs @@ -13,14 +13,12 @@ use smoltcp::wire::{EthernetAddress, HardwareAddress}; #[cfg(not(feature = "dhcpv4"))] use smoltcp::wire::{IpAddress, IpCidr, Ipv4Address}; +use super::network::{NetworkInterface, NetworkState}; use crate::arch; #[cfg(not(feature = "pci"))] use crate::arch::kernel::mmio as hardware; #[cfg(feature = "pci")] use crate::drivers::pci as hardware; -#[cfg(not(feature = "dhcpv4"))] -use crate::env; -use crate::executor::{NetworkInterface, NetworkState}; /// Data type to determine the mac address #[derive(Debug, Copy, Clone)] @@ -35,33 +33,6 @@ impl HermitNet { } } -/// Returns the value of the specified environment variable. -/// -/// The value is fetched from the current runtime environment and, if not -/// present, falls back to the same environment variable set at compile time -/// (might not be present as well). -#[cfg(not(feature = "dhcpv4"))] -macro_rules! hermit_var { - ($name:expr) => {{ - use alloc::borrow::Cow; - - match env::var($name) { - Some(val) => Some(Cow::from(val)), - None => option_env!($name).map(Cow::Borrowed), - } - }}; -} - -/// Tries to fetch the specified environment variable with a default value. -/// -/// Fetches according to [`hermit_var`] or returns the specified default value. -#[cfg(not(feature = "dhcpv4"))] -macro_rules! hermit_var_or { - ($name:expr, $default:expr) => {{ - hermit_var!($name).as_deref().unwrap_or($default) - }}; -} - impl<'a> NetworkInterface<'a> { #[cfg(feature = "dhcpv4")] pub(crate) fn create() -> NetworkState<'a> { @@ -89,7 +60,7 @@ impl<'a> NetworkInterface<'a> { config.hardware_addr = hardware_addr; } - let iface = Interface::new(config, &mut device, crate::executor::now()); + let iface = Interface::new(config, &mut device, crate::executor::network::now()); let mut sockets = SocketSet::new(vec![]); let dhcp_handle = sockets.add(dhcp); @@ -154,7 +125,7 @@ impl<'a> NetworkInterface<'a> { config.hardware_addr = hardware_addr; } - let mut iface = Interface::new(config, &mut device, crate::executor::now()); + let mut iface = Interface::new(config, &mut device, crate::executor::network::now()); iface.update_ip_addrs(|ip_addrs| { ip_addrs .push(IpCidr::new( diff --git a/src/executor/mod.rs b/src/executor/mod.rs index 5cb02614a3..d8eaf7ca51 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -1,211 +1,71 @@ +#![allow(dead_code)] + +#[cfg(feature = "tcp")] mod device; -pub(crate) mod runtime; +#[cfg(feature = "tcp")] +pub(crate) mod network; +pub(crate) mod task; -use alloc::boxed::Box; -use core::ops::DerefMut; -use core::sync::atomic::{AtomicU16, Ordering}; -use core::task::Poll; +use alloc::sync::Arc; +use alloc::task::Wake; +use core::future::Future; +use core::task::{Context, Poll, Waker}; -use futures_lite::future; -use hermit_sync::InterruptTicketMutex; -use smoltcp::iface::{SocketHandle, SocketSet}; -#[cfg(feature = "dhcpv4")] -use smoltcp::socket::dhcpv4; -use smoltcp::socket::{tcp, udp, AnySocket}; -use smoltcp::time::{Duration, Instant}; -#[cfg(feature = "dhcpv4")] -use smoltcp::wire::{IpCidr, Ipv4Address, Ipv4Cidr}; +use hermit_sync::without_interrupts; -use crate::arch; -use crate::executor::device::HermitNet; -use crate::executor::runtime::spawn; +use crate::arch::core_local::*; +use crate::executor::task::AsyncTask; +use crate::scheduler::task::TaskHandle; -pub(crate) enum NetworkState<'a> { - Missing, - InitializationFailed, - Initialized(Box>), +struct TaskNotify { + /// The single task executor . + handle: TaskHandle, } -impl<'a> NetworkState<'a> { - pub fn as_nic_mut(&mut self) -> Result<&mut NetworkInterface<'a>, &'static str> { - match self { - NetworkState::Initialized(nic) => Ok(nic), - _ => Err("Network is not initialized!"), +impl TaskNotify { + pub fn new() -> Self { + Self { + handle: core_scheduler().get_current_task_handle(), } } } -pub(crate) type Handle = SocketHandle; - -static LOCAL_ENDPOINT: AtomicU16 = AtomicU16::new(0); -pub(crate) static NIC: InterruptTicketMutex> = - InterruptTicketMutex::new(NetworkState::Missing); - -pub(crate) struct NetworkInterface<'a> { - iface: smoltcp::iface::Interface, - sockets: SocketSet<'a>, - device: HermitNet, - #[cfg(feature = "dhcpv4")] - dhcp_handle: SocketHandle, -} - -#[cfg(target_arch = "x86_64")] -fn start_endpoint() -> u16 { - ((unsafe { core::arch::x86_64::_rdtsc() }) % (u16::MAX as u64)) - .try_into() - .unwrap() -} - -#[cfg(target_arch = "aarch64")] -fn start_endpoint() -> u16 { - use core::arch::asm; - let value: u64; - - unsafe { - asm!( - "mrs {value}, cntpct_el0", - value = out(reg) value, - options(nostack), - ); +impl Wake for TaskNotify { + fn wake(self: Arc) { + self.wake_by_ref() } - (value % (u16::MAX as u64)).try_into().unwrap() -} - -#[inline] -pub(crate) fn now() -> Instant { - let microseconds = arch::processor::get_timer_ticks() + arch::get_boot_time(); - Instant::from_micros_const(microseconds.try_into().unwrap()) -} - -async fn network_run() { - future::poll_fn(|cx| match NIC.lock().deref_mut() { - NetworkState::Initialized(nic) => { - nic.poll_common(now()); - - // this background task will never stop - // => wakeup ourself - cx.waker().clone().wake(); - - Poll::Pending - } - _ => Poll::Ready(()), - }) - .await -} - -pub(crate) fn init() { - info!("Try to initialize network!"); - - // initialize variable, which contains the next local endpoint - LOCAL_ENDPOINT.store(start_endpoint(), Ordering::Relaxed); - - let mut guard = NIC.lock(); - - *guard = NetworkInterface::create(); - - if let NetworkState::Initialized(nic) = guard.deref_mut() { - let time = now(); - nic.poll_common(time); - let wakeup_time = nic - .poll_delay(time) - .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); - crate::core_scheduler().add_network_timer(wakeup_time); - - spawn(network_run()).detach(); + fn wake_by_ref(self: &Arc) { + trace!("Wakeup task {}", self.handle.get_id()); + core_scheduler().custom_wakeup(self.handle); } } -impl<'a> NetworkInterface<'a> { - pub(crate) fn create_udp_handle(&mut self) -> Result { - // Must fit mDNS payload of at least one packet - let udp_rx_buffer = - udp::PacketBuffer::new(vec![udp::PacketMetadata::EMPTY; 4], vec![0; 1024]); - // Will not send mDNS - let udp_tx_buffer = udp::PacketBuffer::new(vec![udp::PacketMetadata::EMPTY], vec![0; 0]); - let udp_socket = udp::Socket::new(udp_rx_buffer, udp_tx_buffer); - let udp_handle = self.sockets.add(udp_socket); - - Ok(udp_handle) - } - - pub(crate) fn create_tcp_handle(&mut self) -> Result { - let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 65535]); - let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 65535]); - let mut tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer); - tcp_socket.set_nagle_enabled(true); - let tcp_handle = self.sockets.add(tcp_socket); - - Ok(tcp_handle) - } +pub(crate) fn run() { + let waker = Waker::noop(); + let mut cx = Context::from_waker(&waker); - pub(crate) fn poll_common(&mut self, timestamp: Instant) { - let _ = self - .iface - .poll(timestamp, &mut self.device, &mut self.sockets); + without_interrupts(|| { + async_tasks().retain_mut(|task| { + trace!("Run async task {}", task.id()); - #[cfg(feature = "dhcpv4")] - match self - .sockets - .get_mut::>(self.dhcp_handle) - .poll() - { - None => {} - Some(dhcpv4::Event::Configured(config)) => { - info!("DHCP config acquired!"); - info!("IP address: {}", config.address); - self.iface.update_ip_addrs(|addrs| { - if let Some(dest) = addrs.iter_mut().next() { - *dest = IpCidr::Ipv4(config.address); - } else if addrs.push(IpCidr::Ipv4(config.address)).is_err() { - info!("Unable to update IP address"); - } - }); - if let Some(router) = config.router { - info!("Default gateway: {}", router); - self.iface - .routes_mut() - .add_default_ipv4_route(router) - .unwrap(); - } else { - info!("Default gateway: None"); - self.iface.routes_mut().remove_default_ipv4_route(); - } - - for (i, s) in config.dns_servers.iter().enumerate() { - info!("DNS server {}: {}", i, s); - } - } - Some(dhcpv4::Event::Deconfigured) => { - info!("DHCP lost config!"); - let cidr = Ipv4Cidr::new(Ipv4Address::UNSPECIFIED, 0); - self.iface.update_ip_addrs(|addrs| { - if let Some(dest) = addrs.iter_mut().next() { - *dest = IpCidr::Ipv4(cidr); - } - }); - self.iface.routes_mut().remove_default_ipv4_route(); + match task.poll(&mut cx) { + Poll::Ready(()) => false, + Poll::Pending => true, } - }; - } - - pub(crate) fn poll_delay(&mut self, timestamp: Instant) -> Option { - self.iface.poll_delay(timestamp, &self.sockets) - } - - #[allow(dead_code)] - pub(crate) fn get_socket>(&self, handle: SocketHandle) -> &T { - self.sockets.get(handle) - } + }) + }); +} - pub(crate) fn get_mut_socket>(&mut self, handle: SocketHandle) -> &mut T { - self.sockets.get_mut(handle) - } +/// Spawns a future on the executor. +pub(crate) fn spawn(future: F) +where + F: Future + Send + 'static, +{ + without_interrupts(|| async_tasks().push(AsyncTask::new(future))); +} - pub(crate) fn get_socket_and_context>( - &mut self, - handle: SocketHandle, - ) -> (&mut T, &mut smoltcp::iface::Context) { - (self.sockets.get_mut(handle), self.iface.context()) - } +pub fn init() { + #[cfg(all(feature = "tcp", not(feature = "newlib")))] + crate::executor::network::init(); } diff --git a/src/executor/network.rs b/src/executor/network.rs new file mode 100644 index 0000000000..d7e6e27d2b --- /dev/null +++ b/src/executor/network.rs @@ -0,0 +1,376 @@ +use alloc::boxed::Box; +use alloc::sync::Arc; +use core::future; +use core::future::Future; +use core::ops::DerefMut; +use core::sync::atomic::{AtomicU16, Ordering}; +use core::task::{Context, Poll}; + +use hermit_sync::InterruptTicketMutex; +use smoltcp::iface::{SocketHandle, SocketSet}; +#[cfg(feature = "dhcpv4")] +use smoltcp::socket::dhcpv4; +use smoltcp::socket::{tcp, udp, AnySocket}; +use smoltcp::time::{Duration, Instant}; +#[cfg(feature = "dhcpv4")] +use smoltcp::wire::{IpCidr, Ipv4Address, Ipv4Cidr}; + +use crate::arch::core_local::*; +use crate::arch::{self, interrupts}; +use crate::executor::device::HermitNet; +use crate::executor::{spawn, TaskNotify}; + +pub(crate) enum NetworkState<'a> { + Missing, + InitializationFailed, + Initialized(Box>), +} + +impl<'a> NetworkState<'a> { + pub fn as_nic_mut(&mut self) -> Result<&mut NetworkInterface<'a>, &'static str> { + match self { + NetworkState::Initialized(nic) => Ok(nic), + _ => Err("Network is not initialized!"), + } + } +} + +pub(crate) type Handle = SocketHandle; + +static LOCAL_ENDPOINT: AtomicU16 = AtomicU16::new(0); +pub(crate) static NIC: InterruptTicketMutex> = + InterruptTicketMutex::new(NetworkState::Missing); + +pub(crate) struct NetworkInterface<'a> { + pub(super) iface: smoltcp::iface::Interface, + pub(super) sockets: SocketSet<'a>, + pub(super) device: HermitNet, + #[cfg(feature = "dhcpv4")] + pub(super) dhcp_handle: SocketHandle, +} + +#[cfg(target_arch = "x86_64")] +fn start_endpoint() -> u16 { + ((unsafe { core::arch::x86_64::_rdtsc() }) % (u16::MAX as u64)) + .try_into() + .unwrap() +} + +#[cfg(target_arch = "aarch64")] +fn start_endpoint() -> u16 { + use core::arch::asm; + let value: u64; + + unsafe { + asm!( + "mrs {value}, cntpct_el0", + value = out(reg) value, + options(nostack), + ); + } + + (value % (u16::MAX as u64)).try_into().unwrap() +} + +#[inline] +pub(crate) fn now() -> Instant { + let microseconds = arch::processor::get_timer_ticks() + arch::get_boot_time(); + Instant::from_micros_const(microseconds.try_into().unwrap()) +} + +async fn network_run() { + future::poll_fn(|_cx| match NIC.lock().deref_mut() { + NetworkState::Initialized(nic) => { + nic.poll_common(now()); + Poll::Pending + } + _ => Poll::Ready(()), + }) + .await +} + +pub(crate) fn init() { + info!("Try to initialize network!"); + + // initialize variable, which contains the next local endpoint + LOCAL_ENDPOINT.store(start_endpoint(), Ordering::Relaxed); + + let mut guard = NIC.lock(); + + *guard = NetworkInterface::create(); + + if let NetworkState::Initialized(nic) = guard.deref_mut() { + let time = now(); + nic.poll_common(time); + let wakeup_time = nic + .poll_delay(time) + .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); + crate::core_scheduler().add_network_timer(wakeup_time); + + spawn(network_run()); + } +} + +impl<'a> NetworkInterface<'a> { + pub(crate) fn create_udp_handle(&mut self) -> Result { + // Must fit mDNS payload of at least one packet + let udp_rx_buffer = + udp::PacketBuffer::new(vec![udp::PacketMetadata::EMPTY; 4], vec![0; 1024]); + // Will not send mDNS + let udp_tx_buffer = udp::PacketBuffer::new(vec![udp::PacketMetadata::EMPTY], vec![0; 0]); + let udp_socket = udp::Socket::new(udp_rx_buffer, udp_tx_buffer); + let udp_handle = self.sockets.add(udp_socket); + + Ok(udp_handle) + } + + pub(crate) fn create_tcp_handle(&mut self) -> Result { + let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 65535]); + let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 65535]); + let mut tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer); + tcp_socket.set_nagle_enabled(true); + let tcp_handle = self.sockets.add(tcp_socket); + + Ok(tcp_handle) + } + + pub(crate) fn poll_common(&mut self, timestamp: Instant) { + let _ = self + .iface + .poll(timestamp, &mut self.device, &mut self.sockets); + + #[cfg(feature = "dhcpv4")] + match self + .sockets + .get_mut::>(self.dhcp_handle) + .poll() + { + None => {} + Some(dhcpv4::Event::Configured(config)) => { + info!("DHCP config acquired!"); + info!("IP address: {}", config.address); + self.iface.update_ip_addrs(|addrs| { + if let Some(dest) = addrs.iter_mut().next() { + *dest = IpCidr::Ipv4(config.address); + } else if addrs.push(IpCidr::Ipv4(config.address)).is_err() { + info!("Unable to update IP address"); + } + }); + if let Some(router) = config.router { + info!("Default gateway: {}", router); + self.iface + .routes_mut() + .add_default_ipv4_route(router) + .unwrap(); + } else { + info!("Default gateway: None"); + self.iface.routes_mut().remove_default_ipv4_route(); + } + + for (i, s) in config.dns_servers.iter().enumerate() { + info!("DNS server {}: {}", i, s); + } + } + Some(dhcpv4::Event::Deconfigured) => { + info!("DHCP lost config!"); + let cidr = Ipv4Cidr::new(Ipv4Address::UNSPECIFIED, 0); + self.iface.update_ip_addrs(|addrs| { + if let Some(dest) = addrs.iter_mut().next() { + *dest = IpCidr::Ipv4(cidr); + } + }); + self.iface.routes_mut().remove_default_ipv4_route(); + } + }; + } + + pub(crate) fn poll_delay(&mut self, timestamp: Instant) -> Option { + self.iface.poll_delay(timestamp, &self.sockets) + } + + #[allow(dead_code)] + pub(crate) fn get_socket>(&self, handle: SocketHandle) -> &T { + self.sockets.get(handle) + } + + pub(crate) fn get_mut_socket>(&mut self, handle: SocketHandle) -> &mut T { + self.sockets.get_mut(handle) + } + + pub(crate) fn get_socket_and_context>( + &mut self, + handle: SocketHandle, + ) -> (&mut T, &mut smoltcp::iface::Context) { + (self.sockets.get_mut(handle), self.iface.context()) + } +} + +/// set driver in polling mode +#[inline] +fn set_polling_mode(value: bool) { + #[cfg(feature = "pci")] + if let Some(driver) = crate::drivers::pci::get_network_driver() { + driver.lock().set_polling_mode(value) + } +} + +#[inline] +fn network_delay(timestamp: Instant) -> Option { + crate::executor::network::NIC + .lock() + .as_nic_mut() + .unwrap() + .poll_delay(timestamp) +} + +#[inline] +fn network_poll(timestamp: Instant) { + crate::executor::network::NIC + .lock() + .as_nic_mut() + .unwrap() + .poll_common(timestamp); +} + +/// Blocks the current thread on `f`, running the executor when idling. +pub(crate) fn block_on(future: F, timeout: Option) -> Result +where + F: Future>, +{ + // Enter polling mode => no NIC interrupts + set_polling_mode(true); + + let mut counter: u16 = 0; + let mut blocking_time = 1000; + let start = crate::executor::network::now(); + let task_notify = Arc::new(TaskNotify::new()); + let waker = task_notify.into(); + let mut cx = Context::from_waker(&waker); + let mut future = future; + let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) }; + + loop { + // run background tasks + crate::executor::run(); + + if let Poll::Ready(t) = future.as_mut().poll(&mut cx) { + let wakeup_time = network_delay(crate::executor::network::now()) + .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); + core_scheduler().add_network_timer(wakeup_time); + + // allow network interrupts + set_polling_mode(false); + + return t; + } + + if let Some(duration) = timeout { + if crate::executor::network::now() >= start + duration { + let wakeup_time = network_delay(crate::executor::network::now()) + .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); + core_scheduler().add_network_timer(wakeup_time); + + // allow network interrupts + set_polling_mode(false); + + return Err(-crate::errno::ETIME); + } + } + + counter += 1; + // besure that we are not interrupted by a timer, which is able + // to call `reschedule` + interrupts::disable(); + let now = crate::executor::network::now(); + let delay = network_delay(now).map(|d| d.total_micros()); + if counter > 200 && delay.unwrap_or(10_000_000) > 100_000 { + // add additional check before the task will block + if let Poll::Ready(t) = future.as_mut().poll(&mut cx) { + // allow network interrupts + set_polling_mode(false); + // enable interrupts + interrupts::enable(); + + return t; + } + + let ticks = crate::arch::processor::get_timer_ticks(); + let wakeup_time = timeout + .map(|duration| { + core::cmp::min( + u64::try_from((start + duration).total_micros()).unwrap(), + ticks + delay.unwrap_or(blocking_time), + ) + }) + .or(Some(ticks + delay.unwrap_or(blocking_time))); + let network_timer = delay.map(|d| ticks + d); + let core_scheduler = core_scheduler(); + blocking_time *= 2; + + core_scheduler.add_network_timer(network_timer); + core_scheduler.block_current_task(wakeup_time); + + // allow network interrupts + set_polling_mode(false); + + // enable interrupts + interrupts::enable(); + + // switch to another task + core_scheduler.reschedule(); + + // reset polling counter + counter = 0; + + // Enter polling mode => no NIC interrupts + set_polling_mode(true); + } else { + // enable interrupts + interrupts::enable(); + } + } +} + +/// Blocks the current thread on `f`, running the executor when idling. +pub(crate) fn poll_on(future: F, timeout: Option) -> Result +where + F: Future>, +{ + // Enter polling mode => no NIC interrupts + set_polling_mode(true); + + let start = crate::executor::network::now(); + let waker = core::task::Waker::noop(); + let mut cx = Context::from_waker(&waker); + let mut future = future; + let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) }; + + loop { + // run background tasks + crate::executor::run(); + + if let Poll::Ready(t) = future.as_mut().poll(&mut cx) { + let wakeup_time = network_delay(crate::executor::network::now()) + .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); + core_scheduler().add_network_timer(wakeup_time); + + // allow interrupts => NIC thread is able to run + set_polling_mode(false); + + return t; + } + + if let Some(duration) = timeout { + if crate::executor::network::now() >= start + duration { + let wakeup_time = network_delay(crate::executor::network::now()) + .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); + core_scheduler().add_network_timer(wakeup_time); + + // allow interrupts => NIC thread is able to run + set_polling_mode(false); + + return Err(-crate::errno::ETIME); + } + } + } +} diff --git a/src/executor/runtime.rs b/src/executor/runtime.rs deleted file mode 100644 index f9352cdbf7..0000000000 --- a/src/executor/runtime.rs +++ /dev/null @@ -1,165 +0,0 @@ -use alloc::sync::Arc; -use alloc::task::Wake; -use alloc::vec::Vec; -use core::future::Future; -use core::sync::atomic::{AtomicBool, Ordering}; -use core::task::{Context, Poll}; - -use async_task::{Runnable, Task}; -use futures_lite::pin; -use hermit_sync::InterruptTicketMutex; -use smoltcp::time::{Duration, Instant}; - -use crate::core_scheduler; -use crate::scheduler::task::TaskHandle; - -static QUEUE: InterruptTicketMutex> = InterruptTicketMutex::new(Vec::new()); - -/// set driver in polling mode -#[inline] -fn set_polling_mode(value: bool) { - #[cfg(feature = "pci")] - if let Some(driver) = crate::drivers::pci::get_network_driver() { - driver.lock().set_polling_mode(value) - } -} - -#[inline] -fn network_delay(timestamp: Instant) -> Option { - crate::executor::NIC - .lock() - .as_nic_mut() - .unwrap() - .poll_delay(timestamp) -} - -fn run_executor_once() { - let mut guard = QUEUE.lock(); - let mut runnables = Vec::with_capacity(guard.len()); - - while let Some(runnable) = guard.pop() { - runnables.push(runnable); - } - - drop(guard); - - for runnable in runnables { - runnable.run(); - } -} - -/// Spawns a future on the executor. -pub(crate) fn spawn(future: F) -> Task -where - F: Future + Send + 'static, - T: Send + 'static, -{ - let schedule = |runnable| QUEUE.lock().push(runnable); - let (runnable, task) = async_task::spawn(future, schedule); - runnable.schedule(); - task -} - -struct TaskNotify { - /// The single task executor . - handle: TaskHandle, - /// A flag to ensure a wakeup is not "forgotten" before the next `block_current_task` - unparked: AtomicBool, -} - -impl TaskNotify { - pub fn new() -> Self { - Self { - handle: core_scheduler().get_current_task_handle(), - unparked: AtomicBool::new(false), - } - } -} - -impl Drop for TaskNotify { - fn drop(&mut self) { - debug!("Dropping ThreadNotify!"); - } -} - -impl Wake for TaskNotify { - fn wake(self: Arc) { - self.wake_by_ref() - } - - fn wake_by_ref(self: &Arc) { - // Make sure the wakeup is remembered until the next `park()`. - let unparked = self.unparked.swap(true, Ordering::AcqRel); - if !unparked { - core_scheduler().custom_wakeup(self.handle); - } - } -} - -/// Blocks the current thread on `f`, running the executor when idling. -pub(crate) fn block_on(future: F, timeout: Option) -> Result -where - F: Future>, -{ - // Enter polling mode => no NIC interrupts - set_polling_mode(true); - - let mut counter: u16 = 0; - let start = crate::executor::now(); - let task_notify = Arc::new(TaskNotify::new()); - let waker = task_notify.clone().into(); - let mut cx = Context::from_waker(&waker); - pin!(future); - - loop { - // run background tasks - run_executor_once(); - - if let Poll::Ready(t) = future.as_mut().poll(&mut cx) { - let wakeup_time = network_delay(crate::executor::now()) - .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); - core_scheduler().add_network_timer(wakeup_time); - - // allow interrupts => NIC thread is able to run - set_polling_mode(false); - - return t; - } - - if let Some(duration) = timeout { - if crate::executor::now() >= start + duration { - let wakeup_time = network_delay(crate::executor::now()) - .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); - core_scheduler().add_network_timer(wakeup_time); - - // allow interrupts => NIC thread is able to run - set_polling_mode(false); - - return Err(-crate::errno::ETIME); - } - } - - counter += 1; - let now = crate::executor::now(); - let delay = network_delay(now).map(|d| d.total_micros()); - if counter > 200 && delay.unwrap_or(10_000_000) > 100_000 { - let unparked = task_notify.unparked.swap(false, Ordering::AcqRel); - if !unparked { - let core_scheduler = core_scheduler(); - core_scheduler.add_network_timer( - delay.map(|d| crate::arch::processor::get_timer_ticks() + d), - ); - let wakeup_time = delay.map(|d| crate::arch::processor::get_timer_ticks() + d); - core_scheduler.add_network_timer(wakeup_time); - core_scheduler.block_current_async_task(); - // allow interrupts => NIC thread is able to run - set_polling_mode(false); - // switch to another task - core_scheduler.reschedule(); - // Polling mode => no NIC interrupts => NIC thread should not run - set_polling_mode(true); - counter = 0; - } - } - } -} diff --git a/src/executor/task.rs b/src/executor/task.rs new file mode 100644 index 0000000000..b0ef843d3a --- /dev/null +++ b/src/executor/task.rs @@ -0,0 +1,54 @@ +use alloc::boxed::Box; +use core::fmt; +use core::future::Future; +use core::pin::Pin; +use core::sync::atomic::{AtomicU32, Ordering}; +use core::task::{Context, Poll}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct AsyncTaskId(u32); + +impl AsyncTaskId { + pub const fn into(self) -> u32 { + self.0 + } + + pub const fn from(x: u32) -> Self { + AsyncTaskId(x) + } +} + +impl fmt::Display for AsyncTaskId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl AsyncTaskId { + fn new() -> Self { + static NEXT_ID: AtomicU32 = AtomicU32::new(0); + AsyncTaskId(NEXT_ID.fetch_add(1, Ordering::Relaxed)) + } +} + +pub(crate) struct AsyncTask { + id: AsyncTaskId, + future: Pin>>, +} + +impl AsyncTask { + pub fn new(future: impl Future + 'static) -> AsyncTask { + AsyncTask { + id: AsyncTaskId::new(), + future: Box::pin(future), + } + } + + pub fn id(&self) -> AsyncTaskId { + self.id + } + + pub fn poll(&mut self, context: &mut Context<'_>) -> Poll<()> { + self.future.as_mut().poll(context) + } +} diff --git a/src/fd/socket/mod.rs b/src/fd/socket/mod.rs index f25c51404a..5d89effea8 100644 --- a/src/fd/socket/mod.rs +++ b/src/fd/socket/mod.rs @@ -4,7 +4,7 @@ use core::ops::DerefMut; use core::sync::atomic::Ordering; use crate::errno::*; -use crate::executor::{NetworkState, NIC}; +use crate::executor::network::{NetworkState, NIC}; use crate::fd::{get_object, insert_object, FD_COUNTER, OBJECT_MAP}; use crate::syscalls::net::*; diff --git a/src/fd/socket/tcp.rs b/src/fd/socket/tcp.rs index 3f23e570fa..acf7680e79 100644 --- a/src/fd/socket/tcp.rs +++ b/src/fd/socket/tcp.rs @@ -1,19 +1,18 @@ use core::ffi::c_void; +use core::future; use core::marker::PhantomData; use core::mem::size_of; use core::ops::DerefMut; use core::sync::atomic::{AtomicBool, AtomicU16, Ordering}; use core::task::Poll; -use futures_lite::future; use smoltcp::iface; use smoltcp::socket::tcp; use smoltcp::time::Duration; use smoltcp::wire::IpAddress; use crate::errno::*; -use crate::executor::runtime::block_on; -use crate::executor::{now, Handle, NetworkState, NIC}; +use crate::executor::network::{block_on, now, poll_on, Handle, NetworkState, NIC}; use crate::fd::ObjectInterface; use crate::syscalls::net::*; use crate::DEFAULT_KEEP_ALIVE_INTERVAL; @@ -73,8 +72,11 @@ impl Socket { if socket.can_recv() { return Poll::Ready( socket - .recv_slice(buffer) - .map(|x| isize::try_from(x).unwrap()) + .recv(|data| { + let len = core::cmp::min(buffer.len(), data.len()); + buffer[..len].copy_from_slice(&data[..len]); + (len, isize::try_from(len).unwrap()) + }) .map_err(|_| -crate::errno::EIO), ); } @@ -85,15 +87,14 @@ impl Socket { | tcp::State::Closed | tcp::State::Closing | tcp::State::CloseWait - | tcp::State::TimeWait => Poll::Ready(Err(-crate::errno::EIO)), + | tcp::State::TimeWait => { + warn!("async_read: socket closed"); + Poll::Ready(Err(-crate::errno::EIO)) + } _ => { if socket.can_recv() { - Poll::Ready( - socket - .recv_slice(buffer) - .map(|x| isize::try_from(x).unwrap()) - .map_err(|_| -crate::errno::EIO), - ) + warn!("async_read: Unable to consume data"); + Poll::Ready(Ok(0)) } else { socket.register_recv_waker(cx.waker()); Poll::Pending @@ -131,10 +132,18 @@ impl Socket { | tcp::State::Closed | tcp::State::Closing | tcp::State::CloseWait - | tcp::State::TimeWait => Poll::Ready(Err(-crate::errno::EIO)), + | tcp::State::TimeWait => { + warn!("async_write: socket closed"); + Poll::Ready(Err(-crate::errno::EIO)) + } _ => { - socket.register_send_waker(cx.waker()); - Poll::Pending + if socket.can_send() { + warn!("async_write: Unable to consume data"); + Poll::Ready(Ok(0)) + } else { + socket.register_send_waker(cx.waker()); + Poll::Pending + } } } }) @@ -293,7 +302,7 @@ impl Socket { let slice = unsafe { core::slice::from_raw_parts(buf, len) }; if self.nonblocking.load(Ordering::Acquire) { - block_on(self.async_write(slice), Some(Duration::ZERO)).unwrap_or_else(|x| { + poll_on(self.async_write(slice), Some(Duration::ZERO)).unwrap_or_else(|x| { if x == -ETIME { (-EAGAIN).try_into().unwrap() } else { @@ -301,7 +310,7 @@ impl Socket { } }) } else { - block_on(self.async_write(slice), None).unwrap_or_else(|x| x.try_into().unwrap()) + poll_on(self.async_write(slice), None).unwrap_or_else(|x| x.try_into().unwrap()) } } diff --git a/src/fd/socket/udp.rs b/src/fd/socket/udp.rs index ffdccb1b3f..0d9543af57 100644 --- a/src/fd/socket/udp.rs +++ b/src/fd/socket/udp.rs @@ -1,4 +1,4 @@ -use crate::executor::Handle; +use crate::executor::network::Handle; use crate::fd::ObjectInterface; #[derive(Debug, Clone)] diff --git a/src/lib.rs b/src/lib.rs index 630bf30c8a..d5c9734038 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ #![feature(linked_list_cursors)] #![feature(maybe_uninit_slice)] #![feature(naked_functions)] +#![feature(noop_waker)] #![cfg_attr(target_arch = "aarch64", feature(specialization))] #![feature(strict_provenance)] #![cfg_attr(target_os = "none", no_std)] @@ -70,7 +71,6 @@ mod drivers; mod entropy; mod env; pub mod errno; -#[cfg(feature = "tcp")] mod executor; pub(crate) mod fd; pub(crate) mod fs; @@ -260,7 +260,6 @@ extern "C" fn initd(_arg: usize) { // Initialize Drivers arch::init_drivers(); - #[cfg(all(feature = "tcp", not(feature = "newlib")))] crate::executor::init(); syscalls::init(); @@ -359,6 +358,7 @@ fn application_processor_main() -> ! { info!("Entering idle loop for application processor"); synch_all_cores(); + crate::executor::init(); let core_scheduler = core_scheduler(); // Run the scheduler loop. diff --git a/src/macros.rs b/src/macros.rs index 0f61c01cf8..197ebb635f 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -108,3 +108,30 @@ macro_rules! kernel_function { $f($($x)*) }}; } + +/// Returns the value of the specified environment variable. +/// +/// The value is fetched from the current runtime environment and, if not +/// present, falls back to the same environment variable set at compile time +/// (might not be present as well). +#[allow(unused_macros)] +macro_rules! hermit_var { + ($name:expr) => {{ + use alloc::borrow::Cow; + + match crate::env::var($name) { + Some(val) => Some(Cow::from(val)), + None => option_env!($name).map(Cow::Borrowed), + } + }}; +} + +/// Tries to fetch the specified environment variable with a default value. +/// +/// Fetches according to [`hermit_var`] or returns the specified default value. +#[allow(unused_macros)] +macro_rules! hermit_var_or { + ($name:expr, $default:expr) => {{ + hermit_var!($name).as_deref().unwrap_or($default) + }}; +} diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 094e080640..082001c454 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -4,8 +4,6 @@ use alloc::rc::Rc; #[cfg(feature = "smp")] use alloc::vec::Vec; use core::cell::RefCell; -#[cfg(feature = "tcp")] -use core::ops::DerefMut; use core::sync::atomic::{AtomicU32, Ordering}; use crossbeam_utils::Backoff; @@ -75,9 +73,6 @@ pub struct PerCoreScheduler { finished_tasks: VecDeque>>, /// Queue of blocked tasks, sorted by wakeup time. blocked_tasks: BlockedTaskQueue, - /// Queue of blocked tasks, sorted by wakeup time. - #[cfg(feature = "tcp")] - blocked_async_tasks: VecDeque, /// Queues to handle incoming requests from the other cores #[cfg(feature = "smp")] input: InterruptTicketMutex, @@ -301,9 +296,8 @@ impl PerCoreScheduler { #[inline] pub fn handle_waiting_tasks(&mut self) { without_interrupts(|| { - #[cfg(feature = "tcp")] - self.wakeup_async_tasks(); - self.blocked_tasks.handle_waiting_tasks() + crate::executor::run(); + self.blocked_tasks.handle_waiting_tasks(); }); } @@ -340,42 +334,6 @@ impl PerCoreScheduler { without_interrupts(|| self.blocked_tasks.add_network_timer(wakeup_time)) } - #[cfg(feature = "tcp")] - #[inline] - pub fn block_current_async_task(&mut self) { - without_interrupts(|| { - self.blocked_async_tasks - .push_back(self.get_current_task_handle()); - self.blocked_tasks.add(self.current_task.clone(), None) - }); - } - - #[cfg(feature = "tcp")] - #[inline] - pub fn wakeup_async_tasks(&mut self) { - let mut has_tasks = false; - - without_interrupts(|| { - while let Some(task) = self.blocked_async_tasks.pop_front() { - has_tasks = true; - self.custom_wakeup(task) - } - - if !has_tasks { - if let Some(mut guard) = crate::executor::NIC.try_lock() { - if let crate::executor::NetworkState::Initialized(nic) = guard.deref_mut() { - let time = crate::executor::now(); - nic.poll_common(time); - let wakeup_time = nic - .poll_delay(time) - .map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros()); - self.add_network_timer(wakeup_time); - } - } - } - }); - } - #[inline] pub fn get_current_task_handle(&self) -> TaskHandle { without_interrupts(|| { @@ -564,6 +522,10 @@ impl PerCoreScheduler { loop { interrupts::disable(); + + // run async tasks + crate::executor::run(); + // do housekeeping self.cleanup_tasks(); @@ -591,6 +553,9 @@ impl PerCoreScheduler { /// Triggers the scheduler to reschedule the tasks. /// Interrupt flag must be cleared before calling this function. pub fn scheduler(&mut self) -> Option<*mut usize> { + // run background tasks + crate::executor::run(); + // Someone wants to give up the CPU // => we have time to cleanup the system self.cleanup_tasks(); @@ -725,8 +690,6 @@ pub fn add_current_core() { ready_queue: PriorityTaskQueue::new(), finished_tasks: VecDeque::new(), blocked_tasks: BlockedTaskQueue::new(), - #[cfg(feature = "tcp")] - blocked_async_tasks: VecDeque::new(), #[cfg(feature = "smp")] input: InterruptTicketMutex::new(SchedulerInput::new()), }); diff --git a/src/scheduler/task.rs b/src/scheduler/task.rs index d58e0261f6..7a3802c62d 100644 --- a/src/scheduler/task.rs +++ b/src/scheduler/task.rs @@ -639,9 +639,9 @@ impl BlockedTaskQueue { let mut cursor = self.list.cursor_front_mut(); #[cfg(feature = "tcp")] - if let Some(mut guard) = crate::executor::NIC.try_lock() { - if let crate::executor::NetworkState::Initialized(nic) = guard.deref_mut() { - let now = crate::executor::now(); + if let Some(mut guard) = crate::executor::network::NIC.try_lock() { + if let crate::executor::network::NetworkState::Initialized(nic) = guard.deref_mut() { + let now = crate::executor::network::now(); nic.poll_common(now); self.network_wakeup_time = nic.poll_delay(now).map(|d| d.total_micros() + time); }