diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index ce1e3d51d05d..e61a12283a70 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -196,26 +196,45 @@ CSigSharesManager::CSigSharesManager(CConnman& connman, CChainState& chainstate, CSigSharesManager::~CSigSharesManager() = default; -void CSigSharesManager::StartWorkerThread() +void CSigSharesManager::Start() { - // can't start new thread if we have one running already - if (workThread.joinable()) { + // can't start if threads are already running + if (housekeepingThread.joinable() || dispatcherThread.joinable()) { assert(false); } - workThread = std::thread(&util::TraceThread, "sigshares", [this] { WorkThreadMain(); }); + // Initialize worker pool + int workerCount = std::clamp(static_cast(std::thread::hardware_concurrency() / 2), 1, 4); + workerPool.resize(workerCount); + RenameThreadPool(workerPool, "sigsh-work"); + + // Start housekeeping thread + housekeepingThread = std::thread(&util::TraceThread, "sigsh-maint", + [this] { HousekeepingThreadMain(); }); + + // Start dispatcher thread + dispatcherThread = std::thread(&util::TraceThread, "sigsh-dispat", + [this] { WorkDispatcherThreadMain(); }); } -void CSigSharesManager::StopWorkerThread() +void CSigSharesManager::Stop() { // make sure to call InterruptWorkerThread() first if (!workInterrupt) { assert(false); } - if (workThread.joinable()) { - workThread.join(); + // Join threads FIRST to stop any pending push() calls + if (housekeepingThread.joinable()) { + housekeepingThread.join(); } + if (dispatcherThread.joinable()) { + dispatcherThread.join(); + } + + // Then stop worker pool (now safe, no more push() calls) + workerPool.clear_queue(); + workerPool.stop(true); } void CSigSharesManager::RegisterAsRecoveredSigsListener() @@ -1611,60 +1630,106 @@ void CSigSharesManager::BanNode(NodeId nodeId) nodeState.banned = true; } -void CSigSharesManager::WorkThreadMain() +void CSigSharesManager::HousekeepingThreadMain() { - int64_t lastSendTime = 0; - while (!workInterrupt) { RemoveBannedNodeStates(); + SendMessages(); + Cleanup(); - bool fMoreWork = ProcessPendingSigShares(); - SignPendingSigShares(); + workInterrupt.sleep_for(std::chrono::milliseconds(100)); + } +} - if (TicksSinceEpoch(SystemClock::now()) - lastSendTime > 100) { - SendMessages(); - lastSendTime = TicksSinceEpoch(SystemClock::now()); - } +void CSigSharesManager::WorkDispatcherThreadMain() +{ + while (!workInterrupt) { + // Dispatch all pending signs (individual tasks) + DispatchPendingSigns(); - Cleanup(); + // If there's processing work, spawn a helper worker + DispatchPendingProcessing(); - // TODO Wakeup when pending signing is needed? - if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { - return; - } + // Always sleep briefly between checks + workInterrupt.sleep_for(std::chrono::milliseconds(10)); } } -void CSigSharesManager::AsyncSign(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash) +void CSigSharesManager::DispatchPendingSigns() { - LOCK(cs_pendingSigns); - pendingSigns.emplace_back(std::move(quorum), id, msgHash); + // Swap out entire vector to avoid lock thrashing + std::vector signs; + { + LOCK(cs_pendingSigns); + signs.swap(pendingSigns); + } + + // Dispatch all signs to worker pool + for (auto& work : signs) { + if (workInterrupt) break; + + workerPool.push([this, work = std::move(work)](int) { + SignAndProcessSingleShare(std::move(work)); + }); + } } -void CSigSharesManager::SignPendingSigShares() +void CSigSharesManager::DispatchPendingProcessing() { - std::vector v; - WITH_LOCK(cs_pendingSigns, v.swap(pendingSigns)); - - for (const auto& [pQuorum, id, msgHash] : v) { - auto opt_sigShare = CreateSigShare(*pQuorum, id, msgHash); - - if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) { - auto& sigShare = *opt_sigShare; - ProcessSigShare(sigShare, pQuorum); - - if (IsAllMembersConnectedEnabled(pQuorum->params.type, m_sporkman)) { - LOCK(cs); - auto& session = signedSessions[sigShare.GetSignHash()]; - session.sigShare = std::move(sigShare); - session.quorum = pQuorum; - session.nextAttemptTime = 0; - session.attempt = 0; - } + // Check if there's work, spawn a helper if so + bool hasWork = false; + { + LOCK(cs); + hasWork = std::any_of(nodeStates.begin(), nodeStates.end(), + [](const auto& entry) { + return !entry.second.pendingIncomingSigShares.Empty(); + }); + } + + if (hasWork) { + // Work exists - spawn a worker to help! + workerPool.push([this](int) { + ProcessPendingSigSharesLoop(); + }); + } +} + +void CSigSharesManager::ProcessPendingSigSharesLoop() +{ + while (!workInterrupt) { + bool moreWork = ProcessPendingSigShares(); + + if (!moreWork) { + return; // No work found, exit immediately + } + } +} + +void CSigSharesManager::SignAndProcessSingleShare(PendingSignatureData work) +{ + auto opt_sigShare = CreateSigShare(*work.quorum, work.id, work.msgHash); + + if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) { + auto& sigShare = *opt_sigShare; + ProcessSigShare(sigShare, work.quorum); + + if (IsAllMembersConnectedEnabled(work.quorum->params.type, m_sporkman)) { + LOCK(cs); + auto& session = signedSessions[sigShare.GetSignHash()]; + session.sigShare = std::move(sigShare); + session.quorum = work.quorum; + session.nextAttemptTime = 0; + session.attempt = 0; } } } +void CSigSharesManager::AsyncSign(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash) +{ + LOCK(cs_pendingSigns); + pendingSigns.emplace_back(std::move(quorum), id, msgHash); +} + std::optional CSigSharesManager::CreateSigShareForSingleMember(const CQuorum& quorum, const uint256& id, const uint256& msgHash) const { cxxtimer::Timer t(true); diff --git a/src/llmq/signing_shares.h b/src/llmq/signing_shares.h index 59fe5caff1b0..8570c3c11435 100644 --- a/src/llmq/signing_shares.h +++ b/src/llmq/signing_shares.h @@ -6,6 +6,7 @@ #define BITCOIN_LLMQ_SIGNING_SHARES_H #include +#include #include #include #include @@ -361,7 +362,7 @@ class CSignedSession int attempt{0}; }; -class CSigSharesManager : public CRecoveredSigsListener +class CSigSharesManager : public llmq::CRecoveredSigsListener { private: static constexpr int64_t SESSION_NEW_SHARES_TIMEOUT{60}; @@ -380,7 +381,9 @@ class CSigSharesManager : public CRecoveredSigsListener Mutex cs; - std::thread workThread; + mutable ctpl::thread_pool workerPool; + std::thread housekeepingThread; + std::thread dispatcherThread; CThreadInterrupt workInterrupt; SigShareMap sigShares GUARDED_BY(cs); @@ -426,8 +429,8 @@ class CSigSharesManager : public CRecoveredSigsListener const CQuorumManager& _qman, const CSporkManager& sporkman); ~CSigSharesManager() override; - void StartWorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!cs); - void StopWorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!cs); + void Start() EXCLUSIVE_LOCKS_REQUIRED(!cs); + void Stop() EXCLUSIVE_LOCKS_REQUIRED(!cs); void RegisterAsRecoveredSigsListener() EXCLUSIVE_LOCKS_REQUIRED(!cs); void UnregisterAsRecoveredSigsListener() EXCLUSIVE_LOCKS_REQUIRED(!cs); void InterruptWorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!cs); @@ -500,8 +503,18 @@ class CSigSharesManager : public CRecoveredSigsListener void CollectSigSharesToSendConcentrated(std::unordered_map>& sigSharesToSend, const std::vector& vNodes) EXCLUSIVE_LOCKS_REQUIRED(cs); void CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce) EXCLUSIVE_LOCKS_REQUIRED(cs); - void SignPendingSigShares() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs); - void WorkThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs); + + // Thread main functions + void HousekeepingThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs); + void WorkDispatcherThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs); + + // Dispatcher functions + void DispatchPendingSigns() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); + void DispatchPendingProcessing() EXCLUSIVE_LOCKS_REQUIRED(!cs); + + // Worker pool task functions + void ProcessPendingSigSharesLoop() EXCLUSIVE_LOCKS_REQUIRED(!cs); + void SignAndProcessSingleShare(PendingSignatureData work) EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs); }; } // namespace llmq diff --git a/src/masternode/active/context.cpp b/src/masternode/active/context.cpp index c737d8a03d0b..8051d070591f 100644 --- a/src/masternode/active/context.cpp +++ b/src/masternode/active/context.cpp @@ -55,12 +55,12 @@ void ActiveContext::Start(CConnman& connman, PeerManager& peerman) { m_llmq_ctx.qdkgsman->StartThreads(connman, peerman); shareman->RegisterAsRecoveredSigsListener(); - shareman->StartWorkerThread(); + shareman->Start(); } void ActiveContext::Stop() { - shareman->StopWorkerThread(); + shareman->Stop(); shareman->UnregisterAsRecoveredSigsListener(); m_llmq_ctx.qdkgsman->StopThreads(); }