diff --git a/agents/zmq/src/zmq_agent.cc b/agents/zmq/src/zmq_agent.cc index 40fc86be8c..ed930812af 100644 --- a/agents/zmq/src/zmq_agent.cc +++ b/agents/zmq/src/zmq_agent.cc @@ -208,13 +208,16 @@ const char PONG[] = "{\"pong\":true}"; V(kExit, "exit") \ V(kFlags, "flags") \ V(kHeapProfile, "heap_profile") \ + V(kHeapProfileStop, "heap_profile_stop") \ V(kHeapSampling, "heap_sampling") \ + V(kHeapSamplingStop, "heap_sampling_stop") \ V(kInterval, "interval") \ V(kMessage, "message") \ V(kMetadata, "metadata") \ V(kMetrics, "metrics") \ V(kPackages, "packages") \ V(kProfile, "profile") \ + V(kProfileStop, "profile_stop") \ V(kReconfigure, "reconfigure") \ V(kRequestId, "requestId") \ V(kSampleInterval, "sampleInterval") \ @@ -715,8 +718,6 @@ void ZmqAgent::do_start() { ASSERT_EQ(0, metrics_timer_.init(&loop_)); - ASSERT_EQ(0, profile_msg_.init(&loop_, profile_msg_cb, this)); - ASSERT_EQ(0, heap_snapshot_msg_.init(&loop_, heap_snapshot_msg_cb, this)); ASSERT_EQ(0, blocked_loop_msg_.init(&loop_, blocked_loop_msg_cb, this)); @@ -725,6 +726,14 @@ void ZmqAgent::do_start() { http_client_.reset(new zmq::ZmqHttpClient(&loop_)); + profile_collector_ = std::make_shared( + &loop_, + +[](ProfileCollector::ProfileQStor&& stor, ZmqAgent* agent) { + agent->got_profile(std::move(stor)); + }, + this); + profile_collector_->initialize(); + status(Initializing); uv_cond_signal(&start_cond_); uv_mutex_unlock(&start_lock_); @@ -770,6 +779,7 @@ int ZmqAgent::stop(bool profile_stopped) { void ZmqAgent::do_stop() { send_exit(); span_collector_.reset(); + profile_collector_.reset(); http_client_.reset(nullptr); command_handle_.reset(nullptr); data_handle_.reset(nullptr); @@ -782,7 +792,6 @@ void ZmqAgent::do_stop() { env_msg_.close(); shutdown_.close(); metrics_timer_.close(); - profile_msg_.close(); heap_snapshot_msg_.close(); blocked_loop_msg_.close(); custom_command_msg_.close(); @@ -2111,76 +2120,56 @@ void ZmqAgent::check_exit_on_profile() { } } -void ZmqAgent::heap_profile_cb(int status, - std::string profile, - uint64_t thread_id, - ZmqAgent* agent) { - if (!is_running || agent->profile_msg_.is_closing()) { - return; - } - - agent->profile_msg_q_.enqueue( - ProfileQStor { kHeapProf, status, profile, thread_id }); - - ASSERT_EQ(0, agent->profile_msg_.send()); -} - -void ZmqAgent::heap_sampling_cb(int status, - std::string profile, - uint64_t thread_id, - ZmqAgent* agent) { - if (!is_running || agent->profile_msg_.is_closing()) { - return; - } - - agent->profile_msg_q_.enqueue( - ProfileQStor { kHeapSampl, status, profile, thread_id }); - - ASSERT_EQ(0, agent->profile_msg_.send()); -} - - -void ZmqAgent::cpu_profile_cb(int status, - std::string profile, - uint64_t thread_id, - ZmqAgent* agent) { - if (!is_running || agent->profile_msg_.is_closing()) { - return; - } - - agent->profile_msg_q_.enqueue( - ProfileQStor { kCpu, status, profile, thread_id }); - - ASSERT_EQ(0, agent->profile_msg_.send()); -} - -void ZmqAgent::profile_msg_cb(nsuv::ns_async*, ZmqAgent* agent) { - ProfileQStor stor; - while (agent->profile_msg_q_.dequeue(stor)) { - switch (stor.type) { - case kHeapProf: - agent->got_prof(stor); - break; - case kHeapSampl: - agent->got_prof(stor); - break; - case kCpu: - agent->got_prof(stor); - break; - default: - ASSERT(false); +void ZmqAgent::got_profile(const ProfileCollector::ProfileQStor& stor) { + switch (stor.type) { + case kCpu: + { + CPUProfileOptions opts = std::get(stor.options); + do_got_prof(stor.type, + opts.thread_id, + stor.status, + stor.profile, + kProfile, + kProfileStop); + } + break; + case kHeapProf: + { + HeapProfileOptions opts = std::get(stor.options); + do_got_prof(stor.type, + opts.thread_id, + stor.status, + stor.profile, + kHeapProfile, + kHeapProfileStop); } + break; + case kHeapSampl: + { + HeapSamplingOptions opts = std::get(stor.options); + do_got_prof(stor.type, + opts.thread_id, + stor.status, + stor.profile, + kHeapSampling, + kHeapSamplingStop); + } + break; + default: + ASSERT(false); } } -void ZmqAgent::do_got_prof(const ProfileQStor& stor, - ProfileType type, +void ZmqAgent::do_got_prof(ProfileType type, + uint64_t thread_id, + int status, + const std::string& profile, const char* cmd, const char* stop_cmd) { - bool profileStreamComplete = stor.profile.length() == 0; + bool profileStreamComplete = profile.length() == 0; ProfileState& profile_state = profile_state_[type]; // get and remove associated data from pending_profiles_map - auto it = profile_state.pending_profiles_map.find(stor.thread_id); + auto it = profile_state.pending_profiles_map.find(thread_id); ASSERT(it != profile_state.pending_profiles_map.end()); ProfileStor prof_stor = it->second; if (profileStreamComplete) { @@ -2188,15 +2177,15 @@ void ZmqAgent::do_got_prof(const ProfileQStor& stor, profile_state.nr_profiles--; } - if (profile_on_exit_ && stor.thread_id == 0) { + if (profile_on_exit_ && thread_id == 0) { // Store the req_id of the main thread profile profile_state.last_main_profile = prof_stor.req_id; } // get start_ts and metadata from pending_profiles_map - if (stor.status < 0) { + if (status < 0) { // Send error message back - send_error_response(prof_stor.req_id, cmd, stor.status); + send_error_response(prof_stor.req_id, cmd, status); return; } @@ -2214,6 +2203,11 @@ void ZmqAgent::do_got_prof(const ProfileQStor& stor, // send profile chunks thru the bulk channel if (bulk_handle_) { uv_update_time(&loop_); + // Access the metadata field from the options variant + std::string metadata_dump; + std::visit([&metadata_dump](auto&& option) { + metadata_dump = option.metadata.dump(); + }, prof_stor.options); snprintf(msg_buf_, msg_size_, MSG_3, @@ -2221,11 +2215,11 @@ void ZmqAgent::do_got_prof(const ProfileQStor& stor, prof_stor.req_id.c_str(), cmd, uv_now(&loop_) - prof_stor.timestamp, - prof_stor.metadata.dump().c_str(), + metadata_dump.c_str(), profileStreamComplete ? "true" : "false", - stor.thread_id, + thread_id, version_); - bulk_handle_->send(msg_buf_, stor.profile); + bulk_handle_->send(msg_buf_, profile); } // Don't continue with the exit procedure until all profiles have finished. @@ -2234,9 +2228,30 @@ void ZmqAgent::do_got_prof(const ProfileQStor& stor, int ZmqAgent::do_start_prof(const nlohmann::json& message, const std::string& req_id, - ProfileType type, - const char* cmd, - StartProfiling start_profiling) { + ProfileType type) { + const char* cmd = nullptr; + StartProfiling start_profiling = nullptr; + ProfileOptions options; + switch (type) { + case ProfileType::kCpu: + cmd = kProfile; + start_profiling = &ZmqAgent::do_start_cpu_prof; + options = CPUProfileOptions{/* initialize with appropriate values */}; + break; + case ProfileType::kHeapProf: + cmd = kHeapProfile; + start_profiling = &ZmqAgent::do_start_heap_prof; + options = HeapProfileOptions{/* initialize with appropriate values */}; + break; + case ProfileType::kHeapSampl: + cmd = kHeapSampling; + start_profiling = &ZmqAgent::do_start_heap_sampl; + options = HeapSamplingOptions{/* initialize with appropriate values */}; + break; + default: + ASSERT(false); + } + if (!data_handle_) { // Don't send error message back as there's actually no connection return send_error_response(req_id, cmd, UV_ENOTCONN); @@ -2260,7 +2275,7 @@ int ZmqAgent::do_start_prof(const nlohmann::json& message, return send_error_response(req_id, cmd, UV_EINVAL); } - uint64_t duration = *it; + uint64_t duration = *it; it = args.find(kMetadata); @@ -2275,8 +2290,15 @@ int ZmqAgent::do_start_prof(const nlohmann::json& message, return send_error_response(req_id, cmd, UV_EEXIST); } - ProfileStor stor{ req_id, std::move(metadata), uv_now(&loop_) }; - int err = start_profiling(thread_id, duration, args, stor, this); + // Set common fields in options + std::visit([&](auto& opt) { + opt.thread_id = thread_id; + opt.duration = duration; + opt.metadata = std::move(metadata); + }, options); + + ProfileStor stor{ req_id, uv_now(&loop_), std::move(options) }; + int err = (this->*start_profiling)(args, stor); if (err != 0) { return send_error_response(req_id, cmd, err); } @@ -2296,19 +2318,94 @@ int ZmqAgent::do_start_prof(const nlohmann::json& message, return 0; } +int ZmqAgent::do_start_cpu_prof(const nlohmann::json&, + ProfileStor& stor) { + CPUProfileOptions& options = std::get(stor.options); + Debug("Starting CPU profile for thread %lu, duration: %lu\n", + options.thread_id, + options.duration); + return profile_collector_->StartCPUProfile(options); +} + +int ZmqAgent::do_start_heap_prof(const nlohmann::json& args, + ProfileStor& stor) { + auto it = args.find(kTrackAllocations); + if (it == args.end() || !it->is_boolean()) { + return UV_EINVAL; + } + + HeapProfileOptions& options = std::get(stor.options); + options.track_allocations = *it; + auto conf_it = config_.find("redactSnapshots"); + if (conf_it != config_.end()) { + if (conf_it->is_boolean()) { + options.redacted = *conf_it; + } + } + + Debug("Starting Heap profile for thread %lu, duration: %lu\n", + options.thread_id, + options.duration); + + return profile_collector_->StartHeapProfile(options); +} + +int ZmqAgent::do_start_heap_sampl(const nlohmann::json& args, + ProfileStor& stor) { + HeapSamplingOptions& options = std::get(stor.options); + auto it = args.find(kSampleInterval); + if (it != args.end()) { + if (!it->is_number_unsigned()) { + return UV_EINVAL; + } + + options.sample_interval = *it; + } + + it = args.find(kStackDepth); + if (it != args.end()) { + if (!it->is_number_unsigned()) { + return UV_EINVAL; + } + + options.stack_depth = *it; + } + + it = args.find(kFlags); + if (it != args.end()) { + if (!it->is_number_unsigned()) { + return UV_EINVAL; + } + + options.flags = *it; + } + + Debug("Starting Heap sampling for thread %lu, duration: %lu\n", + options.thread_id, + options.duration); + + return profile_collector_->StartHeapSampling(options); +} + int ZmqAgent::start_profiling(const json& message, const std::string& req_id) { - return start_prof(message, req_id); + return do_start_prof(message, + req_id, + ProfileType::kCpu); } int ZmqAgent::start_heap_profiling(const json& message, const std::string& req_id) { - return start_prof(message, req_id); + return do_start_prof(message, + req_id, + ProfileType::kHeapProf); } int ZmqAgent::start_heap_sampling(const json& message, const std::string& req_id) { - return start_prof(message, req_id); + return do_start_prof(message, + req_id, + ProfileType::kHeapSampl); } int ZmqAgent::stop_profiling(uint64_t thread_id) { @@ -2656,88 +2753,6 @@ void ZmqAgent::resize_msg_buffer(int size) { msg_size_ = size + 1; } -int ZmqAgent::CPUProfilePolicy::start_profiling(uint64_t thread_id, - uint64_t duration, - const nlohmann::json& args, - ProfileStor& stor, - ZmqAgent* agent) { - return CpuProfiler::TakeProfile(GetEnvInst(thread_id), - duration, - cpu_profile_cb, - thread_id, - agent); -} - -int ZmqAgent::HeapProfilePolicy::start_profiling(uint64_t thread_id, - uint64_t duration, - const nlohmann::json& args, - ProfileStor& stor, - ZmqAgent* agent) { - auto it = args.find(kTrackAllocations); - if (it == args.end() || !it->is_boolean()) { - return UV_EINVAL; - } - - stor.trackAllocations = *it; - bool redacted = false; - auto conf_it = agent->config_.find("redactSnapshots"); - if (conf_it != agent->config_.end()) { - if (conf_it->is_boolean()) { - redacted = *conf_it; - } - } - - return Snapshot::StartTrackingHeapObjects(GetEnvInst(thread_id), - redacted, - stor.trackAllocations, - duration, - heap_profile_cb, - thread_id, - agent); -} - -int ZmqAgent::HeapSamplingPolicy::start_profiling(uint64_t thread_id, - uint64_t duration, - const nlohmann::json& args, - ProfileStor& stor, - ZmqAgent* agent) { - auto it = args.find(kSampleInterval); - if (it != args.end()) { - if (!it->is_number_unsigned()) { - return UV_EINVAL; - } - - stor.sample_interval = *it; - } - - it = args.find(kStackDepth); - if (it != args.end()) { - if (!it->is_number_unsigned()) { - return UV_EINVAL; - } - - stor.stack_depth = *it; - } - - it = args.find(kFlags); - if (it != args.end()) { - if (!it->is_number_unsigned()) { - return UV_EINVAL; - } - - stor.flags = *it; - } - - return Snapshot::StartSampling(GetEnvInst(thread_id), - stor.sample_interval, - stor.stack_depth, - stor.flags, - duration, - heap_sampling_cb, - thread_id, - agent); -} - ZmqCommandHandleRes ZmqCommandHandle::create(ZmqAgent& agent, const json& config) { std::unique_ptr handle(new ZmqCommandHandle(agent)); diff --git a/agents/zmq/src/zmq_agent.h b/agents/zmq/src/zmq_agent.h index 885c973153..080f9d37f5 100644 --- a/agents/zmq/src/zmq_agent.h +++ b/agents/zmq/src/zmq_agent.h @@ -15,6 +15,7 @@ #include "asserts-cpp/asserts.h" #include "nlohmann/json.hpp" #include "http_client.h" +#include "../../src/profile_collector.h" #define HANDLE_TYPES(X) \ X(Command, "Command", ZMQ_SUB, "inproc://monitor-command", "command", false, \ @@ -349,8 +350,6 @@ class ZmqBulkHandle : public ZmqHandle { // console, dynamically change the configuration of the agent/runtime, etc. class ZmqAgent { - struct ProfileQStor; - public: enum Status { #define X(type, str) \ @@ -449,9 +448,6 @@ class ZmqAgent { const nlohmann::json& message, const std::string& req_id = utils::generate_unique_id()); - template - void got_prof(const ProfileQStor& stor); - bool pending_profiles() const; int stop_profiling(uint64_t thread_id); @@ -469,28 +465,14 @@ class ZmqAgent { bool profile_on_exit() { return profile_on_exit_.load(); } private: - enum ProfileType { - kCpu = 0, - kHeapProf, - kHeapSampl, - kNumberOfProfileTypes - }; - struct ProfileStor { std::string req_id; - json metadata; uint64_t timestamp; - bool trackAllocations = false; - uint64_t sample_interval = 0; - int stack_depth = 0; - v8::HeapProfiler::SamplingFlags flags = v8::HeapProfiler::kSamplingNoFlags; + ProfileOptions options; }; - using StartProfiling = int (*)(uint64_t, - uint64_t, - const nlohmann::json&, - ProfileStor&, - ZmqAgent*); + using StartProfiling = int (ZmqAgent::*)(const nlohmann::json&, + ProfileStor&); using ProfileStorMap = std::map; @@ -500,13 +482,6 @@ class ZmqAgent { std::string last_main_profile; }; - struct ProfileQStor { - ProfileType type; - int status; - std::string profile; - uint64_t thread_id; - }; - ZmqAgent(); ~ZmqAgent(); @@ -544,8 +519,6 @@ class ZmqAgent { uint64_t thread_id, ZmqAgent* agent); - static void profile_msg_cb(nsuv::ns_async*, ZmqAgent*); - static void heap_profile_cb(int status, std::string profile, uint64_t thread_id, @@ -615,19 +588,23 @@ class ZmqAgent { int command_message(const nlohmann::json& message); - template - int start_prof( - const nlohmann::json& message, - const std::string& req_id = utils::generate_unique_id()); - int do_start_prof(const nlohmann::json& message, const std::string& req_id, - ProfileType type, - const char* cmd, - StartProfiling start_profiling); + ProfileType type); + + // NOLINTNEXTLINE(runtime/references) + int do_start_cpu_prof(const nlohmann::json&, ProfileStor& stor); + + // NOLINTNEXTLINE(runtime/references) + int do_start_heap_prof(const nlohmann::json&, ProfileStor& stor); + + // NOLINTNEXTLINE(runtime/references) + int do_start_heap_sampl(const nlohmann::json&, ProfileStor& stor); - void do_got_prof(const ProfileQStor& stor, - ProfileType type, + void do_got_prof(ProfileType type, + uint64_t thread_id, + int status, + const std::string& profile, const char* cmd, const char* stop_cmd); @@ -649,6 +626,8 @@ class ZmqAgent { int got_metrics(const std::string& metrics); + void got_profile(const ProfileCollector::ProfileQStor& stor); + // NOLINTNEXTLINE(runtime/int) void handle_auth_response(CURLcode, long, const std::string&); @@ -755,10 +734,9 @@ class ZmqAgent { nsuv::ns_async update_state_msg_; // Profiling - nsuv::ns_async profile_msg_; - TSQueue profile_msg_q_; ProfileState profile_state_[ProfileType::kNumberOfProfileTypes]; std::atomic profile_on_exit_; + std::shared_ptr profile_collector_; // Heap Snapshot nsuv::ns_async heap_snapshot_msg_; @@ -787,63 +765,8 @@ class ZmqAgent { // For status testing void(*status_cb_)(const std::string&); - - class CPUProfilePolicy { - public: - static int start_profiling(uint64_t thread_id, - uint64_t duration, - const nlohmann::json& metadata, - ProfileStor& stor, // NOLINT(runtime/references) - ZmqAgent* agent); - - static constexpr const char* cmd = "profile"; - static constexpr const char* stop_cmd = "profile_stop"; - static constexpr ProfileType Type = ProfileType::kCpu; - }; - - class HeapProfilePolicy { - public: - static int start_profiling(uint64_t thread_id, - uint64_t duration, - const nlohmann::json& metadata, - ProfileStor& stor, // NOLINT(runtime/references) - ZmqAgent* agent); - - static constexpr const char* cmd = "heap_profile"; - static constexpr const char* stop_cmd = "heap_profile_stop"; - static constexpr ProfileType Type = ProfileType::kHeapProf; - }; - - class HeapSamplingPolicy { - public: - static int start_profiling(uint64_t thread_id, - uint64_t duration, - const nlohmann::json& metadata, - ProfileStor& stor, // NOLINT(runtime/references) - ZmqAgent* agent); - - static constexpr const char* cmd = "heap_sampling"; - static constexpr const char* stop_cmd = "heap_sampling_stop"; - static constexpr ProfileType Type = ProfileType::kHeapSampl; - }; }; - -template -int ZmqAgent::start_prof(const nlohmann::json& message, - const std::string& req_id) { - return do_start_prof(message, - req_id, - Policy::Type, - Policy::cmd, - Policy::start_profiling); -} - -template -void ZmqAgent::got_prof(const ProfileQStor& stor) { - do_got_prof(stor, Policy::Type, Policy::cmd, Policy::stop_cmd); -} - } // namespace nsolid } // namespace node