Skip to content

Commit

Permalink
agents: add metrics transform API to agent
Browse files Browse the repository at this point in the history
Allow users to register a callback that will transform the metrics into
a std::string that will be written to the endpoint.

In order to do this some of the internals needed to be rewritten so that
config() could be called from any thread. The internal usage of config()
has been renamed since it will always be called from the StatsDAgent
thread.
  • Loading branch information
trevnorris committed May 29, 2024
1 parent 0282a00 commit 7df5555
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 2 deletions.
63 changes: 61 additions & 2 deletions agents/statsd/src/statsd_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ using nlohmann::json;
using string_vector = std::vector<std::string>;
using udp_req_data_tup = std::tuple<string_vector*, bool>;

uv_once_t StatsDAgent::list_lock_flag_ = UV_ONCE_INIT;
nsuv::ns_mutex StatsDAgent::agent_list_lock_;
std::list<WeakStatsDAgent> StatsDAgent::agent_list_;


template <typename... Args>
inline void Debug(Args&&... args) {
per_process::Debug(DebugCategory::NSOLID_STATSD_AGENT,
Expand Down Expand Up @@ -455,6 +460,17 @@ SharedStatsDAgent StatsDAgent::Inst() {
return agent;
}

SharedStatsDAgent StatsDAgent::Create() {
SharedStatsDAgent agent(new StatsDAgent(), [](StatsDAgent* agent) {
delete agent;
});
// TODO(trevnorris): For now we'll assume that a StatsDAgent instance will
// be alive for the entire life of the process. So not going to worry about
// cleaning up the hooks that are added for this instance.
agent->start();
return agent;
}

StatsDAgent::StatsDAgent(): hooks_init_(false),
proc_metrics_(),
config_(default_agent_config),
Expand All @@ -468,6 +484,13 @@ StatsDAgent::StatsDAgent(): hooks_init_(false),
ASSERT_EQ(0, bucket_lock_.init(true));
ASSERT_EQ(0, tags_lock_.init(true));
ASSERT_EQ(0, stop_lock_.init(true));
uv_once(&list_lock_flag_, [](void) {
ASSERT_EQ(0, StatsDAgent::agent_list_lock_.init(true));
});
{
nsuv::ns_mutex::scoped_lock lock(StatsDAgent::agent_list_lock_);
StatsDAgent::agent_list_.push_back(weak_from_this());
}
}

StatsDAgent::~StatsDAgent() {
Expand Down Expand Up @@ -499,6 +522,8 @@ int StatsDAgent::start() {
// This method is not thread-safe and it's supposed to be called only from
// the NSolid thread, so it's safe to use this lock after having checked
// the status_ variable as there won't be any concurrent calls.
// TODO(trevnorris): This can't only be run from the NSolid thread anymore.
// Need to be able to call it from whatever thread created the instance.
uv_mutex_lock(&start_lock_);
r = thread_.create(run_, weak_from_this());
if (r == 0) {
Expand All @@ -512,6 +537,8 @@ int StatsDAgent::start() {
return r;
}

// TODO(trevnorris) this assumes it'll only be called by the global instance.
// It needs to change so that it can be called by every instance.
void StatsDAgent::do_start() {
uv_mutex_lock(&start_lock_);

Expand All @@ -535,8 +562,15 @@ void StatsDAgent::do_start() {

status(Initializing);

// TODO(trevnorris): This assumes that the configuration will be happening
// immediately. This will also need to iterate over the SharedEnvInst list
// and create a... Having each StatsDAgent inst add its own hooks seems like
// a bad idea. Have a global set of hooks...
if (hooks_init_ == false) {
ASSERT_EQ(0, OnConfigurationHook(config_agent_cb, weak_from_this()));
// Only add the OnConfigurationHook if this is the global instance.
if (Inst().get() == this) {
ASSERT_EQ(0, OnConfigurationHook(config_agent_cb, weak_from_this()));
}
ASSERT_EQ(0, ThreadAddedHook(env_creation_cb, weak_from_this()));
ASSERT_EQ(0, ThreadRemovedHook(env_deletion_cb, weak_from_this()));
hooks_init_ = true;
Expand Down Expand Up @@ -669,6 +703,11 @@ void StatsDAgent::metrics_msg_cb_(nsuv::ns_async*, WeakStatsDAgent agent_wp) {

ThreadMetrics::MetricsStor stor;
while (agent->metrics_msg_q_.dequeue(stor)) {
if (agent->t_transform_cb_ != nullptr) {
agent->connection_->write(agent->t_transform_cb_(stor));
continue;
}

json body = {
#define V(Type, CName, JSName, MType, Unit) \
{ #JSName, stor.CName },
Expand Down Expand Up @@ -711,7 +750,7 @@ void StatsDAgent::config_msg_cb_(nsuv::ns_async*, WeakStatsDAgent agent_wp) {
json config_msg;

while (agent->config_msg_q_.dequeue(config_msg)) {
r = agent->config(config_msg);
r = agent->config_cb_(config_msg);
if (agent->status_ != Unconfigured) {
ASSERT_EQ(0, agent->update_state_msg_.send());
}
Expand Down Expand Up @@ -757,6 +796,14 @@ int StatsDAgent::send(const std::vector<std::string>& sv, size_t len) {
return send_stats_msg_.send();
}

void StatsDAgent::set_pmetrics_transform_cb(pmetrics_transform_cb cb) {
p_transform_cb_ = cb;
}

void StatsDAgent::set_tmetrics_transform_cb(tmetrics_transform_cb cb) {
t_transform_cb_ = cb;
}

std::string StatsDAgent::status() const {
switch (status_) {
#define X(type, str) \
Expand Down Expand Up @@ -918,6 +965,12 @@ void StatsDAgent::config_tags() {
}

int StatsDAgent::config(const json& config) {
config_msg_q_.enqueue(config);
ASSERT_EQ(0, config_msg_.send());
return 0;
}

int StatsDAgent::config_cb_(const json& config) {
int ret;

json old_config = config_;
Expand Down Expand Up @@ -993,6 +1046,12 @@ void StatsDAgent::metrics_timer_cb_(nsuv::ns_timer*, WeakStatsDAgent agent_wp) {
// Get and send proc metrics
ASSERT_EQ(0, agent->proc_metrics_.Update());
proc_stor = agent->proc_metrics_.Get();

if (agent->p_transform_cb_ != nullptr) {
agent->connection_->write(agent->p_transform_cb_(proc_stor));
return;
}

json body = {
#define V(Type, CName, JSName, MType, Unit) \
{ #JSName, proc_stor.CName },
Expand Down
23 changes: 23 additions & 0 deletions agents/statsd/src/statsd_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ using WeakStatsDAgent = std::weak_ptr<StatsDAgent>;

class StatsDAgent: public std::enable_shared_from_this<StatsDAgent> {
public:
using pmetrics_transform_cb = std::vector<std::string>(*)(
ProcessMetrics::MetricsStor);
using tmetrics_transform_cb = std::vector<std::string>(*)(
ThreadMetrics::MetricsStor);

enum Status {
#define X(type, str) \
type,
Expand All @@ -204,6 +209,10 @@ class StatsDAgent: public std::enable_shared_from_this<StatsDAgent> {

static SharedStatsDAgent Inst();

// Need to run config() to start the metrics timer and receive runtime
// metrics. Problem is do_start() needs to run first...
static SharedStatsDAgent Create();

int setup_metrics_timer(uint64_t period);

// TODO(trevnorris): This is only meant to be used by the global instance,
Expand Down Expand Up @@ -234,6 +243,11 @@ class StatsDAgent: public std::enable_shared_from_this<StatsDAgent> {

int send(const std::vector<std::string>& sv, size_t len);

// Whatever string is returned by metrics_transform_cb will override the
// statsdBucket and statsdTags config.
void set_pmetrics_transform_cb(pmetrics_transform_cb cb);
void set_tmetrics_transform_cb(tmetrics_transform_cb cb);

std::string status() const;

std::string tags() {
Expand Down Expand Up @@ -284,6 +298,8 @@ class StatsDAgent: public std::enable_shared_from_this<StatsDAgent> {

std::string calculate_tags(const std::string& tpl) const;

int config_cb_(const nlohmann::json& message);

void config_bucket();

void config_tags();
Expand Down Expand Up @@ -339,11 +355,18 @@ class StatsDAgent: public std::enable_shared_from_this<StatsDAgent> {
nsuv::ns_async send_stats_msg_;
TSQueue<std::vector<std::string>> send_stats_msg_q_;

static uv_once_t list_lock_flag_;
static nsuv::ns_mutex agent_list_lock_;
static std::list<WeakStatsDAgent> agent_list_;

std::string bucket_;
nsuv::ns_mutex bucket_lock_;
std::string tags_;
nsuv::ns_mutex tags_lock_;

pmetrics_transform_cb p_transform_cb_ = nullptr;
tmetrics_transform_cb t_transform_cb_ = nullptr;

// For status testing
status_cb status_cb_;
};
Expand Down
75 changes: 75 additions & 0 deletions test/addons/nsolid-statsdagent/binding.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#include <node.h>
#include <v8.h>
#include <uv.h>
#include <nsolid.h>
#include <statsd_agent.h>

#include <assert.h>
#include <atomic>

using v8::FunctionCallbackInfo;
using v8::String;
using v8::Value;

using node::nsolid::ProcessMetrics;
using node::nsolid::ThreadMetrics;
using node::nsolid::statsd::SharedStatsDAgent;
using node::nsolid::statsd::StatsDAgent;

SharedStatsDAgent agent;

static std::vector<std::string> pm(ProcessMetrics::MetricsStor stor) {
std::vector<std::string> sv;
std::string ms = "";
#define V(Type, CName, JSName, MType, Unit) \
ms = "proc."; \
ms += #JSName ":"; \
ms += isnan(stor.CName) ? "null" : std::to_string(stor.CName); \
ms += "|g\n"; \
sv.push_back(ms);
NSOLID_PROCESS_METRICS_NUMBERS(V)
#undef V
return sv;
}

static std::vector<std::string> tm(ThreadMetrics::MetricsStor stor) {
std::vector<std::string> sv;
std::string ms = "";
#define V(Type, CName, JSName, MType, Unit) \
ms = "thread."; \
ms += #JSName ":"; \
ms += isnan(stor.CName) ? "null" : std::to_string(stor.CName); \
ms += "|g|#threadId:"; \
ms += std::to_string(stor.thread_id); \
ms += "\n"; \
sv.push_back(ms);
NSOLID_ENV_METRICS_NUMBERS(V)
#undef V
return sv;
}

static void SetAddHooks(const FunctionCallbackInfo<Value>& args) {
assert(args[0]->IsString());
const String::Utf8Value url(args.GetIsolate(), args[0]);
agent = node::nsolid::statsd::StatsDAgent::Create();
agent->config({{"statsd", *url},
{"interval", 3000},
{"pauseMetrics", false}});
agent->set_pmetrics_transform_cb(pm);
agent->set_tmetrics_transform_cb(tm);
}

static void GetStatus(const FunctionCallbackInfo<Value>& args) {
if (!agent)
return args.GetReturnValue().SetNull();
args.GetReturnValue().Set(String::NewFromOneByte(
args.GetIsolate(),
reinterpret_cast<const uint8_t*>(agent->status().c_str()),
v8::NewStringType::kNormal).ToLocalChecked());
}


NODE_MODULE_INIT(/* exports, module, context */) {
NODE_SET_METHOD(exports, "setAddHooks", SetAddHooks);
NODE_SET_METHOD(exports, "getStatus", GetStatus);
}
22 changes: 22 additions & 0 deletions test/addons/nsolid-statsdagent/binding.gyp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
'targets': [{
'target_name': 'binding',
'sources': [ 'binding.cc' ],
'includes': ['../common.gypi'],
'defines': [ 'NODE_WANT_INTERNALS=1' ],
'include_dirs': [
'../../../src/',
'../../../deps/nsuv/include/',
'../../../agents/statsd/src/',
],
'target_defaults': {
'default_configuration': 'Release',
'configurations': {
'Debug': {
'defines': [ 'DEBUG', '_DEBUG' ],
'cflags': [ '-g', '-O0', '-fstandalone-debug' ],
}
},
},
}],
}
Loading

0 comments on commit 7df5555

Please sign in to comment.