Skip to content

Commit

Permalink
Fix a bug in assigning mutex to next owner
Browse files Browse the repository at this point in the history
Summary: When we release a "lock" in XReqSync and there is already another pending waiter, we need to set the new waiter as the new owner before waking up the thread with success.

Reviewed By: paulbiss

Differential Revision: D67980743

fbshipit-source-id: b5bdbda85005fef22db62490de5b307788442d82
  • Loading branch information
metaduv authored and facebook-github-bot committed Jan 9, 2025
1 parent 258d4ca commit 9e251bb
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 9 deletions.
11 changes: 4 additions & 7 deletions hphp/runtime/ext/xreqsync/ext_xreqsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,8 @@ void XReqAsioBoolEvent::unserialize(TypedValue& result) {
///////////////////////////////////////////////////////////////////////////////
// XReqCallback

XReqCallback::XReqCallback(XReqAsioBoolEvent* event) :
m_event(event),
m_invalidated(false),
m_expireAt(AsioSession::TimePoint::min()) {}

XReqCallback::XReqCallback(XReqAsioBoolEvent* event, AsioSession::TimePoint expireAt) :
XReqCallback::XReqCallback(XReqAsioBoolEvent* event, AsioSession::TimePoint expireAt, req_id waiterId) :
m_waiterId(waiterId),
m_event(event),
m_invalidated(false),
m_expireAt(expireAt) {}
Expand Down Expand Up @@ -116,6 +112,7 @@ bool XReqSyncImpl::mutex_try_unlock(req_id unlocker) {
auto next = std::move(m_waiters.back());
m_waiters.pop_back();
if (next->isValid()) {
m_mutex_owner.store(next->getWaiterId());
next->call();
break;
}
Expand Down Expand Up @@ -284,7 +281,7 @@ c_Awaitable* XReqSync::genLock(int64_t timeout_ms) {
auto expiration = timeout_ms > 0
? AsioSession::TimePoint::clock::now() + std::chrono::milliseconds(timeout_ms)
: AsioSession::TimePoint::min();
callback = std::make_shared<XReqCallback>(event, expiration);
callback = std::make_shared<XReqCallback>(event, expiration, m_self_id);

// Store the callback in per-request queue for invalidation when we die
this->m_waiters.push_back(callback);
Expand Down
5 changes: 3 additions & 2 deletions hphp/runtime/ext/xreqsync/ext_xreqsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,16 @@ struct XReqAsioBoolEvent : XReqAsioEvent<bool> {
*/
class XReqCallback {
public:
explicit XReqCallback(XReqAsioBoolEvent* event);
XReqCallback(XReqAsioBoolEvent* event, AsioSession::TimePoint expireAt);
XReqCallback(XReqAsioBoolEvent* event, AsioSession::TimePoint expireAt, req_id waiterId);
void call();
bool isValid();
void invalidate() { m_invalidated = true; }
req_id getWaiterId() { return m_waiterId; }
AsioSession::TimePoint getExpireAt() { return m_expireAt; }
static bool earlier(std::shared_ptr<XReqCallback> x, const std::shared_ptr<XReqCallback> y);

private:
req_id m_waiterId;
XReqAsioBoolEvent* m_event;
bool m_invalidated;
AsioSession::TimePoint m_expireAt;
Expand Down
74 changes: 74 additions & 0 deletions hphp/test/slow/xreqsync/stress.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?hh

<<__EntryPoint>>
function main(): void {
$ctx = HH\execution_context();
if ($ctx === 'xbox') {
return;
}

HH\Asio\join(genMain());
}

function getAPCKey(string $lockname) {
return $lockname.'_count';
}

async function reportProgress(string $lockname): Awaitable<void> {
$apc_key = getAPCKey($lockname);

// For up to 20 seconds at 0.1 intervals
$last_finished = 0;
for ($i = 0; $i < 200; $i++) {
$success = false;
$count = apc_fetch($apc_key, inout $success);
if ($count !== $last_finished) {
$last_finished = $count;
echo "Threads finished: $count\n";
if ($count == 8) {
$seconds = $i * 0.1;
if ($seconds > 15) {
echo "Overall time over 15s\n";
} else {
echo "Overall time under 15s\n";
}
return;
}
}
// 100ms
await SleepWaitHandle::create(1000 * 100);
}
}

async function genMain(): Awaitable<void> {
$lockname = 'lock'.time().rand();
$apc_key = getAPCKey($lockname);
apc_store($apc_key, 0);

// 8 Threads, 1s each. Should take 8s to complete.
concurrent {
await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]);
await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]);
await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]);
await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]);
await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]);
await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]);
await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]);
await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]);
await reportProgress($lockname);
}
}

<<__DynamicallyCallable>>
function thread(string $lockname) {
echo "Thread started\n";
usleep(1000 * 500);
$lock = HH\XReqSync::get($lockname);
HH\Asio\join($lock->genLock());
echo "Thread got lock\n";
sleep(1);
echo "Thread releasing lock\n";
$lock->unlock();
$success = false;
apc_inc(getAPCKey($lockname), 1, inout $success);
}
33 changes: 33 additions & 0 deletions hphp/test/slow/xreqsync/stress.php.expect
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
Thread started
Thread started
Thread started
Thread started
Thread started
Thread started
Thread started
Thread started
Thread got lock
Thread releasing lock
Thread got lock
Threads finished: 1
Thread releasing lock
Thread got lock
Threads finished: 2
Thread releasing lock
Thread got lock
Threads finished: 3
Thread releasing lock
Thread got lock
Threads finished: 4
Thread releasing lock
Thread got lock
Threads finished: 5
Thread releasing lock
Thread got lock
Threads finished: 6
Thread releasing lock
Thread got lock
Threads finished: 7
Thread releasing lock
Threads finished: 8
Overall time under 15s
Empty file.

0 comments on commit 9e251bb

Please sign in to comment.