Skip to content

Commit

Permalink
absl: fix a priority bug in CondVar wait morphing
Browse files Browse the repository at this point in the history
Enqueue updates priority of the queued thread.
It was assumed that the queued thread is the current thread.
But it's not the case in CondVar wait morhping,
where we requeue an existing CondVar waiter on the Mutex.
As the result one thread can falsely get priority of another thread.

Fix this by not updating priority in this case.
And make the assumption explicit and checked.

PiperOrigin-RevId: 561249402
Change-Id: I9476c047757090b893a88a2839b795b85fe220ad
  • Loading branch information
Abseil Team authored and copybara-github committed Aug 30, 2023
1 parent f6fc4ef commit b06ab1f
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 16 deletions.
37 changes: 21 additions & 16 deletions absl/synchronization/mutex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ static const intptr_t kMuOne = 0x0100; // a count of one reader
// flags passed to Enqueue and LockSlow{,WithTimeout,Loop}
static const int kMuHasBlocked = 0x01; // already blocked (MUST == 1)
static const int kMuIsCond = 0x02; // conditional waiter (CV or Condition)
static const int kMuIsFer = 0x04; // wait morphing from a CondVar

static_assert(PerThreadSynch::kAlignment > kMuLow,
"PerThreadSynch::kAlignment must be greater than kMuLow");
Expand Down Expand Up @@ -920,20 +921,23 @@ static PerThreadSynch* Enqueue(PerThreadSynch* head, SynchWaitParams* waitp,
s->wake = false; // not being woken
s->cond_waiter = ((flags & kMuIsCond) != 0);
#ifdef ABSL_HAVE_PTHREAD_GETSCHEDPARAM
int64_t now_cycles = CycleClock::Now();
if (s->next_priority_read_cycles < now_cycles) {
// Every so often, update our idea of the thread's priority.
// pthread_getschedparam() is 5% of the block/wakeup time;
// CycleClock::Now() is 0.5%.
int policy;
struct sched_param param;
const int err = pthread_getschedparam(pthread_self(), &policy, &param);
if (err != 0) {
ABSL_RAW_LOG(ERROR, "pthread_getschedparam failed: %d", err);
} else {
s->priority = param.sched_priority;
s->next_priority_read_cycles =
now_cycles + static_cast<int64_t>(CycleClock::Frequency());
if ((flags & kMuIsFer) == 0) {
assert(s == Synch_GetPerThread());
int64_t now_cycles = CycleClock::Now();
if (s->next_priority_read_cycles < now_cycles) {
// Every so often, update our idea of the thread's priority.
// pthread_getschedparam() is 5% of the block/wakeup time;
// CycleClock::Now() is 0.5%.
int policy;
struct sched_param param;
const int err = pthread_getschedparam(pthread_self(), &policy, &param);
if (err != 0) {
ABSL_RAW_LOG(ERROR, "pthread_getschedparam failed: %d", err);
} else {
s->priority = param.sched_priority;
s->next_priority_read_cycles =
now_cycles + static_cast<int64_t>(CycleClock::Frequency());
}
}
}
#endif
Expand Down Expand Up @@ -2436,7 +2440,8 @@ void Mutex::Fer(PerThreadSynch* w) {
} else {
if ((v & (kMuSpin | kMuWait)) == 0) { // no waiters
// This thread tries to become the one and only waiter.
PerThreadSynch* new_h = Enqueue(nullptr, w->waitp, v, kMuIsCond);
PerThreadSynch* new_h =
Enqueue(nullptr, w->waitp, v, kMuIsCond | kMuIsFer);
ABSL_RAW_CHECK(new_h != nullptr,
"Enqueue failed"); // we must queue ourselves
if (mu_.compare_exchange_strong(
Expand All @@ -2447,7 +2452,7 @@ void Mutex::Fer(PerThreadSynch* w) {
} else if ((v & kMuSpin) == 0 &&
mu_.compare_exchange_strong(v, v | kMuSpin | kMuWait)) {
PerThreadSynch* h = GetPerThreadSynch(v);
PerThreadSynch* new_h = Enqueue(h, w->waitp, v, kMuIsCond);
PerThreadSynch* new_h = Enqueue(h, w->waitp, v, kMuIsCond | kMuIsFer);
ABSL_RAW_CHECK(new_h != nullptr,
"Enqueue failed"); // we must queue ourselves
do {
Expand Down
60 changes: 60 additions & 0 deletions absl/synchronization/mutex_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,16 @@
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/memory/memory.h"
#include "absl/synchronization/internal/create_thread_identity.h"
#include "absl/synchronization/internal/thread_pool.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"

#ifdef ABSL_HAVE_PTHREAD_GETSCHEDPARAM
#include <pthread.h>
#include <string.h>
#endif

namespace {

// TODO(dmauro): Replace with a commandline flag.
Expand Down Expand Up @@ -1868,6 +1874,60 @@ TEST(Mutex, WriterPriority) {
EXPECT_TRUE(saw_wrote.load());
}

#ifdef ABSL_HAVE_PTHREAD_GETSCHEDPARAM
TEST(Mutex, CondVarPriority) {
// A regression test for a bug in condition variable wait morphing,
// which resulted in the waiting thread getting priority of the waking thread.
int err = 0;
sched_param param;
param.sched_priority = 7;
std::thread test([&]() {
err = pthread_setschedparam(pthread_self(), SCHED_FIFO, &param);
});
test.join();
if (err) {
// Setting priority usually requires special privileges.
GTEST_SKIP() << "failed to set priority: " << strerror(err);
}
absl::Mutex mu;
absl::CondVar cv;
bool locked = false;
bool notified = false;
bool waiting = false;
bool morph = false;
std::thread th([&]() {
EXPECT_EQ(0, pthread_setschedparam(pthread_self(), SCHED_FIFO, &param));
mu.Lock();
locked = true;
mu.Await(absl::Condition(&notified));
mu.Unlock();
EXPECT_EQ(absl::synchronization_internal::GetOrCreateCurrentThreadIdentity()
->per_thread_synch.priority,
param.sched_priority);
mu.Lock();
mu.Await(absl::Condition(&waiting));
morph = true;
absl::SleepFor(absl::Seconds(1));
cv.Signal();
mu.Unlock();
});
mu.Lock();
mu.Await(absl::Condition(&locked));
notified = true;
mu.Unlock();
mu.Lock();
waiting = true;
while (!morph) {
cv.Wait(&mu);
}
mu.Unlock();
th.join();
EXPECT_NE(absl::synchronization_internal::GetOrCreateCurrentThreadIdentity()
->per_thread_synch.priority,
param.sched_priority);
}
#endif

TEST(Mutex, LockWhenWithTimeoutResult) {
// Check various corner cases for Await/LockWhen return value
// with always true/always false conditions.
Expand Down

0 comments on commit b06ab1f

Please sign in to comment.