diff --git a/hphp/runtime/ext/xreqsync/ext_xreqsync.cpp b/hphp/runtime/ext/xreqsync/ext_xreqsync.cpp index a685d584c48295..70647dac4ab9b8 100644 --- a/hphp/runtime/ext/xreqsync/ext_xreqsync.cpp +++ b/hphp/runtime/ext/xreqsync/ext_xreqsync.cpp @@ -46,12 +46,8 @@ void XReqAsioBoolEvent::unserialize(TypedValue& result) { /////////////////////////////////////////////////////////////////////////////// // XReqCallback -XReqCallback::XReqCallback(XReqAsioBoolEvent* event) : - m_event(event), - m_invalidated(false), - m_expireAt(AsioSession::TimePoint::min()) {} - -XReqCallback::XReqCallback(XReqAsioBoolEvent* event, AsioSession::TimePoint expireAt) : +XReqCallback::XReqCallback(XReqAsioBoolEvent* event, AsioSession::TimePoint expireAt, req_id waiterId) : + m_waiterId(waiterId), m_event(event), m_invalidated(false), m_expireAt(expireAt) {} @@ -116,6 +112,7 @@ bool XReqSyncImpl::mutex_try_unlock(req_id unlocker) { auto next = std::move(m_waiters.back()); m_waiters.pop_back(); if (next->isValid()) { + m_mutex_owner.store(next->getWaiterId()); next->call(); break; } @@ -284,7 +281,7 @@ c_Awaitable* XReqSync::genLock(int64_t timeout_ms) { auto expiration = timeout_ms > 0 ? AsioSession::TimePoint::clock::now() + std::chrono::milliseconds(timeout_ms) : AsioSession::TimePoint::min(); - callback = std::make_shared(event, expiration); + callback = std::make_shared(event, expiration, m_self_id); // Store the callback in per-request queue for invalidation when we die this->m_waiters.push_back(callback); diff --git a/hphp/runtime/ext/xreqsync/ext_xreqsync.h b/hphp/runtime/ext/xreqsync/ext_xreqsync.h index b7af0c6de025a3..e3b533b0a46b0a 100644 --- a/hphp/runtime/ext/xreqsync/ext_xreqsync.h +++ b/hphp/runtime/ext/xreqsync/ext_xreqsync.h @@ -86,15 +86,16 @@ struct XReqAsioBoolEvent : XReqAsioEvent { */ class XReqCallback { public: - explicit XReqCallback(XReqAsioBoolEvent* event); - XReqCallback(XReqAsioBoolEvent* event, AsioSession::TimePoint expireAt); + XReqCallback(XReqAsioBoolEvent* event, AsioSession::TimePoint expireAt, req_id waiterId); void call(); bool isValid(); void invalidate() { m_invalidated = true; } + req_id getWaiterId() { return m_waiterId; } AsioSession::TimePoint getExpireAt() { return m_expireAt; } static bool earlier(std::shared_ptr x, const std::shared_ptr y); private: + req_id m_waiterId; XReqAsioBoolEvent* m_event; bool m_invalidated; AsioSession::TimePoint m_expireAt; diff --git a/hphp/test/slow/xreqsync/stress.php b/hphp/test/slow/xreqsync/stress.php new file mode 100644 index 00000000000000..846fceba486da5 --- /dev/null +++ b/hphp/test/slow/xreqsync/stress.php @@ -0,0 +1,74 @@ +> +function main(): void { + $ctx = HH\execution_context(); + if ($ctx === 'xbox') { + return; + } + + HH\Asio\join(genMain()); +} + +function getAPCKey(string $lockname) { + return $lockname.'_count'; +} + +async function reportProgress(string $lockname): Awaitable { + $apc_key = getAPCKey($lockname); + + // For up to 20 seconds at 0.1 intervals + $last_finished = 0; + for ($i = 0; $i < 200; $i++) { + $success = false; + $count = apc_fetch($apc_key, inout $success); + if ($count !== $last_finished) { + $last_finished = $count; + echo "Threads finished: $count\n"; + if ($count == 8) { + $seconds = $i * 0.1; + if ($seconds > 15) { + echo "Overall time over 15s\n"; + } else { + echo "Overall time under 15s\n"; + } + return; + } + } + // 100ms + await SleepWaitHandle::create(1000 * 100); + } +} + +async function genMain(): Awaitable { + $lockname = 'lock'.time().rand(); + $apc_key = getAPCKey($lockname); + apc_store($apc_key, 0); + + // 8 Threads, 1s each. Should take 8s to complete. + concurrent { + await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]); + await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]); + await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]); + await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]); + await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]); + await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]); + await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]); + await fb_gen_user_func_array(__FILE__, 'thread', vec[$lockname]); + await reportProgress($lockname); + } +} + +<<__DynamicallyCallable>> +function thread(string $lockname) { + echo "Thread started\n"; + usleep(1000 * 500); + $lock = HH\XReqSync::get($lockname); + HH\Asio\join($lock->genLock()); + echo "Thread got lock\n"; + sleep(1); + echo "Thread releasing lock\n"; + $lock->unlock(); + $success = false; + apc_inc(getAPCKey($lockname), 1, inout $success); +} diff --git a/hphp/test/slow/xreqsync/stress.php.expect b/hphp/test/slow/xreqsync/stress.php.expect new file mode 100644 index 00000000000000..3e308c507125ac --- /dev/null +++ b/hphp/test/slow/xreqsync/stress.php.expect @@ -0,0 +1,33 @@ +Thread started +Thread started +Thread started +Thread started +Thread started +Thread started +Thread started +Thread started +Thread got lock +Thread releasing lock +Thread got lock +Threads finished: 1 +Thread releasing lock +Thread got lock +Threads finished: 2 +Thread releasing lock +Thread got lock +Threads finished: 3 +Thread releasing lock +Thread got lock +Threads finished: 4 +Thread releasing lock +Thread got lock +Threads finished: 5 +Thread releasing lock +Thread got lock +Threads finished: 6 +Thread releasing lock +Thread got lock +Threads finished: 7 +Thread releasing lock +Threads finished: 8 +Overall time under 15s diff --git a/hphp/test/slow/xreqsync/stress.php.norepo b/hphp/test/slow/xreqsync/stress.php.norepo new file mode 100644 index 00000000000000..e69de29bb2d1d6