diff --git a/src/kyron/src/safety.rs b/src/kyron/src/safety.rs index 0e5ae37..7e303d9 100644 --- a/src/kyron/src/safety.rs +++ b/src/kyron/src/safety.rs @@ -46,7 +46,8 @@ pub fn ensure_safety_enabled() { /// /// # Safety /// This API is intended to provide a way to ensure that user can react on errors within a `task` independent of other workers state (ie. being busy looping etc). -/// This means that if the `task` (aka provided Future) will return Err(_), then the task that is awaiting on JoinHandle will be woken up in `SafetyWorker`. +/// This means that if the `task` (aka provided Future) will return Err(_), then the task that is awaiting on JoinHandle will be woken up in either `SafetyWorker` or regular worker. +/// Assumption of Use is that the task that is running on SafetyWorker never blocks. /// pub fn spawn(future: F) -> JoinHandle where @@ -64,7 +65,8 @@ where /// /// # Safety /// This API is intended to provide a way to ensure that user can react on errors within a `task` independent of other workers state (ie. being busy looping etc). -/// This means that if the `task` (aka provided Future) will return Err(_), then the task that is awaiting on JoinHandle will be woken up in `SafetyWorker`. +/// This means that if the `task` (aka provided Future) will return Err(_), then the task that is awaiting on JoinHandle will be woken up in either `SafetyWorker` or regular worker. +/// Assumption of Use is that the task that is running on SafetyWorker never blocks. /// pub fn spawn_from_boxed(boxed: FutureBox>) -> JoinHandle> where @@ -88,7 +90,8 @@ where /// /// # Safety /// This API is intended to provide a way to ensure that user can react on errors within a `task` independent of other workers state (ie. being busy looping etc). -/// This means that if the `task` (aka provided Future) will return Err(_), then the task that is awaiting on JoinHandle will be woken up in `SafetyWorker`. +/// This means that if the `task` (aka provided Future) will return Err(_), then the task that is awaiting on JoinHandle will be woken up in either `SafetyWorker` or regular worker. +/// Assumption of Use is that the task that is running on SafetyWorker never blocks. /// pub fn spawn_from_reusable(reusable: ReusableBoxFuture>) -> JoinHandle> where @@ -113,7 +116,8 @@ where /// /// # Safety /// This API is intended to provide a way to ensure that user can react on errors within a `task` independent of other workers state (ie. being busy looping etc). -/// This means that if the `task` (aka provided Future) will return Err(_), then the task that is awaiting on JoinHandle will be woken up in `SafetyWorker`. +/// This means that if the `task` (aka provided Future) will return Err(_), then the task that is awaiting on JoinHandle will be woken up in either `SafetyWorker` or regular worker. +/// Assumption of Use is that the task that is running on SafetyWorker never blocks. /// pub fn spawn_on_dedicated(future: F, worker_id: UniqueWorkerId) -> JoinHandle where @@ -131,7 +135,8 @@ where /// /// # Safety /// This API is intended to provide a way to ensure that user can react on errors within a `task` independent of other workers state (ie. being busy looping etc). -/// This means that if the `task` (aka provided Future) will return Err(_), then the task that is awaiting on JoinHandle will be woken up in `SafetyWorker`. +/// This means that if the `task` (aka provided Future) will return Err(_), then the task that is awaiting on JoinHandle will be woken up in either `SafetyWorker` or regular worker. +/// Assumption of Use is that the task that is running on SafetyWorker never blocks. /// pub fn spawn_from_boxed_on_dedicated( boxed: FutureBox>, @@ -158,7 +163,8 @@ where /// /// # Safety /// This API is intended to provide a way to ensure that user can react on errors within a `task` independent of other workers state (ie. being busy looping etc). -/// This means that if the `task` (aka provided Future) will return Err(_), then the task that is awaiting on JoinHandle will be woken up in `SafetyWorker`. +/// This means that if the `task` (aka provided Future) will return Err(_), then the task that is awaiting on JoinHandle will be woken up in either `SafetyWorker` or regular worker. +/// Assumption of Use is that the task that is running on SafetyWorker never blocks. /// pub fn spawn_from_reusable_on_dedicated( reusable: ReusableBoxFuture>, diff --git a/src/kyron/src/scheduler/context.rs b/src/kyron/src/scheduler/context.rs index 123ddcf..a2ebd50 100644 --- a/src/kyron/src/scheduler/context.rs +++ b/src/kyron/src/scheduler/context.rs @@ -542,6 +542,7 @@ pub(crate) fn ctx_get_drivers() -> Drivers { .unwrap() } +#[allow(dead_code)] // Mock function is used instead of this if mock runtime feature is enabled /// /// Sets currently running `task` /// @@ -559,6 +560,7 @@ pub(super) fn ctx_set_running_task(task: TaskRef) { }); } +#[allow(dead_code)] // Mock function is used instead of this if mock runtime feature is enabled /// /// Clears currently running `task` /// @@ -574,6 +576,7 @@ pub(super) fn ctx_unset_running_task() { .map_err(|_| {}); } +#[allow(dead_code)] // Mock function is used instead of this if mock runtime feature is enabled /// /// Gets currently running `task id` /// @@ -592,6 +595,27 @@ pub(crate) fn ctx_get_running_task_id() -> Option { }) } +#[allow(dead_code)] // Mock function is used instead of this if mock runtime feature is enabled +/// +/// Returns `true` if the running task resulted in safety error +/// +pub(crate) fn ctx_get_task_safety_error() -> bool { + CTX.try_with(|ctx| { + // This funcation can be called from a thread outside of Kyron runtime through wake()/wake_by_ref(), so we need to check for ctx presence + if let Some(cx) = ctx.borrow().as_ref() { + cx.running_task + .borrow() + .as_ref() + .is_some_and(|task| task.get_task_safety_error()) + } else { + false + } + }) + .unwrap_or_else(|e| { + panic!("Something is really bad here, error {}!", e); + }) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/kyron/src/scheduler/execution_engine.rs b/src/kyron/src/scheduler/execution_engine.rs index c55275d..b39b494 100644 --- a/src/kyron/src/scheduler/execution_engine.rs +++ b/src/kyron/src/scheduler/execution_engine.rs @@ -541,7 +541,8 @@ mod tests { #[test] #[cfg(not(miri))] // Provenance issues fn create_engine_with_worker_and_verify_ids() { - use crate::scheduler::context::{ctx_get_running_task_id, ctx_get_worker_id}; + use crate::scheduler::context::ctx_get_worker_id; + use crate::testing::mock_context::ctx_get_running_task_id; let mut engine = ExecutionEngineBuilder::new() .workers(1) .task_queue_size(16) diff --git a/src/kyron/src/scheduler/join_handle.rs b/src/kyron/src/scheduler/join_handle.rs index 889f489..e1ee85f 100644 --- a/src/kyron/src/scheduler/join_handle.rs +++ b/src/kyron/src/scheduler/join_handle.rs @@ -65,7 +65,7 @@ impl Future for JoinHandle { /// fn poll(self: ::core::pin::Pin<&mut Self>, cx: &mut ::core::task::Context<'_>) -> Poll { let res: FutureInternalReturn> = match self.state { - FutureState::New => { + FutureState::New | FutureState::Polled => { let waker = cx.waker(); // Set the waker, return values tells what have happen and took care about correct synchronization @@ -80,6 +80,7 @@ impl Future for JoinHandle { match ret { Ok(v) => FutureInternalReturn::ready(Ok(v)), + Err(CommonErrors::NoData) => FutureInternalReturn::polled(), Err(CommonErrors::OperationAborted) => { FutureInternalReturn::ready(Err(CommonErrors::OperationAborted)) }, @@ -89,23 +90,6 @@ impl Future for JoinHandle { } } }, - FutureState::Polled => { - // Safety belows forms AqrRel so waker is really written before we do marking - let mut ret: Result = Err(CommonErrors::NoData); - let ret_as_ptr = &mut ret as *mut _; - self.for_task.get_return_val(ret_as_ptr as *mut u8); - - match ret { - Ok(v) => FutureInternalReturn::ready(Ok(v)), - Err(CommonErrors::NoData) => FutureInternalReturn::polled(), - Err(CommonErrors::OperationAborted) => { - FutureInternalReturn::ready(Err(CommonErrors::OperationAborted)) - }, - Err(e) => { - not_recoverable_error!(with e, "There has been an error in a task that is not recoverable ({})!"); - }, - } - }, FutureState::Finished => { not_recoverable_error!("Future polled after it finished!"); }, @@ -293,6 +277,45 @@ mod tests { assert_eq!(poller.poll(), ::core::task::Poll::Ready(Ok(0))); } } + + #[test] + fn test_join_handle_waker_is_set_in_polled_state_also() { + let scheduler = create_mock_scheduler(); + + { + // Data is present before first poll of join handle + let worker_id = create_mock_worker_id(0, 1); + let task = ArcInternal::new(AsyncTask::new( + box_future(test_function::()), + &worker_id, + scheduler.clone(), + )); + + let handle = JoinHandle::::new(TaskRef::new(task.clone())); + + let mut poller = TestingFuturePoller::new(handle); + + let waker_mock1 = TrackableWaker::new(); + let waker1 = waker_mock1.get_waker(); + + let waker_mock2 = TrackableWaker::new(); + let waker2 = waker_mock2.get_waker(); + + let _ = poller.poll_with_waker(&waker1); + // Now in polled state, poll again with waker2 + let _ = poller.poll_with_waker(&waker2); + { + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + task.poll(&mut cx); // task done + } + + assert!(!waker_mock1.was_waked()); + // this should be TRUE + assert!(waker_mock2.was_waked()); + assert_eq!(poller.poll(), ::core::task::Poll::Ready(Ok(0))); + } + } } #[cfg(test)] @@ -315,8 +338,9 @@ mod tests { #[test] fn test_join_handler_mt_get_result() { - let builder = Builder::new(); - + let mut builder = Builder::new(); + // Limit preemption to avoid loom error "Model exceeded maximum number of branches." + builder.preemption_bound = Some(4); builder.check(|| { let scheduler = create_mock_scheduler(); @@ -342,22 +366,17 @@ mod tests { let waker_mock = TrackableWaker::new(); let waker = waker_mock.get_waker(); - let mut was_pending = false; - loop { match poller.poll_with_waker(&waker) { Poll::Ready(v) => { assert_eq!(v, Ok(1234)); - - if was_pending { - assert!(waker_mock.was_waked()); - } + // Note: + // Cannot check whether the waker was woken or not since the waker is set in the join handle poll every time if task is not yet done. + // So depending on the interleaving, the task may finish before the waker is set. break; }, - Poll::Pending => { - was_pending = true; - }, + Poll::Pending => {}, } loom::hint::spin_loop(); } diff --git a/src/kyron/src/scheduler/safety_waker.rs b/src/kyron/src/scheduler/safety_waker.rs index 06dbf2c..845cb77 100644 --- a/src/kyron/src/scheduler/safety_waker.rs +++ b/src/kyron/src/scheduler/safety_waker.rs @@ -55,11 +55,9 @@ static VTABLE: RawWakerVTable = RawWakerVTable::new(clone_waker, wake, wake_by_r /// /// Waker will store internally a pointer to the ref counted Task. /// -pub(crate) unsafe fn create_safety_waker(waker: Waker) -> Waker { - let raw_waker = RawWaker::new(waker.data(), &VTABLE); - - // Forget original as we took over the ownership, so ref count - ::core::mem::forget(waker); +pub(crate) fn create_safety_waker(ptr: TaskRef) -> Waker { + let ptr = TaskRef::into_raw(ptr); // Extracts the pointer from TaskRef not decreasing it's reference count. Since we have a clone here, ref cnt was already increased + let raw_waker = RawWaker::new(ptr as *const (), &VTABLE); // Convert RawWaker to Waker unsafe { Waker::from_raw(raw_waker) } diff --git a/src/kyron/src/scheduler/task/async_task.rs b/src/kyron/src/scheduler/task/async_task.rs index 83863a9..09683ed 100644 --- a/src/kyron/src/scheduler/task/async_task.rs +++ b/src/kyron/src/scheduler/task/async_task.rs @@ -13,9 +13,9 @@ use super::task_state::*; use crate::core::types::*; -use crate::scheduler::safety_waker::create_safety_waker; use crate::scheduler::scheduler_mt::SchedulerTrait; use crate::scheduler::workers::worker_types::WorkerId; +use ::core::cell::Cell; use ::core::future::Future; use ::core::mem; use ::core::ops::{Deref, DerefMut}; @@ -82,7 +82,7 @@ pub(crate) enum TaskStage { pub(crate) struct TaskHeader { pub(in crate::scheduler) state: TaskState, id: TaskId, - + is_safety_error: Cell, // Flag to indicate whether task resulted in safety error vtable: &'static TaskVTable, // API entrypoint to typed task } @@ -97,6 +97,7 @@ impl TaskHeader { Self { state: TaskState::new(), id: TaskId::new(worker_id), + is_safety_error: Cell::new(false), vtable: create_task_vtable::(), } } @@ -111,9 +112,18 @@ impl TaskHeader { Self { state: TaskState::new(), id: TaskId::new(worker_id), + is_safety_error: Cell::new(false), vtable: create_task_s_vtable::(), } } + + pub(crate) fn set_safety_error(&self) { + self.is_safety_error.set(true); + } + + pub(crate) fn get_safety_error(&self) -> bool { + self.is_safety_error.get() + } } #[derive(PartialEq, Debug)] @@ -210,13 +220,20 @@ where } pub(crate) fn set_waker(&self, waker: Waker) -> bool { - unsafe { - self.handle_waker.with_mut(|ptr| { - *ptr = Some(waker); - }) + // Safety: Unset join handle flag before setting waker, the flag would have been set previously in the first poll of join handle. + // If flag is not cleared, another worker finishing the task will see the flag set and + // read the waker to call wake() while it is written here + if self.header.state.unset_join_handle() { + unsafe { + self.handle_waker.with_mut(|ptr| { + *ptr = Some(waker); + }) + }; + + return self.header.state.set_join_handle(); } - self.header.state.set_waker() // Safety: makes sure storing waker is not reordered behind this operation + false } /// @@ -316,12 +333,10 @@ where .with_mut(|ptr: *mut Option| match unsafe { (*ptr).take() } { Some(v) => { if is_safety_err && self.is_with_safety { - unsafe { - create_safety_waker(v).wake(); - } - } else { - v.wake(); + // Set saftey error flag which will be checked in wake()/wkae_by_ref() to schedule parent task into safety worker + self.header.set_safety_error(); } + v.wake(); }, None => { not_recoverable_error!("We shall never be here if we have HadConnectedJoinHandle set!") @@ -624,6 +639,10 @@ impl TaskRef { snapshot.is_completed() || snapshot.is_canceled() } + pub(crate) fn get_task_safety_error(&self) -> bool { + unsafe { self.header.as_ref().get_safety_error() } + } + pub(crate) fn id(&self) -> TaskId { unsafe { self.header.as_ref().id } } @@ -651,7 +670,10 @@ mod tests { safety::SafetyResult, scheduler::{ scheduler_mt::SchedulerTrait, - task::async_task::{TaskId, TaskRef}, + task::{ + async_task::{TaskId, TaskRef}, + task_context::TaskContextGuard, + }, }, testing::*, }; @@ -788,6 +810,7 @@ mod tests { safety_task_ref.set_join_handle_waker(waker.clone()); // Mimic that JoinHandler is set let mut ctx = Context::from_waker(&waker); + let _guard = TaskContextGuard::new(safety_task_ref.clone()); assert_eq!(safety_task_ref.poll(&mut ctx), TaskPollResult::Done); let mut result: SafetyResult = Ok(true); @@ -816,6 +839,7 @@ mod tests { safety_task_ref.set_join_handle_waker(waker.clone()); // Mimic that JoinHandler is set let mut ctx = Context::from_waker(&waker); + let _guard = TaskContextGuard::new(safety_task_ref.clone()); assert_eq!(safety_task_ref.poll(&mut ctx), TaskPollResult::Done); let mut result: SafetyResult = Ok(true); @@ -844,6 +868,7 @@ mod tests { safety_task_ref.set_join_handle_waker(waker.clone()); // Mimic that JoinHandler is set let mut ctx = Context::from_waker(&waker); + let _guard = TaskContextGuard::new(safety_task_ref.clone()); assert_eq!(safety_task_ref.poll(&mut ctx), TaskPollResult::Done); let mut result: SafetyResult = Ok(true); @@ -872,6 +897,7 @@ mod tests { safety_task_ref.set_join_handle_waker(waker.clone()); // Mimic that JoinHandler is set let mut ctx = Context::from_waker(&waker); + let _guard = TaskContextGuard::new(safety_task_ref.clone()); assert_eq!(safety_task_ref.poll(&mut ctx), TaskPollResult::Done); let mut result: SafetyResult = Ok(true); diff --git a/src/kyron/src/scheduler/task/task_context.rs b/src/kyron/src/scheduler/task/task_context.rs index 181ee5a..f94ceb8 100644 --- a/src/kyron/src/scheduler/task/task_context.rs +++ b/src/kyron/src/scheduler/task/task_context.rs @@ -11,12 +11,17 @@ // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* +#[cfg(not(any(test, feature = "runtime-api-mock")))] +use crate::scheduler::context::{ + ctx_get_running_task_id, ctx_get_task_safety_error, ctx_set_running_task, ctx_unset_running_task, +}; +#[cfg(any(test, feature = "runtime-api-mock"))] +use crate::testing::mock_context::{ + ctx_get_running_task_id, ctx_get_task_safety_error, ctx_set_running_task, ctx_unset_running_task, +}; use crate::{ core::types::TaskId, - scheduler::{ - context::{ctx_get_running_task_id, ctx_get_worker_id, ctx_set_running_task, ctx_unset_running_task}, - workers::worker_types::WorkerId, - }, + scheduler::{context::ctx_get_worker_id, workers::worker_types::WorkerId}, TaskRef, }; @@ -33,6 +38,11 @@ impl TaskContext { pub fn task_id() -> Option { ctx_get_running_task_id() } + + /// Check whether the running task resulted in safety error to schedule parent into safety worker + pub(crate) fn should_wake_task_into_safety() -> bool { + ctx_get_task_safety_error() + } } /// A guard that sets the task on creation and unsets it on drop. diff --git a/src/kyron/src/scheduler/task/task_state.rs b/src/kyron/src/scheduler/task/task_state.rs index 053a523..7f060d4 100644 --- a/src/kyron/src/scheduler/task/task_state.rs +++ b/src/kyron/src/scheduler/task/task_state.rs @@ -127,6 +127,11 @@ impl TaskStateSnapshot { self.0 |= TASK_JOIN_HANDLE_ATTACHED; } + #[inline(always)] + pub(crate) fn unset_join_handle(&mut self) { + self.0 &= !TASK_JOIN_HANDLE_ATTACHED; + } + #[inline(always)] pub(crate) fn set_running(&mut self) { let mask = TASK_STATE_RUNNING | TASK_STATE_IDLE; @@ -274,7 +279,7 @@ impl TaskState { /// /// Returns result of transition /// - pub(crate) fn set_waker(&self) -> bool { + pub(crate) fn set_join_handle(&self) -> bool { self.fetch_update_with_return(|old: TaskStateSnapshot| { if old.is_completed() || old.is_canceled() { return (None, false); @@ -286,6 +291,25 @@ impl TaskState { }) } + /// + /// Unset join handle attached flag if task is not already completed or canceled. + /// + pub(crate) fn unset_join_handle(&self) -> bool { + self.fetch_update_with_return(|old: TaskStateSnapshot| { + if old.is_completed() || old.is_canceled() { + return (None, false); + } + + if !old.has_join_handle() { + return (None, true); + } + + let mut new = old; + new.unset_join_handle(); + (Some(new), true) + }) + } + /// /// Return true if task was notified before, otherwise false /// diff --git a/src/kyron/src/scheduler/waker.rs b/src/kyron/src/scheduler/waker.rs index ffbd025..78967a2 100644 --- a/src/kyron/src/scheduler/waker.rs +++ b/src/kyron/src/scheduler/waker.rs @@ -14,6 +14,7 @@ use kyron_foundation::prelude::FoundationAtomicPtr; use super::task::async_task::*; +use crate::scheduler::task::task_context::TaskContext; use core::task::{RawWaker, RawWakerVTable, Waker}; fn clone_waker(data: *const ()) -> RawWaker { @@ -31,15 +32,21 @@ fn clone_waker(data: *const ()) -> RawWaker { fn wake(data: *const ()) { let task_header_ptr = data as *const TaskHeader; let task_ref = unsafe { TaskRef::from_raw(task_header_ptr) }; - - task_ref.schedule(); + if TaskContext::should_wake_task_into_safety() { + task_ref.schedule_safety(); + } else { + task_ref.schedule(); + } } fn wake_by_ref(data: *const ()) { let task_header_ptr = data as *const TaskHeader; let task_ref = unsafe { TaskRef::from_raw(task_header_ptr) }; - - task_ref.schedule_by_ref(); + if TaskContext::should_wake_task_into_safety() { + task_ref.schedule_safety_by_ref(); + } else { + task_ref.schedule_by_ref(); + } ::core::mem::forget(task_ref); // don't touch refcount from our data since this is done by drop_waker } diff --git a/src/kyron/src/scheduler/workers/safety_worker.rs b/src/kyron/src/scheduler/workers/safety_worker.rs index 27b8ef5..ee38cbe 100644 --- a/src/kyron/src/scheduler/workers/safety_worker.rs +++ b/src/kyron/src/scheduler/workers/safety_worker.rs @@ -24,9 +24,9 @@ use crate::{ scheduler::{ context::{ctx_initialize, ContextBuilder}, driver::Drivers, + safety_waker::create_safety_waker, scheduler_mt::{AsyncScheduler, DedicatedScheduler}, task::{async_task::TaskPollResult, task_context::TaskContextGuard}, - waker::create_waker, }, TaskRef, }; @@ -176,7 +176,7 @@ impl WorkerInner { } fn run_task(&mut self, task: TaskRef) { - let waker = create_waker(task.clone()); + let waker = create_safety_waker(task.clone()); let mut ctx = Context::from_waker(&waker); let _guard = TaskContextGuard::new(task.clone()); match task.poll(&mut ctx) { diff --git a/src/kyron/src/testing/mock.rs b/src/kyron/src/testing/mock.rs index bc1b49b..6c814e1 100644 --- a/src/kyron/src/testing/mock.rs +++ b/src/kyron/src/testing/mock.rs @@ -16,7 +16,11 @@ use crate::{ core::types::{box_future, FutureBox, UniqueWorkerId}, futures::reusable_box_future::ReusableBoxFuture, - scheduler::{join_handle::JoinHandle, task::async_task::TaskRef, waker::create_waker}, + scheduler::{ + join_handle::JoinHandle, + task::{async_task::TaskRef, task_context::TaskContextGuard}, + waker::create_waker, + }, testing::*, }; use ::core::{cell::RefCell, future::Future, sync::atomic, task::Context}; @@ -95,7 +99,7 @@ pub mod runtime { while let Some(task) = dequeue_task() { let waker = create_waker(task.clone()); let mut ctx = Context::from_waker(&waker); - + let _guard = TaskContextGuard::new(task.clone()); task.poll(&mut ctx); if !task.is_done() { to_enqueue.push(task); diff --git a/src/kyron/src/testing/mock_context.rs b/src/kyron/src/testing/mock_context.rs new file mode 100644 index 0000000..132010f --- /dev/null +++ b/src/kyron/src/testing/mock_context.rs @@ -0,0 +1,53 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +use crate::core::types::TaskId; +use crate::TaskRef; + +// Thread-local storage to set/get running task in mock context +use ::core::cell::RefCell; +thread_local! { + static MOCK_TASK_CTX: RefCell> = const { RefCell::new(None) }; +} + +/// +/// Sets currently running `task` +/// +pub(crate) fn ctx_set_running_task(task: TaskRef) { + let _ = MOCK_TASK_CTX.try_with(|ctx| ctx.replace(Some(task))); +} + +/// +/// Clears currently running `task` +/// +pub(crate) fn ctx_unset_running_task() { + let _ = MOCK_TASK_CTX.try_with(|ctx| ctx.replace(None)); +} + +/// +/// Returns `true` if the running task resulted in safety error +/// +pub(crate) fn ctx_get_task_safety_error() -> bool { + MOCK_TASK_CTX + .try_with(|ctx| ctx.borrow().as_ref().is_some_and(|task| task.get_task_safety_error())) + .unwrap_or_default() +} + +/// +/// Gets currently running `task id` +/// +pub(crate) fn ctx_get_running_task_id() -> Option { + MOCK_TASK_CTX + .try_with(|ctx| ctx.borrow().as_ref().map(|task| task.id())) + .unwrap_or_default() +} diff --git a/src/kyron/src/testing/mod.rs b/src/kyron/src/testing/mod.rs index 04c1ceb..e588728 100644 --- a/src/kyron/src/testing/mod.rs +++ b/src/kyron/src/testing/mod.rs @@ -33,6 +33,8 @@ use crate::{ #[cfg(any(test, feature = "runtime-api-mock"))] pub mod mock; +#[cfg(any(test, feature = "runtime-api-mock"))] +pub mod mock_context; #[derive(Default)] pub struct SchedulerSyncMock { diff --git a/tests/test_cases/tests/runtime/worker/test_safety_worker.py b/tests/test_cases/tests/runtime/worker/test_safety_worker.py index 6008f9e..51af86d 100644 --- a/tests/test_cases/tests/runtime/worker/test_safety_worker.py +++ b/tests/test_cases/tests/runtime/worker/test_safety_worker.py @@ -120,7 +120,6 @@ def test_safety_enabled( # region safety worker core -@pytest.mark.xfail(reason="https://github.com/qorix-group/inc_orchestrator_internal/issues/397") class TestTaskHandling(CitScenario): @pytest.fixture(scope="class") def scenario_name(self) -> str: @@ -169,7 +168,6 @@ def test_safety_worker_uniqueness( ) -@pytest.mark.xfail(reason="https://github.com/qorix-group/inc_orchestrator_internal/issues/397") class TestNestedTaskHandling(CitScenario): @pytest.fixture(scope="class") def scenario_name(self) -> str: @@ -425,7 +423,6 @@ def results( @pytest.mark.root_required @pytest.mark.skipif("WSL" in platform(), reason="Not supported on WSL") -@pytest.mark.xfail(reason="https://github.com/qorix-group/inc_orchestrator_internal/issues/397") class TestSafeWorkerThreadParameters(ThreadParametersBase): @pytest.fixture(scope="class") def scheduler(self) -> str: @@ -481,7 +478,6 @@ def test_safety_worker_thread_params( @pytest.mark.root_required @pytest.mark.skipif("WSL" in platform(), reason="Not supported on WSL") -@pytest.mark.xfail(reason="https://github.com/qorix-group/inc_orchestrator_internal/issues/397") class TestSafeWorkerMissingThreadParameterScheduler(ThreadParametersNegativeBase): @pytest.fixture(scope="class") def priority(self) -> int: @@ -502,7 +498,6 @@ def test_config(self, priority: int) -> dict[str, Any]: @pytest.mark.root_required @pytest.mark.skipif("WSL" in platform(), reason="Not supported on WSL") -@pytest.mark.xfail(reason="https://github.com/qorix-group/inc_orchestrator_internal/issues/397") class TestSafeWorkerMissingThreadParameterPriority(ThreadParametersNegativeBase): @pytest.fixture(scope="class") def scheduler(self) -> str: @@ -523,7 +518,6 @@ def test_config(self, scheduler: str) -> dict[str, Any]: @pytest.mark.root_required @pytest.mark.skipif("WSL" in platform(), reason="Not supported on WSL") -@pytest.mark.xfail(reason="https://github.com/qorix-group/inc_orchestrator_internal/issues/397") class TestSafeWorkerMissingThreadParameters(ThreadParametersNegativeBase): @pytest.fixture(scope="class") def test_config(self) -> dict[str, Any]: @@ -540,7 +534,6 @@ def test_config(self) -> dict[str, Any]: # region spawn methods -@pytest.mark.xfail(reason="https://github.com/qorix-group/inc_orchestrator_internal/issues/397") class TestSpawnFromBoxed(CitScenario): @pytest.fixture(scope="class") def scenario_name(self) -> str: @@ -569,14 +562,12 @@ def test_safety_worker_handler( ) -@pytest.mark.xfail(reason="https://github.com/qorix-group/inc_orchestrator_internal/issues/397") class TestSpawnFromReusable(TestSpawnFromBoxed): @pytest.fixture(scope="class") def scenario_name(self) -> str: return "runtime.worker.safety_worker.spawn_from_reusable" -@pytest.mark.xfail(reason="https://github.com/qorix-group/inc_orchestrator_internal/issues/397") class TestSpawnOnDedicated(CitScenario): @pytest.fixture(scope="class") def scenario_name(self) -> str: @@ -610,14 +601,12 @@ def test_task_to_worker_assignment( assert len(all_thread_ids) == 3, "Each task should be executed on its own thread" -@pytest.mark.xfail(reason="https://github.com/qorix-group/inc_orchestrator_internal/issues/397") class TestSpawnFromBoxedOnDedicated(CitScenario): @pytest.fixture(scope="class") def scenario_name(self) -> str: return "runtime.worker.safety_worker.spawn_from_boxed_on_dedicated" -@pytest.mark.xfail(reason="https://github.com/qorix-group/inc_orchestrator_internal/issues/397") class TestSpawnFromReusableOnDedicated(CitScenario): @pytest.fixture(scope="class") def scenario_name(self) -> str: diff --git a/tests/test_scenarios/rust/src/scenarios/runtime/worker/safety_worker.rs b/tests/test_scenarios/rust/src/scenarios/runtime/worker/safety_worker.rs index e452459..fcdd533 100644 --- a/tests/test_scenarios/rust/src/scenarios/runtime/worker/safety_worker.rs +++ b/tests/test_scenarios/rust/src/scenarios/runtime/worker/safety_worker.rs @@ -15,6 +15,7 @@ use crate::internals::runtime_helper::Runtime; use crate::internals::thread_params::{current_thread_priority_params, ThreadPriorityParams}; use kyron::core::types::UniqueWorkerId; use kyron::futures::reusable_box_future::ReusableBoxFuturePool; +use kyron::futures::sleep; use kyron::{safety, spawn}; use serde::Deserialize; use serde_json::Value; @@ -68,6 +69,10 @@ impl Scenario for EnsureSafetyEnabledOutisdeAsyncContext { async fn failing_task() -> Result<(), String> { info!(name = "failing_task"); + // Note: The task that is awaiting on JoinHandle will be woken up either in safety worker or regular worker. + // If the execution of spawned task is completed before awaiting on the JoinHandle, then the awaiting task will be woken up in regular worker. + // Simulate work, this helps to wake awaiting task into safety worker after this task fails. + sleep::sleep(::core::time::Duration::from_millis(1)).await; Err("Intentional failure".to_string()) } struct SafetyWorkerFailedTaskHandling;