Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Update EventManager and task::run_on_core to ignore spurious wakeups if they happen #340

Merged
merged 3 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/event_manager/include/event_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class EventManager : public espp::BaseComponent {

struct SubscriberData {
std::mutex m;
bool notified = false; // Allows cv to ignore spurious wakeups
std::condition_variable cv;
std::deque<std::vector<uint8_t>> deq;
};
Expand Down
10 changes: 9 additions & 1 deletion components/event_manager/src/event_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ bool EventManager::publish(const std::string &topic, const std::vector<uint8_t>
std::unique_lock<std::mutex> lk(sub_data->m);
// push the data into the queue
sub_data->deq.push_back(data);
// update the notified flag (used to ignore spurious wakeups)
sub_data->notified = true;
}
// notify the task that there is new data in the queue
sub_data->cv.notify_all();
Expand Down Expand Up @@ -174,6 +176,10 @@ bool EventManager::remove_subscriber(const std::string &topic, const std::string
// notify the data (so the subscriber task function can stop waiting on the data cv)
{
std::lock_guard<std::recursive_mutex> lk(data_mutex_);
{
std::unique_lock<std::mutex> lk(subscriber_data_[topic].m);
subscriber_data_[topic].notified = true;
}
subscriber_data_[topic].cv.notify_all();
}
{
Expand Down Expand Up @@ -210,7 +216,7 @@ bool EventManager::subscriber_task_fn(const std::string &topic, std::mutex &m,
{
// wait on sub_data's mutex/cv
std::unique_lock<std::mutex> lk(sub_data->m);
sub_data->cv.wait(lk);
sub_data->cv.wait(lk, [&sub_data] { return sub_data->notified; });
if (sub_data->deq.empty()) {
// stop the task, we were notified, but there was no data available.
return true;
Expand All @@ -224,6 +230,8 @@ bool EventManager::subscriber_task_fn(const std::string &topic, std::mutex &m,
{
std::unique_lock<std::mutex> lk(sub_data->m);
if (sub_data->deq.empty()) {
// reset the notified flag
sub_data->notified = false;
// we've gotten all the data, so break out of the loop
break;
}
Expand Down
11 changes: 7 additions & 4 deletions components/task/include/run_on_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ static auto run_on_core(const auto &f, int core_id, size_t stack_size_bytes = 20
// If the core id is larger than the number of cores, run on the last core
core_id = configNUM_CORES - 1;
}
bool notified = false;
std::mutex mutex;
std::unique_lock lock(mutex); // cppcheck-suppress localMutex
std::condition_variable cv; ///< Signal for when the task is done / function is run
Expand All @@ -43,13 +44,14 @@ static auto run_on_core(const auto &f, int core_id, size_t stack_size_bytes = 20
decltype(f()) ret_val;
auto f_task = espp::Task::make_unique(espp::Task::Config{
.name = name,
.callback = [&mutex, &cv, &f, &ret_val](auto &cb_m, auto &cb_cv) -> bool {
.callback = [&mutex, &cv, &f, &ret_val, &notified](auto &cb_m, auto &cb_cv) -> bool {
// synchronize with the main thread - block here until the main thread
// waits on the condition variable (cv), then run the function
std::unique_lock lock(mutex);
// run the function
ret_val = f();
// signal that the task is done
notified = true;
cv.notify_all();
return true; // stop the task
},
Expand All @@ -58,19 +60,20 @@ static auto run_on_core(const auto &f, int core_id, size_t stack_size_bytes = 20
.core_id = core_id,
});
f_task->start();
cv.wait(lock);
cv.wait(lock, [&notified] { return notified; });
return ret_val;
} else {
// the function returns void
auto f_task = espp::Task::make_unique(espp::Task::Config{
.name = name,
.callback = [&mutex, &cv, &f](auto &cb_m, auto &cb_cv) -> bool {
.callback = [&mutex, &cv, &f, &notified](auto &cb_m, auto &cb_cv) -> bool {
// synchronize with the main thread - block here until the main thread
// waits on the condition variable (cv), then run the function
std::unique_lock lock(mutex);
// run the function
f();
// signal that the task is done
notified = true;
cv.notify_all();
return true; // stop the task
},
Expand All @@ -79,7 +82,7 @@ static auto run_on_core(const auto &f, int core_id, size_t stack_size_bytes = 20
.core_id = core_id,
});
f_task->start();
cv.wait(lock);
cv.wait(lock, [&notified] { return notified; });
}
}
}
Expand Down