From f08389e928759587f31904b7c364ef0a11bebe48 Mon Sep 17 00:00:00 2001 From: Trevor Norris Date: Tue, 23 Apr 2024 14:08:17 -0600 Subject: [PATCH] agents: add metrics transform API to agent Allow users to register a callback that will transform the metrics into a std::string that will be written to the endpoint. In order to do this some of the internals needed to be rewritten so that config() could be called from any thread. The internal usage of config() has been renamed since it will always be called from the StatsDAgent thread. --- agents/statsd/src/statsd_agent.cc | 55 ++++++- agents/statsd/src/statsd_agent.h | 23 +++ test/addons/nsolid-statsdagent/binding.cc | 75 ++++++++++ test/addons/nsolid-statsdagent/binding.gyp | 22 +++ .../nsolid-statsdagent/nsolid-statsdagent.js | 138 ++++++++++++++++++ 5 files changed, 311 insertions(+), 2 deletions(-) create mode 100644 test/addons/nsolid-statsdagent/binding.cc create mode 100644 test/addons/nsolid-statsdagent/binding.gyp create mode 100644 test/addons/nsolid-statsdagent/nsolid-statsdagent.js diff --git a/agents/statsd/src/statsd_agent.cc b/agents/statsd/src/statsd_agent.cc index 8f61391df1..94b548717c 100644 --- a/agents/statsd/src/statsd_agent.cc +++ b/agents/statsd/src/statsd_agent.cc @@ -22,6 +22,11 @@ using nlohmann::json; using string_vector = std::vector; using udp_req_data_tup = std::tuple; +uv_once_t StatsDAgent::list_lock_flag_ = UV_ONCE_INIT; +nsuv::ns_mutex StatsDAgent::agent_list_lock_; +std::list StatsDAgent::agent_list_; + + template inline void Debug(Args&&... args) { per_process::Debug(DebugCategory::NSOLID_STATSD_AGENT, @@ -455,6 +460,17 @@ SharedStatsDAgent StatsDAgent::Inst() { return agent; } +SharedStatsDAgent StatsDAgent::Create() { + SharedStatsDAgent agent(new StatsDAgent(), [](StatsDAgent* agent) { + delete agent; + }); + // TODO(trevnorris): For now we'll assume that a StatsDAgent instance will + // be alive for the entire life of the process. So not going to worry about + // cleaning up the hooks that are added for this instance. + agent->start(); + return agent; +} + StatsDAgent::StatsDAgent(): hooks_init_(false), proc_metrics_(), config_(default_agent_config), @@ -468,6 +484,13 @@ StatsDAgent::StatsDAgent(): hooks_init_(false), ASSERT_EQ(0, bucket_lock_.init(true)); ASSERT_EQ(0, tags_lock_.init(true)); ASSERT_EQ(0, stop_lock_.init(true)); + uv_once(&list_lock_flag_, [](void) { + ASSERT_EQ(0, StatsDAgent::agent_list_lock_.init(true)); + }); + { + nsuv::ns_mutex::scoped_lock lock(StatsDAgent::agent_list_lock_); + StatsDAgent::agent_list_.push_back(weak_from_this()); + } } StatsDAgent::~StatsDAgent() { @@ -536,7 +559,10 @@ void StatsDAgent::do_start() { status(Initializing); if (hooks_init_ == false) { - ASSERT_EQ(0, OnConfigurationHook(config_agent_cb, weak_from_this())); + // Only add the OnConfigurationHook if this is the global instance. + if (Inst().get() == this) { + ASSERT_EQ(0, OnConfigurationHook(config_agent_cb, weak_from_this())); + } ASSERT_EQ(0, ThreadAddedHook(env_creation_cb, weak_from_this())); ASSERT_EQ(0, ThreadRemovedHook(env_deletion_cb, weak_from_this())); hooks_init_ = true; @@ -669,6 +695,11 @@ void StatsDAgent::metrics_msg_cb_(nsuv::ns_async*, WeakStatsDAgent agent_wp) { ThreadMetrics::MetricsStor stor; while (agent->metrics_msg_q_.dequeue(stor)) { + if (agent->t_transform_cb_ != nullptr) { + agent->connection_->write(agent->t_transform_cb_(stor)); + continue; + } + json body = { #define V(Type, CName, JSName, MType, Unit) \ { #JSName, stor.CName }, @@ -711,7 +742,7 @@ void StatsDAgent::config_msg_cb_(nsuv::ns_async*, WeakStatsDAgent agent_wp) { json config_msg; while (agent->config_msg_q_.dequeue(config_msg)) { - r = agent->config(config_msg); + r = agent->config_cb_(config_msg); if (agent->status_ != Unconfigured) { ASSERT_EQ(0, agent->update_state_msg_.send()); } @@ -757,6 +788,14 @@ int StatsDAgent::send(const std::vector& sv, size_t len) { return send_stats_msg_.send(); } +void StatsDAgent::set_pmetrics_transform_cb(pmetrics_transform_cb cb) { + p_transform_cb_ = cb; +} + +void StatsDAgent::set_tmetrics_transform_cb(tmetrics_transform_cb cb) { + t_transform_cb_ = cb; +} + std::string StatsDAgent::status() const { switch (status_) { #define X(type, str) \ @@ -918,6 +957,12 @@ void StatsDAgent::config_tags() { } int StatsDAgent::config(const json& config) { + config_msg_q_.enqueue(config); + ASSERT_EQ(0, config_msg_.send()); + return 0; +} + +int StatsDAgent::config_cb_(const json& config) { int ret; json old_config = config_; @@ -993,6 +1038,12 @@ void StatsDAgent::metrics_timer_cb_(nsuv::ns_timer*, WeakStatsDAgent agent_wp) { // Get and send proc metrics ASSERT_EQ(0, agent->proc_metrics_.Update()); proc_stor = agent->proc_metrics_.Get(); + + if (agent->p_transform_cb_ != nullptr) { + agent->connection_->write(agent->p_transform_cb_(proc_stor)); + return; + } + json body = { #define V(Type, CName, JSName, MType, Unit) \ { #JSName, proc_stor.CName }, diff --git a/agents/statsd/src/statsd_agent.h b/agents/statsd/src/statsd_agent.h index 27ce26a9b7..224929b84d 100644 --- a/agents/statsd/src/statsd_agent.h +++ b/agents/statsd/src/statsd_agent.h @@ -195,6 +195,11 @@ using WeakStatsDAgent = std::weak_ptr; class StatsDAgent: public std::enable_shared_from_this { public: + using pmetrics_transform_cb = std::vector(*)( + ProcessMetrics::MetricsStor); + using tmetrics_transform_cb = std::vector(*)( + ThreadMetrics::MetricsStor); + enum Status { #define X(type, str) \ type, @@ -204,6 +209,10 @@ class StatsDAgent: public std::enable_shared_from_this { static SharedStatsDAgent Inst(); + // Need to run config() to start the metrics timer and receive runtime + // metrics. Problem is do_start() needs to run first... + static SharedStatsDAgent Create(); + int setup_metrics_timer(uint64_t period); // TODO(trevnorris): This is only meant to be used by the global instance, @@ -234,6 +243,11 @@ class StatsDAgent: public std::enable_shared_from_this { int send(const std::vector& sv, size_t len); + // Whatever string is returned by metrics_transform_cb will override the + // statsdBucket and statsdTags config. + void set_pmetrics_transform_cb(pmetrics_transform_cb cb); + void set_tmetrics_transform_cb(tmetrics_transform_cb cb); + std::string status() const; std::string tags() { @@ -284,6 +298,8 @@ class StatsDAgent: public std::enable_shared_from_this { std::string calculate_tags(const std::string& tpl) const; + int config_cb_(const nlohmann::json& message); + void config_bucket(); void config_tags(); @@ -339,11 +355,18 @@ class StatsDAgent: public std::enable_shared_from_this { nsuv::ns_async send_stats_msg_; TSQueue> send_stats_msg_q_; + static uv_once_t list_lock_flag_; + static nsuv::ns_mutex agent_list_lock_; + static std::list agent_list_; + std::string bucket_; nsuv::ns_mutex bucket_lock_; std::string tags_; nsuv::ns_mutex tags_lock_; + pmetrics_transform_cb p_transform_cb_ = nullptr; + tmetrics_transform_cb t_transform_cb_ = nullptr; + // For status testing status_cb status_cb_; }; diff --git a/test/addons/nsolid-statsdagent/binding.cc b/test/addons/nsolid-statsdagent/binding.cc new file mode 100644 index 0000000000..c224d4d8a0 --- /dev/null +++ b/test/addons/nsolid-statsdagent/binding.cc @@ -0,0 +1,75 @@ +#include +#include +#include +#include +#include + +#include +#include + +using v8::FunctionCallbackInfo; +using v8::String; +using v8::Value; + +using node::nsolid::ProcessMetrics; +using node::nsolid::ThreadMetrics; +using node::nsolid::statsd::SharedStatsDAgent; +using node::nsolid::statsd::StatsDAgent; + +SharedStatsDAgent agent; + +static std::vector pm(ProcessMetrics::MetricsStor stor) { + std::vector sv; + std::string ms = ""; +#define V(Type, CName, JSName, MType, Unit) \ + ms = "proc."; \ + ms += #JSName ":"; \ + ms += isnan(stor.CName) ? "null" : std::to_string(stor.CName); \ + ms += "|g\n"; \ + sv.push_back(ms); + NSOLID_PROCESS_METRICS_NUMBERS(V) +#undef V + return sv; +} + +static std::vector tm(ThreadMetrics::MetricsStor stor) { + std::vector sv; + std::string ms = ""; +#define V(Type, CName, JSName, MType, Unit) \ + ms = "thread."; \ + ms += #JSName ":"; \ + ms += isnan(stor.CName) ? "null" : std::to_string(stor.CName); \ + ms += "|g|#threadId:"; \ + ms += std::to_string(stor.thread_id); \ + ms += "\n"; \ + sv.push_back(ms); + NSOLID_ENV_METRICS_NUMBERS(V) +#undef V + return sv; +} + +static void SetAddHooks(const FunctionCallbackInfo& args) { + assert(args[0]->IsString()); + const String::Utf8Value url(args.GetIsolate(), args[0]); + agent = node::nsolid::statsd::StatsDAgent::Create(); + agent->config({{"statsd", *url}, + {"interval", 3000}, + {"pauseMetrics", false}}); + agent->set_pmetrics_transform_cb(pm); + agent->set_tmetrics_transform_cb(tm); +} + +static void GetStatus(const FunctionCallbackInfo& args) { + if (!agent) + return args.GetReturnValue().SetNull(); + args.GetReturnValue().Set(String::NewFromOneByte( + args.GetIsolate(), + reinterpret_cast(agent->status().c_str()), + v8::NewStringType::kNormal).ToLocalChecked()); +} + + +NODE_MODULE_INIT(/* exports, module, context */) { + NODE_SET_METHOD(exports, "setAddHooks", SetAddHooks); + NODE_SET_METHOD(exports, "getStatus", GetStatus); +} diff --git a/test/addons/nsolid-statsdagent/binding.gyp b/test/addons/nsolid-statsdagent/binding.gyp new file mode 100644 index 0000000000..9b637a7cf8 --- /dev/null +++ b/test/addons/nsolid-statsdagent/binding.gyp @@ -0,0 +1,22 @@ +{ + 'targets': [{ + 'target_name': 'binding', + 'sources': [ 'binding.cc' ], + 'includes': ['../common.gypi'], + 'defines': [ 'NODE_WANT_INTERNALS=1' ], + 'include_dirs': [ + '../../../src/', + '../../../deps/nsuv/include/', + '../../../agents/statsd/src/', + ], + 'target_defaults': { + 'default_configuration': 'Release', + 'configurations': { + 'Debug': { + 'defines': [ 'DEBUG', '_DEBUG' ], + 'cflags': [ '-g', '-O0', '-fstandalone-debug' ], + } + }, + }, + }], +} diff --git a/test/addons/nsolid-statsdagent/nsolid-statsdagent.js b/test/addons/nsolid-statsdagent/nsolid-statsdagent.js new file mode 100644 index 0000000000..d85e4db690 --- /dev/null +++ b/test/addons/nsolid-statsdagent/nsolid-statsdagent.js @@ -0,0 +1,138 @@ +'use strict'; + +const { buildType, mustCall, mustCallAtLeast, skip } = require('../../common'); +const assert = require('assert'); +const bindingPath = require.resolve(`./build/${buildType}/binding`); +const binding = require(bindingPath); +const net = require('net'); +const { Worker, isMainThread } = require('worker_threads'); + +let expectedProcMetrics = [ + 'timestamp', + 'uptime', + 'systemUptime', + 'freeMem', + 'blockInputOpCount', + 'blockOutputOpCount', + 'ctxSwitchInvoluntaryCount', + 'ctxSwitchVoluntaryCount', + 'ipcReceivedCount', + 'ipcSentCount', + 'pageFaultHardCount', + 'pageFaultSoftCount', + 'signalCount', + 'swapCount', + 'rss', + 'load1m', + 'load5m', + 'load15m', + 'cpuUserPercent', + 'cpuSystemPercent', + 'cpuPercent', +]; + +let expectedThreadMetrics = [ + 'threadId', + 'timestamp', + 'activeHandles', + 'activeRequests', + 'heapTotal', + 'totalHeapSizeExecutable', + 'totalPhysicalSize', + 'totalAvailableSize', + 'heapUsed', + 'heapSizeLimit', + 'mallocedMemory', + 'externalMem', + 'peakMallocedMemory', + 'numberOfNativeContexts', + 'numberOfDetachedContexts', + 'gcCount', + 'gcForcedCount', + 'gcFullCount', + 'gcMajorCount', + 'dnsCount', + 'httpClientAbortCount', + 'httpClientCount', + 'httpServerAbortCount', + 'httpServerCount', + 'loopIdleTime', + 'loopIterations', + 'loopIterWithEvents', + 'eventsProcessed', + 'eventsWaiting', + 'providerDelay', + 'processingDelay', + 'loopTotalCount', + 'pipeServerCreatedCount', + 'pipeServerDestroyedCount', + 'pipeSocketCreatedCount', + 'pipeSocketDestroyedCount', + 'tcpServerCreatedCount', + 'tcpServerDestroyedCount', + 'tcpSocketCreatedCount', + 'tcpSocketDestroyedCount', + 'udpSocketCreatedCount', + 'udpSocketDestroyedCount', + 'promiseCreatedCount', + 'promiseResolvedCount', + 'fsHandlesOpenedCount', + 'fsHandlesClosedCount', + 'gcDurUs99Ptile', + 'gcDurUsMedian', + 'dns99Ptile', + 'dnsMedian', + 'httpClient99Ptile', + 'httpClientMedian', + 'httpServer99Ptile', + 'httpServerMedian', + 'loopUtilization', + 'res5s', + 'res1m', + 'res5m', + 'res15m', + 'loopAvgTasks', + 'loopEstimatedLag', + 'loopIdlePercent', +]; + +if (process.env.NSOLID_COMMAND) + skip('required to run without the Console'); + +if (!isMainThread) + skip('Test must be run as the main thread'); + +const server = net.createServer(mustCall((socket) => { + assert.strictEqual(binding.getStatus(), 'ready'); + socket.on('data', mustCallAtLeast((d) => { + const lines = d.toString() + .split('\n') + .filter(e => e.length > 0) + .map(e => e.split('|')); + for (let e of lines) { + const s = e[0].split('.'); + const m = s[1].split(':')[0]; + if (s[0] === 'proc') { + assert.ok(expectedProcMetrics.includes(m), m); + expectedProcMetrics = expectedProcMetrics.filter((x) => x !== m); + } else if (s[0] === 'thread') { + assert.ok(expectedThreadMetrics.includes(m), m); + expectedThreadMetrics = expectedThreadMetrics.filter((x) => x !== m); + } else { + assert.fail('unreachable'); + } + } + // All metrics have been found. Test is complete. + if (expectedThreadMetrics.length === 0 && + expectedProcMetrics.length === 0) { + process.exit(); + } + }, 2)); +})); + +server.listen(0, '127.0.0.1', mustCall(() => { + binding.setAddHooks(`tcp://127.0.0.1:${server.address().port}`); + assert.strictEqual(binding.getStatus(), 'initializing'); +})); + +assert.strictEqual(binding.getStatus(), null);