From 350f273d724ff81d5bd6312e455940697649c53c Mon Sep 17 00:00:00 2001 From: Santiago Gimeno Date: Tue, 8 Oct 2024 22:13:24 +0200 Subject: [PATCH] agents: add missing text metrics - `thread.name` is added as an Attribute to every thread metric datapoing as we do with `thread.id`. - `user` and `title` from ProcessMetrics are added to the `Resource` as `process.owner` and `process.title` Attributes respectively. --- agents/otlp/src/otlp_agent.cc | 5 ++-- agents/otlp/src/otlp_common.cc | 34 ++++++++++++++++++++------ agents/otlp/src/otlp_common.h | 7 +++--- agents/otlp/src/otlp_metrics.cc | 26 ++++++++++++++------ agents/otlp/src/otlp_metrics.h | 3 --- test/agents/test-otlp-grpc-metrics.mjs | 21 ++++++++++++++-- test/agents/test-otlp-metrics.mjs | 23 ++++++++++++++--- 7 files changed, 90 insertions(+), 29 deletions(-) diff --git a/agents/otlp/src/otlp_agent.cc b/agents/otlp/src/otlp_agent.cc index b853e92eff..8faab01e9a 100644 --- a/agents/otlp/src/otlp_agent.cc +++ b/agents/otlp/src/otlp_agent.cc @@ -30,6 +30,7 @@ namespace trace = OPENTELEMETRY_NAMESPACE::trace; namespace resource = sdk::resource; namespace instrumentationscope = sdk::instrumentationscope; namespace detail = trace::propagation::detail; +using resource::ResourceAttributes; using resource::SemanticConventions::kServiceName; using resource::SemanticConventions::kServiceInstanceId; using resource::SemanticConventions::kServiceVersion; @@ -525,7 +526,7 @@ void OTLPAgent::config_otlp_endpoint(const json& config) { } metrics_exporter_.reset( - new OTLPMetrics(&loop_, *GetResource(), GetScope())); + new OTLPMetrics(&loop_, GetScope())); return; } @@ -549,7 +550,7 @@ void OTLPAgent::config_otlp_endpoint(const json& config) { } metrics_exporter_.reset( - new OTLPMetrics(&loop_, url, "", is_http, *GetResource(), GetScope())); + new OTLPMetrics(&loop_, url, "", is_http, GetScope())); } diff --git a/agents/otlp/src/otlp_common.cc b/agents/otlp/src/otlp_common.cc index 1ed9bf2a1c..4e646d907b 100644 --- a/agents/otlp/src/otlp_common.cc +++ b/agents/otlp/src/otlp_common.cc @@ -7,7 +7,6 @@ #include "nlohmann/json.hpp" #include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h" #include "opentelemetry/sdk/logs/recordable.h" -#include "opentelemetry/sdk/resource/resource.h" #include "opentelemetry/sdk/resource/semantic_conventions.h" #include "opentelemetry/sdk/trace/recordable.h" #include "opentelemetry/trace/propagation/detail/hex.h" @@ -48,6 +47,7 @@ using opentelemetry::trace::TraceFlags; using opentelemetry::trace::TraceId; using opentelemetry::trace::propagation::detail::HexToBinary; using opentelemetry::v1::trace::SemanticConventions::kThreadId; +using opentelemetry::v1::trace::SemanticConventions::kThreadName; namespace node { namespace nsolid { @@ -64,6 +64,10 @@ static std::vector discarded_metrics = { "thread_id", "timestamp" }; +static std::unique_ptr resource_g = + std::make_unique(Resource::GetEmpty()); +static bool isResourceInitialized_g = false; + // NOLINTNEXTLINE(runtime/references) static void add_counter(std::vector& metrics, const time_point& start, @@ -134,9 +138,7 @@ InstrumentationScope* GetScope() { } Resource* GetResource() { - static bool isResourceInitialized = false; - static auto resource = std::make_unique(Resource::GetEmpty()); - if (!isResourceInitialized) { + if (!isResourceInitialized_g) { json config = json::parse(nsolid::GetConfig(), nullptr, false); // assert because the runtime should never send me an invalid JSON config ASSERT(!config.is_discarded()); @@ -153,11 +155,26 @@ Resource* GetResource() { } // Directly construct a new Resource in the unique_ptr - resource = std::make_unique(Resource::Create(attrs)); - isResourceInitialized = true; + resource_g = std::make_unique(Resource::Create(attrs)); + isResourceInitialized_g = true; + } + + return resource_g.get(); +} + +Resource* UpdateResource(ResourceAttributes&& attrs) { + // First, get current kServiceName to avoid overwriting it with the default + // value "unknown_service". (See Resource::Create() method in the SDK). + auto resource = GetResource(); + auto attributes = resource->GetAttributes(); + if (attributes.find(kServiceName) != attributes.end()) { + attrs.SetAttribute(kServiceName, + std::get(attributes[kServiceName])); } - return resource.get(); + auto new_res = std::make_unique(Resource::Create(attrs)); + resource_g = std::make_unique(resource->Merge(*new_res)); + return resource_g.get(); } // NOLINTNEXTLINE(runtime/references) @@ -226,7 +243,8 @@ void fill_env_metrics(std::vector& metrics, ValueType value; PointAttributes attrs = { - { kThreadId, static_cast(stor.thread_id) } + { kThreadId, static_cast(stor.thread_id) }, + { kThreadName, stor.thread_name }, }; #define V(CType, CName, JSName, MType, Unit) \ diff --git a/agents/otlp/src/otlp_common.h b/agents/otlp/src/otlp_common.h index a4657fba8a..871e8261b8 100644 --- a/agents/otlp/src/otlp_common.h +++ b/agents/otlp/src/otlp_common.h @@ -3,6 +3,7 @@ #include "nsolid.h" #include "opentelemetry/sdk/metrics/data/metric_data.h" +#include "opentelemetry/sdk/resource/resource.h" // Class pre-declaration OPENTELEMETRY_BEGIN_NAMESPACE @@ -28,9 +29,6 @@ class InstrumentationScope; namespace logs { class Recordable; } -namespace resource { -class Resource; -} namespace trace { class Recordable; } @@ -46,6 +44,9 @@ OPENTELEMETRY_NAMESPACE::sdk::instrumentationscope::InstrumentationScope* OPENTELEMETRY_NAMESPACE::sdk::resource::Resource* GetResource(); +OPENTELEMETRY_NAMESPACE::sdk::resource::Resource* UpdateResource( + OPENTELEMETRY_NAMESPACE::sdk::resource::ResourceAttributes&&); + void fill_proc_metrics(std::vector&, const ProcessMetrics::MetricsStor& stor); diff --git a/agents/otlp/src/otlp_metrics.cc b/agents/otlp/src/otlp_metrics.cc index 49f6b2899a..3718d15ea0 100644 --- a/agents/otlp/src/otlp_metrics.cc +++ b/agents/otlp/src/otlp_metrics.cc @@ -39,11 +39,13 @@ using opentelemetry::sdk::metrics::ScopeMetrics; using opentelemetry::sdk::metrics::SumPointData; using opentelemetry::sdk::metrics::ValueType; using opentelemetry::sdk::resource::Resource; +using opentelemetry::sdk::resource::ResourceAttributes; using opentelemetry::v1::exporter::otlp::GetOtlpDefaultHttpMetricsProtocol; using opentelemetry::v1::exporter::otlp::OtlpGrpcMetricExporter; using opentelemetry::v1::exporter::otlp::OtlpGrpcMetricExporterOptions; using opentelemetry::v1::exporter::otlp::OtlpHttpMetricExporter; using opentelemetry::v1::exporter::otlp::OtlpHttpMetricExporterOptions; +using opentelemetry::v1::trace::SemanticConventions::kProcessOwner; using opentelemetry::v1::trace::SemanticConventions::kThreadId; namespace node { @@ -61,9 +63,7 @@ static std::vector discarded_metrics = { }; OTLPMetrics::OTLPMetrics(uv_loop_t* loop, - const Resource& resource, InstrumentationScope* scope): - resource_(resource), scope_(scope) { const std::string prot = GetOtlpDefaultHttpMetricsProtocol(); if (prot == "grpc") { @@ -80,9 +80,7 @@ OTLPMetrics::OTLPMetrics(uv_loop_t* loop, const std::string& url, const std::string& key, bool is_http, - const Resource& resource, InstrumentationScope* scope): - resource_(resource), scope_(scope), key_(key), url_(url) { @@ -107,11 +105,23 @@ OTLPMetrics::~OTLPMetrics() { void OTLPMetrics::got_proc_metrics(const ProcessMetricsStor& stor, const ProcessMetricsStor& prev_stor) { ResourceMetrics data; - data.resource_ = &resource_; + Resource* resource; + // Check if 'user' or 'title' are different from the previous metrics + if (prev_stor.user != stor.user || prev_stor.title != stor.title) { + ResourceAttributes attrs = { + { kProcessOwner, stor.user }, + { "process.title", stor.title }, + }; + + resource = UpdateResource(std::move(attrs)); + } else { + resource = GetResource(); + } + + data.resource_ = resource; std::vector metrics; fill_proc_metrics(metrics, stor); - data.scope_metric_data_ = - std::vector{{scope_, metrics}}; + data.scope_metric_data_ = std::vector{{scope_, metrics}}; auto result = otlp_metric_exporter_->Export(data); Debug("# ProcessMetrics Exported. Result: %d\n", static_cast(result)); } @@ -120,7 +130,7 @@ void OTLPMetrics::got_proc_metrics(const ProcessMetricsStor& stor, void OTLPMetrics::got_thr_metrics( const std::vector& thr_metrics) { ResourceMetrics data; - data.resource_ = &resource_; + data.resource_ = GetResource(); std::vector metrics; for (const auto& tm : thr_metrics) { diff --git a/agents/otlp/src/otlp_metrics.h b/agents/otlp/src/otlp_metrics.h index 77cc3fe4d4..9b215a220a 100644 --- a/agents/otlp/src/otlp_metrics.h +++ b/agents/otlp/src/otlp_metrics.h @@ -31,14 +31,12 @@ class OTLPMetrics final: public MetricsExporter { public: explicit OTLPMetrics( uv_loop_t* loop, - const OPENTELEMETRY_NAMESPACE::sdk::resource::Resource&, OPENTELEMETRY_NAMESPACE::sdk::instrumentationscope::InstrumentationScope*); explicit OTLPMetrics( uv_loop_t* loop, const std::string& url, const std::string& key, bool is_http, - const OPENTELEMETRY_NAMESPACE::sdk::resource::Resource&, OPENTELEMETRY_NAMESPACE::sdk::instrumentationscope::InstrumentationScope*); virtual ~OTLPMetrics(); @@ -52,7 +50,6 @@ class OTLPMetrics final: public MetricsExporter { private: std::unique_ptr otlp_metric_exporter_; - const OPENTELEMETRY_NAMESPACE::sdk::resource::Resource& resource_; OPENTELEMETRY_NAMESPACE::sdk::instrumentationscope::InstrumentationScope* scope_; std::string key_; diff --git a/test/agents/test-otlp-grpc-metrics.mjs b/test/agents/test-otlp-grpc-metrics.mjs index 8308237201..fa853ccce4 100644 --- a/test/agents/test-otlp-grpc-metrics.mjs +++ b/test/agents/test-otlp-grpc-metrics.mjs @@ -28,17 +28,22 @@ if (process.argv[2] === 'child') { tracingEnabled: false }); + nsolid.setThreadName('main-thread'); + const worker = new Worker(__filename, { argv: ['child'] }); process.send({ type: 'workerThreadId', id: worker.threadId }); process.send({ type: 'nsolid', id: nsolid.id, appName: nsolid.appName, + metrics: nsolid.metrics(), }); process.on('message', (message) => { assert.strictEqual(message, 'exit'); process.exit(0); }); + } else { + nsolid.setThreadName('worker-thread'); } } else { const expectedProcMetrics = [ @@ -436,19 +441,23 @@ if (process.argv[2] === 'child') { let nsolidId; let nsolidAppName; + let nsolidMetrics; function checkResource(resource) { validateArray(resource.attributes, 'attributes'); - assert.strictEqual(resource.attributes.length, 5); const expectedAttributes = { 'telemetry.sdk.version': '1.16.0', 'telemetry.sdk.language': 'cpp', 'telemetry.sdk.name': 'opentelemetry', 'service.instance.id': nsolidId, - 'service.name': nsolidAppName + 'service.name': nsolidAppName, + 'process.title': nsolidMetrics.title, + 'process.owner': nsolidMetrics.user, }; + assert.strictEqual(resource.attributes.length, Object.keys(expectedAttributes).length); + resource.attributes.forEach((attribute) => { assert.strictEqual(attribute.value.stringValue, expectedAttributes[attribute.key]); delete expectedAttributes[attribute.key]; @@ -484,6 +493,13 @@ if (process.argv[2] === 'child') { const attrIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.id' && a.value.intValue === `${context.threadId}`); if (attrIndex > -1) { indicesToRemove.push(i); + const nameIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.name'); + assert(nameIndex > -1); + if (context.threadId === 0) { // main-thread + assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'main-thread'); + } else { // worker-thread + assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'worker-thread'); + } } } else { indicesToRemove.push(i); @@ -583,6 +599,7 @@ if (process.argv[2] === 'child') { if (message.type === 'nsolid') { nsolidId = message.id; nsolidAppName = message.appName; + nsolidMetrics = message.metrics; } else if (message.type === 'workerThreadId') { context.threadList.push(message.id); } diff --git a/test/agents/test-otlp-metrics.mjs b/test/agents/test-otlp-metrics.mjs index 039d1a51ef..6f9460cc05 100644 --- a/test/agents/test-otlp-metrics.mjs +++ b/test/agents/test-otlp-metrics.mjs @@ -28,17 +28,22 @@ if (process.argv[2] === 'child') { tracingEnabled: false }); + nsolid.setThreadName('main-thread'); + const worker = new Worker(__filename, { argv: ['child'] }); process.send({ type: 'workerThreadId', id: worker.threadId }); process.send({ type: 'nsolid', id: nsolid.id, appName: nsolid.appName, + metrics: nsolid.metrics(), }); process.on('message', (message) => { assert.strictEqual(message, 'exit'); process.exit(0); }); + } else { + nsolid.setThreadName('worker-thread'); } } else { const expectedProcMetrics = [ @@ -436,19 +441,23 @@ if (process.argv[2] === 'child') { let nsolidId; let nsolidAppName; + let nsolidMetrics; function checkResource(resource) { validateArray(resource.attributes, 'attributes'); - assert.strictEqual(resource.attributes.length, 5); const expectedAttributes = { 'telemetry.sdk.version': '1.16.0', 'telemetry.sdk.language': 'cpp', 'telemetry.sdk.name': 'opentelemetry', 'service.instance.id': nsolidId, - 'service.name': nsolidAppName + 'service.name': nsolidAppName, + 'process.title': nsolidMetrics.title, + 'process.owner': nsolidMetrics.user, }; + assert.strictEqual(resource.attributes.length, Object.keys(expectedAttributes).length); + resource.attributes.forEach((attribute) => { assert.strictEqual(attribute.value.stringValue, expectedAttributes[attribute.key]); delete expectedAttributes[attribute.key]; @@ -484,6 +493,13 @@ if (process.argv[2] === 'child') { const attrIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.id' && a.value.intValue === `${context.threadId}`); if (attrIndex > -1) { indicesToRemove.push(i); + const nameIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.name'); + assert(nameIndex > -1); + if (context.threadId === 0) { // main-thread + assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'main-thread'); + } else { // worker-thread + assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'worker-thread'); + } } } else { indicesToRemove.push(i); @@ -561,7 +577,7 @@ if (process.argv[2] === 'child') { metrics: [], expected: [], threadId: null, - threadList: [ threadId ] + threadList: [ threadId ], }; async function runTest(getEnv) { @@ -585,6 +601,7 @@ if (process.argv[2] === 'child') { if (message.type === 'nsolid') { nsolidId = message.id; nsolidAppName = message.appName; + nsolidMetrics = message.metrics; } else if (message.type === 'workerThreadId') { context.threadList.push(message.id); }