diff --git a/Makefile b/Makefile index a762ba79ef..d9f85d4a65 100644 --- a/Makefile +++ b/Makefile @@ -1424,6 +1424,8 @@ LINT_CPP_EXCLUDE += src/tracing/trace_event.h src/tracing/trace_event_common.h LINT_CPP_FILES = $(filter-out $(LINT_CPP_EXCLUDE), $(wildcard \ agents/otlp/src/*.cc \ agents/otlp/src/*.h \ + agents/src/*.cc \ + agents/src/*.h \ agents/statsd/src/*.cc \ agents/statsd/src/*.h \ agents/zmq/src/*.cc \ diff --git a/agents/otlp/src/otlp_agent.cc b/agents/otlp/src/otlp_agent.cc index 79032dc317..b853e92eff 100644 --- a/agents/otlp/src/otlp_agent.cc +++ b/agents/otlp/src/otlp_agent.cc @@ -18,6 +18,7 @@ #include "opentelemetry/exporters/otlp/otlp_http_exporter.h" #include "opentelemetry/ext/http/client/curl/http_client_curl.h" #include "opentelemetry/trace/propagation/detail/hex.h" +#include "../../src/span_collector.h" using ThreadMetricsStor = node::nsolid::ThreadMetrics::MetricsStor; using nlohmann::json; @@ -40,6 +41,9 @@ namespace node { namespace nsolid { namespace otlp { +constexpr uint64_t span_timer_interval = 1000; +constexpr size_t span_msg_q_min_size = 200; + template inline void Debug(Args&&... args) { per_process::Debug(DebugCategory::NSOLID_OTLP_AGENT, @@ -167,7 +171,7 @@ int OTLPAgent::config(const nlohmann::json& config) { } // Configure tracing flags - if (trace_flags_ == 0 || + if (span_collector_ == nullptr || utils::find_any_fields_in_diff(diff, tracing_fields)) { trace_flags_ = 0; if (otlp_exporter_ != nullptr) { @@ -312,7 +316,6 @@ void OTLPAgent::do_start() { uv_mutex_lock(&start_lock_); ASSERT_EQ(0, shutdown_.init(&loop_, shutdown_cb_, this)); ASSERT_EQ(0, env_msg_.init(&loop_, env_msg_cb_, this)); - ASSERT_EQ(0, span_msg_.init(&loop_, span_msg_cb_, this)); ASSERT_EQ(0, metrics_msg_.init(&loop_, metrics_msg_cb_, this)); ASSERT_EQ(0, metrics_timer_.init(&loop_)); ASSERT_EQ(0, config_msg_.init(&loop_, config_msg_cb_, this)); @@ -334,46 +337,17 @@ void OTLPAgent::do_stop() { ready_ = false; shutdown_.close(); env_msg_.close(); - span_msg_.close(); metrics_msg_.close(); metrics_timer_.close(); config_msg_.close(); metrics_exporter_.reset(nullptr); + span_collector_.reset(); } -void OTLPAgent::trace_hook_(Tracer* tracer, - const Tracer::SpanStor& stor, - OTLPAgent* agent) { - nsuv::ns_rwlock::scoped_rdlock lock(exit_lock_); - if (!is_running_ || !agent->ready_) { - return; - } - - agent->span_msg_q_.enqueue(stor); - ASSERT_EQ(0, agent->span_msg_.send()); -} - - -void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) { - // Don't exit until all the pending spans are sent - nsuv::ns_rwlock::scoped_rdlock lock(exit_lock_); - if (!is_running_) { - return; - } - - using std::chrono::duration_cast; - using std::chrono::milliseconds; - using std::chrono::nanoseconds; - std::vector> recordables; - Tracer::SpanStor s; - while (agent->span_msg_q_.dequeue(s)) { - auto recordable = agent->otlp_exporter_->MakeRecordable(); - fill_recordable(recordable.get(), s); - recordables.push_back(std::move(recordable)); - } - - auto result = agent->otlp_exporter_->Export(recordables); +void OTLPAgent::got_spans(const UniqRecordables& recordables) { + auto result = + otlp_exporter_->Export(const_cast(recordables)); Debug("# Spans Exported: %ld. Result: %d\n", recordables.size(), static_cast(result)); @@ -615,14 +589,33 @@ int OTLPAgent::setup_metrics_timer(uint64_t period) { } void OTLPAgent::update_tracer(uint32_t flags) { - tracer_.reset(nullptr); - if (flags) { - Tracer* tracer = Tracer::CreateInstance(flags, trace_hook_, this); - ASSERT_NOT_NULL(tracer); - tracer_.reset(tracer); + Debug("Updating Tracer with flags: %u\n", flags); + span_collector_.reset(); + if (trace_flags_) { + span_collector_ = std::make_shared(&loop_, + trace_flags_, + span_msg_q_min_size, + span_timer_interval); + span_collector_->CollectAndTransform( + transf, +[](UniqRecordables& spans, OTLPAgent* agent) { + nsuv::ns_rwlock::scoped_rdlock lock(exit_lock_); + if (!is_running_) { + return; + } + + agent->got_spans(spans); + }, this); } } +UniqRecordable OTLPAgent::transf(const Tracer::SpanStor& span, + OTLPAgent* agent) { + // Transform the span + fprintf(stderr, "Transforming span\n"); + auto recordable = agent->otlp_exporter_->MakeRecordable(); + fill_recordable(recordable.get(), span); + return recordable; +} } // namespace otlp } // namespace nsolid } // namespace node diff --git a/agents/otlp/src/otlp_agent.h b/agents/otlp/src/otlp_agent.h index 0e2329e272..b70aa28a1b 100644 --- a/agents/otlp/src/otlp_agent.h +++ b/agents/otlp/src/otlp_agent.h @@ -21,14 +21,21 @@ struct OtlpGrpcExporterOptions; } namespace sdk { namespace trace { +class Recordable; class SpanExporter; } } OPENTELEMETRY_END_NAMESPACE +using UniqRecordable = + std::unique_ptr; +using UniqRecordables = std::vector; + namespace node { namespace nsolid { +class SpanCollector; + namespace otlp { struct JSThreadMetrics { @@ -68,18 +75,14 @@ class OTLPAgent { static void on_thread_remove_(SharedEnvInst, OTLPAgent* agent); - static void trace_hook_(Tracer* tracer, - const Tracer::SpanStor& stor, - OTLPAgent* agent); - - static void span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent); - static void metrics_timer_cb_(nsuv::ns_timer*, OTLPAgent*); static void metrics_msg_cb_(nsuv::ns_async*, OTLPAgent* agent); static void thr_metrics_cb_(SharedThreadMetrics, OTLPAgent*); + static UniqRecordable transf(const Tracer::SpanStor& span, OTLPAgent* agent); + void do_start(); void do_stop(); @@ -99,6 +102,8 @@ class OTLPAgent { void got_proc_metrics(); + void got_spans(const UniqRecordables& spans); + void setup_trace_otlp_exporter( // NOLINTNEXTLINE(runtime/references) OPENTELEMETRY_NAMESPACE::exporter::otlp::OtlpHttpExporterOptions& opts); @@ -124,10 +129,8 @@ class OTLPAgent { TSQueue> env_msg_q_; // For the Tracing API - std::unique_ptr tracer_; - nsuv::ns_async span_msg_; - TSQueue span_msg_q_; uint32_t trace_flags_; + std::shared_ptr span_collector_; std::unique_ptr otlp_exporter_; diff --git a/agents/src/http_client.h b/agents/src/http_client.h index 6a70af1d18..6b32220fa8 100644 --- a/agents/src/http_client.h +++ b/agents/src/http_client.h @@ -1,5 +1,5 @@ -#ifndef AGENTS_COMMON_SRC_HTTP_CLIENT_H_ -#define AGENTS_COMMON_SRC_HTTP_CLIENT_H_ +#ifndef AGENTS_SRC_HTTP_CLIENT_H_ +#define AGENTS_SRC_HTTP_CLIENT_H_ #include #include @@ -26,7 +26,7 @@ class CurlContext { int start(int events, poll_cb cb); private: - friend class HttpClient; + friend class HttpClient; HttpClient* http_client_; nsuv::ns_poll* poll_handle_; curl_socket_t sockfd_; diff --git a/agents/src/span_collector.cc b/agents/src/span_collector.cc new file mode 100644 index 0000000000..3e429c65f8 --- /dev/null +++ b/agents/src/span_collector.cc @@ -0,0 +1,89 @@ +#include "span_collector.h" +#include "asserts-cpp/asserts.h" + +namespace node { +namespace nsolid { + +SpanCollector::SpanCollector(uv_loop_t* loop, + uint32_t trace_flags, + size_t min_span_count, + uint64_t interval): + loop_(loop), + trace_flags_(trace_flags), + min_span_count_(min_span_count), + span_msg_(new nsuv::ns_async()), + interval_(interval), + span_timer_(new nsuv::ns_timer()) { +} + +SpanCollector::~SpanCollector() { + span_timer_->close_and_delete(); + span_msg_->close_and_delete(); +} + +void SpanCollector::do_collect() { + // Create the instance with a callback that enqueues spans and sends a message + Tracer* tracer = Tracer::CreateInstance( + trace_flags_, + +[](Tracer*, Tracer::SpanStor span, WeakSpanCollector collector_wp) { + SharedSpanCollector collector = collector_wp.lock(); + if (collector == nullptr) { + return; + } + + // Only notify the loop_ thread if the queue reaches min_span_count_to + // avoid too many uv_async_t::send() calls. + if (collector->span_msg_q_.enqueue(span) > collector->min_span_count_) { + ASSERT_EQ(0, collector->span_msg_->send()); + } + }, + weak_from_this()); + ASSERT_NOT_NULL(tracer); + tracer_.reset(tracer); + + int er = span_msg_->init( + loop_, + +[](nsuv::ns_async*, WeakSpanCollector collector_wp) { + SharedSpanCollector collector = collector_wp.lock(); + if (collector == nullptr) { + return; + } + + collector->process_spans(); + }, + weak_from_this()); + ASSERT_EQ(0, er); + er = span_timer_->init(loop_); + ASSERT_EQ(0, er); + er = span_timer_->start(+[](nsuv::ns_timer*, + WeakSpanCollector collector_wp) { + SharedSpanCollector collector = collector_wp.lock(); + if (collector == nullptr) { + return; + } + + collector->process_spans(); + }, 0, interval_, weak_from_this()); + ASSERT_EQ(0, er); +} + +void SpanCollector::process_spans() { + Tracer::SpanStor span; + SpanVector spans; + while (span_msg_q_.dequeue(span)) { + spans.push_back(std::move(span)); + } + + // After processing, call the stored callback with the spans and additional + // data. + if (spans.size()) { + if (transform_callback_) { + transform_callback_(spans); + } else { + callback_(spans); + } + } +} + +} // namespace nsolid +} // namespace node diff --git a/agents/src/span_collector.h b/agents/src/span_collector.h new file mode 100644 index 0000000000..71e23197bd --- /dev/null +++ b/agents/src/span_collector.h @@ -0,0 +1,91 @@ +#ifndef AGENTS_SRC_SPAN_COLLECTOR_H_ +#define AGENTS_SRC_SPAN_COLLECTOR_H_ + +#include +#include +#include "nsuv-inl.h" +#include + +namespace node { +namespace nsolid { + +class SpanCollector; + +using SharedSpanCollector = std::shared_ptr; +using WeakSpanCollector = std::weak_ptr; +using SpanVector = std::vector; + +/* + * SpanCollector is a class that allows to start collecting spans and report + * them to a callback function running in a specific uv_loop_t thread. The + * collector can be configured with a minimum span count and a time interval to + * report the spans. In addition, the collector can be configured with a + * transform function that will be applied to the spans before calling the final + * callback. + */ +class SpanCollector: public std::enable_shared_from_this { + public: + explicit SpanCollector(uv_loop_t* loop, + uint32_t trace_flags, + size_t min_span_count, + uint64_t interval); + ~SpanCollector(); + + template + void Collect(Cb&& cb, Data&&... data) { + // Store the callback and data + callback_ = std::bind(std::forward(cb), + std::placeholders::_1, + std::forward(data)...); + this->do_collect(); + } + + template + void CollectAndTransform(Transform&& transform, Cb&& cb, Data&&... data) { + // Store the transform function and callback + auto data_tuple = std::make_tuple(std::forward(data)...); + transform_callback_ = [transform = std::forward(transform), + cb = std::forward(cb), + data_tuple] (const SpanVector& spans) { + // Create a vector for transformed spans + using TransformedType = + decltype(transform(std::declval(), + std::declval()...)); + std::vector transformed_spans; + transformed_spans.reserve(spans.size()); + + // Apply the transform to each span + std::apply([&transform, &spans, &transformed_spans](const Data&... data) { + for (const auto& span : spans) { + transformed_spans.push_back(transform(span, data...)); + } + }, data_tuple); + + // Call the callback with the transformed spans and additional data + std::apply([&cb, &transformed_spans](const Data&... data) { + cb(transformed_spans, data...); + }, data_tuple); + }; + this->do_collect(); + } + + private: + void do_collect(); + void process_spans(); + + uv_loop_t* loop_; + uint32_t trace_flags_; + size_t min_span_count_; + std::unique_ptr tracer_; + nsuv::ns_async* span_msg_; + TSQueue span_msg_q_; + uint64_t interval_; + nsuv::ns_timer* span_timer_; + std::function callback_ = nullptr; + std::function transform_callback_ = nullptr; +}; + +} // namespace nsolid +} // namespace node + +#endif // AGENTS_SRC_SPAN_COLLECTOR_H_ diff --git a/agents/zmq/src/zmq_agent.cc b/agents/zmq/src/zmq_agent.cc index 038dfac5cf..40fc86be8c 100644 --- a/agents/zmq/src/zmq_agent.cc +++ b/agents/zmq/src/zmq_agent.cc @@ -9,6 +9,7 @@ #include "zmq_endpoint.h" #include "zmq_errors.h" #include "nsolid/nsolid_heap_snapshot.h" +#include "../../src/span_collector.h" namespace node { @@ -73,7 +74,7 @@ const uint64_t NSEC_BEFORE_WARN = 10000000000; const uint64_t auth_timer_interval = 500; const uint64_t invalid_key_timer_interval = 500; constexpr uint64_t span_timer_interval = 100; -constexpr size_t span_msg_q_max_size = 200; +constexpr size_t span_msg_q_min_size = 200; const char MSG_1[] = "{" "\"agentId\":\"%s\"" @@ -722,10 +723,6 @@ void ZmqAgent::do_start() { ASSERT_EQ(0, custom_command_msg_.init(&loop_, custom_command_msg_cb, this)); - ASSERT_EQ(0, span_msg_.init(&loop_, span_msg_cb, this)); - - ASSERT_EQ(0, span_timer_.init(&loop_)); - http_client_.reset(new zmq::ZmqHttpClient(&loop_)); status(Initializing); @@ -772,6 +769,7 @@ int ZmqAgent::stop(bool profile_stopped) { void ZmqAgent::do_stop() { send_exit(); + span_collector_.reset(); http_client_.reset(nullptr); command_handle_.reset(nullptr); data_handle_.reset(nullptr); @@ -788,8 +786,6 @@ void ZmqAgent::do_stop() { heap_snapshot_msg_.close(); blocked_loop_msg_.close(); custom_command_msg_.close(); - span_msg_.close(); - span_timer_.close(); config_.clear(); saas_.clear(); @@ -860,21 +856,6 @@ void ZmqAgent::env_msg_cb(nsuv::ns_async*, ZmqAgent* agent) { } -void ZmqAgent::got_trace(Tracer* tracer, - const Tracer::SpanStor& stor, - ZmqAgent* agent) { - // Check if the agent is already delete or if it's closing - if (!is_running || agent->span_msg_.is_closing()) { - return; - } - - size_t size = agent->span_msg_q_.enqueue(stor); - if (size > span_msg_q_max_size) { - ASSERT_EQ(0, agent->span_msg_.send()); - } -} - - void ZmqAgent::got_env_metrics(const ThreadMetrics::MetricsStor& stor) { ProcessMetrics::MetricsStor proc_stor; @@ -1456,7 +1437,7 @@ int ZmqAgent::config(const json& config) { } // Configure tracing flags - if (tracer_ == nullptr || + if (span_collector_ == nullptr || utils::find_any_fields_in_diff(diff, { "/tracingEnabled", "/tracingModulesBlacklist" })) { auto it = config_.find("tracingEnabled"); @@ -1479,23 +1460,18 @@ int ZmqAgent::config(const json& config) { trace_flags_ = 0; } - tracer_.reset(nullptr); - if (trace_flags_) { - Tracer* tracer = Tracer::CreateInstance(trace_flags_, got_trace, this); - ASSERT_NOT_NULL(tracer); - tracer_.reset(tracer); - } - + span_collector_.reset(); if (trace_flags_) { - ret = span_timer_.start(span_timer_cb, 0, span_timer_interval, this); - if (ret != 0) { - return ret; - } - } else { - ret = span_timer_.stop(); - if (ret != 0) { - return ret; - } + span_collector_ = std::make_shared(&loop_, + trace_flags_, + span_msg_q_min_size, + span_timer_interval); + span_collector_->Collect(+[](const std::vector& spans, + ZmqAgent* agent) { + if (is_running) { + agent->got_spans(spans); + } + }, this); } } @@ -2041,22 +2017,6 @@ void ZmqAgent::custom_command_cb(std::string req_id, ASSERT_EQ(0, agent->custom_command_msg_.send()); } -void ZmqAgent::span_msg_cb(nsuv::ns_async*, ZmqAgent* agent) { - std::vector spans; - Tracer::SpanStor stor; - while (agent->span_msg_q_.dequeue(stor)) { - spans.push_back(std::move(stor)); - } - - if (spans.size() > 0) { - agent->got_spans(spans); - } -} - -void ZmqAgent::span_timer_cb(nsuv::ns_timer*, ZmqAgent* agent) { - agent->span_msg_cb(nullptr, agent); -} - void ZmqAgent::got_spans(const std::vector& spans) { if (spans.empty()) { return; diff --git a/agents/zmq/src/zmq_agent.h b/agents/zmq/src/zmq_agent.h index 3e2020c7d1..885c973153 100644 --- a/agents/zmq/src/zmq_agent.h +++ b/agents/zmq/src/zmq_agent.h @@ -51,6 +51,7 @@ static unsigned int monitor_suffix = 0; namespace node { namespace nsolid { +class SpanCollector; class ZmqAgent; class ZmqHandle; class ZmqCommandHandle; @@ -524,10 +525,6 @@ class ZmqAgent { static void env_msg_cb(nsuv::ns_async*, ZmqAgent*); - static void got_trace(Tracer* tracer, - const Tracer::SpanStor& stor, - ZmqAgent* agent); - static void shutdown_cb(nsuv::ns_async*, ZmqAgent*); static void metrics_msg_cb(nsuv::ns_async*, ZmqAgent*); @@ -779,11 +776,8 @@ class ZmqAgent { TSQueue custom_command_msg_q_; // Tracing - std::unique_ptr tracer_; - nsuv::ns_async span_msg_; - TSQueue span_msg_q_; - nsuv::ns_timer span_timer_; uint32_t trace_flags_; + std::shared_ptr span_collector_; // ZMQ message buffers char msg_slab_[65536]; diff --git a/node.gyp b/node.gyp index 21b4478272..849515c0c4 100644 --- a/node.gyp +++ b/node.gyp @@ -390,6 +390,8 @@ 'nsolid_sources': [ 'agents/src/http_client.cc', 'agents/src/http_client.h', + 'agents/src/span_collector.cc', + 'agents/src/span_collector.h', 'agents/otlp/src/datadog_metrics.cc', 'agents/otlp/src/dynatrace_metrics.cc', 'agents/otlp/src/http_client.cc',