From e150738b10c00cfd4b1aabb7514b97e236c78951 Mon Sep 17 00:00:00 2001 From: pasta Date: Fri, 21 Nov 2025 14:57:07 -0600 Subject: [PATCH 1/4] refactor: enhance CSigSharesManager with multi-threaded worker pool and dispatcher --- src/llmq/signing_shares.cpp | 145 +++++++++++++++++++++++++++--------- src/llmq/signing_shares.h | 25 +++++-- 2 files changed, 128 insertions(+), 42 deletions(-) diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index ce1e3d51d05d..20d68c19602a 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -196,25 +196,45 @@ CSigSharesManager::CSigSharesManager(CConnman& connman, CChainState& chainstate, CSigSharesManager::~CSigSharesManager() = default; -void CSigSharesManager::StartWorkerThread() +void CSigSharesManager::Start() { - // can't start new thread if we have one running already - if (workThread.joinable()) { + // can't start if threads are already running + if (housekeepingThread.joinable() || dispatcherThread.joinable()) { assert(false); } - workThread = std::thread(&util::TraceThread, "sigshares", [this] { WorkThreadMain(); }); + // Initialize worker pool + int workerCount = std::clamp(static_cast(std::thread::hardware_concurrency() / 2), 1, 4); + workerPool.resize(workerCount); + RenameThreadPool(workerPool, "sigsh-work"); + + // Start housekeeping thread + housekeepingThread = std::thread(&util::TraceThread, "sigsh-maint", + [this] { HousekeepingThreadMain(); }); + + // Start dispatcher thread + dispatcherThread = std::thread(&util::TraceThread, "sigsh-dispatch", + [this] { WorkDispatcherThreadMain(); }); } -void CSigSharesManager::StopWorkerThread() +void CSigSharesManager::Stop() { // make sure to call InterruptWorkerThread() first if (!workInterrupt) { assert(false); } - if (workThread.joinable()) { - workThread.join(); + // Stop worker pool + workerPool.clear_queue(); + workerPool.stop(true); + + // Join threads + if (housekeepingThread.joinable()) { + housekeepingThread.join(); + } + + if (dispatcherThread.joinable()) { + dispatcherThread.join(); } } @@ -1611,16 +1631,13 @@ void CSigSharesManager::BanNode(NodeId nodeId) nodeState.banned = true; } -void CSigSharesManager::WorkThreadMain() +void CSigSharesManager::HousekeepingThreadMain() { int64_t lastSendTime = 0; while (!workInterrupt) { RemoveBannedNodeStates(); - bool fMoreWork = ProcessPendingSigShares(); - SignPendingSigShares(); - if (TicksSinceEpoch(SystemClock::now()) - lastSendTime > 100) { SendMessages(); lastSendTime = TicksSinceEpoch(SystemClock::now()); @@ -1628,43 +1645,99 @@ void CSigSharesManager::WorkThreadMain() Cleanup(); - // TODO Wakeup when pending signing is needed? - if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { - return; + workInterrupt.sleep_for(std::chrono::milliseconds(100)); + } +} + +void CSigSharesManager::WorkDispatcherThreadMain() +{ + while (!workInterrupt) { + // Dispatch all pending signs (individual tasks) + DispatchPendingSigns(); + + // If there's processing work, spawn a helper worker + DispatchPendingProcessing(); + + // Always sleep briefly between checks + workInterrupt.sleep_for(std::chrono::milliseconds(10)); + } +} + +void CSigSharesManager::DispatchPendingSigns() +{ + // Pop and dispatch ALL pending signs until queue is empty + while (!workInterrupt) { + std::optional work; + { + LOCK(cs_pendingSigns); + if (pendingSigns.empty()) break; + // Move the data out of the vector + work.emplace(std::move(pendingSigns.back())); + pendingSigns.pop_back(); } + + workerPool.push([this, work = std::move(*work)](int) { + SignAndProcessSingleShare(std::move(work)); + }); } } -void CSigSharesManager::AsyncSign(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash) +void CSigSharesManager::DispatchPendingProcessing() { - LOCK(cs_pendingSigns); - pendingSigns.emplace_back(std::move(quorum), id, msgHash); + // Check if there's work, spawn a helper if so + bool hasWork = false; + { + LOCK(cs); + hasWork = std::any_of(nodeStates.begin(), nodeStates.end(), + [](const auto& entry) { + return !entry.second.pendingIncomingSigShares.Empty(); + }); + } + + if (hasWork) { + // Work exists - spawn a worker to help! + workerPool.push([this](int) { + ProcessPendingSigSharesLoop(); + }); + } } -void CSigSharesManager::SignPendingSigShares() +void CSigSharesManager::ProcessPendingSigSharesLoop() { - std::vector v; - WITH_LOCK(cs_pendingSigns, v.swap(pendingSigns)); - - for (const auto& [pQuorum, id, msgHash] : v) { - auto opt_sigShare = CreateSigShare(*pQuorum, id, msgHash); - - if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) { - auto& sigShare = *opt_sigShare; - ProcessSigShare(sigShare, pQuorum); - - if (IsAllMembersConnectedEnabled(pQuorum->params.type, m_sporkman)) { - LOCK(cs); - auto& session = signedSessions[sigShare.GetSignHash()]; - session.sigShare = std::move(sigShare); - session.quorum = pQuorum; - session.nextAttemptTime = 0; - session.attempt = 0; - } + while (!workInterrupt) { + bool moreWork = ProcessPendingSigShares(); + + if (!moreWork) { + return; // No work found, exit immediately } } } +void CSigSharesManager::SignAndProcessSingleShare(PendingSignatureData work) +{ + auto opt_sigShare = CreateSigShare(*work.quorum, work.id, work.msgHash); + + if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) { + auto& sigShare = *opt_sigShare; + ProcessSigShare(sigShare, work.quorum); + + if (IsAllMembersConnectedEnabled(work.quorum->params.type, m_sporkman)) { + LOCK(cs); + auto& session = signedSessions[sigShare.GetSignHash()]; + session.sigShare = std::move(sigShare); + session.quorum = work.quorum; + session.nextAttemptTime = 0; + session.attempt = 0; + } + } +} + +void CSigSharesManager::AsyncSign(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash) +{ + LOCK(cs_pendingSigns); + pendingSigns.emplace_back(std::move(quorum), id, msgHash); +} + std::optional CSigSharesManager::CreateSigShareForSingleMember(const CQuorum& quorum, const uint256& id, const uint256& msgHash) const { cxxtimer::Timer t(true); diff --git a/src/llmq/signing_shares.h b/src/llmq/signing_shares.h index 59fe5caff1b0..8570c3c11435 100644 --- a/src/llmq/signing_shares.h +++ b/src/llmq/signing_shares.h @@ -6,6 +6,7 @@ #define BITCOIN_LLMQ_SIGNING_SHARES_H #include +#include #include #include #include @@ -361,7 +362,7 @@ class CSignedSession int attempt{0}; }; -class CSigSharesManager : public CRecoveredSigsListener +class CSigSharesManager : public llmq::CRecoveredSigsListener { private: static constexpr int64_t SESSION_NEW_SHARES_TIMEOUT{60}; @@ -380,7 +381,9 @@ class CSigSharesManager : public CRecoveredSigsListener Mutex cs; - std::thread workThread; + mutable ctpl::thread_pool workerPool; + std::thread housekeepingThread; + std::thread dispatcherThread; CThreadInterrupt workInterrupt; SigShareMap sigShares GUARDED_BY(cs); @@ -426,8 +429,8 @@ class CSigSharesManager : public CRecoveredSigsListener const CQuorumManager& _qman, const CSporkManager& sporkman); ~CSigSharesManager() override; - void StartWorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!cs); - void StopWorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!cs); + void Start() EXCLUSIVE_LOCKS_REQUIRED(!cs); + void Stop() EXCLUSIVE_LOCKS_REQUIRED(!cs); void RegisterAsRecoveredSigsListener() EXCLUSIVE_LOCKS_REQUIRED(!cs); void UnregisterAsRecoveredSigsListener() EXCLUSIVE_LOCKS_REQUIRED(!cs); void InterruptWorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!cs); @@ -500,8 +503,18 @@ class CSigSharesManager : public CRecoveredSigsListener void CollectSigSharesToSendConcentrated(std::unordered_map>& sigSharesToSend, const std::vector& vNodes) EXCLUSIVE_LOCKS_REQUIRED(cs); void CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce) EXCLUSIVE_LOCKS_REQUIRED(cs); - void SignPendingSigShares() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs); - void WorkThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs); + + // Thread main functions + void HousekeepingThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs); + void WorkDispatcherThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs); + + // Dispatcher functions + void DispatchPendingSigns() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); + void DispatchPendingProcessing() EXCLUSIVE_LOCKS_REQUIRED(!cs); + + // Worker pool task functions + void ProcessPendingSigSharesLoop() EXCLUSIVE_LOCKS_REQUIRED(!cs); + void SignAndProcessSingleShare(PendingSignatureData work) EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs); }; } // namespace llmq From 1c52425d442e50101c81e8e425bdaa90f8e13a62 Mon Sep 17 00:00:00 2001 From: pasta Date: Sat, 22 Nov 2025 09:21:55 -0600 Subject: [PATCH 2/4] refactor: streamline CSigSharesManager's message handling and improve thread safety - Removed unnecessary lastSendTime variable and simplified message sending in HousekeepingThreadMain. - Enhanced DispatchPendingSigns by swapping the entire vector of pending signs to reduce lock contention and improve performance. - Updated ActiveContext to start and stop the share manager more efficiently. --- src/llmq/signing_shares.cpp | 31 ++++++++++++------------------- src/masternode/active/context.cpp | 4 ++-- 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index 20d68c19602a..ca19cd59b20b 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -1633,16 +1633,9 @@ void CSigSharesManager::BanNode(NodeId nodeId) void CSigSharesManager::HousekeepingThreadMain() { - int64_t lastSendTime = 0; - while (!workInterrupt) { RemoveBannedNodeStates(); - - if (TicksSinceEpoch(SystemClock::now()) - lastSendTime > 100) { - SendMessages(); - lastSendTime = TicksSinceEpoch(SystemClock::now()); - } - + SendMessages(); Cleanup(); workInterrupt.sleep_for(std::chrono::milliseconds(100)); @@ -1665,18 +1658,18 @@ void CSigSharesManager::WorkDispatcherThreadMain() void CSigSharesManager::DispatchPendingSigns() { - // Pop and dispatch ALL pending signs until queue is empty - while (!workInterrupt) { - std::optional work; - { - LOCK(cs_pendingSigns); - if (pendingSigns.empty()) break; - // Move the data out of the vector - work.emplace(std::move(pendingSigns.back())); - pendingSigns.pop_back(); - } + // Swap out entire vector to avoid lock thrashing + std::vector signs; + { + LOCK(cs_pendingSigns); + signs.swap(pendingSigns); + } + + // Dispatch all signs to worker pool + for (auto& work : signs) { + if (workInterrupt) break; - workerPool.push([this, work = std::move(*work)](int) { + workerPool.push([this, work = std::move(work)](int) { SignAndProcessSingleShare(std::move(work)); }); } diff --git a/src/masternode/active/context.cpp b/src/masternode/active/context.cpp index c737d8a03d0b..8051d070591f 100644 --- a/src/masternode/active/context.cpp +++ b/src/masternode/active/context.cpp @@ -55,12 +55,12 @@ void ActiveContext::Start(CConnman& connman, PeerManager& peerman) { m_llmq_ctx.qdkgsman->StartThreads(connman, peerman); shareman->RegisterAsRecoveredSigsListener(); - shareman->StartWorkerThread(); + shareman->Start(); } void ActiveContext::Stop() { - shareman->StopWorkerThread(); + shareman->Stop(); shareman->UnregisterAsRecoveredSigsListener(); m_llmq_ctx.qdkgsman->StopThreads(); } From cc702038d0c4d21d58506ca55eea4bb54f22624f Mon Sep 17 00:00:00 2001 From: PastaPastaPasta <6443210+PastaPastaPasta@users.noreply.github.com> Date: Tue, 25 Nov 2025 08:09:12 -0600 Subject: [PATCH 3/4] fix: 12 char thread name --- src/llmq/signing_shares.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index ca19cd59b20b..98baaf13823a 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -213,7 +213,7 @@ void CSigSharesManager::Start() [this] { HousekeepingThreadMain(); }); // Start dispatcher thread - dispatcherThread = std::thread(&util::TraceThread, "sigsh-dispatch", + dispatcherThread = std::thread(&util::TraceThread, "sigsh-dispat", [this] { WorkDispatcherThreadMain(); }); } From f033c54fa69bc1c3235ef0329455d90c924906b2 Mon Sep 17 00:00:00 2001 From: pasta Date: Mon, 8 Dec 2025 20:57:47 -0600 Subject: [PATCH 4/4] fix: call join first --- src/llmq/signing_shares.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index 98baaf13823a..e61a12283a70 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -224,18 +224,17 @@ void CSigSharesManager::Stop() assert(false); } - // Stop worker pool - workerPool.clear_queue(); - workerPool.stop(true); - - // Join threads + // Join threads FIRST to stop any pending push() calls if (housekeepingThread.joinable()) { housekeepingThread.join(); } - if (dispatcherThread.joinable()) { dispatcherThread.join(); } + + // Then stop worker pool (now safe, no more push() calls) + workerPool.clear_queue(); + workerPool.stop(true); } void CSigSharesManager::RegisterAsRecoveredSigsListener()