From ed1578f29fb5c06493b9c440e0668ab9d2ac38ad Mon Sep 17 00:00:00 2001 From: LyokoJeremie Date: Fri, 1 Sep 2023 02:50:12 +0800 Subject: [PATCH] use std::atomic_ --- src/StateMonitorServer.cpp | 127 +++++++++++++++++++++------------ src/TcpRelaySession.cpp | 2 + src/TcpRelayStatisticsInfo.cpp | 8 ++- src/TcpRelayStatisticsInfo.h | 13 ++-- src/UpstreamPool.cpp | 20 +++--- src/UpstreamPool.h | 22 +++--- 6 files changed, 118 insertions(+), 74 deletions(-) diff --git a/src/StateMonitorServer.cpp b/src/StateMonitorServer.cpp index f28f6a7..9b508ea 100644 --- a/src/StateMonitorServer.cpp +++ b/src/StateMonitorServer.cpp @@ -108,24 +108,24 @@ std::string HttpConnectSession::createJsonString() { n.put("name", a->name); n.put("host", a->host); n.put("port", a->port); - n.put("isOffline", a->isOffline); - n.put("lastConnectFailed", a->lastConnectFailed); - n.put("isManualDisable", a->isManualDisable); - n.put("disable", a->disable); + n.put("isOffline", a->isOffline.load()); + n.put("lastConnectFailed", a->lastConnectFailed.load()); + n.put("isManualDisable", a->isManualDisable.load()); + n.put("disable", a->disable.load()); n.put("lastConnectCheckResult", a->lastConnectCheckResult); n.put("connectCount", a->connectCount.load()); n.put("lastOnlinePing", ( - a->lastOnlinePing.count() != -1 ? - boost::lexical_cast(a->lastOnlinePing.count()) : "")); + a->lastOnlinePing.load().count() != -1 ? + boost::lexical_cast(a->lastOnlinePing.load().count()) : "")); n.put("lastConnectPing", ( - a->lastConnectPing.count() != -1 ? - boost::lexical_cast(a->lastConnectPing.count()) : "")); + a->lastConnectPing.load().count() != -1 ? + boost::lexical_cast(a->lastConnectPing.load().count()) : "")); n.put("lastOnlineTime", ( - a->lastOnlineTime.has_value() ? - printUpstreamTimePoint(a->lastOnlineTime.value()) : "")); + a->lastOnlineTime.load() != UpstreamTimePoint{} ? + printUpstreamTimePoint(a->lastOnlineTime.load()) : "")); n.put("lastConnectTime", ( - a->lastConnectTime.has_value() ? - printUpstreamTimePoint(a->lastConnectTime.value()) : "")); + a->lastConnectTime.load() != UpstreamTimePoint{} ? + printUpstreamTimePoint(a->lastConnectTime.load()) : "")); n.put("isWork", upstreamPool->checkServer(a)); if (info) { @@ -194,7 +194,10 @@ std::string HttpConnectSession::createJsonString() { n.put("byteUpChangeMax", a.second->byteUpChangeMax); n.put("byteDownChangeMax", a.second->byteDownChangeMax); n.put("rule", ruleEnum2string(a.second->rule)); - n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex); + { + std::lock_guard lgLM{a.second->lastUseUpstreamIndexMtx}; + n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex); + } pS.push_back(std::make_pair("", n)); } @@ -214,7 +217,10 @@ std::string HttpConnectSession::createJsonString() { n.put("byteUpChangeMax", a.second->byteUpChangeMax); n.put("byteDownChangeMax", a.second->byteDownChangeMax); n.put("rule", ruleEnum2string(a.second->rule)); - n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex); + { + std::lock_guard lgLM{a.second->lastUseUpstreamIndexMtx}; + n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex); + } pC.push_back(std::make_pair("", n)); } @@ -234,7 +240,10 @@ std::string HttpConnectSession::createJsonString() { n.put("byteUpChangeMax", a.second->byteUpChangeMax); n.put("byteDownChangeMax", a.second->byteDownChangeMax); n.put("rule", ruleEnum2string(a.second->rule)); - n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex); + { + std::lock_guard lgLM{a.second->lastUseUpstreamIndexMtx}; + n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex); + } pL.push_back(std::make_pair("", n)); } @@ -255,7 +264,10 @@ std::string HttpConnectSession::createJsonString() { n.put("byteUpChangeMax", a.second->byteUpChangeMax); n.put("byteDownChangeMax", a.second->byteDownChangeMax); n.put("rule", ruleEnum2string(a.second->rule)); - n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex); + { + std::lock_guard lgLM{a.second->lastUseUpstreamIndexMtx}; + n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex); + } auto au = am->getById(a.first); boost::property_tree::ptree pAU; @@ -396,6 +408,7 @@ void HttpConnectSession::path_op(HttpConnectSession::QueryPairsType &queryPairs) if (ptr->getStatisticsInfo()) { auto info = ptr->getStatisticsInfo()->getInfoClient(target->second); if (info) { + std::lock_guard lg{info->lastUseUpstreamIndexMtx}; info->lastUseUpstreamIndex = index; } } @@ -403,6 +416,16 @@ void HttpConnectSession::path_op(HttpConnectSession::QueryPairsType &queryPairs) if (ptr->getStatisticsInfo()) { auto info = ptr->getStatisticsInfo()->getInfoListen(target->second); if (info) { + std::lock_guard lg{info->lastUseUpstreamIndexMtx}; + info->lastUseUpstreamIndex = index; + } + } + } else if ("auth" == t) { + if (ptr->getStatisticsInfo()) { + auto info = ptr->getStatisticsInfo()->getInfoAuthUser( + boost::lexical_cast(target->second)); + if (info) { + std::lock_guard lg{info->lastUseUpstreamIndexMtx}; info->lastUseUpstreamIndex = index; } } @@ -446,9 +469,9 @@ void HttpConnectSession::path_op(HttpConnectSession::QueryPairsType &queryPairs) for (auto &a: upstreamPool->pool()) { a->isOffline = false; a->lastConnectFailed = false; - a->lastOnlineTime.reset(); + a->lastOnlineTime = UpstreamTimePoint{}; a->lastOnlinePing = std::chrono::milliseconds{-1}; - a->lastConnectTime.reset(); + a->lastConnectTime = UpstreamTimePoint{}; a->lastConnectPing = std::chrono::milliseconds{-1}; } // recheck @@ -643,24 +666,24 @@ void HttpConnectSession::path_per_info(HttpConnectSession::QueryPairsType &query n.put("name", a->name); n.put("host", a->host); n.put("port", a->port); - n.put("isOffline", a->isOffline); - n.put("lastConnectFailed", a->lastConnectFailed); - n.put("isManualDisable", a->isManualDisable); - n.put("disable", a->disable); + n.put("isOffline", a->isOffline.load()); + n.put("lastConnectFailed", a->lastConnectFailed.load()); + n.put("isManualDisable", a->isManualDisable.load()); + n.put("disable", a->disable.load()); n.put("lastConnectCheckResult", a->lastConnectCheckResult); n.put("connectCount", a->connectCount.load()); n.put("lastOnlinePing", ( - a->lastOnlinePing.count() != -1 ? - boost::lexical_cast(a->lastOnlinePing.count()) : "")); + a->lastOnlinePing.load().count() != -1 ? + boost::lexical_cast(a->lastOnlinePing.load().count()) : "")); n.put("lastConnectPing", ( - a->lastConnectPing.count() != -1 ? - boost::lexical_cast(a->lastConnectPing.count()) : "")); + a->lastConnectPing.load().count() != -1 ? + boost::lexical_cast(a->lastConnectPing.load().count()) : "")); n.put("lastOnlineTime", ( - a->lastOnlineTime.has_value() ? - printUpstreamTimePoint(a->lastOnlineTime.value()) : "")); + a->lastOnlineTime.load() != UpstreamTimePoint{} ? + printUpstreamTimePoint(a->lastOnlineTime.load()) : "")); n.put("lastConnectTime", ( - a->lastConnectTime.has_value() ? - printUpstreamTimePoint(a->lastConnectTime.value()) : "")); + a->lastConnectTime.load() != UpstreamTimePoint{} ? + printUpstreamTimePoint(a->lastConnectTime.load()) : "")); n.put("isWork", upstreamPool->checkServer(a)); if (info) { @@ -710,7 +733,10 @@ void HttpConnectSession::path_per_info(HttpConnectSession::QueryPairsType &query n.put("byteUpChangeMax", a.second->byteUpChangeMax); n.put("byteDownChangeMax", a.second->byteDownChangeMax); n.put("rule", ruleEnum2string(a.second->rule)); - n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex); + { + std::lock_guard lgLM{a.second->lastUseUpstreamIndexMtx}; + n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex); + } pS.push_back(std::make_pair("", n)); } @@ -755,7 +781,10 @@ void HttpConnectSession::path_per_info(HttpConnectSession::QueryPairsType &query baseInfo.put("byteUpChangeMax", in->byteUpChangeMax); baseInfo.put("byteDownChangeMax", in->byteDownChangeMax); baseInfo.put("rule", ruleEnum2string(in->rule)); - baseInfo.put("lastUseUpstreamIndex", in->lastUseUpstreamIndex); + { + std::lock_guard lgLM{in->lastUseUpstreamIndexMtx}; + baseInfo.put("lastUseUpstreamIndex", in->lastUseUpstreamIndex); + } root.add_child("BaseInfo", baseInfo); } } @@ -798,7 +827,10 @@ void HttpConnectSession::path_per_info(HttpConnectSession::QueryPairsType &query baseInfo.put("byteUpChangeMax", in->byteUpChangeMax); baseInfo.put("byteDownChangeMax", in->byteDownChangeMax); baseInfo.put("rule", ruleEnum2string(in->rule)); - baseInfo.put("lastUseUpstreamIndex", in->lastUseUpstreamIndex); + { + std::lock_guard lgLM{in->lastUseUpstreamIndexMtx}; + baseInfo.put("lastUseUpstreamIndex", in->lastUseUpstreamIndex); + } root.add_child("BaseInfo", baseInfo); } } @@ -841,7 +873,10 @@ void HttpConnectSession::path_per_info(HttpConnectSession::QueryPairsType &query baseInfo.put("byteUpChangeMax", in->byteUpChangeMax); baseInfo.put("byteDownChangeMax", in->byteDownChangeMax); baseInfo.put("rule", ruleEnum2string(in->rule)); - baseInfo.put("lastUseUpstreamIndex", in->lastUseUpstreamIndex); + { + std::lock_guard lgLM{in->lastUseUpstreamIndexMtx}; + baseInfo.put("lastUseUpstreamIndex", in->lastUseUpstreamIndex); + } root.add_child("BaseInfo", baseInfo); } } @@ -899,24 +934,24 @@ boost::property_tree::ptree HttpConnectSession::makeUpstreamServerDI(const Upstr n.put("name", a->name); n.put("host", a->host); n.put("port", a->port); - n.put("isOffline", a->isOffline); - n.put("lastConnectFailed", a->lastConnectFailed); - n.put("isManualDisable", a->isManualDisable); - n.put("disable", a->disable); + n.put("isOffline", a->isOffline.load()); + n.put("lastConnectFailed", a->lastConnectFailed.load()); + n.put("isManualDisable", a->isManualDisable.load()); + n.put("disable", a->disable.load()); n.put("lastConnectCheckResult", a->lastConnectCheckResult); n.put("connectCount", a->connectCount.load()); n.put("lastOnlinePing", ( - a->lastOnlinePing.count() != -1 ? - boost::lexical_cast(a->lastOnlinePing.count()) : "")); + a->lastOnlinePing.load().count() != -1 ? + boost::lexical_cast(a->lastOnlinePing.load().count()) : "")); n.put("lastConnectPing", ( - a->lastConnectPing.count() != -1 ? - boost::lexical_cast(a->lastConnectPing.count()) : "")); + a->lastConnectPing.load().count() != -1 ? + boost::lexical_cast(a->lastConnectPing.load().count()) : "")); n.put("lastOnlineTime", ( - a->lastOnlineTime.has_value() ? - printUpstreamTimePoint(a->lastOnlineTime.value()) : "")); + a->lastOnlineTime.load() != UpstreamTimePoint{} ? + printUpstreamTimePoint(a->lastOnlineTime.load()) : "")); n.put("lastConnectTime", ( - a->lastConnectTime.has_value() ? - printUpstreamTimePoint(a->lastConnectTime.value()) : "")); + a->lastConnectTime.load() != UpstreamTimePoint{} ? + printUpstreamTimePoint(a->lastConnectTime.load()) : "")); n.put("isWork", upstreamPool->checkServer(a)); auto info = pT->getStatisticsInfo(); diff --git a/src/TcpRelaySession.cpp b/src/TcpRelaySession.cpp index b5c9a51..af7a7b5 100644 --- a/src/TcpRelaySession.cpp +++ b/src/TcpRelaySession.cpp @@ -104,6 +104,7 @@ void TcpRelaySession::try_connect_upstream() { if (ic && ic->rule != RuleEnum::inherit) { BOOST_LOG_S5B_ID(relayId, trace) << "TcpRelaySession::try_connect_upstream() get by client getServerByHint"; + std::lock_guard lg{ic->lastUseUpstreamIndexMtx}; s = upstreamPool->getServerByHint(ic->rule, ic->lastUseUpstreamIndex, relayId); } if (!s) { @@ -112,6 +113,7 @@ void TcpRelaySession::try_connect_upstream() { if (il && il->rule != RuleEnum::inherit) { BOOST_LOG_S5B_ID(relayId, trace) << "TcpRelaySession::try_connect_upstream() get by listen getServerByHint"; + std::lock_guard lg{il->lastUseUpstreamIndexMtx}; s = upstreamPool->getServerByHint(il->rule, il->lastUseUpstreamIndex, relayId); } } diff --git a/src/TcpRelayStatisticsInfo.cpp b/src/TcpRelayStatisticsInfo.cpp index d66164c..1f1938e 100644 --- a/src/TcpRelayStatisticsInfo.cpp +++ b/src/TcpRelayStatisticsInfo.cpp @@ -56,10 +56,10 @@ void TcpRelayStatisticsInfo::Info::calcByte() { byteUpLast = newByteUp; byteDownLast = newByteDown; if (byteUpChange > byteUpChangeMax) { - byteUpChangeMax = byteUpChange; + byteUpChangeMax = byteUpChange.load(); } if (byteDownChange > byteDownChangeMax) { - byteDownChangeMax = byteDownChange; + byteDownChangeMax = byteDownChange.load(); } } @@ -130,6 +130,7 @@ void TcpRelayStatisticsInfo::addSession(size_t index, const std::shared_ptrsessions.get().end()); n->sessions.emplace_back(s); if (auto ns = ptr->getNowServer()) { + std::lock_guard lgLM{n->lastUseUpstreamIndexMtx}; n->lastUseUpstreamIndex = ns->index; } } @@ -159,6 +160,7 @@ void TcpRelayStatisticsInfo::addSessionClient(const std::shared_ptrsessions.get().end()); n->sessions.emplace_back(s); if (auto ns = ptr->getNowServer()) { + std::lock_guard lgLM{n->lastUseUpstreamIndexMtx}; n->lastUseUpstreamIndex = ns->index; } } @@ -188,6 +190,7 @@ void TcpRelayStatisticsInfo::addSessionListen(const std::shared_ptrsessions.get().end()); n->sessions.emplace_back(s); if (auto ns = ptr->getNowServer()) { + std::lock_guard lgLM{n->lastUseUpstreamIndexMtx}; n->lastUseUpstreamIndex = ns->index; } } @@ -220,6 +223,7 @@ void TcpRelayStatisticsInfo::addSessionAuthUser(const std::shared_ptrsessions.get().end()); n->sessions.emplace_back(s); // if (auto ns = ptr->getNowServer()) { +// std::lock_guard lgLM{n->lastUseUpstreamIndexMtx}; // n->lastUseUpstreamIndex = ns->index; // } } diff --git a/src/TcpRelayStatisticsInfo.h b/src/TcpRelayStatisticsInfo.h index 2d6628d..c69971c 100644 --- a/src/TcpRelayStatisticsInfo.h +++ b/src/TcpRelayStatisticsInfo.h @@ -147,16 +147,17 @@ class TcpRelayStatisticsInfo : public std::enable_shared_from_thisisManualDisable; } return u - && u->lastConnectTime.has_value() - && u->lastOnlineTime.has_value() + && u->lastConnectTime.load().time_since_epoch() != UpstreamTimePoint::duration::zero() + && u->lastOnlineTime.load().time_since_epoch() != UpstreamTimePoint::duration::zero() && !u->lastConnectFailed && !u->isOffline && !u->isManualDisable; @@ -205,7 +205,7 @@ auto UpstreamPool::getServerByHint( case RuleEnum::change_by_time: { UpstreamTimePoint t = UpstreamTimePointNow(); const auto &d = _configLoader->config.serverChangeTime; - if ((t - lastChangeUpstreamTime) > d) { + if ((t - lastChangeUpstreamTime.load()) > d) { s = getNextServer(_lastUseUpstreamIndex); lastChangeUpstreamTime = UpstreamTimePointNow(); } else { @@ -309,11 +309,11 @@ std::string UpstreamPool::print() { << "\t" << "isOffline :" << r->isOffline << "\n" << "\t" << "lastConnectFailed :" << r->lastConnectFailed << "\n" << "\t" << "lastOnlineTime :" << ( - r->lastOnlineTime.has_value() ? - printUpstreamTimePoint(r->lastOnlineTime.value()) : "empty") << "\n" + r->lastOnlineTime.load() != UpstreamTimePoint{} ? + printUpstreamTimePoint(r->lastOnlineTime.load()) : "empty") << "\n" << "\t" << "lastConnectTime :" << ( - r->lastConnectTime.has_value() ? - printUpstreamTimePoint(r->lastConnectTime.value()) : "empty") << "\n" + r->lastConnectTime.load() != UpstreamTimePoint{} ? + printUpstreamTimePoint(r->lastConnectTime.load()) : "empty") << "\n" << "\t" << "lastConnectCheckResult :" << r->lastConnectCheckResult << "\n" << "\t" << "disable :" << r->disable << "\n" << "\t" << "isManualDisable :" << r->isManualDisable << "\n" @@ -418,7 +418,7 @@ void UpstreamPool::do_AdditionTimer() { } ); if (isAllDown) { - if ((UpstreamTimePointNow() - lastConnectComeTime) <= _configLoader->config.sleepTime) { + if ((UpstreamTimePointNow() - lastConnectComeTime.load()) <= _configLoader->config.sleepTime) { do_AdditionTimer_impl(); } } @@ -459,7 +459,7 @@ void UpstreamPool::do_tcpCheckerTimer() { // BOOST_LOG_S5B(trace) << "do_tcpCheckerTimer()"; // BOOST_LOG_S5B(trace) << print(); - if ((UpstreamTimePointNow() - lastConnectComeTime) <= _configLoader->config.sleepTime) { + if ((UpstreamTimePointNow() - lastConnectComeTime.load()) <= _configLoader->config.sleepTime) { do_tcpCheckerTimer_impl(); } @@ -567,7 +567,7 @@ void UpstreamPool::do_connectCheckerTimer() { } // BOOST_LOG_S5B(trace) << "do_connectCheckerTimer()"; - if ((UpstreamTimePointNow() - lastConnectComeTime) <= _configLoader->config.sleepTime) { + if ((UpstreamTimePointNow() - lastConnectComeTime.load()) <= _configLoader->config.sleepTime) { do_connectCheckerTimer_impl(); } diff --git a/src/UpstreamPool.h b/src/UpstreamPool.h index c97956c..abb2176 100644 --- a/src/UpstreamPool.h +++ b/src/UpstreamPool.h @@ -60,18 +60,20 @@ struct UpstreamServer : public std::enable_shared_from_this { std::string authUser; std::string authPwd; - std::optional lastOnlineTime; - std::optional lastConnectTime; - bool lastConnectFailed = true; + std::atomic lastOnlineTime{UpstreamTimePoint{UpstreamTimePoint::duration::zero()}}; + std::atomic lastConnectTime{UpstreamTimePoint{UpstreamTimePoint::duration::zero()}}; + std::atomic_bool lastConnectFailed = true; + std::string lastConnectCheckResult; - bool isOffline = true; + + std::atomic_bool isOffline = true; std::atomic_size_t connectCount{0}; - bool isManualDisable = false; - bool disable = false; + std::atomic_bool isManualDisable = false; + std::atomic_bool disable = false; bool slowImpl = false; - std::chrono::milliseconds lastOnlinePing{-1}; - std::chrono::milliseconds lastConnectPing{-1}; + std::atomic lastOnlinePing{std::chrono::milliseconds{-1}}; + std::atomic lastConnectPing{std::chrono::milliseconds{-1}}; bool traditionTcpRelay; std::shared_ptr delayCollect; @@ -117,9 +119,9 @@ class UpstreamPool : public std::enable_shared_from_this { std::default_random_engine randomGenerator; - UpstreamTimePoint lastChangeUpstreamTime; + std::atomic lastChangeUpstreamTime; - UpstreamTimePoint lastConnectComeTime; + std::atomic lastConnectComeTime; std::shared_ptr tcpTest; std::shared_ptr connectTestHttps;