Skip to content

Commit

Permalink
Stratum: check found P2Pool shares one at a time
Browse files Browse the repository at this point in the history
Parallel checks could result in a deadlock
  • Loading branch information
SChernykh committed Oct 31, 2024
1 parent 031a1c2 commit a3006cb
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 67 deletions.
1 change: 1 addition & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

#include <array>
#include <vector>
#include <deque>
#include <string>
#include <algorithm>
#include <atomic>
Expand Down
147 changes: 82 additions & 65 deletions src/stratum_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ StratumServer::StratumServer(p2pool* pool)

m_extraNonce = get_random32();

m_submittedSharesPool.resize(10);
for (size_t i = 0; i < m_submittedSharesPool.size(); ++i) {
SubmittedShare* share = new SubmittedShare{};
ASAN_POISON_MEMORY_REGION(share, sizeof(SubmittedShare));
m_submittedSharesPool[i] = share;
}

uv_async_init_checked(&m_loop, &m_blobsAsync, on_blobs_ready);
m_blobsAsync.data = this;
m_blobsQueue.reserve(2);
Expand Down Expand Up @@ -106,11 +99,6 @@ StratumServer::~StratumServer()
uv_mutex_destroy(&m_showWorkersLock);
uv_mutex_destroy(&m_rngLock);
uv_rwlock_destroy(&m_hashrateDataLock);

for (SubmittedShare* share : m_submittedSharesPool) {
ASAN_UNPOISON_MEMORY_REGION(share, sizeof(SubmittedShare));
delete share;
}
}

void StratumServer::on_block(const BlockTemplate& block)
Expand Down Expand Up @@ -440,72 +428,77 @@ bool StratumServer::on_submit(StratumClient* client, uint32_t id, const char* jo
}
}

SubmittedShare* share;

if (!m_submittedSharesPool.empty()) {
share = m_submittedSharesPool.back();
m_submittedSharesPool.pop_back();
ASAN_UNPOISON_MEMORY_REGION(share, sizeof(SubmittedShare));
}
else {
share = new SubmittedShare{};
}

if (target >= TARGET_4_BYTES_LIMIT) {
// "Low diff share" fix: adjust target to the same value as XMRig would use
target = std::numeric_limits<uint64_t>::max() / (std::numeric_limits<uint32_t>::max() / (target >> 32));
}

share->m_req.data = share;
share->m_server = this;
share->m_client = client;
share->m_clientIPv6 = client->m_isV6;
share->m_clientAddr = client->m_addr;
memcpy(share->m_clientAddrString, client->m_addrString, sizeof(share->m_clientAddrString));
memcpy(share->m_clientCustomUser, client->m_customUser, sizeof(share->m_clientCustomUser));
share->m_clientResetCounter = client->m_resetCounter.load();
share->m_rpcId = client->m_rpcId;
share->m_id = id;
share->m_templateId = template_id;
share->m_nonce = nonce;
share->m_extraNonce = extra_nonce;
share->m_target = target;
share->m_resultHash = resultHash;
share->m_sidechainDifficulty = sidechain_diff;
share->m_mainchainHeight = height;
share->m_sidechainHeight = sidechain_height;
share->m_effort = -1.0;
share->m_timestamp = seconds_since_epoch();
SubmittedShare share{};

share.m_req.data = &share;
share.m_allocated = false;

share.m_server = this;
share.m_client = client;
share.m_clientIPv6 = client->m_isV6;
share.m_clientAddr = client->m_addr;
memcpy(share.m_clientAddrString, client->m_addrString, sizeof(share.m_clientAddrString));
memcpy(share.m_clientCustomUser, client->m_customUser, sizeof(share.m_clientCustomUser));
share.m_clientResetCounter = client->m_resetCounter.load();
share.m_rpcId = client->m_rpcId;
share.m_id = id;
share.m_templateId = template_id;
share.m_nonce = nonce;
share.m_extraNonce = extra_nonce;
share.m_target = target;
share.m_resultHash = resultHash;
share.m_sidechainDifficulty = sidechain_diff;
share.m_mainchainHeight = height;
share.m_sidechainHeight = sidechain_height;
share.m_effort = -1.0;
share.m_timestamp = seconds_since_epoch();

uint64_t rem;
share->m_hashes = (target > 1) ? udiv128(1, 0, target, &rem) : 1;
share->m_highEnoughDifficulty = sidechain_diff.check_pow(resultHash);
share->m_score = 0;
share.m_hashes = (target > 1) ? udiv128(1, 0, target, &rem) : 1;
share.m_highEnoughDifficulty = sidechain_diff.check_pow(resultHash);
share.m_score = 0;

// Don't count shares that were found during sync
const SideChain& side_chain = m_pool->side_chain();
const PoolBlock* tip = side_chain.chainTip();
if (tip && (sidechain_height + side_chain.chain_window_size() < tip->m_sidechainHeight)) {
share->m_highEnoughDifficulty = false;
share.m_highEnoughDifficulty = false;
}

update_auto_diff(client, share->m_timestamp, share->m_hashes);
update_auto_diff(client, share.m_timestamp, share.m_hashes);

// If this share is below sidechain difficulty, process it in this thread because it'll be quick
if (!share->m_highEnoughDifficulty) {
on_share_found(&share->m_req);
on_after_share_found(&share->m_req, 0);
if (!share.m_highEnoughDifficulty) {
on_share_found(&share.m_req);
on_after_share_found(&share.m_req, 0);
return true;
}

// Else switch to a worker thread to check PoW which can take a long time
const int err = uv_queue_work(&m_loop, &share->m_req, on_share_found, on_after_share_found);
if (err) {
LOGERR(1, "uv_queue_work failed, error " << uv_err_name(err));
SubmittedShare* share2 = new SubmittedShare(share);

share2->m_req.data = share2;
share2->m_allocated = true;

m_pendingShareChecks.push_back(share2);
LOGINFO(5, "on_submit: pending share checks count = " << m_pendingShareChecks.size());

// If uv_queue_work failed, process this share here anyway
on_share_found(&share->m_req);
on_after_share_found(&share->m_req, 0);
// If there were no pending share checks, run on_share_found in background
// on_after_share_found will pick the remaining share checks
if (m_pendingShareChecks.size() == 1) {
const int err = uv_queue_work(&m_loop, &share2->m_req, on_share_found, on_after_share_found);
if (err) {
LOGERR(1, "uv_queue_work failed, error " << uv_err_name(err));

// If uv_queue_work failed, process this share here anyway
on_share_found(&share2->m_req);
on_after_share_found(&share2->m_req, 0);
}
}

return true;
Expand Down Expand Up @@ -1017,6 +1010,38 @@ void StratumServer::on_share_found(uv_work_t* req)
void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/)
{
SubmittedShare* share = reinterpret_cast<SubmittedShare*>(req->data);
StratumServer* server = share->m_server;

server->check_event_loop_thread(__func__);

ON_SCOPE_LEAVE([share, server]()
{
if (!share->m_allocated) {
return;
}

auto it = std::find(server->m_pendingShareChecks.begin(), server->m_pendingShareChecks.end(), share);
if (it != server->m_pendingShareChecks.end()) {
server->m_pendingShareChecks.erase(it);
}

delete share;

if (!server->m_pendingShareChecks.empty()) {
SubmittedShare* share2 = server->m_pendingShareChecks.front();

const int err = uv_queue_work(&server->m_loop, &share2->m_req, on_share_found, on_after_share_found);
if (err) {
LOGERR(1, "uv_queue_work failed, error " << uv_err_name(err));

// If uv_queue_work failed, process this share here anyway
server->on_share_found(&share2->m_req);
server->on_after_share_found(&share2->m_req, 0);
}
}

LOGINFO(5, "on_after_share_found: pending share checks count = " << server->m_pendingShareChecks.size());
});

if (share->m_highEnoughDifficulty) {
const char* s = share->m_clientCustomUser;
Expand All @@ -1040,14 +1065,6 @@ void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/)
BACKGROUND_JOB_STOP(StratumServer::on_share_found);
}

StratumServer* server = share->m_server;

ON_SCOPE_LEAVE([share, server]()
{
ASAN_POISON_MEMORY_REGION(share, sizeof(SubmittedShare));
server->m_submittedSharesPool.push_back(share);
});

const bool bad_share = (share->m_result == SubmittedShare::Result::LOW_DIFF) || (share->m_result == SubmittedShare::Result::INVALID_POW);

StratumClient* client = share->m_client;
Expand Down
6 changes: 4 additions & 2 deletions src/stratum_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class StratumServer : public TCPServer
struct SubmittedShare
{
uv_work_t m_req;
bool m_allocated;

StratumServer* m_server;
StratumClient* m_client;
bool m_clientIPv6;
Expand Down Expand Up @@ -180,8 +182,6 @@ class StratumServer : public TCPServer
} m_result;
};

std::vector<SubmittedShare*> m_submittedSharesPool;

struct HashrateData
{
uint64_t m_timestamp;
Expand All @@ -204,6 +204,8 @@ class StratumServer : public TCPServer

std::atomic<uint64_t> m_apiLastUpdateTime;

std::deque<SubmittedShare*> m_pendingShareChecks;

void update_hashrate_data(uint64_t hashes, uint64_t timestamp);
void api_update_local_stats(uint64_t timestamp);

Expand Down

0 comments on commit a3006cb

Please sign in to comment.