diff --git a/components/event_manager/include/event_manager.hpp b/components/event_manager/include/event_manager.hpp index 816ea46de..8c12da76f 100644 --- a/components/event_manager/include/event_manager.hpp +++ b/components/event_manager/include/event_manager.hpp @@ -134,6 +134,7 @@ class EventManager : public espp::BaseComponent { struct SubscriberData { std::mutex m; + bool notified = false; // Allows cv to ignore spurious wakeups std::condition_variable cv; std::deque> deq; }; diff --git a/components/event_manager/src/event_manager.cpp b/components/event_manager/src/event_manager.cpp index 9c1348aa0..7b2dfa694 100644 --- a/components/event_manager/src/event_manager.cpp +++ b/components/event_manager/src/event_manager.cpp @@ -98,6 +98,8 @@ bool EventManager::publish(const std::string &topic, const std::vector std::unique_lock lk(sub_data->m); // push the data into the queue sub_data->deq.push_back(data); + // update the notified flag (used to ignore spurious wakeups) + sub_data->notified = true; } // notify the task that there is new data in the queue sub_data->cv.notify_all(); @@ -174,6 +176,10 @@ bool EventManager::remove_subscriber(const std::string &topic, const std::string // notify the data (so the subscriber task function can stop waiting on the data cv) { std::lock_guard lk(data_mutex_); + { + std::unique_lock lk(subscriber_data_[topic].m); + subscriber_data_[topic].notified = true; + } subscriber_data_[topic].cv.notify_all(); } { @@ -210,7 +216,7 @@ bool EventManager::subscriber_task_fn(const std::string &topic, std::mutex &m, { // wait on sub_data's mutex/cv std::unique_lock lk(sub_data->m); - sub_data->cv.wait(lk); + sub_data->cv.wait(lk, [&sub_data] { return sub_data->notified; }); if (sub_data->deq.empty()) { // stop the task, we were notified, but there was no data available. return true; @@ -224,6 +230,8 @@ bool EventManager::subscriber_task_fn(const std::string &topic, std::mutex &m, { std::unique_lock lk(sub_data->m); if (sub_data->deq.empty()) { + // reset the notified flag + sub_data->notified = false; // we've gotten all the data, so break out of the loop break; } diff --git a/components/task/include/run_on_core.hpp b/components/task/include/run_on_core.hpp index b3ee0cec8..5a240eec4 100644 --- a/components/task/include/run_on_core.hpp +++ b/components/task/include/run_on_core.hpp @@ -35,6 +35,7 @@ static auto run_on_core(const auto &f, int core_id, size_t stack_size_bytes = 20 // If the core id is larger than the number of cores, run on the last core core_id = configNUM_CORES - 1; } + bool notified = false; std::mutex mutex; std::unique_lock lock(mutex); // cppcheck-suppress localMutex std::condition_variable cv; ///< Signal for when the task is done / function is run @@ -43,13 +44,14 @@ static auto run_on_core(const auto &f, int core_id, size_t stack_size_bytes = 20 decltype(f()) ret_val; auto f_task = espp::Task::make_unique(espp::Task::Config{ .name = name, - .callback = [&mutex, &cv, &f, &ret_val](auto &cb_m, auto &cb_cv) -> bool { + .callback = [&mutex, &cv, &f, &ret_val, ¬ified](auto &cb_m, auto &cb_cv) -> bool { // synchronize with the main thread - block here until the main thread // waits on the condition variable (cv), then run the function std::unique_lock lock(mutex); // run the function ret_val = f(); // signal that the task is done + notified = true; cv.notify_all(); return true; // stop the task }, @@ -58,19 +60,20 @@ static auto run_on_core(const auto &f, int core_id, size_t stack_size_bytes = 20 .core_id = core_id, }); f_task->start(); - cv.wait(lock); + cv.wait(lock, [¬ified] { return notified; }); return ret_val; } else { // the function returns void auto f_task = espp::Task::make_unique(espp::Task::Config{ .name = name, - .callback = [&mutex, &cv, &f](auto &cb_m, auto &cb_cv) -> bool { + .callback = [&mutex, &cv, &f, ¬ified](auto &cb_m, auto &cb_cv) -> bool { // synchronize with the main thread - block here until the main thread // waits on the condition variable (cv), then run the function std::unique_lock lock(mutex); // run the function f(); // signal that the task is done + notified = true; cv.notify_all(); return true; // stop the task }, @@ -79,7 +82,7 @@ static auto run_on_core(const auto &f, int core_id, size_t stack_size_bytes = 20 .core_id = core_id, }); f_task->start(); - cv.wait(lock); + cv.wait(lock, [¬ified] { return notified; }); } } }