From efca53f6a9751678eec69edbd59c14eb58d27a8a Mon Sep 17 00:00:00 2001 From: Santiago Gimeno Date: Wed, 29 Nov 2023 17:51:50 +0100 Subject: [PATCH] src: make nsolid::ThreadMetrics safer The current ThreadMetrics API is not easy to use safely because the `ThreadMetrics` instance needs to be alive while there is a pending `Update()` callback to be called. Change the API to use shared_ptr, as we define a new type `SharedThreadMetrics` as `std::shared_ptr`. Migrate the agents to use the new API while deprecating the current one. --- agents/otlp/src/otlp_agent.cc | 13 +++++---- agents/otlp/src/otlp_agent.h | 11 ++++---- agents/statsd/src/statsd_agent.cc | 21 +++++++-------- agents/statsd/src/statsd_agent.h | 6 ++--- agents/zmq/src/zmq_agent.cc | 19 +++++++------- agents/zmq/src/zmq_agent.h | 14 +++++----- src/nsolid.cc | 32 ++++++++++++++--------- src/nsolid.h | 30 +++++++++++---------- test/addons/nsolid-env-metrics/binding.cc | 11 ++++---- test/addons/nsolid-metrics/binding.cc | 9 ++++--- 10 files changed, 86 insertions(+), 80 deletions(-) diff --git a/agents/otlp/src/otlp_agent.cc b/agents/otlp/src/otlp_agent.cc index a0ac78bdf8e..d696d7cf8d0 100644 --- a/agents/otlp/src/otlp_agent.cc +++ b/agents/otlp/src/otlp_agent.cc @@ -405,7 +405,7 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) { for (auto& item : agent->env_metrics_map_) { // Retrieve metrics from the Metrics API. Ignore any return error since // there's nothing to be done. - item.second.metrics_.Update(thr_metrics_cb_, agent); + item.second.metrics_->Update(thr_metrics_cb_, agent); } } @@ -418,12 +418,11 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) { std::vector> thr_metrics_vector; - ThreadMetrics* m; - while (agent->thr_metrics_msg_q_.dequeue(&m)) { - auto it = agent->env_metrics_map_.find(m->thread_id()); + ThreadMetricsStor stor; + while (agent->thr_metrics_msg_q_.dequeue(stor)) { + auto it = agent->env_metrics_map_.find(stor.thread_id); if (it != agent->env_metrics_map_.end()) { auto& metrics = it->second; - ThreadMetricsStor stor = m->Get(); thr_metrics_vector.emplace_back(stor, metrics.prev_); metrics.prev_ = stor; } @@ -435,14 +434,14 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) { } -/*static*/void OTLPAgent::thr_metrics_cb_(ThreadMetrics* metrics, +/*static*/void OTLPAgent::thr_metrics_cb_(SharedThreadMetrics metrics, OTLPAgent* agent) { nsuv::ns_rwlock::scoped_rdlock lock(agent->exit_lock_); if (!is_running_) { return; } - agent->thr_metrics_msg_q_.enqueue(metrics); + agent->thr_metrics_msg_q_.enqueue(metrics->Get()); ASSERT_EQ(0, uv_async_send(&agent->metrics_msg_)); } diff --git a/agents/otlp/src/otlp_agent.h b/agents/otlp/src/otlp_agent.h index fc45e5a1e83..fc6e14732f8 100644 --- a/agents/otlp/src/otlp_agent.h +++ b/agents/otlp/src/otlp_agent.h @@ -28,10 +28,11 @@ namespace nsolid { namespace otlp { struct JSThreadMetrics { - ThreadMetrics metrics_; + SharedThreadMetrics metrics_; ThreadMetrics::MetricsStor prev_; - explicit JSThreadMetrics(SharedEnvInst envinst): metrics_(envinst), - prev_() {} + explicit JSThreadMetrics(SharedEnvInst envinst) + : metrics_(ThreadMetrics::Create(envinst)), + prev_() {} }; class OTLPAgent { @@ -73,7 +74,7 @@ class OTLPAgent { static void metrics_msg_cb_(nsuv::ns_async*, OTLPAgent* agent); - static void thr_metrics_cb_(ThreadMetrics*, OTLPAgent*); + static void thr_metrics_cb_(SharedThreadMetrics, OTLPAgent*); void do_start(); @@ -134,7 +135,7 @@ class OTLPAgent { ProcessMetrics::MetricsStor proc_prev_stor_; std::map env_metrics_map_; nsuv::ns_async metrics_msg_; - TSQueue thr_metrics_msg_q_; + TSQueue thr_metrics_msg_q_; nsuv::ns_timer metrics_timer_; std::unique_ptr metrics_exporter_; diff --git a/agents/statsd/src/statsd_agent.cc b/agents/statsd/src/statsd_agent.cc index 9428796b615..0324dc47deb 100644 --- a/agents/statsd/src/statsd_agent.cc +++ b/agents/statsd/src/statsd_agent.cc @@ -461,9 +461,7 @@ void StatsDAgent::env_msg_cb(nsuv::ns_async*, StatsDAgent* agent) { bool creation = std::get<1>(tup); if (creation) { auto pair = agent->env_metrics_map_.emplace( - std::piecewise_construct, - std::forward_as_tuple(GetThreadId(envinst)), - std::forward_as_tuple(envinst)); + GetThreadId(envinst), ThreadMetrics::Create(envinst)); ASSERT(pair.second); } else { ASSERT_EQ(1, agent->env_metrics_map_.erase(GetThreadId(envinst))); @@ -476,10 +474,8 @@ void StatsDAgent::shutdown_cb_(nsuv::ns_async*, StatsDAgent* agent) { } void StatsDAgent::metrics_msg_cb_(nsuv::ns_async*, StatsDAgent* agent) { - ThreadMetrics* m; - while (agent->metrics_msg_q_.dequeue(&m)) { - uint64_t thread_id = m->thread_id(); - ThreadMetrics::MetricsStor stor = m->Get(); + ThreadMetrics::MetricsStor stor; + while (agent->metrics_msg_q_.dequeue(stor)) { json body = { #define V(Type, CName, JSName, MType) \ { #JSName, stor.CName }, @@ -487,7 +483,7 @@ void StatsDAgent::metrics_msg_cb_(nsuv::ns_async*, StatsDAgent* agent) { #undef V }; - agent->send_metrics(body, thread_id, stor.thread_name.c_str()); + agent->send_metrics(body, stor.thread_id, stor.thread_name.c_str()); } } @@ -730,10 +726,10 @@ void StatsDAgent::metrics_timer_cb_(nsuv::ns_timer*, StatsDAgent* agent) { for (auto it = agent->env_metrics_map_.begin(); it != agent->env_metrics_map_.end(); ++it) { - ThreadMetrics& e_metrics = std::get<1>(*it); + SharedThreadMetrics& e_metrics = std::get<1>(*it); // Retrieve metrics from the Metrics API. Ignore any return error since // there's nothing to be done. - e_metrics.Update(env_metrics_cb_, agent); + e_metrics->Update(env_metrics_cb_, agent); } // Get and send proc metrics @@ -749,13 +745,14 @@ void StatsDAgent::metrics_timer_cb_(nsuv::ns_timer*, StatsDAgent* agent) { agent->send_metrics(body); } -void StatsDAgent::env_metrics_cb_(ThreadMetrics* metrics, StatsDAgent* agent) { +void StatsDAgent::env_metrics_cb_(SharedThreadMetrics metrics, + StatsDAgent* agent) { // Check if the agent is already closing if (agent->metrics_msg_.is_closing()) { return; } - agent->metrics_msg_q_.enqueue(metrics); + agent->metrics_msg_q_.enqueue(metrics->Get()); ASSERT_EQ(0, agent->metrics_msg_.send()); } diff --git a/agents/statsd/src/statsd_agent.h b/agents/statsd/src/statsd_agent.h index 5a314ad32ea..70d5fe385ea 100644 --- a/agents/statsd/src/statsd_agent.h +++ b/agents/statsd/src/statsd_agent.h @@ -204,7 +204,7 @@ class StatsDAgent { static void send_stats_msg_cb_(nsuv::ns_async*, StatsDAgent*); - static void env_metrics_cb_(ThreadMetrics*, StatsDAgent*); + static void env_metrics_cb_(SharedThreadMetrics, StatsDAgent*); static void status_command_cb_(SharedEnvInst, StatsDAgent*); @@ -258,11 +258,11 @@ class StatsDAgent { nsuv::ns_timer retry_timer_; // For the Metrics API - std::map env_metrics_map_; + std::map env_metrics_map_; ProcessMetrics proc_metrics_; uint64_t metrics_period_; nsuv::ns_async metrics_msg_; - TSQueue metrics_msg_q_; + TSQueue metrics_msg_q_; nsuv::ns_timer metrics_timer_; // For the Configuration API diff --git a/agents/zmq/src/zmq_agent.cc b/agents/zmq/src/zmq_agent.cc index 8989115448c..1da0502cbc4 100644 --- a/agents/zmq/src/zmq_agent.cc +++ b/agents/zmq/src/zmq_agent.cc @@ -881,18 +881,17 @@ void ZmqAgent::got_trace(Tracer* tracer, } -void ZmqAgent::got_env_metrics(ThreadMetrics* t_metrics) { +void ZmqAgent::got_env_metrics(const ThreadMetrics::MetricsStor& stor) { ProcessMetrics::MetricsStor proc_stor; - auto iter = env_metrics_map_.find(t_metrics->thread_id()); + auto iter = env_metrics_map_.find(stor.thread_id); if (iter != env_metrics_map_.end()) { - ASSERT_PTR_EQ(t_metrics, &iter->second.t_metrics); iter->second.fetching = false; // Store into the completed_env_metrics_ vector so we have easy access once // the metrics from all the threads are retrieved. Make a copy of // ThreadMetrics to make sure the metrics are still valid even if the // thread is gone. - completed_env_metrics_.push_back(t_metrics->Get()); + completed_env_metrics_.push_back(stor); } bool done = true; @@ -951,9 +950,9 @@ NSOLID_ENV_METRICS_STRINGS(V) void ZmqAgent::metrics_msg_cb(nsuv::ns_async*, ZmqAgent* agent) { - ThreadMetrics* metrics; - while (agent->metrics_msg_q_.dequeue(&metrics)) { - agent->got_env_metrics(metrics); + ThreadMetrics::MetricsStor stor; + while (agent->metrics_msg_q_.dequeue(stor)) { + agent->got_env_metrics(stor); } } @@ -1641,19 +1640,19 @@ void ZmqAgent::metrics_timer_cb(nsuv::ns_timer*, ZmqAgent* agent) { stor.fetching = false; // Retrieve metrics from the Metrics API. Ignore any return error since // there's nothing to be done. - int r = stor.t_metrics.Update(env_metrics_cb, agent); + int r = stor.t_metrics->Update(env_metrics_cb, agent); if (r == 0) stor.fetching = true; } } -void ZmqAgent::env_metrics_cb(ThreadMetrics* metrics, ZmqAgent* agent) { +void ZmqAgent::env_metrics_cb(SharedThreadMetrics metrics, ZmqAgent* agent) { // Check if the agent is already delete or it's closing if (!is_running || agent->metrics_msg_.is_closing()) { return; } - agent->metrics_msg_q_.enqueue(metrics); + agent->metrics_msg_q_.enqueue(metrics->Get()); ASSERT_EQ(0, agent->metrics_msg_.send()); } diff --git a/agents/zmq/src/zmq_agent.h b/agents/zmq/src/zmq_agent.h index ff219807ba2..2ece452254e 100644 --- a/agents/zmq/src/zmq_agent.h +++ b/agents/zmq/src/zmq_agent.h @@ -356,10 +356,12 @@ class ZmqAgent { }; struct EnvMetricsStor { - ThreadMetrics t_metrics; + SharedThreadMetrics t_metrics; bool fetching; - explicit EnvMetricsStor(SharedEnvInst envinst, bool f): t_metrics(envinst), - fetching(f) {} + NSOLID_DELETE_DEFAULT_CONSTRUCTORS(EnvMetricsStor) + explicit EnvMetricsStor(SharedEnvInst envinst, bool f) + : t_metrics(ThreadMetrics::Create(envinst)), + fetching(f) {} }; struct ZmqCommandError { @@ -478,7 +480,7 @@ class ZmqAgent { static void update_state_msg_cb(nsuv::ns_async*, ZmqAgent*); - static void env_metrics_cb(ThreadMetrics*, ZmqAgent*); + static void env_metrics_cb(SharedThreadMetrics, ZmqAgent*); static void status_command_cb(SharedEnvInst, ZmqAgent*); @@ -555,7 +557,7 @@ class ZmqAgent { const std::pair&, const std::pair&); - void got_env_metrics(ThreadMetrics* t_metrics); + void got_env_metrics(const ThreadMetrics::MetricsStor& stor); void got_heap_snapshot(int status, const std::string& snaphost, @@ -657,7 +659,7 @@ class ZmqAgent { ProcessMetrics proc_metrics_; uint64_t metrics_period_; nsuv::ns_async metrics_msg_; - TSQueue metrics_msg_q_; + TSQueue metrics_msg_q_; nsuv::ns_timer metrics_timer_; std::string cached_metrics_; std::set pending_metrics_reqs_; diff --git a/src/nsolid.cc b/src/nsolid.cc index 1ad216b8efa..451edbb6962 100644 --- a/src/nsolid.cc +++ b/src/nsolid.cc @@ -283,6 +283,11 @@ ThreadMetrics::ThreadMetrics(uint64_t thread_id) } +SharedThreadMetrics ThreadMetrics::Create(SharedEnvInst envinst) { + return SharedThreadMetrics(new ThreadMetrics(envinst)); +} + + std::string ThreadMetrics::toJSON() { MetricsStor dup; std::string metrics_string; @@ -342,23 +347,24 @@ int ThreadMetrics::Update(v8::Isolate* isolate) { int ThreadMetrics::get_thread_metrics_() { // Might need to fire myself for using nested lambdas. - void (*cb)(SharedEnvInst, ThreadMetrics*) = + auto cb = [](SharedEnvInst ei, SharedThreadMetrics tm_sp) { // This runs from the worker thread. - [](SharedEnvInst ei, ThreadMetrics* tm) { - void (*ret_proxy)(ThreadMetrics*) = - [](ThreadMetrics* tm) { - tm->proxy_(tm); - }; - - uv_mutex_lock(&tm->stor_lock_); - ei->GetThreadMetrics(&tm->stor_); - uv_mutex_unlock(&tm->stor_lock_); - + auto ret_proxy = [](SharedThreadMetrics tm_sp) { // This runs from the NSolid thread. - QueueCallback(ret_proxy, tm); + tm_sp->proxy_(tm_sp); }; - return RunCommand(GetEnvInst(thread_id_), CommandType::Interrupt, cb, this); + uv_mutex_lock(&tm_sp->stor_lock_); + ei->GetThreadMetrics(&tm_sp->stor_); + uv_mutex_unlock(&tm_sp->stor_lock_); + + QueueCallback(ret_proxy, tm_sp); + }; + + return RunCommand(GetEnvInst(thread_id_), + CommandType::Interrupt, + cb, + shared_from_this()); } diff --git a/src/nsolid.h b/src/nsolid.h index 25411b66f18..efb257047b5 100644 --- a/src/nsolid.h +++ b/src/nsolid.h @@ -180,13 +180,12 @@ enum NSolidErr { #undef X }; - -class ProcessMetrics; class ThreadMetrics; enum class CommandType; using SharedEnvInst = std::shared_ptr; +using SharedThreadMetrics = std::shared_ptr; using ns_error_tp = std::tuple; /** @cond DONT_DOCUMENT */ @@ -601,9 +600,10 @@ class NODE_EXTERN ProcessMetrics { * @brief Class that allows to retrieve thread-specific metrics from a process. * */ -class NODE_EXTERN ThreadMetrics { +class NODE_EXTERN ThreadMetrics: + public std::enable_shared_from_this { public: - using thread_metrics_proxy_sig = void(*)(ThreadMetrics*); + using thread_metrics_proxy_sig = void(*)(SharedThreadMetrics); /** * @brief struct to store thread metrics data. @@ -630,7 +630,9 @@ class NODE_EXTERN ThreadMetrics { * @param thread_id the id of the JS thread the metrics are going to be * retrieved from. */ - explicit ThreadMetrics(SharedEnvInst envinst); + static SharedThreadMetrics Create(SharedEnvInst envinst); + NODE_DEPRECATED("Use ThreadMetrics::Create(envinst)", + explicit ThreadMetrics(SharedEnvInst envinst)); ThreadMetrics() = delete; ThreadMetrics(const ThreadMetrics&) = delete; ThreadMetrics& operator=(const ThreadMetrics&) = delete; @@ -664,7 +666,7 @@ class NODE_EXTERN ThreadMetrics { * called when the retrieval has completed. * * @param cb callback function with the following signature - * `void(*)(ThreadMetrics*, ...Data)` + * `void(*)(SharedThreadMetrics, ...Data)` * @param data variable number of arguments to be propagated to the callback. * @return NSOLID_E_SUCCESS in case of success or a different NSOLID_E_ * error value otherwise. @@ -687,7 +689,7 @@ class NODE_EXTERN ThreadMetrics { int get_thread_metrics_(); template - static void thread_metrics_proxy_(ThreadMetrics* tm); + static void thread_metrics_proxy_(SharedThreadMetrics tm_sp); uint64_t thread_id_ = 0xFFFFFFFFFFFFFFFF; void* user_data_ = nullptr; @@ -988,7 +990,7 @@ int ThreadMetrics::Update(Cb&& cb, Data&&... data) { return UV_EBUSY; } - // _1 - ThreadMetrics* + // _1 - SharedThreadMetrics UserData* user_data = new (std::nothrow) UserData( std::bind(std::forward(cb), _1, std::forward(data)...)); @@ -1012,12 +1014,12 @@ int ThreadMetrics::Update(Cb&& cb, Data&&... data) { template -void ThreadMetrics::thread_metrics_proxy_(ThreadMetrics* tm) { - G* g = static_cast(tm->user_data_); - tm->user_data_ = nullptr; - tm->proxy_ = nullptr; - tm->update_running_ = false; - (*g)(tm); +void ThreadMetrics::thread_metrics_proxy_(SharedThreadMetrics tm_sp) { + G* g = static_cast(tm_sp->user_data_); + tm_sp->user_data_ = nullptr; + tm_sp->proxy_ = nullptr; + tm_sp->update_running_ = false; + (*g)(tm_sp); delete g; } diff --git a/test/addons/nsolid-env-metrics/binding.cc b/test/addons/nsolid-env-metrics/binding.cc index 99c911e8540..b7680a0f58f 100644 --- a/test/addons/nsolid-env-metrics/binding.cc +++ b/test/addons/nsolid-env-metrics/binding.cc @@ -8,10 +8,10 @@ std::atomic cb_cntr = { 0 }; using node::nsolid::ThreadMetrics; +using node::nsolid::SharedThreadMetrics; -static void got_env_metrics(ThreadMetrics* tm) { +static void got_env_metrics(SharedThreadMetrics tm_sp) { cb_cntr++; - delete tm; } static void GetMetrics(const v8::FunctionCallbackInfo& args) { @@ -27,11 +27,10 @@ static void GetMetrics(const v8::FunctionCallbackInfo& args) { assert(envinst != nullptr); } - auto* tm = new node::nsolid::ThreadMetrics(envinst); + SharedThreadMetrics tm_sp = ThreadMetrics::Create(envinst); - int er = tm->Update(got_env_metrics); - if (er) - delete tm; + int er = tm_sp->Update(got_env_metrics); + args.GetReturnValue().Set(er); } NODE_MODULE_INIT(/* exports, module, context */) { diff --git a/test/addons/nsolid-metrics/binding.cc b/test/addons/nsolid-metrics/binding.cc index b16ea5f4fc5..cc740dbe084 100644 --- a/test/addons/nsolid-metrics/binding.cc +++ b/test/addons/nsolid-metrics/binding.cc @@ -7,6 +7,7 @@ using node::nsolid::ProcessMetrics; using node::nsolid::ThreadMetrics; +using node::nsolid::SharedThreadMetrics; using v8::FunctionCallbackInfo; using v8::String; @@ -15,18 +16,18 @@ using v8::Value; std::atomic cb_cntr = { 0 }; -static void metrics_cb(ThreadMetrics* tm, void*, ThreadMetrics*) { +static void metrics_cb(SharedThreadMetrics tm_sp, void*, ThreadMetrics*) { cb_cntr++; - delete tm; } static void GetEnvMetrics(const FunctionCallbackInfo& args) { assert(args[0]->IsNumber()); uint64_t thread_id = args[0].As()->Value(); - auto* tm = new ThreadMetrics(node::nsolid::GetEnvInst(thread_id)); + SharedThreadMetrics tm_sp = + ThreadMetrics::Create(node::nsolid::GetEnvInst(thread_id)); - args.GetReturnValue().Set(tm->Update(metrics_cb, nullptr, tm)); + args.GetReturnValue().Set(tm_sp->Update(metrics_cb, nullptr, tm_sp.get())); } static void GetProcMetrics(const FunctionCallbackInfo& args) {