Skip to content

Commit

Permalink
src: make nsolid::ThreadMetrics safer
Browse files Browse the repository at this point in the history
The current ThreadMetrics API is not easy to use safely because the
`ThreadMetrics` instance needs to be alive while there is a pending
`Update()` callback to be called. Change the API to use shared_ptr, as
we define a new type `SharedThreadMetrics` as
`std::shared_ptr<ThreadMetrics>`. Migrate the agents to use the new API
while deprecating the current one.
  • Loading branch information
santigimeno committed Nov 29, 2023
1 parent f16e228 commit efca53f
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 80 deletions.
13 changes: 6 additions & 7 deletions agents/otlp/src/otlp_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {
for (auto& item : agent->env_metrics_map_) {
// Retrieve metrics from the Metrics API. Ignore any return error since
// there's nothing to be done.
item.second.metrics_.Update(thr_metrics_cb_, agent);
item.second.metrics_->Update(thr_metrics_cb_, agent);
}
}

Expand All @@ -418,12 +418,11 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {

std::vector<std::pair<ThreadMetricsStor,
ThreadMetricsStor>> thr_metrics_vector;
ThreadMetrics* m;
while (agent->thr_metrics_msg_q_.dequeue(&m)) {
auto it = agent->env_metrics_map_.find(m->thread_id());
ThreadMetricsStor stor;
while (agent->thr_metrics_msg_q_.dequeue(stor)) {
auto it = agent->env_metrics_map_.find(stor.thread_id);
if (it != agent->env_metrics_map_.end()) {
auto& metrics = it->second;
ThreadMetricsStor stor = m->Get();
thr_metrics_vector.emplace_back(stor, metrics.prev_);
metrics.prev_ = stor;
}
Expand All @@ -435,14 +434,14 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {
}


/*static*/void OTLPAgent::thr_metrics_cb_(ThreadMetrics* metrics,
/*static*/void OTLPAgent::thr_metrics_cb_(SharedThreadMetrics metrics,
OTLPAgent* agent) {
nsuv::ns_rwlock::scoped_rdlock lock(agent->exit_lock_);
if (!is_running_) {
return;
}

agent->thr_metrics_msg_q_.enqueue(metrics);
agent->thr_metrics_msg_q_.enqueue(metrics->Get());
ASSERT_EQ(0, uv_async_send(&agent->metrics_msg_));
}

Expand Down
11 changes: 6 additions & 5 deletions agents/otlp/src/otlp_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ namespace nsolid {
namespace otlp {

struct JSThreadMetrics {
ThreadMetrics metrics_;
SharedThreadMetrics metrics_;
ThreadMetrics::MetricsStor prev_;
explicit JSThreadMetrics(SharedEnvInst envinst): metrics_(envinst),
prev_() {}
explicit JSThreadMetrics(SharedEnvInst envinst)
: metrics_(ThreadMetrics::Create(envinst)),
prev_() {}
};

class OTLPAgent {
Expand Down Expand Up @@ -73,7 +74,7 @@ class OTLPAgent {

static void metrics_msg_cb_(nsuv::ns_async*, OTLPAgent* agent);

static void thr_metrics_cb_(ThreadMetrics*, OTLPAgent*);
static void thr_metrics_cb_(SharedThreadMetrics, OTLPAgent*);

void do_start();

Expand Down Expand Up @@ -134,7 +135,7 @@ class OTLPAgent {
ProcessMetrics::MetricsStor proc_prev_stor_;
std::map<uint64_t, JSThreadMetrics> env_metrics_map_;
nsuv::ns_async metrics_msg_;
TSQueue<ThreadMetrics*> thr_metrics_msg_q_;
TSQueue<ThreadMetrics::MetricsStor> thr_metrics_msg_q_;
nsuv::ns_timer metrics_timer_;
std::unique_ptr<MetricsExporter> metrics_exporter_;

Expand Down
21 changes: 9 additions & 12 deletions agents/statsd/src/statsd_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,7 @@ void StatsDAgent::env_msg_cb(nsuv::ns_async*, StatsDAgent* agent) {
bool creation = std::get<1>(tup);
if (creation) {
auto pair = agent->env_metrics_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(GetThreadId(envinst)),
std::forward_as_tuple(envinst));
GetThreadId(envinst), ThreadMetrics::Create(envinst));
ASSERT(pair.second);
} else {
ASSERT_EQ(1, agent->env_metrics_map_.erase(GetThreadId(envinst)));
Expand All @@ -476,18 +474,16 @@ void StatsDAgent::shutdown_cb_(nsuv::ns_async*, StatsDAgent* agent) {
}

void StatsDAgent::metrics_msg_cb_(nsuv::ns_async*, StatsDAgent* agent) {
ThreadMetrics* m;
while (agent->metrics_msg_q_.dequeue(&m)) {
uint64_t thread_id = m->thread_id();
ThreadMetrics::MetricsStor stor = m->Get();
ThreadMetrics::MetricsStor stor;
while (agent->metrics_msg_q_.dequeue(stor)) {
json body = {
#define V(Type, CName, JSName, MType) \
{ #JSName, stor.CName },
NSOLID_ENV_METRICS_NUMBERS(V)
#undef V
};

agent->send_metrics(body, thread_id, stor.thread_name.c_str());
agent->send_metrics(body, stor.thread_id, stor.thread_name.c_str());
}
}

Expand Down Expand Up @@ -730,10 +726,10 @@ void StatsDAgent::metrics_timer_cb_(nsuv::ns_timer*, StatsDAgent* agent) {
for (auto it = agent->env_metrics_map_.begin();
it != agent->env_metrics_map_.end();
++it) {
ThreadMetrics& e_metrics = std::get<1>(*it);
SharedThreadMetrics& e_metrics = std::get<1>(*it);
// Retrieve metrics from the Metrics API. Ignore any return error since
// there's nothing to be done.
e_metrics.Update(env_metrics_cb_, agent);
e_metrics->Update(env_metrics_cb_, agent);
}

// Get and send proc metrics
Expand All @@ -749,13 +745,14 @@ void StatsDAgent::metrics_timer_cb_(nsuv::ns_timer*, StatsDAgent* agent) {
agent->send_metrics(body);
}

void StatsDAgent::env_metrics_cb_(ThreadMetrics* metrics, StatsDAgent* agent) {
void StatsDAgent::env_metrics_cb_(SharedThreadMetrics metrics,
StatsDAgent* agent) {
// Check if the agent is already closing
if (agent->metrics_msg_.is_closing()) {
return;
}

agent->metrics_msg_q_.enqueue(metrics);
agent->metrics_msg_q_.enqueue(metrics->Get());
ASSERT_EQ(0, agent->metrics_msg_.send());
}

Expand Down
6 changes: 3 additions & 3 deletions agents/statsd/src/statsd_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class StatsDAgent {

static void send_stats_msg_cb_(nsuv::ns_async*, StatsDAgent*);

static void env_metrics_cb_(ThreadMetrics*, StatsDAgent*);
static void env_metrics_cb_(SharedThreadMetrics, StatsDAgent*);

static void status_command_cb_(SharedEnvInst, StatsDAgent*);

Expand Down Expand Up @@ -258,11 +258,11 @@ class StatsDAgent {
nsuv::ns_timer retry_timer_;

// For the Metrics API
std::map<uint64_t, ThreadMetrics> env_metrics_map_;
std::map<uint64_t, SharedThreadMetrics> env_metrics_map_;
ProcessMetrics proc_metrics_;
uint64_t metrics_period_;
nsuv::ns_async metrics_msg_;
TSQueue<ThreadMetrics*> metrics_msg_q_;
TSQueue<ThreadMetrics::MetricsStor> metrics_msg_q_;
nsuv::ns_timer metrics_timer_;

// For the Configuration API
Expand Down
19 changes: 9 additions & 10 deletions agents/zmq/src/zmq_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -881,18 +881,17 @@ void ZmqAgent::got_trace(Tracer* tracer,
}


void ZmqAgent::got_env_metrics(ThreadMetrics* t_metrics) {
void ZmqAgent::got_env_metrics(const ThreadMetrics::MetricsStor& stor) {
ProcessMetrics::MetricsStor proc_stor;

auto iter = env_metrics_map_.find(t_metrics->thread_id());
auto iter = env_metrics_map_.find(stor.thread_id);
if (iter != env_metrics_map_.end()) {
ASSERT_PTR_EQ(t_metrics, &iter->second.t_metrics);
iter->second.fetching = false;
// Store into the completed_env_metrics_ vector so we have easy access once
// the metrics from all the threads are retrieved. Make a copy of
// ThreadMetrics to make sure the metrics are still valid even if the
// thread is gone.
completed_env_metrics_.push_back(t_metrics->Get());
completed_env_metrics_.push_back(stor);
}

bool done = true;
Expand Down Expand Up @@ -951,9 +950,9 @@ NSOLID_ENV_METRICS_STRINGS(V)


void ZmqAgent::metrics_msg_cb(nsuv::ns_async*, ZmqAgent* agent) {
ThreadMetrics* metrics;
while (agent->metrics_msg_q_.dequeue(&metrics)) {
agent->got_env_metrics(metrics);
ThreadMetrics::MetricsStor stor;
while (agent->metrics_msg_q_.dequeue(stor)) {
agent->got_env_metrics(stor);
}
}

Expand Down Expand Up @@ -1641,19 +1640,19 @@ void ZmqAgent::metrics_timer_cb(nsuv::ns_timer*, ZmqAgent* agent) {
stor.fetching = false;
// Retrieve metrics from the Metrics API. Ignore any return error since
// there's nothing to be done.
int r = stor.t_metrics.Update(env_metrics_cb, agent);
int r = stor.t_metrics->Update(env_metrics_cb, agent);
if (r == 0)
stor.fetching = true;
}
}

void ZmqAgent::env_metrics_cb(ThreadMetrics* metrics, ZmqAgent* agent) {
void ZmqAgent::env_metrics_cb(SharedThreadMetrics metrics, ZmqAgent* agent) {
// Check if the agent is already delete or it's closing
if (!is_running || agent->metrics_msg_.is_closing()) {
return;
}

agent->metrics_msg_q_.enqueue(metrics);
agent->metrics_msg_q_.enqueue(metrics->Get());
ASSERT_EQ(0, agent->metrics_msg_.send());
}

Expand Down
14 changes: 8 additions & 6 deletions agents/zmq/src/zmq_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,12 @@ class ZmqAgent {
};

struct EnvMetricsStor {
ThreadMetrics t_metrics;
SharedThreadMetrics t_metrics;
bool fetching;
explicit EnvMetricsStor(SharedEnvInst envinst, bool f): t_metrics(envinst),
fetching(f) {}
NSOLID_DELETE_DEFAULT_CONSTRUCTORS(EnvMetricsStor)
explicit EnvMetricsStor(SharedEnvInst envinst, bool f)
: t_metrics(ThreadMetrics::Create(envinst)),
fetching(f) {}
};

struct ZmqCommandError {
Expand Down Expand Up @@ -478,7 +480,7 @@ class ZmqAgent {

static void update_state_msg_cb(nsuv::ns_async*, ZmqAgent*);

static void env_metrics_cb(ThreadMetrics*, ZmqAgent*);
static void env_metrics_cb(SharedThreadMetrics, ZmqAgent*);

static void status_command_cb(SharedEnvInst, ZmqAgent*);

Expand Down Expand Up @@ -555,7 +557,7 @@ class ZmqAgent {
const std::pair<bool, std::string>&,
const std::pair<bool, std::string>&);

void got_env_metrics(ThreadMetrics* t_metrics);
void got_env_metrics(const ThreadMetrics::MetricsStor& stor);

void got_heap_snapshot(int status,
const std::string& snaphost,
Expand Down Expand Up @@ -657,7 +659,7 @@ class ZmqAgent {
ProcessMetrics proc_metrics_;
uint64_t metrics_period_;
nsuv::ns_async metrics_msg_;
TSQueue<ThreadMetrics*> metrics_msg_q_;
TSQueue<ThreadMetrics::MetricsStor> metrics_msg_q_;
nsuv::ns_timer metrics_timer_;
std::string cached_metrics_;
std::set<std::string> pending_metrics_reqs_;
Expand Down
32 changes: 19 additions & 13 deletions src/nsolid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ ThreadMetrics::ThreadMetrics(uint64_t thread_id)
}


SharedThreadMetrics ThreadMetrics::Create(SharedEnvInst envinst) {
return SharedThreadMetrics(new ThreadMetrics(envinst));
}


std::string ThreadMetrics::toJSON() {
MetricsStor dup;
std::string metrics_string;
Expand Down Expand Up @@ -342,23 +347,24 @@ int ThreadMetrics::Update(v8::Isolate* isolate) {

int ThreadMetrics::get_thread_metrics_() {
// Might need to fire myself for using nested lambdas.
void (*cb)(SharedEnvInst, ThreadMetrics*) =
auto cb = [](SharedEnvInst ei, SharedThreadMetrics tm_sp) {
// This runs from the worker thread.
[](SharedEnvInst ei, ThreadMetrics* tm) {
void (*ret_proxy)(ThreadMetrics*) =
[](ThreadMetrics* tm) {
tm->proxy_(tm);
};

uv_mutex_lock(&tm->stor_lock_);
ei->GetThreadMetrics(&tm->stor_);
uv_mutex_unlock(&tm->stor_lock_);

auto ret_proxy = [](SharedThreadMetrics tm_sp) {
// This runs from the NSolid thread.
QueueCallback(ret_proxy, tm);
tm_sp->proxy_(tm_sp);
};

return RunCommand(GetEnvInst(thread_id_), CommandType::Interrupt, cb, this);
uv_mutex_lock(&tm_sp->stor_lock_);
ei->GetThreadMetrics(&tm_sp->stor_);
uv_mutex_unlock(&tm_sp->stor_lock_);

QueueCallback(ret_proxy, tm_sp);
};

return RunCommand(GetEnvInst(thread_id_),
CommandType::Interrupt,
cb,
shared_from_this());
}


Expand Down
30 changes: 16 additions & 14 deletions src/nsolid.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,12 @@ enum NSolidErr {
#undef X
};


class ProcessMetrics;
class ThreadMetrics;
enum class CommandType;


using SharedEnvInst = std::shared_ptr<EnvInst>;
using SharedThreadMetrics = std::shared_ptr<ThreadMetrics>;
using ns_error_tp = std::tuple<std::string, std::string>;

/** @cond DONT_DOCUMENT */
Expand Down Expand Up @@ -601,9 +600,10 @@ class NODE_EXTERN ProcessMetrics {
* @brief Class that allows to retrieve thread-specific metrics from a process.
*
*/
class NODE_EXTERN ThreadMetrics {
class NODE_EXTERN ThreadMetrics:
public std::enable_shared_from_this<ThreadMetrics> {
public:
using thread_metrics_proxy_sig = void(*)(ThreadMetrics*);
using thread_metrics_proxy_sig = void(*)(SharedThreadMetrics);

/**
* @brief struct to store thread metrics data.
Expand All @@ -630,7 +630,9 @@ class NODE_EXTERN ThreadMetrics {
* @param thread_id the id of the JS thread the metrics are going to be
* retrieved from.
*/
explicit ThreadMetrics(SharedEnvInst envinst);
static SharedThreadMetrics Create(SharedEnvInst envinst);
NODE_DEPRECATED("Use ThreadMetrics::Create(envinst)",
explicit ThreadMetrics(SharedEnvInst envinst));
ThreadMetrics() = delete;
ThreadMetrics(const ThreadMetrics&) = delete;
ThreadMetrics& operator=(const ThreadMetrics&) = delete;
Expand Down Expand Up @@ -664,7 +666,7 @@ class NODE_EXTERN ThreadMetrics {
* called when the retrieval has completed.
*
* @param cb callback function with the following signature
* `void(*)(ThreadMetrics*, ...Data)`
* `void(*)(SharedThreadMetrics, ...Data)`
* @param data variable number of arguments to be propagated to the callback.
* @return NSOLID_E_SUCCESS in case of success or a different NSOLID_E_
* error value otherwise.
Expand All @@ -687,7 +689,7 @@ class NODE_EXTERN ThreadMetrics {
int get_thread_metrics_();

template <typename G>
static void thread_metrics_proxy_(ThreadMetrics* tm);
static void thread_metrics_proxy_(SharedThreadMetrics tm_sp);

uint64_t thread_id_ = 0xFFFFFFFFFFFFFFFF;
void* user_data_ = nullptr;
Expand Down Expand Up @@ -988,7 +990,7 @@ int ThreadMetrics::Update(Cb&& cb, Data&&... data) {
return UV_EBUSY;
}

// _1 - ThreadMetrics*
// _1 - SharedThreadMetrics
UserData* user_data = new (std::nothrow) UserData(
std::bind(std::forward<Cb>(cb), _1, std::forward<Data>(data)...));

Expand All @@ -1012,12 +1014,12 @@ int ThreadMetrics::Update(Cb&& cb, Data&&... data) {


template <typename G>
void ThreadMetrics::thread_metrics_proxy_(ThreadMetrics* tm) {
G* g = static_cast<G*>(tm->user_data_);
tm->user_data_ = nullptr;
tm->proxy_ = nullptr;
tm->update_running_ = false;
(*g)(tm);
void ThreadMetrics::thread_metrics_proxy_(SharedThreadMetrics tm_sp) {
G* g = static_cast<G*>(tm_sp->user_data_);
tm_sp->user_data_ = nullptr;
tm_sp->proxy_ = nullptr;
tm_sp->update_running_ = false;
(*g)(tm_sp);
delete g;
}

Expand Down
Loading

0 comments on commit efca53f

Please sign in to comment.