Skip to content

Commit

Permalink
added cleanup and stats
Browse files Browse the repository at this point in the history
  • Loading branch information
GloryOfNight committed Mar 2, 2024
1 parent cd1e160 commit 4aa2262
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 21 deletions.
80 changes: 62 additions & 18 deletions src/relay/relay.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ bool relay::init()
LOG(Error, "Error with setting send buffer size");
LOG(Verbose, "Send buffer size, wanted: {0}, actual: {1}", wantedBufferSize, actualBufferSize);


if (!m_socket->setRecvBufferSize(wantedBufferSize, actualBufferSize))
LOG(Error, "Error with setting recv buffer size");
LOG(Verbose, "Receive buffer size, wanted: {0}, actual: {1}", wantedBufferSize, actualBufferSize);
Expand All @@ -50,6 +49,39 @@ channel& relay::createChannel(const guid& inGuid)
return *m_channels.emplace_after(m_channels.before_begin(), newChannel);
}

bool relay::conditionalCleanup(bool force)
{
const auto now = std::chrono::steady_clock::now();
if (force || std::chrono::duration_cast<std::chrono::seconds>(now - m_lastCleanupTime).count() > 60)
{
for (auto it = m_channels.begin(); it != m_channels.end();)
{
if (std::chrono::duration_cast<std::chrono::seconds>(now - it->m_lastUpdated).count() > 30)
{
LOG(Display, "Channel \"{0}\" has been inactive for 30 seconds, removing.", it->m_guid.toString());

const auto stats = it->m_stats;
LOG(Display, "Channel \"{0}\" Stats. Recv: {0} ({1}); Sent: {2} ({3}); Dropped: {4} ({5}) ", stats.m_packetsReceived, stats.m_bytesReceived, stats.m_packetsSent, stats.m_bytesReceived, stats.m_packetsDropped, stats.m_bytesDropped);

m_addressMappedChannels.erase(it->m_peerA);
m_addressMappedChannels.erase(it->m_peerB);
m_guidMappedChannels.erase(it->m_guid);

it = m_channels.erase_after(m_channels.before_begin());
}
else
{
++it;
}
}

m_lastCleanupTime = now;
return true;
}

return false;
}

bool relay::run()
{
if (!init())
Expand All @@ -58,12 +90,16 @@ bool relay::run()
LOG(Display, "Relay initialized and running on port {0}", args::port);

std::array<uint8_t, 1024> buffer{};

m_lastCleanupTime = std::chrono::steady_clock::now();
m_running = true;
while (m_running)
{
if (!m_socket->waitForRead(1000))
if (!m_socket->waitForRead(5))
continue;

const auto m_tickStartTime = std::chrono::steady_clock::now();

std::shared_ptr<internetaddr> recvAddr = udpsocketFactory::createInternetAddr();
const int32_t bytesRead = m_socket->recvFrom(buffer.data(), buffer.size(), recvAddr.get());
if (bytesRead > 0)
Expand All @@ -72,26 +108,30 @@ bool relay::run()
// if recvAddr has a channel mapped to it, as well as two valid peers, relay packet to the other peer
if (findRes != m_addressMappedChannels.end() && findRes->second.m_peerA && findRes->second.m_peerB)
{
auto& currentChannel = findRes->second;

currentChannel.m_lastUpdated = m_tickStartTime;

currentChannel.m_stats.m_packetsReceived++;
currentChannel.m_stats.m_bytesReceived += bytesRead;

const auto& sendAddr = *findRes->second.m_peerA != *recvAddr ? findRes->second.m_peerA : findRes->second.m_peerB;
int32_t bytesSent = m_socket->sendTo(buffer.data(), bytesRead, sendAddr.get());
if (bytesSent == -1 && errno == SE_WOULDBLOCK)
{
if (m_socket->waitForWrite(5))
bytesSent = m_socket->sendTo(buffer.data(), bytesRead, sendAddr.get());
}

if (bytesSent == -1)
if (bytesSent > 0)
{
if (errno == SE_WOULDBLOCK)
{
if (m_socket->waitForWrite(5))
{
m_socket->sendTo(buffer.data(), bytesRead, sendAddr.get());
}
else
{
LOG(Error, "Packet dropped, socket not ready for write {0}.", sendAddr->toString());
}
}
else
{
LOG(Error, "Failed to send data to {0}. Error code: {1}", sendAddr->toString(), errno);
}
currentChannel.m_stats.m_packetsSent++;
currentChannel.m_stats.m_bytesSent += bytesSent;
}
else
{
currentChannel.m_stats.m_packetsDropped++;
currentChannel.m_stats.m_bytesDropped += bytesRead;
}

continue;
Expand All @@ -113,6 +153,7 @@ bool relay::run()
{
channel& newChannel = createChannel(htnGuid);
newChannel.m_peerA = std::move(recvAddr);
newChannel.m_lastUpdated = m_tickStartTime;

m_guidMappedChannels.emplace(newChannel.m_guid, newChannel);

Expand All @@ -121,6 +162,7 @@ bool relay::run()
else if (*guidChannel->second.m_peerA != *recvAddr && guidChannel->second.m_peerB == nullptr)
{
guidChannel->second.m_peerB = std::move(recvAddr);
guidChannel->second.m_lastUpdated = m_tickStartTime;

m_addressMappedChannels.emplace(guidChannel->second.m_peerA, guidChannel->second);
m_addressMappedChannels.emplace(guidChannel->second.m_peerB, guidChannel->second);
Expand All @@ -130,6 +172,8 @@ bool relay::run()
}
}
}

conditionalCleanup(false);
}

return true;
Expand Down
25 changes: 22 additions & 3 deletions src/relay/relay.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,32 @@

#include "types.hxx"

#include <chrono>
#include <cstdint>
#include <forward_list>
#include <map>
#include <memory>
#include <unordered_map>

struct channel_stats
{
uint32_t m_bytesReceived{};
uint32_t m_packetsReceived{};

uint32_t m_bytesSent{};
uint32_t m_packetsSent{};

uint32_t m_bytesDropped{};
uint32_t m_packetsDropped{};
};

struct channel
{
channel_stats m_stats{};
guid m_guid{};
std::shared_ptr<internetaddr> m_peerA{};
std::shared_ptr<internetaddr> m_peerB{};
std::chrono::time_point<std::chrono::steady_clock> m_lastUpdated{};
};

#ifdef _MSC_VER
Expand All @@ -40,13 +55,17 @@ private:

channel& createChannel(const guid& inGuid);

std::unique_ptr<udpsocket> m_socket{};
bool conditionalCleanup(bool force);

std::unordered_map<guid, channel&> m_guidMappedChannels{};

std::unordered_map<std::shared_ptr<internetaddr>, channel&> m_addressMappedChannels{};

std::forward_list<channel> m_channels{};

std::unordered_map<guid, channel&> m_guidMappedChannels{};
std::unique_ptr<udpsocket> m_socket{};

std::unordered_map<std::shared_ptr<internetaddr>, const channel&> m_addressMappedChannels{};
std::chrono::time_point<std::chrono::steady_clock> m_lastCleanupTime{};

bool m_running{false};
};

0 comments on commit 4aa2262

Please sign in to comment.