Skip to content

Commit

Permalink
multithread
Browse files Browse the repository at this point in the history
  • Loading branch information
Lyoko-Jeremie committed Aug 29, 2023
1 parent 9425a54 commit a9653eb
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 24 deletions.
59 changes: 49 additions & 10 deletions src/TcpRelayStatisticsInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ TcpRelayStatisticsInfo::SessionInfo::SessionInfo(const std::shared_ptr<TcpRelayS
updateTargetInfo(s);
}

TcpRelayStatisticsInfo::SessionInfo::SessionInfo(const std::weak_ptr<TcpRelaySession> &s) : SessionInfo(s.lock()) {
}
TcpRelayStatisticsInfo::SessionInfo::SessionInfo(const std::weak_ptr<TcpRelaySession> &s)
: SessionInfo(s.lock()) {}

void TcpRelayStatisticsInfo::SessionInfo::updateTargetInfo(const std::shared_ptr<TcpRelaySession> &s) {
if (const auto &p = s) {
Expand All @@ -109,6 +109,7 @@ void TcpRelayStatisticsInfo::SessionInfo::updateTargetInfo(const std::shared_ptr

void TcpRelayStatisticsInfo::addSession(size_t index, const std::shared_ptr<TcpRelaySession> &s) {
BOOST_ASSERT(s);
std::lock_guard lgG{mtx};
if (auto ptr = s) {
if (upstreamIndex.find(index) == upstreamIndex.end()) {
upstreamIndex.try_emplace(index, std::make_shared<Info>());
Expand Down Expand Up @@ -136,6 +137,7 @@ void TcpRelayStatisticsInfo::addSession(size_t index, const std::shared_ptr<TcpR

void TcpRelayStatisticsInfo::addSessionClient(const std::shared_ptr<TcpRelaySession> &s) {
BOOST_ASSERT(s);
std::lock_guard lgG{mtx};
if (auto ptr = s) {
const std::string &addr = ptr->getClientEndpointAddrString();
if (clientIndex.find(addr) == clientIndex.end()) {
Expand Down Expand Up @@ -164,6 +166,7 @@ void TcpRelayStatisticsInfo::addSessionClient(const std::shared_ptr<TcpRelaySess

void TcpRelayStatisticsInfo::addSessionListen(const std::shared_ptr<TcpRelaySession> &s) {
BOOST_ASSERT(s);
std::lock_guard lgG{mtx};
if (auto ptr = s) {
const std::string &addr = ptr->getListenEndpointAddrString();
if (listenIndex.find(addr) == listenIndex.end()) {
Expand Down Expand Up @@ -192,6 +195,7 @@ void TcpRelayStatisticsInfo::addSessionListen(const std::shared_ptr<TcpRelaySess

void TcpRelayStatisticsInfo::addSessionAuthUser(const std::shared_ptr<TcpRelaySession> &s) {
BOOST_ASSERT(s);
std::lock_guard lgG{mtx};
if (auto ptr = s) {
BOOST_ASSERT(ptr->authUser);
size_t id = ptr->authUser->id;
Expand Down Expand Up @@ -223,6 +227,7 @@ void TcpRelayStatisticsInfo::addSessionAuthUser(const std::shared_ptr<TcpRelaySe

void TcpRelayStatisticsInfo::updateSessionInfo(std::shared_ptr<TcpRelaySession> s) {
BOOST_ASSERT(s);
std::lock_guard lgG{mtx};
if (s) {
{
auto ui = upstreamIndex.find(s->getNowServer()->index);
Expand Down Expand Up @@ -276,6 +281,7 @@ void TcpRelayStatisticsInfo::updateSessionInfo(std::shared_ptr<TcpRelaySession>
}

std::shared_ptr<TcpRelayStatisticsInfo::Info> TcpRelayStatisticsInfo::getInfo(size_t index) {
std::lock_guard lg{mtx};
auto n = upstreamIndex.find(index);
if (n != upstreamIndex.end()) {
return n->second;
Expand All @@ -285,6 +291,7 @@ std::shared_ptr<TcpRelayStatisticsInfo::Info> TcpRelayStatisticsInfo::getInfo(si
}

std::shared_ptr<TcpRelayStatisticsInfo::Info> TcpRelayStatisticsInfo::getInfoClient(const std::string &addr) {
std::lock_guard lg{mtx};
auto n = clientIndex.find(addr);
if (n != clientIndex.end()) {
return n->second;
Expand All @@ -294,6 +301,7 @@ std::shared_ptr<TcpRelayStatisticsInfo::Info> TcpRelayStatisticsInfo::getInfoCli
}

std::shared_ptr<TcpRelayStatisticsInfo::Info> TcpRelayStatisticsInfo::getInfoListen(const std::string &addr) {
std::lock_guard lg{mtx};
auto n = listenIndex.find(addr);
if (n != listenIndex.end()) {
return n->second;
Expand All @@ -303,6 +311,7 @@ std::shared_ptr<TcpRelayStatisticsInfo::Info> TcpRelayStatisticsInfo::getInfoLis
}

std::shared_ptr<TcpRelayStatisticsInfo::Info> TcpRelayStatisticsInfo::getInfoAuthUser(size_t id) {
std::lock_guard lg{mtx};
auto n = authUserIndex.find(id);
BOOST_ASSERT(n != authUserIndex.end());
if (n != authUserIndex.end()) {
Expand All @@ -313,90 +322,103 @@ std::shared_ptr<TcpRelayStatisticsInfo::Info> TcpRelayStatisticsInfo::getInfoAut
}

void TcpRelayStatisticsInfo::removeExpiredSession(size_t index) {
std::lock_guard lg{mtx};
auto p = getInfo(index);
if (p) {
p->removeExpiredSession();
}
}

void TcpRelayStatisticsInfo::removeExpiredSessionClient(const std::string &addr) {
std::lock_guard lg{mtx};
auto p = getInfoClient(addr);
if (p) {
p->removeExpiredSession();
}
}

void TcpRelayStatisticsInfo::removeExpiredSessionListen(const std::string &addr) {
std::lock_guard lg{mtx};
auto p = getInfoListen(addr);
if (p) {
p->removeExpiredSession();
}
}

void TcpRelayStatisticsInfo::removeExpiredSessionAuthUser(size_t id) {
std::lock_guard lg{mtx};
auto p = getInfoAuthUser(id);
if (p) {
p->removeExpiredSession();
}
}

void TcpRelayStatisticsInfo::addByteUp(size_t index, size_t b) {
std::lock_guard lg{mtx};
auto p = getInfo(index);
if (p) {
p->byteUp += b;
}
}

void TcpRelayStatisticsInfo::addByteUpClient(const std::string &addr, size_t b) {
std::lock_guard lg{mtx};
auto p = getInfoClient(addr);
if (p) {
p->byteUp += b;
}
}

void TcpRelayStatisticsInfo::addByteUpListen(const std::string &addr, size_t b) {
std::lock_guard lg{mtx};
auto p = getInfoListen(addr);
if (p) {
p->byteUp += b;
}
}

void TcpRelayStatisticsInfo::addByteUpAuthUser(size_t id, size_t b) {
std::lock_guard lg{mtx};
auto p = getInfoAuthUser(id);
if (p) {
p->byteUp += b;
}
}

void TcpRelayStatisticsInfo::addByteDown(size_t index, size_t b) {
std::lock_guard lg{mtx};
auto p = getInfo(index);
if (p) {
p->byteDown += b;
}
}

void TcpRelayStatisticsInfo::addByteDownClient(const std::string &addr, size_t b) {
std::lock_guard lg{mtx};
auto p = getInfoClient(addr);
if (p) {
p->byteDown += b;
}
}

void TcpRelayStatisticsInfo::addByteDownListen(const std::string &addr, size_t b) {
std::lock_guard lg{mtx};
auto p = getInfoListen(addr);
if (p) {
p->byteDown += b;
}
}

void TcpRelayStatisticsInfo::addByteDownAuthUser(size_t id, size_t b) {
std::lock_guard lg{mtx};
auto p = getInfoAuthUser(id);
if (p) {
p->byteDown += b;
}
}

void TcpRelayStatisticsInfo::calcByteAll() {
std::lock_guard lg{mtx};
for (auto &a: upstreamIndex) {
a.second->calcByte();
}
Expand All @@ -412,6 +434,7 @@ void TcpRelayStatisticsInfo::calcByteAll() {
}

void TcpRelayStatisticsInfo::removeExpiredSessionAll() {
std::lock_guard lg{mtx};
for (auto &a: upstreamIndex) {
a.second->removeExpiredSession();
}
Expand All @@ -427,101 +450,117 @@ void TcpRelayStatisticsInfo::removeExpiredSessionAll() {
}

void TcpRelayStatisticsInfo::closeAllSession(size_t index) {
std::lock_guard lg{mtx};
auto p = getInfo(index);
if (p) {
p->closeAllSession();
}
}

void TcpRelayStatisticsInfo::closeAllSessionClient(const std::string &addr) {
std::lock_guard lg{mtx};
auto p = getInfoClient(addr);
if (p) {
p->closeAllSession();
}
}

void TcpRelayStatisticsInfo::closeAllSessionListen(const std::string &addr) {
std::lock_guard lg{mtx};
auto p = getInfoListen(addr);
if (p) {
p->closeAllSession();
}
}

void TcpRelayStatisticsInfo::closeAllSessionAuthUser(size_t id) {
std::lock_guard lg{mtx};
auto p = getInfoAuthUser(id);
if (p) {
p->closeAllSession();
}
}

void TcpRelayStatisticsInfo::connectCountAdd(size_t index) {
std::lock_guard lg{mtx};
auto p = getInfo(index);
if (p) {
p->connectCountAdd();
}
}

void TcpRelayStatisticsInfo::connectCountAddClient(const std::string &addr) {
std::lock_guard lg{mtx};
auto p = getInfoClient(addr);
if (p) {
p->connectCountAdd();
}
}

void TcpRelayStatisticsInfo::connectCountAddListen(const std::string &addr) {
std::lock_guard lg{mtx};
auto p = getInfoListen(addr);
if (p) {
p->connectCountAdd();
}
}

void TcpRelayStatisticsInfo::connectCountAddAuthUser(size_t id) {
std::lock_guard lg{mtx};
auto p = getInfoAuthUser(id);
if (p) {
p->connectCountAdd();
}
}

void TcpRelayStatisticsInfo::connectCountSub(size_t index) {
std::lock_guard lg{mtx};
auto p = getInfo(index);
if (p) {
p->connectCountSub();
}
}

void TcpRelayStatisticsInfo::connectCountSubClient(const std::string &addr) {
std::lock_guard lg{mtx};
auto p = getInfoClient(addr);
if (p) {
p->connectCountSub();
}
}

void TcpRelayStatisticsInfo::connectCountSubListen(const std::string &addr) {
std::lock_guard lg{mtx};
auto p = getInfoListen(addr);
if (p) {
p->connectCountSub();
}
}

void TcpRelayStatisticsInfo::connectCountSubAuthUser(size_t id) {
std::lock_guard lg{mtx};
auto p = getInfoAuthUser(id);
if (p) {
p->connectCountSub();
}
}

std::map<size_t, std::shared_ptr<TcpRelayStatisticsInfo::Info>> &TcpRelayStatisticsInfo::getUpstreamIndex() {
return upstreamIndex;
std::map<size_t, std::shared_ptr<TcpRelayStatisticsInfo::Info>> TcpRelayStatisticsInfo::getUpstreamIndex() {
std::lock_guard lg{mtx};
return decltype(upstreamIndex){upstreamIndex.begin(), upstreamIndex.end()};
}

std::map<std::string, std::shared_ptr<TcpRelayStatisticsInfo::Info>> &TcpRelayStatisticsInfo::getClientIndex() {
return clientIndex;
std::map<std::string, std::shared_ptr<TcpRelayStatisticsInfo::Info>> TcpRelayStatisticsInfo::getClientIndex() {
std::lock_guard lg{mtx};
return decltype(clientIndex){clientIndex.begin(), clientIndex.end()};
}

std::map<std::string, std::shared_ptr<TcpRelayStatisticsInfo::Info>> &TcpRelayStatisticsInfo::getListenIndex() {
return listenIndex;
std::map<std::string, std::shared_ptr<TcpRelayStatisticsInfo::Info>> TcpRelayStatisticsInfo::getListenIndex() {
std::lock_guard lg{mtx};
return decltype(listenIndex){listenIndex.begin(), listenIndex.end()};
}

std::map<size_t, std::shared_ptr<TcpRelayStatisticsInfo::Info>> &TcpRelayStatisticsInfo::getAuthUserIndex() {
return authUserIndex;
std::map<size_t, std::shared_ptr<TcpRelayStatisticsInfo::Info>> TcpRelayStatisticsInfo::getAuthUserIndex() {
std::lock_guard lg{mtx};
return decltype(authUserIndex){authUserIndex.begin(), authUserIndex.end()};
}
23 changes: 17 additions & 6 deletions src/TcpRelayStatisticsInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class TcpRelayStatisticsInfo : public std::enable_shared_from_this<TcpRelayStati

size_t authUserId{0};

protected:
friend TcpRelayStatisticsInfo;

void updateTargetInfo(const std::shared_ptr<TcpRelaySession> &s);
};

Expand Down Expand Up @@ -156,20 +159,28 @@ class TcpRelayStatisticsInfo : public std::enable_shared_from_this<TcpRelayStati
RuleEnum rule = RuleEnum::inherit;
size_t lastUseUpstreamIndex = 0;

void removeExpiredSession();

void closeAllSession();
protected:
friend TcpRelayStatisticsInfo;

void removeExpiredSession();

void calcByte();

void connectCountAdd();

void connectCountSub();

public:

void closeAllSession();

size_t calcSessionsNumber();
};

private:
std::recursive_mutex mtx;

// upstreamIndex
std::map<size_t, std::shared_ptr<Info>> upstreamIndex;
// clientEndpointAddrString "ip"
Expand All @@ -184,15 +195,15 @@ class TcpRelayStatisticsInfo : public std::enable_shared_from_this<TcpRelayStati

public:
// nowServer->index
std::map<size_t, std::shared_ptr<Info>> &getUpstreamIndex();
std::map<size_t, std::shared_ptr<Info>> getUpstreamIndex();

// ClientEndpointAddrString : (127.0.0.1)
std::map<std::string, std::shared_ptr<Info>> &getClientIndex();
std::map<std::string, std::shared_ptr<Info>> getClientIndex();

// ListenEndpointAddrString : (127.0.0.1:661133)
std::map<std::string, std::shared_ptr<Info>> &getListenIndex();
std::map<std::string, std::shared_ptr<Info>> getListenIndex();

std::map<size_t, std::shared_ptr<Info>> &getAuthUserIndex();
std::map<size_t, std::shared_ptr<Info>> getAuthUserIndex();

public:
void addSession(size_t index, const std::shared_ptr<TcpRelaySession> &s);
Expand Down
Loading

0 comments on commit a9653eb

Please sign in to comment.