Skip to content

Commit

Permalink
use std::atomic_
Browse files Browse the repository at this point in the history
  • Loading branch information
Lyoko-Jeremie committed Aug 31, 2023
1 parent fbff3d3 commit ed1578f
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 74 deletions.
127 changes: 81 additions & 46 deletions src/StateMonitorServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,24 +108,24 @@ std::string HttpConnectSession::createJsonString() {
n.put("name", a->name);
n.put("host", a->host);
n.put("port", a->port);
n.put("isOffline", a->isOffline);
n.put("lastConnectFailed", a->lastConnectFailed);
n.put("isManualDisable", a->isManualDisable);
n.put("disable", a->disable);
n.put("isOffline", a->isOffline.load());
n.put("lastConnectFailed", a->lastConnectFailed.load());
n.put("isManualDisable", a->isManualDisable.load());
n.put("disable", a->disable.load());
n.put("lastConnectCheckResult", a->lastConnectCheckResult);
n.put("connectCount", a->connectCount.load());
n.put("lastOnlinePing", (
a->lastOnlinePing.count() != -1 ?
boost::lexical_cast<std::string>(a->lastOnlinePing.count()) : "<empty>"));
a->lastOnlinePing.load().count() != -1 ?
boost::lexical_cast<std::string>(a->lastOnlinePing.load().count()) : "<empty>"));
n.put("lastConnectPing", (
a->lastConnectPing.count() != -1 ?
boost::lexical_cast<std::string>(a->lastConnectPing.count()) : "<empty>"));
a->lastConnectPing.load().count() != -1 ?
boost::lexical_cast<std::string>(a->lastConnectPing.load().count()) : "<empty>"));
n.put("lastOnlineTime", (
a->lastOnlineTime.has_value() ?
printUpstreamTimePoint(a->lastOnlineTime.value()) : "<empty>"));
a->lastOnlineTime.load() != UpstreamTimePoint{} ?
printUpstreamTimePoint(a->lastOnlineTime.load()) : "<empty>"));
n.put("lastConnectTime", (
a->lastConnectTime.has_value() ?
printUpstreamTimePoint(a->lastConnectTime.value()) : "<empty>"));
a->lastConnectTime.load() != UpstreamTimePoint{} ?
printUpstreamTimePoint(a->lastConnectTime.load()) : "<empty>"));
n.put("isWork", upstreamPool->checkServer(a));

if (info) {
Expand Down Expand Up @@ -194,7 +194,10 @@ std::string HttpConnectSession::createJsonString() {
n.put("byteUpChangeMax", a.second->byteUpChangeMax);
n.put("byteDownChangeMax", a.second->byteDownChangeMax);
n.put("rule", ruleEnum2string(a.second->rule));
n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex);
{
std::lock_guard lgLM{a.second->lastUseUpstreamIndexMtx};
n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex);
}

pS.push_back(std::make_pair("", n));
}
Expand All @@ -214,7 +217,10 @@ std::string HttpConnectSession::createJsonString() {
n.put("byteUpChangeMax", a.second->byteUpChangeMax);
n.put("byteDownChangeMax", a.second->byteDownChangeMax);
n.put("rule", ruleEnum2string(a.second->rule));
n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex);
{
std::lock_guard lgLM{a.second->lastUseUpstreamIndexMtx};
n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex);
}

pC.push_back(std::make_pair("", n));
}
Expand All @@ -234,7 +240,10 @@ std::string HttpConnectSession::createJsonString() {
n.put("byteUpChangeMax", a.second->byteUpChangeMax);
n.put("byteDownChangeMax", a.second->byteDownChangeMax);
n.put("rule", ruleEnum2string(a.second->rule));
n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex);
{
std::lock_guard lgLM{a.second->lastUseUpstreamIndexMtx};
n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex);
}

pL.push_back(std::make_pair("", n));
}
Expand All @@ -255,7 +264,10 @@ std::string HttpConnectSession::createJsonString() {
n.put("byteUpChangeMax", a.second->byteUpChangeMax);
n.put("byteDownChangeMax", a.second->byteDownChangeMax);
n.put("rule", ruleEnum2string(a.second->rule));
n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex);
{
std::lock_guard lgLM{a.second->lastUseUpstreamIndexMtx};
n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex);
}

auto au = am->getById(a.first);
boost::property_tree::ptree pAU;
Expand Down Expand Up @@ -396,13 +408,24 @@ void HttpConnectSession::path_op(HttpConnectSession::QueryPairsType &queryPairs)
if (ptr->getStatisticsInfo()) {
auto info = ptr->getStatisticsInfo()->getInfoClient(target->second);
if (info) {
std::lock_guard lg{info->lastUseUpstreamIndexMtx};
info->lastUseUpstreamIndex = index;
}
}
} else if ("listen" == t) {
if (ptr->getStatisticsInfo()) {
auto info = ptr->getStatisticsInfo()->getInfoListen(target->second);
if (info) {
std::lock_guard lg{info->lastUseUpstreamIndexMtx};
info->lastUseUpstreamIndex = index;
}
}
} else if ("auth" == t) {
if (ptr->getStatisticsInfo()) {
auto info = ptr->getStatisticsInfo()->getInfoAuthUser(
boost::lexical_cast<size_t>(target->second));
if (info) {
std::lock_guard lg{info->lastUseUpstreamIndexMtx};
info->lastUseUpstreamIndex = index;
}
}
Expand Down Expand Up @@ -446,9 +469,9 @@ void HttpConnectSession::path_op(HttpConnectSession::QueryPairsType &queryPairs)
for (auto &a: upstreamPool->pool()) {
a->isOffline = false;
a->lastConnectFailed = false;
a->lastOnlineTime.reset();
a->lastOnlineTime = UpstreamTimePoint{};
a->lastOnlinePing = std::chrono::milliseconds{-1};
a->lastConnectTime.reset();
a->lastConnectTime = UpstreamTimePoint{};
a->lastConnectPing = std::chrono::milliseconds{-1};
}
// recheck
Expand Down Expand Up @@ -643,24 +666,24 @@ void HttpConnectSession::path_per_info(HttpConnectSession::QueryPairsType &query
n.put("name", a->name);
n.put("host", a->host);
n.put("port", a->port);
n.put("isOffline", a->isOffline);
n.put("lastConnectFailed", a->lastConnectFailed);
n.put("isManualDisable", a->isManualDisable);
n.put("disable", a->disable);
n.put("isOffline", a->isOffline.load());
n.put("lastConnectFailed", a->lastConnectFailed.load());
n.put("isManualDisable", a->isManualDisable.load());
n.put("disable", a->disable.load());
n.put("lastConnectCheckResult", a->lastConnectCheckResult);
n.put("connectCount", a->connectCount.load());
n.put("lastOnlinePing", (
a->lastOnlinePing.count() != -1 ?
boost::lexical_cast<std::string>(a->lastOnlinePing.count()) : "<empty>"));
a->lastOnlinePing.load().count() != -1 ?
boost::lexical_cast<std::string>(a->lastOnlinePing.load().count()) : "<empty>"));
n.put("lastConnectPing", (
a->lastConnectPing.count() != -1 ?
boost::lexical_cast<std::string>(a->lastConnectPing.count()) : "<empty>"));
a->lastConnectPing.load().count() != -1 ?
boost::lexical_cast<std::string>(a->lastConnectPing.load().count()) : "<empty>"));
n.put("lastOnlineTime", (
a->lastOnlineTime.has_value() ?
printUpstreamTimePoint(a->lastOnlineTime.value()) : "<empty>"));
a->lastOnlineTime.load() != UpstreamTimePoint{} ?
printUpstreamTimePoint(a->lastOnlineTime.load()) : "<empty>"));
n.put("lastConnectTime", (
a->lastConnectTime.has_value() ?
printUpstreamTimePoint(a->lastConnectTime.value()) : "<empty>"));
a->lastConnectTime.load() != UpstreamTimePoint{} ?
printUpstreamTimePoint(a->lastConnectTime.load()) : "<empty>"));
n.put("isWork", upstreamPool->checkServer(a));

if (info) {
Expand Down Expand Up @@ -710,7 +733,10 @@ void HttpConnectSession::path_per_info(HttpConnectSession::QueryPairsType &query
n.put("byteUpChangeMax", a.second->byteUpChangeMax);
n.put("byteDownChangeMax", a.second->byteDownChangeMax);
n.put("rule", ruleEnum2string(a.second->rule));
n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex);
{
std::lock_guard lgLM{a.second->lastUseUpstreamIndexMtx};
n.put("lastUseUpstreamIndex", a.second->lastUseUpstreamIndex);
}

pS.push_back(std::make_pair("", n));
}
Expand Down Expand Up @@ -755,7 +781,10 @@ void HttpConnectSession::path_per_info(HttpConnectSession::QueryPairsType &query
baseInfo.put("byteUpChangeMax", in->byteUpChangeMax);
baseInfo.put("byteDownChangeMax", in->byteDownChangeMax);
baseInfo.put("rule", ruleEnum2string(in->rule));
baseInfo.put("lastUseUpstreamIndex", in->lastUseUpstreamIndex);
{
std::lock_guard lgLM{in->lastUseUpstreamIndexMtx};
baseInfo.put("lastUseUpstreamIndex", in->lastUseUpstreamIndex);
}
root.add_child("BaseInfo", baseInfo);
}
}
Expand Down Expand Up @@ -798,7 +827,10 @@ void HttpConnectSession::path_per_info(HttpConnectSession::QueryPairsType &query
baseInfo.put("byteUpChangeMax", in->byteUpChangeMax);
baseInfo.put("byteDownChangeMax", in->byteDownChangeMax);
baseInfo.put("rule", ruleEnum2string(in->rule));
baseInfo.put("lastUseUpstreamIndex", in->lastUseUpstreamIndex);
{
std::lock_guard lgLM{in->lastUseUpstreamIndexMtx};
baseInfo.put("lastUseUpstreamIndex", in->lastUseUpstreamIndex);
}
root.add_child("BaseInfo", baseInfo);
}
}
Expand Down Expand Up @@ -841,7 +873,10 @@ void HttpConnectSession::path_per_info(HttpConnectSession::QueryPairsType &query
baseInfo.put("byteUpChangeMax", in->byteUpChangeMax);
baseInfo.put("byteDownChangeMax", in->byteDownChangeMax);
baseInfo.put("rule", ruleEnum2string(in->rule));
baseInfo.put("lastUseUpstreamIndex", in->lastUseUpstreamIndex);
{
std::lock_guard lgLM{in->lastUseUpstreamIndexMtx};
baseInfo.put("lastUseUpstreamIndex", in->lastUseUpstreamIndex);
}
root.add_child("BaseInfo", baseInfo);
}
}
Expand Down Expand Up @@ -899,24 +934,24 @@ boost::property_tree::ptree HttpConnectSession::makeUpstreamServerDI(const Upstr
n.put("name", a->name);
n.put("host", a->host);
n.put("port", a->port);
n.put("isOffline", a->isOffline);
n.put("lastConnectFailed", a->lastConnectFailed);
n.put("isManualDisable", a->isManualDisable);
n.put("disable", a->disable);
n.put("isOffline", a->isOffline.load());
n.put("lastConnectFailed", a->lastConnectFailed.load());
n.put("isManualDisable", a->isManualDisable.load());
n.put("disable", a->disable.load());
n.put("lastConnectCheckResult", a->lastConnectCheckResult);
n.put("connectCount", a->connectCount.load());
n.put("lastOnlinePing", (
a->lastOnlinePing.count() != -1 ?
boost::lexical_cast<std::string>(a->lastOnlinePing.count()) : "<empty>"));
a->lastOnlinePing.load().count() != -1 ?
boost::lexical_cast<std::string>(a->lastOnlinePing.load().count()) : "<empty>"));
n.put("lastConnectPing", (
a->lastConnectPing.count() != -1 ?
boost::lexical_cast<std::string>(a->lastConnectPing.count()) : "<empty>"));
a->lastConnectPing.load().count() != -1 ?
boost::lexical_cast<std::string>(a->lastConnectPing.load().count()) : "<empty>"));
n.put("lastOnlineTime", (
a->lastOnlineTime.has_value() ?
printUpstreamTimePoint(a->lastOnlineTime.value()) : "<empty>"));
a->lastOnlineTime.load() != UpstreamTimePoint{} ?
printUpstreamTimePoint(a->lastOnlineTime.load()) : "<empty>"));
n.put("lastConnectTime", (
a->lastConnectTime.has_value() ?
printUpstreamTimePoint(a->lastConnectTime.value()) : "<empty>"));
a->lastConnectTime.load() != UpstreamTimePoint{} ?
printUpstreamTimePoint(a->lastConnectTime.load()) : "<empty>"));
n.put("isWork", upstreamPool->checkServer(a));

auto info = pT->getStatisticsInfo();
Expand Down
2 changes: 2 additions & 0 deletions src/TcpRelaySession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ void TcpRelaySession::try_connect_upstream() {
if (ic && ic->rule != RuleEnum::inherit) {
BOOST_LOG_S5B_ID(relayId, trace)
<< "TcpRelaySession::try_connect_upstream() get by client getServerByHint";
std::lock_guard lg{ic->lastUseUpstreamIndexMtx};
s = upstreamPool->getServerByHint(ic->rule, ic->lastUseUpstreamIndex, relayId);
}
if (!s) {
Expand All @@ -112,6 +113,7 @@ void TcpRelaySession::try_connect_upstream() {
if (il && il->rule != RuleEnum::inherit) {
BOOST_LOG_S5B_ID(relayId, trace)
<< "TcpRelaySession::try_connect_upstream() get by listen getServerByHint";
std::lock_guard lg{il->lastUseUpstreamIndexMtx};
s = upstreamPool->getServerByHint(il->rule, il->lastUseUpstreamIndex, relayId);
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/TcpRelayStatisticsInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ void TcpRelayStatisticsInfo::Info::calcByte() {
byteUpLast = newByteUp;
byteDownLast = newByteDown;
if (byteUpChange > byteUpChangeMax) {
byteUpChangeMax = byteUpChange;
byteUpChangeMax = byteUpChange.load();
}
if (byteDownChange > byteDownChangeMax) {
byteDownChangeMax = byteDownChange;
byteDownChangeMax = byteDownChange.load();
}
}

Expand Down Expand Up @@ -130,6 +130,7 @@ void TcpRelayStatisticsInfo::addSession(size_t index, const std::shared_ptr<TcpR
)) == n->sessions.get<SessionInfo::ListenClientAddrPortPair>().end());
n->sessions.emplace_back(s);
if (auto ns = ptr->getNowServer()) {
std::lock_guard lgLM{n->lastUseUpstreamIndexMtx};
n->lastUseUpstreamIndex = ns->index;
}
}
Expand Down Expand Up @@ -159,6 +160,7 @@ void TcpRelayStatisticsInfo::addSessionClient(const std::shared_ptr<TcpRelaySess
)) == n->sessions.get<SessionInfo::ListenClientAddrPortPair>().end());
n->sessions.emplace_back(s);
if (auto ns = ptr->getNowServer()) {
std::lock_guard lgLM{n->lastUseUpstreamIndexMtx};
n->lastUseUpstreamIndex = ns->index;
}
}
Expand Down Expand Up @@ -188,6 +190,7 @@ void TcpRelayStatisticsInfo::addSessionListen(const std::shared_ptr<TcpRelaySess
)) == n->sessions.get<SessionInfo::ListenClientAddrPortPair>().end());
n->sessions.emplace_back(s);
if (auto ns = ptr->getNowServer()) {
std::lock_guard lgLM{n->lastUseUpstreamIndexMtx};
n->lastUseUpstreamIndex = ns->index;
}
}
Expand Down Expand Up @@ -220,6 +223,7 @@ void TcpRelayStatisticsInfo::addSessionAuthUser(const std::shared_ptr<TcpRelaySe
)) == n->sessions.get<SessionInfo::ListenClientAddrPortPair>().end());
n->sessions.emplace_back(s);
// if (auto ns = ptr->getNowServer()) {
// std::lock_guard lgLM{n->lastUseUpstreamIndexMtx};
// n->lastUseUpstreamIndex = ns->index;
// }
}
Expand Down
13 changes: 7 additions & 6 deletions src/TcpRelayStatisticsInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,17 @@ class TcpRelayStatisticsInfo : public std::enable_shared_from_this<TcpRelayStati

std::atomic_size_t byteUp = 0;
std::atomic_size_t byteDown = 0;
size_t byteUpLast = 0;
size_t byteDownLast = 0;
size_t byteUpChange = 0;
size_t byteDownChange = 0;
size_t byteUpChangeMax = 0;
size_t byteDownChangeMax = 0;
std::atomic_size_t byteUpLast = 0;
std::atomic_size_t byteDownLast = 0;
std::atomic_size_t byteUpChange = 0;
std::atomic_size_t byteDownChange = 0;
std::atomic_size_t byteUpChangeMax = 0;
std::atomic_size_t byteDownChangeMax = 0;

std::atomic_size_t connectCount{0};

RuleEnum rule = RuleEnum::inherit;
std::recursive_mutex lastUseUpstreamIndexMtx;
size_t lastUseUpstreamIndex = 0;


Expand Down
Loading

0 comments on commit ed1578f

Please sign in to comment.