From a3006cb51a2624b47128d42e17297a6c5f04c7bd Mon Sep 17 00:00:00 2001 From: SChernykh <15806605+SChernykh@users.noreply.github.com> Date: Thu, 31 Oct 2024 12:10:10 +0100 Subject: [PATCH] Stratum: check found P2Pool shares one at a time Parallel checks could result in a deadlock --- src/common.h | 1 + src/stratum_server.cpp | 147 +++++++++++++++++++++++------------------ src/stratum_server.h | 6 +- 3 files changed, 87 insertions(+), 67 deletions(-) diff --git a/src/common.h b/src/common.h index 6851816d..51103985 100644 --- a/src/common.h +++ b/src/common.h @@ -52,6 +52,7 @@ #include #include +#include #include #include #include diff --git a/src/stratum_server.cpp b/src/stratum_server.cpp index 3bd6fa2c..95ecb03b 100644 --- a/src/stratum_server.cpp +++ b/src/stratum_server.cpp @@ -72,13 +72,6 @@ StratumServer::StratumServer(p2pool* pool) m_extraNonce = get_random32(); - m_submittedSharesPool.resize(10); - for (size_t i = 0; i < m_submittedSharesPool.size(); ++i) { - SubmittedShare* share = new SubmittedShare{}; - ASAN_POISON_MEMORY_REGION(share, sizeof(SubmittedShare)); - m_submittedSharesPool[i] = share; - } - uv_async_init_checked(&m_loop, &m_blobsAsync, on_blobs_ready); m_blobsAsync.data = this; m_blobsQueue.reserve(2); @@ -106,11 +99,6 @@ StratumServer::~StratumServer() uv_mutex_destroy(&m_showWorkersLock); uv_mutex_destroy(&m_rngLock); uv_rwlock_destroy(&m_hashrateDataLock); - - for (SubmittedShare* share : m_submittedSharesPool) { - ASAN_UNPOISON_MEMORY_REGION(share, sizeof(SubmittedShare)); - delete share; - } } void StratumServer::on_block(const BlockTemplate& block) @@ -440,72 +428,77 @@ bool StratumServer::on_submit(StratumClient* client, uint32_t id, const char* jo } } - SubmittedShare* share; - - if (!m_submittedSharesPool.empty()) { - share = m_submittedSharesPool.back(); - m_submittedSharesPool.pop_back(); - ASAN_UNPOISON_MEMORY_REGION(share, sizeof(SubmittedShare)); - } - else { - share = new SubmittedShare{}; - } - if (target >= TARGET_4_BYTES_LIMIT) { // "Low diff share" fix: adjust target to the same value as XMRig would use target = std::numeric_limits::max() / (std::numeric_limits::max() / (target >> 32)); } - share->m_req.data = share; - share->m_server = this; - share->m_client = client; - share->m_clientIPv6 = client->m_isV6; - share->m_clientAddr = client->m_addr; - memcpy(share->m_clientAddrString, client->m_addrString, sizeof(share->m_clientAddrString)); - memcpy(share->m_clientCustomUser, client->m_customUser, sizeof(share->m_clientCustomUser)); - share->m_clientResetCounter = client->m_resetCounter.load(); - share->m_rpcId = client->m_rpcId; - share->m_id = id; - share->m_templateId = template_id; - share->m_nonce = nonce; - share->m_extraNonce = extra_nonce; - share->m_target = target; - share->m_resultHash = resultHash; - share->m_sidechainDifficulty = sidechain_diff; - share->m_mainchainHeight = height; - share->m_sidechainHeight = sidechain_height; - share->m_effort = -1.0; - share->m_timestamp = seconds_since_epoch(); + SubmittedShare share{}; + + share.m_req.data = &share; + share.m_allocated = false; + + share.m_server = this; + share.m_client = client; + share.m_clientIPv6 = client->m_isV6; + share.m_clientAddr = client->m_addr; + memcpy(share.m_clientAddrString, client->m_addrString, sizeof(share.m_clientAddrString)); + memcpy(share.m_clientCustomUser, client->m_customUser, sizeof(share.m_clientCustomUser)); + share.m_clientResetCounter = client->m_resetCounter.load(); + share.m_rpcId = client->m_rpcId; + share.m_id = id; + share.m_templateId = template_id; + share.m_nonce = nonce; + share.m_extraNonce = extra_nonce; + share.m_target = target; + share.m_resultHash = resultHash; + share.m_sidechainDifficulty = sidechain_diff; + share.m_mainchainHeight = height; + share.m_sidechainHeight = sidechain_height; + share.m_effort = -1.0; + share.m_timestamp = seconds_since_epoch(); uint64_t rem; - share->m_hashes = (target > 1) ? udiv128(1, 0, target, &rem) : 1; - share->m_highEnoughDifficulty = sidechain_diff.check_pow(resultHash); - share->m_score = 0; + share.m_hashes = (target > 1) ? udiv128(1, 0, target, &rem) : 1; + share.m_highEnoughDifficulty = sidechain_diff.check_pow(resultHash); + share.m_score = 0; // Don't count shares that were found during sync const SideChain& side_chain = m_pool->side_chain(); const PoolBlock* tip = side_chain.chainTip(); if (tip && (sidechain_height + side_chain.chain_window_size() < tip->m_sidechainHeight)) { - share->m_highEnoughDifficulty = false; + share.m_highEnoughDifficulty = false; } - update_auto_diff(client, share->m_timestamp, share->m_hashes); + update_auto_diff(client, share.m_timestamp, share.m_hashes); // If this share is below sidechain difficulty, process it in this thread because it'll be quick - if (!share->m_highEnoughDifficulty) { - on_share_found(&share->m_req); - on_after_share_found(&share->m_req, 0); + if (!share.m_highEnoughDifficulty) { + on_share_found(&share.m_req); + on_after_share_found(&share.m_req, 0); return true; } // Else switch to a worker thread to check PoW which can take a long time - const int err = uv_queue_work(&m_loop, &share->m_req, on_share_found, on_after_share_found); - if (err) { - LOGERR(1, "uv_queue_work failed, error " << uv_err_name(err)); + SubmittedShare* share2 = new SubmittedShare(share); + + share2->m_req.data = share2; + share2->m_allocated = true; + + m_pendingShareChecks.push_back(share2); + LOGINFO(5, "on_submit: pending share checks count = " << m_pendingShareChecks.size()); - // If uv_queue_work failed, process this share here anyway - on_share_found(&share->m_req); - on_after_share_found(&share->m_req, 0); + // If there were no pending share checks, run on_share_found in background + // on_after_share_found will pick the remaining share checks + if (m_pendingShareChecks.size() == 1) { + const int err = uv_queue_work(&m_loop, &share2->m_req, on_share_found, on_after_share_found); + if (err) { + LOGERR(1, "uv_queue_work failed, error " << uv_err_name(err)); + + // If uv_queue_work failed, process this share here anyway + on_share_found(&share2->m_req); + on_after_share_found(&share2->m_req, 0); + } } return true; @@ -1017,6 +1010,38 @@ void StratumServer::on_share_found(uv_work_t* req) void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/) { SubmittedShare* share = reinterpret_cast(req->data); + StratumServer* server = share->m_server; + + server->check_event_loop_thread(__func__); + + ON_SCOPE_LEAVE([share, server]() + { + if (!share->m_allocated) { + return; + } + + auto it = std::find(server->m_pendingShareChecks.begin(), server->m_pendingShareChecks.end(), share); + if (it != server->m_pendingShareChecks.end()) { + server->m_pendingShareChecks.erase(it); + } + + delete share; + + if (!server->m_pendingShareChecks.empty()) { + SubmittedShare* share2 = server->m_pendingShareChecks.front(); + + const int err = uv_queue_work(&server->m_loop, &share2->m_req, on_share_found, on_after_share_found); + if (err) { + LOGERR(1, "uv_queue_work failed, error " << uv_err_name(err)); + + // If uv_queue_work failed, process this share here anyway + server->on_share_found(&share2->m_req); + server->on_after_share_found(&share2->m_req, 0); + } + } + + LOGINFO(5, "on_after_share_found: pending share checks count = " << server->m_pendingShareChecks.size()); + }); if (share->m_highEnoughDifficulty) { const char* s = share->m_clientCustomUser; @@ -1040,14 +1065,6 @@ void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/) BACKGROUND_JOB_STOP(StratumServer::on_share_found); } - StratumServer* server = share->m_server; - - ON_SCOPE_LEAVE([share, server]() - { - ASAN_POISON_MEMORY_REGION(share, sizeof(SubmittedShare)); - server->m_submittedSharesPool.push_back(share); - }); - const bool bad_share = (share->m_result == SubmittedShare::Result::LOW_DIFF) || (share->m_result == SubmittedShare::Result::INVALID_POW); StratumClient* client = share->m_client; diff --git a/src/stratum_server.h b/src/stratum_server.h index babbb243..e92d875f 100644 --- a/src/stratum_server.h +++ b/src/stratum_server.h @@ -147,6 +147,8 @@ class StratumServer : public TCPServer struct SubmittedShare { uv_work_t m_req; + bool m_allocated; + StratumServer* m_server; StratumClient* m_client; bool m_clientIPv6; @@ -180,8 +182,6 @@ class StratumServer : public TCPServer } m_result; }; - std::vector m_submittedSharesPool; - struct HashrateData { uint64_t m_timestamp; @@ -204,6 +204,8 @@ class StratumServer : public TCPServer std::atomic m_apiLastUpdateTime; + std::deque m_pendingShareChecks; + void update_hashrate_data(uint64_t hashes, uint64_t timestamp); void api_update_local_stats(uint64_t timestamp);