Skip to content

Commit

Permalink
Merge pull request #10146 from Icinga/JsonRpcConnection-2.14
Browse files Browse the repository at this point in the history
JsonRpcConnection#Send*(): discard messages ASAP once shutting down
  • Loading branch information
yhabteab committed Sep 16, 2024
2 parents eec1d24 + 96839d8 commit 27a4a25
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 28 deletions.
7 changes: 7 additions & 0 deletions lib/base/defer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class Defer
{
}

Defer() = default;

Defer(const Defer&) = delete;
Defer(Defer&&) = delete;
Defer& operator=(const Defer&) = delete;
Expand All @@ -39,6 +41,11 @@ class Defer
}
}

inline void SetFunc(std::function<void()> func)
{
m_Func = std::move(func);
}

inline
void Cancel()
{
Expand Down
44 changes: 24 additions & 20 deletions lib/remote/apilistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1020,17 +1020,22 @@ void ApiListener::ApiTimerHandler()
maxTs = client->GetTimestamp();
}

Log(LogNotice, "ApiListener")
<< "Setting log position for identity '" << endpoint->GetName() << "': "
<< Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);

for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
if (client->GetTimestamp() == maxTs) {
client->SendMessage(lmessage);
try {
client->SendMessage(lmessage);
} catch (const std::runtime_error& ex) {
Log(LogNotice, "ApiListener")
<< "Error while setting log position for identity '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
}
} else {
client->Disconnect();
}
}

Log(LogNotice, "ApiListener")
<< "Setting log position for identity '" << endpoint->GetName() << "': "
<< Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
}
}

Expand Down Expand Up @@ -1194,7 +1199,12 @@ void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionar
if (client->GetTimestamp() != maxTs)
continue;

client->SendMessage(message);
try {
client->SendMessage(message);
} catch (const std::runtime_error& ex) {
Log(LogNotice, "ApiListener")
<< "Error while sending message to endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
}
}
}
}
Expand Down Expand Up @@ -1434,10 +1444,12 @@ void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
{
Endpoint::Ptr endpoint = client->GetEndpoint();
Defer resetEndpointSyncing ([&endpoint]() {
ObjectLock olock(endpoint);
endpoint->SetSyncing(false);
});

if (endpoint->GetLogDuration() == 0) {
ObjectLock olock2(endpoint);
endpoint->SetSyncing(false);
return;
}

Expand All @@ -1454,21 +1466,21 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
Zone::Ptr target_zone = target_endpoint->GetZone();

if (!target_zone) {
ObjectLock olock2(endpoint);
endpoint->SetSyncing(false);
return;
}

for (;;) {
std::unique_lock<std::mutex> lock(m_LogLock);

CloseLogFile();
Defer reopenLog;

if (count == -1 || count > 50000) {
OpenLogFile();
lock.unlock();
} else {
last_sync = true;
reopenLog.SetFunc([this]() { OpenLogFile(); });
}

count = 0;
Expand Down Expand Up @@ -1541,8 +1553,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)

Log(LogDebug, "ApiListener")
<< "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);

break;
return;
}

peer_ts = pmessage->Get("timestamp");
Expand Down Expand Up @@ -1575,14 +1586,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
}

if (last_sync) {
{
ObjectLock olock2(endpoint);
endpoint->SetSyncing(false);
}

OpenLogFile();

break;
return;
}
}
}
Expand Down
28 changes: 21 additions & 7 deletions lib/remote/jsonrpcconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,23 +165,39 @@ ConnectionRole JsonRpcConnection::GetRole() const

void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
{
if (m_ShuttingDown) {
BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!"));
}

Ptr keepAlive (this);

m_IoStrand.post([this, keepAlive, message]() { SendMessageInternal(message); });
}

void JsonRpcConnection::SendRawMessage(const String& message)
{
if (m_ShuttingDown) {
BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!"));
}

Ptr keepAlive (this);

m_IoStrand.post([this, keepAlive, message]() {
if (m_ShuttingDown) {
return;
}

m_OutgoingMessagesQueue.emplace_back(message);
m_OutgoingMessagesQueued.Set();
});
}

void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
{
if (m_ShuttingDown) {
return;
}

m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
m_OutgoingMessagesQueued.Set();
}
Expand All @@ -190,12 +206,10 @@ void JsonRpcConnection::Disconnect()
{
namespace asio = boost::asio;

JsonRpcConnection::Ptr keepAlive (this);

IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
if (!m_ShuttingDown) {
m_ShuttingDown = true;
if (!m_ShuttingDown.exchange(true)) {
JsonRpcConnection::Ptr keepAlive (this);

IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
Log(LogWarning, "JsonRpcConnection")
<< "API client disconnected for identity '" << m_Identity << "'";

Expand Down Expand Up @@ -243,8 +257,8 @@ void JsonRpcConnection::Disconnect()
shutdownTimeout->Cancel();

m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both, ec);
}
});
});
}
}

void JsonRpcConnection::MessageHandler(const String& jsonString)
Expand Down
3 changes: 2 additions & 1 deletion lib/remote/jsonrpcconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "remote/i2-remote.hpp"
#include "remote/endpoint.hpp"
#include "base/atomic.hpp"
#include "base/io-engine.hpp"
#include "base/tlsstream.hpp"
#include "base/timer.hpp"
Expand Down Expand Up @@ -77,7 +78,7 @@ class JsonRpcConnection final : public Object
std::vector<String> m_OutgoingMessagesQueue;
AsioConditionVariable m_OutgoingMessagesQueued;
AsioConditionVariable m_WriterDone;
bool m_ShuttingDown;
Atomic<bool> m_ShuttingDown;
boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;

JsonRpcConnection(const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io);
Expand Down

0 comments on commit 27a4a25

Please sign in to comment.