diff --git a/lib/remote/httpserverconnection.cpp b/lib/remote/httpserverconnection.cpp index b9a8ab814d..8b3a547858 100644 --- a/lib/remote/httpserverconnection.cpp +++ b/lib/remote/httpserverconnection.cpp @@ -428,13 +428,17 @@ bool ProcessRequest( boost::beast::http::response& response, HttpServerConnection& server, bool& hasStartedStreaming, + std::chrono::steady_clock::duration& cpuBoundWorkTime, boost::asio::yield_context& yc ) { namespace http = boost::beast::http; try { + // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads. + auto start (std::chrono::steady_clock::now()); CpuBoundWork handlingRequest (yc); + cpuBoundWorkTime = std::chrono::steady_clock::now() - start; HttpHandler::ProcessRequest(stream, authenticatedUser, request, response, yc, server); } catch (const std::exception& ex) { @@ -525,9 +529,14 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc) << ", user: " << (authenticatedUser ? authenticatedUser->GetName() : "") << ", agent: " << request[http::field::user_agent]; //operator[] - Returns the value for a field, or "" if it does not exist. - Defer addRespCode ([&response, start, &logMsg]() { - logMsg << ", status: " << response.result() << ") took " - << ch::duration_cast(ch::steady_clock::now() - start).count() << "ms."; + ch::steady_clock::duration cpuBoundWorkTime(0); + Defer addRespCode ([&response, start, &logMsg, &cpuBoundWorkTime]() { + logMsg << ", status: " << response.result() << ")"; + if (cpuBoundWorkTime >= ch::seconds(1)) { + logMsg << " waited " << ch::duration_cast(cpuBoundWorkTime).count() << "ms on semaphore and"; + } + + logMsg << " took total " << ch::duration_cast(ch::steady_clock::now() - start).count() << "ms."; }); if (!HandleAccessControl(*m_Stream, request, response, yc)) { @@ -548,7 +557,7 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc) m_Seen = std::numeric_limits::max(); - if (!ProcessRequest(*m_Stream, request, authenticatedUser, response, *this, m_HasStartedStreaming, yc)) { + if (!ProcessRequest(*m_Stream, request, authenticatedUser, response, *this, m_HasStartedStreaming, cpuBoundWorkTime, yc)) { break; } diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index cd684af6ea..f298bb0753 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -60,13 +60,19 @@ void JsonRpcConnection::Start() void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) { + namespace ch = std::chrono; + + auto toMilliseconds ([](ch::steady_clock::duration d) { + return ch::duration_cast(d).count(); + }); + m_Stream->next_layer().SetSeen(&m_Seen); while (!m_ShuttingDown) { - String message; + String jsonString; try { - message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024); + jsonString = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024); } catch (const std::exception& ex) { Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection") << "Error while reading JSON-RPC message for identity '" << m_Identity @@ -76,17 +82,50 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) } m_Seen = Utility::GetTime(); + if (m_Endpoint) { + m_Endpoint->AddMessageReceived(jsonString.GetLength()); + } + + String rpcMethod("UNKNOWN"); + ch::steady_clock::duration cpuBoundDuration(0); + auto start (ch::steady_clock::now()); try { CpuBoundWork handleMessage (yc); + // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads. + cpuBoundDuration = ch::steady_clock::now() - start; + + Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString); + if (String method = message->Get("method"); !method.IsEmpty()) { + rpcMethod = std::move(method); + } + MessageHandler(message); l_TaskStats.InsertValue(Utility::GetTime(), 1); + + auto total = ch::steady_clock::now() - start; + + Log msg(total >= ch::seconds(5) ? LogWarning : LogDebug, "JsonRpcConnection"); + msg << "Processed JSON-RPC '" << rpcMethod << "' message for identity '" << m_Identity + << "' (took total " << toMilliseconds(total) << "ms"; + + if (cpuBoundDuration >= ch::seconds(1)) { + msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore"; + } + msg << ")."; } catch (const std::exception& ex) { - Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection") - << "Error while processing JSON-RPC message for identity '" << m_Identity - << "': " << DiagnosticInformation(ex); + auto total = ch::steady_clock::now() - start; + + Log msg(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection"); + msg << "Error while processing JSON-RPC '" << rpcMethod << "' message for identity '" + << m_Identity << "' (took total " << toMilliseconds(total) << "ms"; + + if (cpuBoundDuration >= ch::seconds(1)) { + msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore"; + } + msg << "): " << DiagnosticInformation(ex); break; } @@ -259,10 +298,19 @@ void JsonRpcConnection::Disconnect() } } -void JsonRpcConnection::MessageHandler(const String& jsonString) +/** + * Route the provided message to its corresponding handler (if any). + * + * This will first verify the timestamp of that RPC message (if any) and subsequently, rejects any message whose + * timestamp is less than the remote log position of the client Endpoint; otherwise, the endpoint's remote log + * position is updated to that timestamp. It is not expected to happen, but any message lacking an RPC method or + * referring to a non-existent one is also discarded. Afterward, the RPC handler is then called for that message + * and sends it's result back to the sender if the message contains an ID. + * + * @param message The RPC message you want to process. +*/ +void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message) { - Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString); - if (m_Endpoint && message->Contains("ts")) { double ts = message->Get("ts"); @@ -281,8 +329,6 @@ void JsonRpcConnection::MessageHandler(const String& jsonString) origin->FromZone = m_Endpoint->GetZone(); else origin->FromZone = Zone::GetByName(message->Get("originZone")); - - m_Endpoint->AddMessageReceived(jsonString.GetLength()); } Value vmethod; diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index bbe8588a18..ef83dce1b4 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -88,7 +88,8 @@ class JsonRpcConnection final : public Object void CheckLiveness(boost::asio::yield_context yc); bool ProcessMessage(); - void MessageHandler(const String& jsonString); + + void MessageHandler(const Dictionary::Ptr& message); void CertificateRequestResponseHandler(const Dictionary::Ptr& message);