Skip to content

Commit

Permalink
agents: implement ProfileCollector class
Browse files Browse the repository at this point in the history
ProfileCollector is a class that allows to start collecting profiles and
report them to a callback function running in a specific uv_loop_t thread.
  • Loading branch information
santigimeno committed Aug 12, 2024
1 parent 3ea993c commit ac2dcd9
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 0 deletions.
83 changes: 83 additions & 0 deletions agents/src/profile_collector.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#include "profile_collector.h"
#include "asserts-cpp/asserts.h"

namespace node {
namespace nsolid {

ProfileCollector::~ProfileCollector() {
profile_msg_->close_and_delete();
}

int ProfileCollector::StartCPUProfile(const CPUProfileOptions& options) {
return CpuProfiler::TakeProfile(GetEnvInst(options.thread_id),
options.duration,
profile_cb,
kCpu,
options,
weak_from_this());
}

int ProfileCollector::StartHeapProfile(const HeapProfileOptions& options) {
return Snapshot::StartTrackingHeapObjects(GetEnvInst(options.thread_id),
options.redacted,
options.track_allocations,
options.duration,
profile_cb,
kHeapProf,
options,
weak_from_this());
}

int ProfileCollector::StartHeapSampling(const HeapSamplingOptions& options) {
return Snapshot::StartSampling(GetEnvInst(options.thread_id),
options.sample_interval,
options.stack_depth,
options.flags,
options.duration,
profile_cb,
kHeapSampl,
options,
weak_from_this());
}

/*static*/
void ProfileCollector::profile_cb(int status,
std::string profile,
ProfileType type,
ProfileOptions options,
WeakProfileCollector collector_wp) {
SharedProfileCollector collector = collector_wp.lock();
if (collector == nullptr) {
return;
}

ProfileQStor qstor = {status, profile, type, std::move(options)};
if (collector->profile_msg_q_.enqueue(std::move(qstor)) == 1) {
ASSERT_EQ(0, collector->profile_msg_->send());
}
}

void ProfileCollector::initialize() {
int er = profile_msg_->init(
loop_,
+[](nsuv::ns_async*, WeakProfileCollector collector_wp) {
SharedProfileCollector collector = collector_wp.lock();
if (collector == nullptr) {
return;
}

collector->process_profiles();
},
weak_from_this());
ASSERT_EQ(0, er);
}

void ProfileCollector::process_profiles() {
ProfileQStor qstor;
while (profile_msg_q_.dequeue(qstor)) {
callback_(std::move(qstor));
}
}

} // namespace nsolid
} // namespace node
98 changes: 98 additions & 0 deletions agents/src/profile_collector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#ifndef AGENTS_SRC_PROFILE_COLLECTOR_H_
#define AGENTS_SRC_PROFILE_COLLECTOR_H_

#include <nsolid.h>
#include <nsolid/thread_safe.h>
#include "nlohmann/json.hpp"
#include "nsuv-inl.h"
#include <variant>
#include <vector>

namespace node {
namespace nsolid {

class ProfileCollector;

using SharedProfileCollector = std::shared_ptr<ProfileCollector>;
using WeakProfileCollector = std::weak_ptr<ProfileCollector>;

enum ProfileType {
kCpu = 0,
kHeapProf,
kHeapSampl,
kNumberOfProfileTypes
};

struct ProfileOptionsBase {
uint64_t thread_id;
uint64_t duration;
nlohmann::json metadata;
};

using CPUProfileOptions = ProfileOptionsBase;

struct HeapProfileOptions: public ProfileOptionsBase {
bool track_allocations = false;
bool redacted = false;
};

struct HeapSamplingOptions: public ProfileOptionsBase {
uint64_t sample_interval = 0;
int stack_depth = 0;
v8::HeapProfiler::SamplingFlags flags = v8::HeapProfiler::kSamplingNoFlags;
};

using ProfileOptions = std::variant<CPUProfileOptions,
HeapProfileOptions,
HeapSamplingOptions>;


/*
* ProfileCollector is a class that allows to start collecting profiles and
* report them to a callback function running in a specific uv_loop_t thread.
*/
class ProfileCollector: public std::enable_shared_from_this<ProfileCollector> {
public:
struct ProfileQStor {
int status;
std::string profile;
ProfileType type;
ProfileOptions options;
};

template <typename Cb, typename... Data>
explicit ProfileCollector(uv_loop_t* loop, Cb&& cb, Data&&... data):
loop_(loop),
profile_msg_(new nsuv::ns_async()) {
// Store the callback and data
callback_ = std::bind(std::forward<Cb>(cb),
std::placeholders::_1,
std::forward<Data>(data)...);
}
~ProfileCollector();

void initialize();

int StartCPUProfile(const CPUProfileOptions& options);
int StartHeapProfile(const HeapProfileOptions& options);
int StartHeapSampling(const HeapSamplingOptions& options);

private:
static void profile_cb(int status,
std::string profile,
ProfileType type,
ProfileOptions options,
WeakProfileCollector collector_wp);
void do_setup();
void process_profiles();

uv_loop_t* loop_;
nsuv::ns_async* profile_msg_;
TSQueue<ProfileQStor> profile_msg_q_;
std::function<void(ProfileQStor&&)> callback_ = nullptr;
};

} // namespace nsolid
} // namespace node

#endif // AGENTS_SRC_PROFILE_COLLECTOR_H_
2 changes: 2 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@
'nsolid_sources': [
'agents/src/http_client.cc',
'agents/src/http_client.h',
'agents/src/profile_collector.cc',
'agents/src/profile_collector.h',
'agents/src/span_collector.cc',
'agents/src/span_collector.h',
'agents/otlp/src/datadog_metrics.cc',
Expand Down

0 comments on commit ac2dcd9

Please sign in to comment.