Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agents: implement SpanCollector helper class #160

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,8 @@ LINT_CPP_EXCLUDE += src/tracing/trace_event.h src/tracing/trace_event_common.h
LINT_CPP_FILES = $(filter-out $(LINT_CPP_EXCLUDE), $(wildcard \
agents/otlp/src/*.cc \
agents/otlp/src/*.h \
agents/src/*.cc \
agents/src/*.h \
agents/statsd/src/*.cc \
agents/statsd/src/*.h \
agents/zmq/src/*.cc \
Expand Down
73 changes: 33 additions & 40 deletions agents/otlp/src/otlp_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "opentelemetry/exporters/otlp/otlp_http_exporter.h"
#include "opentelemetry/ext/http/client/curl/http_client_curl.h"
#include "opentelemetry/trace/propagation/detail/hex.h"
#include "../../src/span_collector.h"

using ThreadMetricsStor = node::nsolid::ThreadMetrics::MetricsStor;
using nlohmann::json;
Expand All @@ -40,6 +41,9 @@ namespace node {
namespace nsolid {
namespace otlp {

constexpr uint64_t span_timer_interval = 1000;
constexpr size_t span_msg_q_min_size = 200;

template <typename... Args>
inline void Debug(Args&&... args) {
per_process::Debug(DebugCategory::NSOLID_OTLP_AGENT,
Expand Down Expand Up @@ -167,7 +171,7 @@ int OTLPAgent::config(const nlohmann::json& config) {
}

// Configure tracing flags
if (trace_flags_ == 0 ||
if (span_collector_ == nullptr ||
utils::find_any_fields_in_diff(diff, tracing_fields)) {
trace_flags_ = 0;
if (otlp_exporter_ != nullptr) {
Expand Down Expand Up @@ -312,7 +316,6 @@ void OTLPAgent::do_start() {
uv_mutex_lock(&start_lock_);
ASSERT_EQ(0, shutdown_.init(&loop_, shutdown_cb_, this));
ASSERT_EQ(0, env_msg_.init(&loop_, env_msg_cb_, this));
ASSERT_EQ(0, span_msg_.init(&loop_, span_msg_cb_, this));
ASSERT_EQ(0, metrics_msg_.init(&loop_, metrics_msg_cb_, this));
ASSERT_EQ(0, metrics_timer_.init(&loop_));
ASSERT_EQ(0, config_msg_.init(&loop_, config_msg_cb_, this));
Expand All @@ -334,46 +337,17 @@ void OTLPAgent::do_stop() {
ready_ = false;
shutdown_.close();
env_msg_.close();
span_msg_.close();
metrics_msg_.close();
metrics_timer_.close();
config_msg_.close();
metrics_exporter_.reset(nullptr);
span_collector_.reset();
}


void OTLPAgent::trace_hook_(Tracer* tracer,
const Tracer::SpanStor& stor,
OTLPAgent* agent) {
nsuv::ns_rwlock::scoped_rdlock lock(exit_lock_);
if (!is_running_ || !agent->ready_) {
return;
}

agent->span_msg_q_.enqueue(stor);
ASSERT_EQ(0, agent->span_msg_.send());
}


void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {
// Don't exit until all the pending spans are sent
nsuv::ns_rwlock::scoped_rdlock lock(exit_lock_);
if (!is_running_) {
return;
}

using std::chrono::duration_cast;
using std::chrono::milliseconds;
using std::chrono::nanoseconds;
std::vector<std::unique_ptr<sdk::trace::Recordable>> recordables;
Tracer::SpanStor s;
while (agent->span_msg_q_.dequeue(s)) {
auto recordable = agent->otlp_exporter_->MakeRecordable();
fill_recordable(recordable.get(), s);
recordables.push_back(std::move(recordable));
}

auto result = agent->otlp_exporter_->Export(recordables);
void OTLPAgent::got_spans(const UniqRecordables& recordables) {
auto result =
otlp_exporter_->Export(const_cast<UniqRecordables&>(recordables));
Debug("# Spans Exported: %ld. Result: %d\n",
recordables.size(),
static_cast<int>(result));
Expand Down Expand Up @@ -615,14 +589,33 @@ int OTLPAgent::setup_metrics_timer(uint64_t period) {
}

void OTLPAgent::update_tracer(uint32_t flags) {
tracer_.reset(nullptr);
if (flags) {
Tracer* tracer = Tracer::CreateInstance(flags, trace_hook_, this);
ASSERT_NOT_NULL(tracer);
tracer_.reset(tracer);
Debug("Updating Tracer with flags: %u\n", flags);
span_collector_.reset();
if (trace_flags_) {
span_collector_ = std::make_shared<SpanCollector>(&loop_,
trace_flags_,
span_msg_q_min_size,
span_timer_interval);
span_collector_->CollectAndTransform(
transf, +[](UniqRecordables& spans, OTLPAgent* agent) {
nsuv::ns_rwlock::scoped_rdlock lock(exit_lock_);
if (!is_running_) {
return;
}

agent->got_spans(spans);
}, this);
}
}

UniqRecordable OTLPAgent::transf(const Tracer::SpanStor& span,
OTLPAgent* agent) {
// Transform the span
fprintf(stderr, "Transforming span\n");
auto recordable = agent->otlp_exporter_->MakeRecordable();
fill_recordable(recordable.get(), span);
return recordable;
}
} // namespace otlp
} // namespace nsolid
} // namespace node
21 changes: 12 additions & 9 deletions agents/otlp/src/otlp_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,21 @@ struct OtlpGrpcExporterOptions;
}
namespace sdk {
namespace trace {
class Recordable;
class SpanExporter;
}
}
OPENTELEMETRY_END_NAMESPACE

using UniqRecordable =
std::unique_ptr<OPENTELEMETRY_NAMESPACE::sdk::trace::Recordable>;
using UniqRecordables = std::vector<UniqRecordable>;

namespace node {
namespace nsolid {

class SpanCollector;

namespace otlp {

struct JSThreadMetrics {
Expand Down Expand Up @@ -68,18 +75,14 @@ class OTLPAgent {

static void on_thread_remove_(SharedEnvInst, OTLPAgent* agent);

static void trace_hook_(Tracer* tracer,
const Tracer::SpanStor& stor,
OTLPAgent* agent);

static void span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent);

static void metrics_timer_cb_(nsuv::ns_timer*, OTLPAgent*);

static void metrics_msg_cb_(nsuv::ns_async*, OTLPAgent* agent);

static void thr_metrics_cb_(SharedThreadMetrics, OTLPAgent*);

static UniqRecordable transf(const Tracer::SpanStor& span, OTLPAgent* agent);

void do_start();

void do_stop();
Expand All @@ -99,6 +102,8 @@ class OTLPAgent {

void got_proc_metrics();

void got_spans(const UniqRecordables& spans);

void setup_trace_otlp_exporter( // NOLINTNEXTLINE(runtime/references)
OPENTELEMETRY_NAMESPACE::exporter::otlp::OtlpHttpExporterOptions& opts);

Expand All @@ -124,10 +129,8 @@ class OTLPAgent {
TSQueue<std::tuple<SharedEnvInst, bool>> env_msg_q_;

// For the Tracing API
std::unique_ptr<Tracer> tracer_;
nsuv::ns_async span_msg_;
TSQueue<Tracer::SpanStor> span_msg_q_;
uint32_t trace_flags_;
std::shared_ptr<SpanCollector> span_collector_;

std::unique_ptr<OPENTELEMETRY_NAMESPACE::sdk::trace::SpanExporter>
otlp_exporter_;
Expand Down
6 changes: 3 additions & 3 deletions agents/src/http_client.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef AGENTS_COMMON_SRC_HTTP_CLIENT_H_
#define AGENTS_COMMON_SRC_HTTP_CLIENT_H_
#ifndef AGENTS_SRC_HTTP_CLIENT_H_
#define AGENTS_SRC_HTTP_CLIENT_H_

#include <curl/curl.h>
#include <string>
Expand All @@ -26,7 +26,7 @@ class CurlContext {
int start(int events, poll_cb cb);

private:
friend class HttpClient;
friend class HttpClient;
HttpClient* http_client_;
nsuv::ns_poll* poll_handle_;
curl_socket_t sockfd_;
Expand Down
89 changes: 89 additions & 0 deletions agents/src/span_collector.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#include "span_collector.h"
#include "asserts-cpp/asserts.h"

namespace node {
namespace nsolid {

SpanCollector::SpanCollector(uv_loop_t* loop,
uint32_t trace_flags,
size_t min_span_count,
uint64_t interval):
loop_(loop),
trace_flags_(trace_flags),
min_span_count_(min_span_count),
span_msg_(new nsuv::ns_async()),
interval_(interval),
span_timer_(new nsuv::ns_timer()) {
}

SpanCollector::~SpanCollector() {
span_timer_->close_and_delete();
span_msg_->close_and_delete();
}

void SpanCollector::do_collect() {
// Create the instance with a callback that enqueues spans and sends a message
Tracer* tracer = Tracer::CreateInstance(
trace_flags_,
+[](Tracer*, Tracer::SpanStor span, WeakSpanCollector collector_wp) {
SharedSpanCollector collector = collector_wp.lock();
if (collector == nullptr) {
return;
}

// Only notify the loop_ thread if the queue reaches min_span_count_to
// avoid too many uv_async_t::send() calls.
if (collector->span_msg_q_.enqueue(span) > collector->min_span_count_) {
ASSERT_EQ(0, collector->span_msg_->send());
}
},
weak_from_this());
ASSERT_NOT_NULL(tracer);
tracer_.reset(tracer);

int er = span_msg_->init(
loop_,
+[](nsuv::ns_async*, WeakSpanCollector collector_wp) {
SharedSpanCollector collector = collector_wp.lock();
if (collector == nullptr) {
return;
}

collector->process_spans();
},
weak_from_this());
ASSERT_EQ(0, er);
er = span_timer_->init(loop_);
ASSERT_EQ(0, er);
er = span_timer_->start(+[](nsuv::ns_timer*,
WeakSpanCollector collector_wp) {
SharedSpanCollector collector = collector_wp.lock();
if (collector == nullptr) {
return;
}

collector->process_spans();
}, 0, interval_, weak_from_this());
ASSERT_EQ(0, er);
}

void SpanCollector::process_spans() {
Tracer::SpanStor span;
SpanVector spans;
while (span_msg_q_.dequeue(span)) {
spans.push_back(std::move(span));
}

// After processing, call the stored callback with the spans and additional
// data.
if (spans.size()) {
if (transform_callback_) {
transform_callback_(spans);
} else {
callback_(spans);
}
}
}

} // namespace nsolid
} // namespace node
91 changes: 91 additions & 0 deletions agents/src/span_collector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#ifndef AGENTS_SRC_SPAN_COLLECTOR_H_
#define AGENTS_SRC_SPAN_COLLECTOR_H_

#include <nsolid.h>
#include <nsolid/thread_safe.h>
#include "nsuv-inl.h"
#include <vector>

namespace node {
namespace nsolid {

class SpanCollector;

using SharedSpanCollector = std::shared_ptr<SpanCollector>;
using WeakSpanCollector = std::weak_ptr<SpanCollector>;
using SpanVector = std::vector<Tracer::SpanStor>;

/*
* SpanCollector is a class that allows to start collecting spans and report
* them to a callback function running in a specific uv_loop_t thread. The
* collector can be configured with a minimum span count and a time interval to
* report the spans. In addition, the collector can be configured with a
* transform function that will be applied to the spans before calling the final
* callback.
*/
class SpanCollector: public std::enable_shared_from_this<SpanCollector> {
public:
explicit SpanCollector(uv_loop_t* loop,
uint32_t trace_flags,
size_t min_span_count,
uint64_t interval);
~SpanCollector();

template <typename Cb, typename... Data>
void Collect(Cb&& cb, Data&&... data) {
// Store the callback and data
callback_ = std::bind(std::forward<Cb>(cb),
std::placeholders::_1,
std::forward<Data>(data)...);
this->do_collect();
}

template <typename Transform, typename Cb, typename... Data>
void CollectAndTransform(Transform&& transform, Cb&& cb, Data&&... data) {
// Store the transform function and callback
auto data_tuple = std::make_tuple(std::forward<Data>(data)...);
transform_callback_ = [transform = std::forward<Transform>(transform),
cb = std::forward<Cb>(cb),
data_tuple] (const SpanVector& spans) {
// Create a vector for transformed spans
using TransformedType =
decltype(transform(std::declval<Tracer::SpanStor>(),
std::declval<Data>()...));
std::vector<TransformedType> transformed_spans;
transformed_spans.reserve(spans.size());

// Apply the transform to each span
std::apply([&transform, &spans, &transformed_spans](const Data&... data) {
for (const auto& span : spans) {
transformed_spans.push_back(transform(span, data...));
}
}, data_tuple);

// Call the callback with the transformed spans and additional data
std::apply([&cb, &transformed_spans](const Data&... data) {
cb(transformed_spans, data...);
}, data_tuple);
};
this->do_collect();
}

private:
void do_collect();
void process_spans();

uv_loop_t* loop_;
uint32_t trace_flags_;
size_t min_span_count_;
std::unique_ptr<Tracer> tracer_;
nsuv::ns_async* span_msg_;
TSQueue<Tracer::SpanStor> span_msg_q_;
uint64_t interval_;
nsuv::ns_timer* span_timer_;
std::function<void(const SpanVector&)> callback_ = nullptr;
std::function<void(const SpanVector&)> transform_callback_ = nullptr;
};

} // namespace nsolid
} // namespace node

#endif // AGENTS_SRC_SPAN_COLLECTOR_H_
Loading
Loading