Skip to content

Commit

Permalink
src: make nsolid::ThreadMetrics safer
Browse files Browse the repository at this point in the history
The current ThreadMetrics API is not easy to use safely because the
`ThreadMetrics` instance needs to be alive while there is a pending
`Update()` callback to be called. Change the API to use shared_ptr, as
we define a new type `SharedThreadMetrics` as
`std::shared_ptr<ThreadMetrics>`.
Migrate the agents to use the new API while deprecating the current one.
  • Loading branch information
santigimeno committed Dec 5, 2023
1 parent 21459cb commit 8f5c66a
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 89 deletions.
13 changes: 6 additions & 7 deletions agents/otlp/src/otlp_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {
for (auto& item : agent->env_metrics_map_) {
// Retrieve metrics from the Metrics API. Ignore any return error since
// there's nothing to be done.
item.second.metrics_.Update(thr_metrics_cb_, agent);
item.second.metrics_->Update(thr_metrics_cb_, agent);
}
}

Expand All @@ -435,12 +435,11 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {

std::vector<std::pair<ThreadMetricsStor,
ThreadMetricsStor>> thr_metrics_vector;
ThreadMetrics* m;
while (agent->thr_metrics_msg_q_.dequeue(&m)) {
auto it = agent->env_metrics_map_.find(m->thread_id());
ThreadMetricsStor stor;
while (agent->thr_metrics_msg_q_.dequeue(stor)) {
auto it = agent->env_metrics_map_.find(stor.thread_id);
if (it != agent->env_metrics_map_.end()) {
auto& metrics = it->second;
ThreadMetricsStor stor = m->Get();
thr_metrics_vector.emplace_back(stor, metrics.prev_);
metrics.prev_ = stor;
}
Expand All @@ -452,14 +451,14 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {
}


/*static*/void OTLPAgent::thr_metrics_cb_(ThreadMetrics* metrics,
/*static*/void OTLPAgent::thr_metrics_cb_(SharedThreadMetrics metrics,
OTLPAgent* agent) {
nsuv::ns_rwlock::scoped_rdlock lock(exit_lock_);
if (!is_running_) {
return;
}

agent->thr_metrics_msg_q_.enqueue(metrics);
agent->thr_metrics_msg_q_.enqueue(metrics->Get());
ASSERT_EQ(0, uv_async_send(&agent->metrics_msg_));
}

Expand Down
11 changes: 6 additions & 5 deletions agents/otlp/src/otlp_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ namespace nsolid {
namespace otlp {

struct JSThreadMetrics {
ThreadMetrics metrics_;
SharedThreadMetrics metrics_;
ThreadMetrics::MetricsStor prev_;
explicit JSThreadMetrics(SharedEnvInst envinst): metrics_(envinst),
prev_() {}
explicit JSThreadMetrics(SharedEnvInst envinst)
: metrics_(ThreadMetrics::Create(envinst)),
prev_() {}
};

class OTLPAgent {
Expand Down Expand Up @@ -73,7 +74,7 @@ class OTLPAgent {

static void metrics_msg_cb_(nsuv::ns_async*, OTLPAgent* agent);

static void thr_metrics_cb_(ThreadMetrics*, OTLPAgent*);
static void thr_metrics_cb_(SharedThreadMetrics, OTLPAgent*);

void do_start();

Expand Down Expand Up @@ -132,7 +133,7 @@ class OTLPAgent {
ProcessMetrics::MetricsStor proc_prev_stor_;
std::map<uint64_t, JSThreadMetrics> env_metrics_map_;
nsuv::ns_async metrics_msg_;
TSQueue<ThreadMetrics*> thr_metrics_msg_q_;
TSQueue<ThreadMetrics::MetricsStor> thr_metrics_msg_q_;
nsuv::ns_timer metrics_timer_;
std::unique_ptr<MetricsExporter> metrics_exporter_;

Expand Down
21 changes: 9 additions & 12 deletions agents/statsd/src/statsd_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,7 @@ void StatsDAgent::env_msg_cb(nsuv::ns_async*, StatsDAgent* agent) {
bool creation = std::get<1>(tup);
if (creation) {
auto pair = agent->env_metrics_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(GetThreadId(envinst)),
std::forward_as_tuple(envinst));
GetThreadId(envinst), ThreadMetrics::Create(envinst));
ASSERT(pair.second);
} else {
ASSERT_EQ(1, agent->env_metrics_map_.erase(GetThreadId(envinst)));
Expand All @@ -480,18 +478,16 @@ void StatsDAgent::shutdown_cb_(nsuv::ns_async*, StatsDAgent* agent) {
}

void StatsDAgent::metrics_msg_cb_(nsuv::ns_async*, StatsDAgent* agent) {
ThreadMetrics* m;
while (agent->metrics_msg_q_.dequeue(&m)) {
uint64_t thread_id = m->thread_id();
ThreadMetrics::MetricsStor stor = m->Get();
ThreadMetrics::MetricsStor stor;
while (agent->metrics_msg_q_.dequeue(stor)) {
json body = {
#define V(Type, CName, JSName, MType) \
{ #JSName, stor.CName },
NSOLID_ENV_METRICS_NUMBERS(V)
#undef V
};

agent->send_metrics(body, thread_id, stor.thread_name.c_str());
agent->send_metrics(body, stor.thread_id, stor.thread_name.c_str());
}
}

Expand Down Expand Up @@ -734,10 +730,10 @@ void StatsDAgent::metrics_timer_cb_(nsuv::ns_timer*, StatsDAgent* agent) {
for (auto it = agent->env_metrics_map_.begin();
it != agent->env_metrics_map_.end();
++it) {
ThreadMetrics& e_metrics = std::get<1>(*it);
SharedThreadMetrics& e_metrics = std::get<1>(*it);
// Retrieve metrics from the Metrics API. Ignore any return error since
// there's nothing to be done.
e_metrics.Update(env_metrics_cb_, agent);
e_metrics->Update(env_metrics_cb_, agent);
}

// Get and send proc metrics
Expand All @@ -753,13 +749,14 @@ void StatsDAgent::metrics_timer_cb_(nsuv::ns_timer*, StatsDAgent* agent) {
agent->send_metrics(body);
}

void StatsDAgent::env_metrics_cb_(ThreadMetrics* metrics, StatsDAgent* agent) {
void StatsDAgent::env_metrics_cb_(SharedThreadMetrics metrics,
StatsDAgent* agent) {
// Check if the agent is already closing
if (agent->metrics_msg_.is_closing()) {
return;
}

agent->metrics_msg_q_.enqueue(metrics);
agent->metrics_msg_q_.enqueue(metrics->Get());
ASSERT_EQ(0, agent->metrics_msg_.send());
}

Expand Down
6 changes: 3 additions & 3 deletions agents/statsd/src/statsd_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class StatsDAgent {

static void send_stats_msg_cb_(nsuv::ns_async*, StatsDAgent*);

static void env_metrics_cb_(ThreadMetrics*, StatsDAgent*);
static void env_metrics_cb_(SharedThreadMetrics, StatsDAgent*);

static void status_command_cb_(SharedEnvInst, StatsDAgent*);

Expand Down Expand Up @@ -258,11 +258,11 @@ class StatsDAgent {
nsuv::ns_timer retry_timer_;

// For the Metrics API
std::map<uint64_t, ThreadMetrics> env_metrics_map_;
std::map<uint64_t, SharedThreadMetrics> env_metrics_map_;
ProcessMetrics proc_metrics_;
uint64_t metrics_period_;
nsuv::ns_async metrics_msg_;
TSQueue<ThreadMetrics*> metrics_msg_q_;
TSQueue<ThreadMetrics::MetricsStor> metrics_msg_q_;
nsuv::ns_timer metrics_timer_;

// For the Configuration API
Expand Down
19 changes: 9 additions & 10 deletions agents/zmq/src/zmq_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -881,18 +881,17 @@ void ZmqAgent::got_trace(Tracer* tracer,
}


void ZmqAgent::got_env_metrics(ThreadMetrics* t_metrics) {
void ZmqAgent::got_env_metrics(const ThreadMetrics::MetricsStor& stor) {
ProcessMetrics::MetricsStor proc_stor;

auto iter = env_metrics_map_.find(t_metrics->thread_id());
auto iter = env_metrics_map_.find(stor.thread_id);
if (iter != env_metrics_map_.end()) {
ASSERT_PTR_EQ(t_metrics, &iter->second.t_metrics);
iter->second.fetching = false;
// Store into the completed_env_metrics_ vector so we have easy access once
// the metrics from all the threads are retrieved. Make a copy of
// ThreadMetrics to make sure the metrics are still valid even if the
// thread is gone.
completed_env_metrics_.push_back(t_metrics->Get());
completed_env_metrics_.push_back(stor);
}

bool done = true;
Expand Down Expand Up @@ -951,9 +950,9 @@ NSOLID_ENV_METRICS_STRINGS(V)


void ZmqAgent::metrics_msg_cb(nsuv::ns_async*, ZmqAgent* agent) {
ThreadMetrics* metrics;
while (agent->metrics_msg_q_.dequeue(&metrics)) {
agent->got_env_metrics(metrics);
ThreadMetrics::MetricsStor stor;
while (agent->metrics_msg_q_.dequeue(stor)) {
agent->got_env_metrics(stor);
}
}

Expand Down Expand Up @@ -1641,19 +1640,19 @@ void ZmqAgent::metrics_timer_cb(nsuv::ns_timer*, ZmqAgent* agent) {
stor.fetching = false;
// Retrieve metrics from the Metrics API. Ignore any return error since
// there's nothing to be done.
int r = stor.t_metrics.Update(env_metrics_cb, agent);
int r = stor.t_metrics->Update(env_metrics_cb, agent);
if (r == 0)
stor.fetching = true;
}
}

void ZmqAgent::env_metrics_cb(ThreadMetrics* metrics, ZmqAgent* agent) {
void ZmqAgent::env_metrics_cb(SharedThreadMetrics metrics, ZmqAgent* agent) {
// Check if the agent is already delete or it's closing
if (!is_running || agent->metrics_msg_.is_closing()) {
return;
}

agent->metrics_msg_q_.enqueue(metrics);
agent->metrics_msg_q_.enqueue(metrics->Get());
ASSERT_EQ(0, agent->metrics_msg_.send());
}

Expand Down
14 changes: 8 additions & 6 deletions agents/zmq/src/zmq_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,12 @@ class ZmqAgent {
};

struct EnvMetricsStor {
ThreadMetrics t_metrics;
SharedThreadMetrics t_metrics;
bool fetching;
explicit EnvMetricsStor(SharedEnvInst envinst, bool f): t_metrics(envinst),
fetching(f) {}
NSOLID_DELETE_DEFAULT_CONSTRUCTORS(EnvMetricsStor)
explicit EnvMetricsStor(SharedEnvInst envinst, bool f)
: t_metrics(ThreadMetrics::Create(envinst)),
fetching(f) {}
};

struct ZmqCommandError {
Expand Down Expand Up @@ -478,7 +480,7 @@ class ZmqAgent {

static void update_state_msg_cb(nsuv::ns_async*, ZmqAgent*);

static void env_metrics_cb(ThreadMetrics*, ZmqAgent*);
static void env_metrics_cb(SharedThreadMetrics, ZmqAgent*);

static void status_command_cb(SharedEnvInst, ZmqAgent*);

Expand Down Expand Up @@ -555,7 +557,7 @@ class ZmqAgent {
const std::pair<bool, std::string>&,
const std::pair<bool, std::string>&);

void got_env_metrics(ThreadMetrics* t_metrics);
void got_env_metrics(const ThreadMetrics::MetricsStor& stor);

void got_heap_snapshot(int status,
const std::string& snaphost,
Expand Down Expand Up @@ -657,7 +659,7 @@ class ZmqAgent {
ProcessMetrics proc_metrics_;
uint64_t metrics_period_;
nsuv::ns_async metrics_msg_;
TSQueue<ThreadMetrics*> metrics_msg_q_;
TSQueue<ThreadMetrics::MetricsStor> metrics_msg_q_;
nsuv::ns_timer metrics_timer_;
std::string cached_metrics_;
std::set<std::string> pending_metrics_reqs_;
Expand Down
86 changes: 72 additions & 14 deletions src/nsolid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ int ProcessMetrics::Update() {


ThreadMetrics::ThreadMetrics(SharedEnvInst envinst)
: thread_id_(envinst->thread_id()) {
: thread_id_(envinst->thread_id()),
user_data_(nullptr, nullptr) {
CHECK_NOT_NULL(envinst.get());
CHECK_EQ(uv_mutex_init(&stor_lock_), 0);
stor_.thread_id = thread_id_;
Expand All @@ -275,14 +276,20 @@ ThreadMetrics::~ThreadMetrics() {


ThreadMetrics::ThreadMetrics(uint64_t thread_id)
: thread_id_(thread_id) {
: thread_id_(thread_id),
user_data_(nullptr, nullptr) {
CHECK_EQ(uv_mutex_init(&stor_lock_), 0);
stor_.thread_id = thread_id_;
stor_.prev_call_time_ = uv_hrtime();
stor_.current_hrtime_ = stor_.prev_call_time_;
}


SharedThreadMetrics ThreadMetrics::Create(SharedEnvInst envinst) {
return SharedThreadMetrics(new ThreadMetrics(envinst));
}


std::string ThreadMetrics::toJSON() {
MetricsStor dup;
std::string metrics_string;
Expand Down Expand Up @@ -340,25 +347,76 @@ int ThreadMetrics::Update(v8::Isolate* isolate) {
}


int ThreadMetrics::get_thread_metrics_() {
int ThreadMetrics::get_thread_metrics() {
// Might need to fire myself for using nested lambdas.
void (*cb)(SharedEnvInst, ThreadMetrics*) =
auto cb = [](SharedEnvInst ei, ThreadMetrics* tm) {
// This runs from the worker thread.
[](SharedEnvInst ei, ThreadMetrics* tm) {
void (*ret_proxy)(ThreadMetrics*) =
[](ThreadMetrics* tm) {
tm->proxy_(tm);
};
auto ret_proxy = [](ThreadMetrics* tm) {
// This runs from the NSolid thread.
tm->proxy_(tm);
};

uv_mutex_lock(&tm->stor_lock_);
ei->GetThreadMetrics(&tm->stor_);
uv_mutex_unlock(&tm->stor_lock_);

QueueCallback(ret_proxy, tm);
};

std::weak_ptr<ThreadMetrics> wp = weak_from_this();
if (!wp.expired()) {
return UV_EINVAL;
}

return RunCommand(GetEnvInst(thread_id_),
CommandType::Interrupt,
cb,
this);
}

uv_mutex_lock(&tm->stor_lock_);
ei->GetThreadMetrics(&tm->stor_);
uv_mutex_unlock(&tm->stor_lock_);

int ThreadMetrics::get_shared_thread_metrics() {
// Might need to fire myself for using nested lambdas.
auto cb = [](SharedEnvInst ei, std::weak_ptr<ThreadMetrics> wp) {
// This runs from the worker thread.
auto ret_proxy = [](std::weak_ptr<ThreadMetrics> wp) {
// This runs from the NSolid thread.
QueueCallback(ret_proxy, tm);
SharedThreadMetrics tm_sp = wp.lock();
if (tm_sp == nullptr) {
return;
}

tm_sp->shared_proxy_(tm_sp);
};

return RunCommand(GetEnvInst(thread_id_), CommandType::Interrupt, cb, this);
SharedThreadMetrics tm_sp = wp.lock();
if (tm_sp == nullptr) {
return;
}

uv_mutex_lock(&tm_sp->stor_lock_);
ei->GetThreadMetrics(&tm_sp->stor_);
uv_mutex_unlock(&tm_sp->stor_lock_);

QueueCallback(ret_proxy, tm_sp);
};

std::weak_ptr<ThreadMetrics> wp = weak_from_this();
if (wp.expired()) {
return UV_EINVAL;
}

return RunCommand(GetEnvInst(thread_id_),
CommandType::Interrupt,
cb,
wp);
}


void ThreadMetrics::reset() {
proxy_ = nullptr;
shared_proxy_ = nullptr;
update_running_ = false;
}


Expand Down
Loading

0 comments on commit 8f5c66a

Please sign in to comment.