Skip to content

Commit

Permalink
agents: implement SpanCollector helper class
Browse files Browse the repository at this point in the history
SpanCollector is a class that allows to start collecting spans and
report them to a callback function running in a specific uv_loop_t
thread. The collector can be configured with a minimum span count and a
time interval to report the spans. In addition, the collector can be
configured with a transform function that will be applied to the spans
before calling the final callback.
Use this class to dry the code in the `ZmqAgent` and `OTLPAgent`.

PR-URL: #160
Reviewed-By: Trevor Norris <trev.norris@gmail.com>
  • Loading branch information
santigimeno authored and trevnorris committed Aug 21, 2024
1 parent fda0c4e commit 8c53110
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 115 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,8 @@ LINT_CPP_EXCLUDE += src/tracing/trace_event.h src/tracing/trace_event_common.h
LINT_CPP_FILES = $(filter-out $(LINT_CPP_EXCLUDE), $(wildcard \
agents/otlp/src/*.cc \
agents/otlp/src/*.h \
agents/src/*.cc \
agents/src/*.h \
agents/statsd/src/*.cc \
agents/statsd/src/*.h \
agents/zmq/src/*.cc \
Expand Down
73 changes: 33 additions & 40 deletions agents/otlp/src/otlp_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "opentelemetry/exporters/otlp/otlp_http_exporter.h"
#include "opentelemetry/ext/http/client/curl/http_client_curl.h"
#include "opentelemetry/trace/propagation/detail/hex.h"
#include "../../src/span_collector.h"

using ThreadMetricsStor = node::nsolid::ThreadMetrics::MetricsStor;
using nlohmann::json;
Expand All @@ -40,6 +41,9 @@ namespace node {
namespace nsolid {
namespace otlp {

constexpr uint64_t span_timer_interval = 1000;
constexpr size_t span_msg_q_min_size = 200;

template <typename... Args>
inline void Debug(Args&&... args) {
per_process::Debug(DebugCategory::NSOLID_OTLP_AGENT,
Expand Down Expand Up @@ -167,7 +171,7 @@ int OTLPAgent::config(const nlohmann::json& config) {
}

// Configure tracing flags
if (trace_flags_ == 0 ||
if (span_collector_ == nullptr ||
utils::find_any_fields_in_diff(diff, tracing_fields)) {
trace_flags_ = 0;
if (otlp_exporter_ != nullptr) {
Expand Down Expand Up @@ -312,7 +316,6 @@ void OTLPAgent::do_start() {
uv_mutex_lock(&start_lock_);
ASSERT_EQ(0, shutdown_.init(&loop_, shutdown_cb_, this));
ASSERT_EQ(0, env_msg_.init(&loop_, env_msg_cb_, this));
ASSERT_EQ(0, span_msg_.init(&loop_, span_msg_cb_, this));
ASSERT_EQ(0, metrics_msg_.init(&loop_, metrics_msg_cb_, this));
ASSERT_EQ(0, metrics_timer_.init(&loop_));
ASSERT_EQ(0, config_msg_.init(&loop_, config_msg_cb_, this));
Expand All @@ -334,46 +337,17 @@ void OTLPAgent::do_stop() {
ready_ = false;
shutdown_.close();
env_msg_.close();
span_msg_.close();
metrics_msg_.close();
metrics_timer_.close();
config_msg_.close();
metrics_exporter_.reset(nullptr);
span_collector_.reset();
}


void OTLPAgent::trace_hook_(Tracer* tracer,
const Tracer::SpanStor& stor,
OTLPAgent* agent) {
nsuv::ns_rwlock::scoped_rdlock lock(exit_lock_);
if (!is_running_ || !agent->ready_) {
return;
}

agent->span_msg_q_.enqueue(stor);
ASSERT_EQ(0, agent->span_msg_.send());
}


void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {
// Don't exit until all the pending spans are sent
nsuv::ns_rwlock::scoped_rdlock lock(exit_lock_);
if (!is_running_) {
return;
}

using std::chrono::duration_cast;
using std::chrono::milliseconds;
using std::chrono::nanoseconds;
std::vector<std::unique_ptr<sdk::trace::Recordable>> recordables;
Tracer::SpanStor s;
while (agent->span_msg_q_.dequeue(s)) {
auto recordable = agent->otlp_exporter_->MakeRecordable();
fill_recordable(recordable.get(), s);
recordables.push_back(std::move(recordable));
}

auto result = agent->otlp_exporter_->Export(recordables);
void OTLPAgent::got_spans(const UniqRecordables& recordables) {
auto result =
otlp_exporter_->Export(const_cast<UniqRecordables&>(recordables));
Debug("# Spans Exported: %ld. Result: %d\n",
recordables.size(),
static_cast<int>(result));
Expand Down Expand Up @@ -615,14 +589,33 @@ int OTLPAgent::setup_metrics_timer(uint64_t period) {
}

void OTLPAgent::update_tracer(uint32_t flags) {
tracer_.reset(nullptr);
if (flags) {
Tracer* tracer = Tracer::CreateInstance(flags, trace_hook_, this);
ASSERT_NOT_NULL(tracer);
tracer_.reset(tracer);
Debug("Updating Tracer with flags: %u\n", flags);
span_collector_.reset();
if (trace_flags_) {
span_collector_ = std::make_shared<SpanCollector>(&loop_,
trace_flags_,
span_msg_q_min_size,
span_timer_interval);
span_collector_->CollectAndTransform(
transf, +[](UniqRecordables& spans, OTLPAgent* agent) {
nsuv::ns_rwlock::scoped_rdlock lock(exit_lock_);
if (!is_running_) {
return;
}

agent->got_spans(spans);
}, this);
}
}

UniqRecordable OTLPAgent::transf(const Tracer::SpanStor& span,
OTLPAgent* agent) {
// Transform the span
fprintf(stderr, "Transforming span\n");
auto recordable = agent->otlp_exporter_->MakeRecordable();
fill_recordable(recordable.get(), span);
return recordable;
}
} // namespace otlp
} // namespace nsolid
} // namespace node
21 changes: 12 additions & 9 deletions agents/otlp/src/otlp_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,21 @@ struct OtlpGrpcExporterOptions;
}
namespace sdk {
namespace trace {
class Recordable;
class SpanExporter;
}
}
OPENTELEMETRY_END_NAMESPACE

using UniqRecordable =
std::unique_ptr<OPENTELEMETRY_NAMESPACE::sdk::trace::Recordable>;
using UniqRecordables = std::vector<UniqRecordable>;

namespace node {
namespace nsolid {

class SpanCollector;

namespace otlp {

struct JSThreadMetrics {
Expand Down Expand Up @@ -68,18 +75,14 @@ class OTLPAgent {

static void on_thread_remove_(SharedEnvInst, OTLPAgent* agent);

static void trace_hook_(Tracer* tracer,
const Tracer::SpanStor& stor,
OTLPAgent* agent);

static void span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent);

static void metrics_timer_cb_(nsuv::ns_timer*, OTLPAgent*);

static void metrics_msg_cb_(nsuv::ns_async*, OTLPAgent* agent);

static void thr_metrics_cb_(SharedThreadMetrics, OTLPAgent*);

static UniqRecordable transf(const Tracer::SpanStor& span, OTLPAgent* agent);

void do_start();

void do_stop();
Expand All @@ -99,6 +102,8 @@ class OTLPAgent {

void got_proc_metrics();

void got_spans(const UniqRecordables& spans);

void setup_trace_otlp_exporter( // NOLINTNEXTLINE(runtime/references)
OPENTELEMETRY_NAMESPACE::exporter::otlp::OtlpHttpExporterOptions& opts);

Expand All @@ -124,10 +129,8 @@ class OTLPAgent {
TSQueue<std::tuple<SharedEnvInst, bool>> env_msg_q_;

// For the Tracing API
std::unique_ptr<Tracer> tracer_;
nsuv::ns_async span_msg_;
TSQueue<Tracer::SpanStor> span_msg_q_;
uint32_t trace_flags_;
std::shared_ptr<SpanCollector> span_collector_;

std::unique_ptr<OPENTELEMETRY_NAMESPACE::sdk::trace::SpanExporter>
otlp_exporter_;
Expand Down
6 changes: 3 additions & 3 deletions agents/src/http_client.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef AGENTS_COMMON_SRC_HTTP_CLIENT_H_
#define AGENTS_COMMON_SRC_HTTP_CLIENT_H_
#ifndef AGENTS_SRC_HTTP_CLIENT_H_
#define AGENTS_SRC_HTTP_CLIENT_H_

#include <curl/curl.h>
#include <string>
Expand All @@ -26,7 +26,7 @@ class CurlContext {
int start(int events, poll_cb cb);

private:
friend class HttpClient;
friend class HttpClient;
HttpClient* http_client_;
nsuv::ns_poll* poll_handle_;
curl_socket_t sockfd_;
Expand Down
89 changes: 89 additions & 0 deletions agents/src/span_collector.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#include "span_collector.h"
#include "asserts-cpp/asserts.h"

namespace node {
namespace nsolid {

SpanCollector::SpanCollector(uv_loop_t* loop,
uint32_t trace_flags,
size_t min_span_count,
uint64_t interval):
loop_(loop),
trace_flags_(trace_flags),
min_span_count_(min_span_count),
span_msg_(new nsuv::ns_async()),
interval_(interval),
span_timer_(new nsuv::ns_timer()) {
}

SpanCollector::~SpanCollector() {
span_timer_->close_and_delete();
span_msg_->close_and_delete();
}

void SpanCollector::do_collect() {
// Create the instance with a callback that enqueues spans and sends a message
Tracer* tracer = Tracer::CreateInstance(
trace_flags_,
+[](Tracer*, Tracer::SpanStor span, WeakSpanCollector collector_wp) {
SharedSpanCollector collector = collector_wp.lock();
if (collector == nullptr) {
return;
}

// Only notify the loop_ thread if the queue reaches min_span_count_to
// avoid too many uv_async_t::send() calls.
if (collector->span_msg_q_.enqueue(span) > collector->min_span_count_) {
ASSERT_EQ(0, collector->span_msg_->send());
}
},
weak_from_this());
ASSERT_NOT_NULL(tracer);
tracer_.reset(tracer);

int er = span_msg_->init(
loop_,
+[](nsuv::ns_async*, WeakSpanCollector collector_wp) {
SharedSpanCollector collector = collector_wp.lock();
if (collector == nullptr) {
return;
}

collector->process_spans();
},
weak_from_this());
ASSERT_EQ(0, er);
er = span_timer_->init(loop_);
ASSERT_EQ(0, er);
er = span_timer_->start(+[](nsuv::ns_timer*,
WeakSpanCollector collector_wp) {
SharedSpanCollector collector = collector_wp.lock();
if (collector == nullptr) {
return;
}

collector->process_spans();
}, 0, interval_, weak_from_this());
ASSERT_EQ(0, er);
}

void SpanCollector::process_spans() {
Tracer::SpanStor span;
SpanVector spans;
while (span_msg_q_.dequeue(span)) {
spans.push_back(std::move(span));
}

// After processing, call the stored callback with the spans and additional
// data.
if (spans.size()) {
if (transform_callback_) {
transform_callback_(spans);
} else {
callback_(spans);
}
}
}

} // namespace nsolid
} // namespace node
91 changes: 91 additions & 0 deletions agents/src/span_collector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#ifndef AGENTS_SRC_SPAN_COLLECTOR_H_
#define AGENTS_SRC_SPAN_COLLECTOR_H_

#include <nsolid.h>
#include <nsolid/thread_safe.h>
#include "nsuv-inl.h"
#include <vector>

namespace node {
namespace nsolid {

class SpanCollector;

using SharedSpanCollector = std::shared_ptr<SpanCollector>;
using WeakSpanCollector = std::weak_ptr<SpanCollector>;
using SpanVector = std::vector<Tracer::SpanStor>;

/*
* SpanCollector is a class that allows to start collecting spans and report
* them to a callback function running in a specific uv_loop_t thread. The
* collector can be configured with a minimum span count and a time interval to
* report the spans. In addition, the collector can be configured with a
* transform function that will be applied to the spans before calling the final
* callback.
*/
class SpanCollector: public std::enable_shared_from_this<SpanCollector> {
public:
explicit SpanCollector(uv_loop_t* loop,
uint32_t trace_flags,
size_t min_span_count,
uint64_t interval);
~SpanCollector();

template <typename Cb, typename... Data>
void Collect(Cb&& cb, Data&&... data) {
// Store the callback and data
callback_ = std::bind(std::forward<Cb>(cb),
std::placeholders::_1,
std::forward<Data>(data)...);
this->do_collect();
}

template <typename Transform, typename Cb, typename... Data>
void CollectAndTransform(Transform&& transform, Cb&& cb, Data&&... data) {
// Store the transform function and callback
auto data_tuple = std::make_tuple(std::forward<Data>(data)...);
transform_callback_ = [transform = std::forward<Transform>(transform),
cb = std::forward<Cb>(cb),
data_tuple] (const SpanVector& spans) {
// Create a vector for transformed spans
using TransformedType =
decltype(transform(std::declval<Tracer::SpanStor>(),
std::declval<Data>()...));
std::vector<TransformedType> transformed_spans;
transformed_spans.reserve(spans.size());

// Apply the transform to each span
std::apply([&transform, &spans, &transformed_spans](const Data&... data) {
for (const auto& span : spans) {
transformed_spans.push_back(transform(span, data...));
}
}, data_tuple);

// Call the callback with the transformed spans and additional data
std::apply([&cb, &transformed_spans](const Data&... data) {
cb(transformed_spans, data...);
}, data_tuple);
};
this->do_collect();
}

private:
void do_collect();
void process_spans();

uv_loop_t* loop_;
uint32_t trace_flags_;
size_t min_span_count_;
std::unique_ptr<Tracer> tracer_;
nsuv::ns_async* span_msg_;
TSQueue<Tracer::SpanStor> span_msg_q_;
uint64_t interval_;
nsuv::ns_timer* span_timer_;
std::function<void(const SpanVector&)> callback_ = nullptr;
std::function<void(const SpanVector&)> transform_callback_ = nullptr;
};

} // namespace nsolid
} // namespace node

#endif // AGENTS_SRC_SPAN_COLLECTOR_H_
Loading

0 comments on commit 8c53110

Please sign in to comment.