diff --git a/absl/synchronization/mutex.cc b/absl/synchronization/mutex.cc index 3aa5560a32f..353a8280644 100644 --- a/absl/synchronization/mutex.cc +++ b/absl/synchronization/mutex.cc @@ -682,6 +682,7 @@ static const intptr_t kMuOne = 0x0100; // a count of one reader // flags passed to Enqueue and LockSlow{,WithTimeout,Loop} static const int kMuHasBlocked = 0x01; // already blocked (MUST == 1) static const int kMuIsCond = 0x02; // conditional waiter (CV or Condition) +static const int kMuIsFer = 0x04; // wait morphing from a CondVar static_assert(PerThreadSynch::kAlignment > kMuLow, "PerThreadSynch::kAlignment must be greater than kMuLow"); @@ -920,20 +921,23 @@ static PerThreadSynch* Enqueue(PerThreadSynch* head, SynchWaitParams* waitp, s->wake = false; // not being woken s->cond_waiter = ((flags & kMuIsCond) != 0); #ifdef ABSL_HAVE_PTHREAD_GETSCHEDPARAM - int64_t now_cycles = CycleClock::Now(); - if (s->next_priority_read_cycles < now_cycles) { - // Every so often, update our idea of the thread's priority. - // pthread_getschedparam() is 5% of the block/wakeup time; - // CycleClock::Now() is 0.5%. - int policy; - struct sched_param param; - const int err = pthread_getschedparam(pthread_self(), &policy, ¶m); - if (err != 0) { - ABSL_RAW_LOG(ERROR, "pthread_getschedparam failed: %d", err); - } else { - s->priority = param.sched_priority; - s->next_priority_read_cycles = - now_cycles + static_cast(CycleClock::Frequency()); + if ((flags & kMuIsFer) == 0) { + assert(s == Synch_GetPerThread()); + int64_t now_cycles = CycleClock::Now(); + if (s->next_priority_read_cycles < now_cycles) { + // Every so often, update our idea of the thread's priority. + // pthread_getschedparam() is 5% of the block/wakeup time; + // CycleClock::Now() is 0.5%. + int policy; + struct sched_param param; + const int err = pthread_getschedparam(pthread_self(), &policy, ¶m); + if (err != 0) { + ABSL_RAW_LOG(ERROR, "pthread_getschedparam failed: %d", err); + } else { + s->priority = param.sched_priority; + s->next_priority_read_cycles = + now_cycles + static_cast(CycleClock::Frequency()); + } } } #endif @@ -2436,7 +2440,8 @@ void Mutex::Fer(PerThreadSynch* w) { } else { if ((v & (kMuSpin | kMuWait)) == 0) { // no waiters // This thread tries to become the one and only waiter. - PerThreadSynch* new_h = Enqueue(nullptr, w->waitp, v, kMuIsCond); + PerThreadSynch* new_h = + Enqueue(nullptr, w->waitp, v, kMuIsCond | kMuIsFer); ABSL_RAW_CHECK(new_h != nullptr, "Enqueue failed"); // we must queue ourselves if (mu_.compare_exchange_strong( @@ -2447,7 +2452,7 @@ void Mutex::Fer(PerThreadSynch* w) { } else if ((v & kMuSpin) == 0 && mu_.compare_exchange_strong(v, v | kMuSpin | kMuWait)) { PerThreadSynch* h = GetPerThreadSynch(v); - PerThreadSynch* new_h = Enqueue(h, w->waitp, v, kMuIsCond); + PerThreadSynch* new_h = Enqueue(h, w->waitp, v, kMuIsCond | kMuIsFer); ABSL_RAW_CHECK(new_h != nullptr, "Enqueue failed"); // we must queue ourselves do { diff --git a/absl/synchronization/mutex_test.cc b/absl/synchronization/mutex_test.cc index b585c342e6b..0bca46c5a54 100644 --- a/absl/synchronization/mutex_test.cc +++ b/absl/synchronization/mutex_test.cc @@ -36,10 +36,16 @@ #include "absl/log/check.h" #include "absl/log/log.h" #include "absl/memory/memory.h" +#include "absl/synchronization/internal/create_thread_identity.h" #include "absl/synchronization/internal/thread_pool.h" #include "absl/time/clock.h" #include "absl/time/time.h" +#ifdef ABSL_HAVE_PTHREAD_GETSCHEDPARAM +#include +#include +#endif + namespace { // TODO(dmauro): Replace with a commandline flag. @@ -1868,6 +1874,60 @@ TEST(Mutex, WriterPriority) { EXPECT_TRUE(saw_wrote.load()); } +#ifdef ABSL_HAVE_PTHREAD_GETSCHEDPARAM +TEST(Mutex, CondVarPriority) { + // A regression test for a bug in condition variable wait morphing, + // which resulted in the waiting thread getting priority of the waking thread. + int err = 0; + sched_param param; + param.sched_priority = 7; + std::thread test([&]() { + err = pthread_setschedparam(pthread_self(), SCHED_FIFO, ¶m); + }); + test.join(); + if (err) { + // Setting priority usually requires special privileges. + GTEST_SKIP() << "failed to set priority: " << strerror(err); + } + absl::Mutex mu; + absl::CondVar cv; + bool locked = false; + bool notified = false; + bool waiting = false; + bool morph = false; + std::thread th([&]() { + EXPECT_EQ(0, pthread_setschedparam(pthread_self(), SCHED_FIFO, ¶m)); + mu.Lock(); + locked = true; + mu.Await(absl::Condition(¬ified)); + mu.Unlock(); + EXPECT_EQ(absl::synchronization_internal::GetOrCreateCurrentThreadIdentity() + ->per_thread_synch.priority, + param.sched_priority); + mu.Lock(); + mu.Await(absl::Condition(&waiting)); + morph = true; + absl::SleepFor(absl::Seconds(1)); + cv.Signal(); + mu.Unlock(); + }); + mu.Lock(); + mu.Await(absl::Condition(&locked)); + notified = true; + mu.Unlock(); + mu.Lock(); + waiting = true; + while (!morph) { + cv.Wait(&mu); + } + mu.Unlock(); + th.join(); + EXPECT_NE(absl::synchronization_internal::GetOrCreateCurrentThreadIdentity() + ->per_thread_synch.priority, + param.sched_priority); +} +#endif + TEST(Mutex, LockWhenWithTimeoutResult) { // Check various corner cases for Await/LockWhen return value // with always true/always false conditions.