Skip to content

Commit

Permalink
agents: add missing text metrics
Browse files Browse the repository at this point in the history
- `thread.name` is added as an Attribute to every thread metric
  datapoing as we do with `thread.id`.
- `user` and `title` from ProcessMetrics are added to the `Resource` as
  `process.owner` and `process.title` Attributes respectively.
  • Loading branch information
santigimeno committed Oct 14, 2024
1 parent fb29b0d commit 350f273
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 29 deletions.
5 changes: 3 additions & 2 deletions agents/otlp/src/otlp_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ namespace trace = OPENTELEMETRY_NAMESPACE::trace;
namespace resource = sdk::resource;
namespace instrumentationscope = sdk::instrumentationscope;
namespace detail = trace::propagation::detail;
using resource::ResourceAttributes;
using resource::SemanticConventions::kServiceName;
using resource::SemanticConventions::kServiceInstanceId;
using resource::SemanticConventions::kServiceVersion;
Expand Down Expand Up @@ -525,7 +526,7 @@ void OTLPAgent::config_otlp_endpoint(const json& config) {
}

metrics_exporter_.reset(
new OTLPMetrics(&loop_, *GetResource(), GetScope()));
new OTLPMetrics(&loop_, GetScope()));
return;
}

Expand All @@ -549,7 +550,7 @@ void OTLPAgent::config_otlp_endpoint(const json& config) {
}

metrics_exporter_.reset(
new OTLPMetrics(&loop_, url, "", is_http, *GetResource(), GetScope()));
new OTLPMetrics(&loop_, url, "", is_http, GetScope()));
}


Expand Down
34 changes: 26 additions & 8 deletions agents/otlp/src/otlp_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "nlohmann/json.hpp"
#include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h"
#include "opentelemetry/sdk/logs/recordable.h"
#include "opentelemetry/sdk/resource/resource.h"
#include "opentelemetry/sdk/resource/semantic_conventions.h"
#include "opentelemetry/sdk/trace/recordable.h"
#include "opentelemetry/trace/propagation/detail/hex.h"
Expand Down Expand Up @@ -48,6 +47,7 @@ using opentelemetry::trace::TraceFlags;
using opentelemetry::trace::TraceId;
using opentelemetry::trace::propagation::detail::HexToBinary;
using opentelemetry::v1::trace::SemanticConventions::kThreadId;
using opentelemetry::v1::trace::SemanticConventions::kThreadName;

namespace node {
namespace nsolid {
Expand All @@ -64,6 +64,10 @@ static std::vector<std::string> discarded_metrics = {
"thread_id", "timestamp"
};

static std::unique_ptr<Resource> resource_g =
std::make_unique<Resource>(Resource::GetEmpty());
static bool isResourceInitialized_g = false;

// NOLINTNEXTLINE(runtime/references)
static void add_counter(std::vector<MetricData>& metrics,
const time_point& start,
Expand Down Expand Up @@ -134,9 +138,7 @@ InstrumentationScope* GetScope() {
}

Resource* GetResource() {
static bool isResourceInitialized = false;
static auto resource = std::make_unique<Resource>(Resource::GetEmpty());
if (!isResourceInitialized) {
if (!isResourceInitialized_g) {
json config = json::parse(nsolid::GetConfig(), nullptr, false);
// assert because the runtime should never send me an invalid JSON config
ASSERT(!config.is_discarded());
Expand All @@ -153,11 +155,26 @@ Resource* GetResource() {
}

// Directly construct a new Resource in the unique_ptr
resource = std::make_unique<Resource>(Resource::Create(attrs));
isResourceInitialized = true;
resource_g = std::make_unique<Resource>(Resource::Create(attrs));
isResourceInitialized_g = true;
}

return resource_g.get();
}

Resource* UpdateResource(ResourceAttributes&& attrs) {
// First, get current kServiceName to avoid overwriting it with the default
// value "unknown_service". (See Resource::Create() method in the SDK).
auto resource = GetResource();
auto attributes = resource->GetAttributes();
if (attributes.find(kServiceName) != attributes.end()) {
attrs.SetAttribute(kServiceName,
std::get<std::string>(attributes[kServiceName]));
}

return resource.get();
auto new_res = std::make_unique<Resource>(Resource::Create(attrs));
resource_g = std::make_unique<Resource>(resource->Merge(*new_res));
return resource_g.get();
}

// NOLINTNEXTLINE(runtime/references)
Expand Down Expand Up @@ -226,7 +243,8 @@ void fill_env_metrics(std::vector<MetricData>& metrics,
ValueType value;

PointAttributes attrs = {
{ kThreadId, static_cast<int64_t>(stor.thread_id) }
{ kThreadId, static_cast<int64_t>(stor.thread_id) },
{ kThreadName, stor.thread_name },
};

#define V(CType, CName, JSName, MType, Unit) \
Expand Down
7 changes: 4 additions & 3 deletions agents/otlp/src/otlp_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "nsolid.h"
#include "opentelemetry/sdk/metrics/data/metric_data.h"
#include "opentelemetry/sdk/resource/resource.h"

// Class pre-declaration
OPENTELEMETRY_BEGIN_NAMESPACE
Expand All @@ -28,9 +29,6 @@ class InstrumentationScope;
namespace logs {
class Recordable;
}
namespace resource {
class Resource;
}
namespace trace {
class Recordable;
}
Expand All @@ -46,6 +44,9 @@ OPENTELEMETRY_NAMESPACE::sdk::instrumentationscope::InstrumentationScope*

OPENTELEMETRY_NAMESPACE::sdk::resource::Resource* GetResource();

OPENTELEMETRY_NAMESPACE::sdk::resource::Resource* UpdateResource(
OPENTELEMETRY_NAMESPACE::sdk::resource::ResourceAttributes&&);

void fill_proc_metrics(std::vector<opentelemetry::sdk::metrics::MetricData>&,
const ProcessMetrics::MetricsStor& stor);

Expand Down
26 changes: 18 additions & 8 deletions agents/otlp/src/otlp_metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ using opentelemetry::sdk::metrics::ScopeMetrics;
using opentelemetry::sdk::metrics::SumPointData;
using opentelemetry::sdk::metrics::ValueType;
using opentelemetry::sdk::resource::Resource;
using opentelemetry::sdk::resource::ResourceAttributes;
using opentelemetry::v1::exporter::otlp::GetOtlpDefaultHttpMetricsProtocol;
using opentelemetry::v1::exporter::otlp::OtlpGrpcMetricExporter;
using opentelemetry::v1::exporter::otlp::OtlpGrpcMetricExporterOptions;
using opentelemetry::v1::exporter::otlp::OtlpHttpMetricExporter;
using opentelemetry::v1::exporter::otlp::OtlpHttpMetricExporterOptions;
using opentelemetry::v1::trace::SemanticConventions::kProcessOwner;
using opentelemetry::v1::trace::SemanticConventions::kThreadId;

namespace node {
Expand All @@ -61,9 +63,7 @@ static std::vector<std::string> discarded_metrics = {
};

OTLPMetrics::OTLPMetrics(uv_loop_t* loop,
const Resource& resource,
InstrumentationScope* scope):
resource_(resource),
scope_(scope) {
const std::string prot = GetOtlpDefaultHttpMetricsProtocol();
if (prot == "grpc") {
Expand All @@ -80,9 +80,7 @@ OTLPMetrics::OTLPMetrics(uv_loop_t* loop,
const std::string& url,
const std::string& key,
bool is_http,
const Resource& resource,
InstrumentationScope* scope):
resource_(resource),
scope_(scope),
key_(key),
url_(url) {
Expand All @@ -107,11 +105,23 @@ OTLPMetrics::~OTLPMetrics() {
void OTLPMetrics::got_proc_metrics(const ProcessMetricsStor& stor,
const ProcessMetricsStor& prev_stor) {
ResourceMetrics data;
data.resource_ = &resource_;
Resource* resource;
// Check if 'user' or 'title' are different from the previous metrics
if (prev_stor.user != stor.user || prev_stor.title != stor.title) {
ResourceAttributes attrs = {
{ kProcessOwner, stor.user },
{ "process.title", stor.title },
};

resource = UpdateResource(std::move(attrs));
} else {
resource = GetResource();
}

data.resource_ = resource;
std::vector<MetricData> metrics;
fill_proc_metrics(metrics, stor);
data.scope_metric_data_ =
std::vector<ScopeMetrics>{{scope_, metrics}};
data.scope_metric_data_ = std::vector<ScopeMetrics>{{scope_, metrics}};
auto result = otlp_metric_exporter_->Export(data);
Debug("# ProcessMetrics Exported. Result: %d\n", static_cast<int>(result));
}
Expand All @@ -120,7 +130,7 @@ void OTLPMetrics::got_proc_metrics(const ProcessMetricsStor& stor,
void OTLPMetrics::got_thr_metrics(
const std::vector<MetricsExporter::ThrMetricsStor>& thr_metrics) {
ResourceMetrics data;
data.resource_ = &resource_;
data.resource_ = GetResource();
std::vector<MetricData> metrics;

for (const auto& tm : thr_metrics) {
Expand Down
3 changes: 0 additions & 3 deletions agents/otlp/src/otlp_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@ class OTLPMetrics final: public MetricsExporter {
public:
explicit OTLPMetrics(
uv_loop_t* loop,
const OPENTELEMETRY_NAMESPACE::sdk::resource::Resource&,
OPENTELEMETRY_NAMESPACE::sdk::instrumentationscope::InstrumentationScope*);
explicit OTLPMetrics(
uv_loop_t* loop,
const std::string& url,
const std::string& key,
bool is_http,
const OPENTELEMETRY_NAMESPACE::sdk::resource::Resource&,
OPENTELEMETRY_NAMESPACE::sdk::instrumentationscope::InstrumentationScope*);

virtual ~OTLPMetrics();
Expand All @@ -52,7 +50,6 @@ class OTLPMetrics final: public MetricsExporter {
private:
std::unique_ptr<OPENTELEMETRY_NAMESPACE::sdk::metrics::PushMetricExporter>
otlp_metric_exporter_;
const OPENTELEMETRY_NAMESPACE::sdk::resource::Resource& resource_;
OPENTELEMETRY_NAMESPACE::sdk::instrumentationscope::InstrumentationScope*
scope_;
std::string key_;
Expand Down
21 changes: 19 additions & 2 deletions test/agents/test-otlp-grpc-metrics.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,22 @@ if (process.argv[2] === 'child') {
tracingEnabled: false
});

nsolid.setThreadName('main-thread');

const worker = new Worker(__filename, { argv: ['child'] });
process.send({ type: 'workerThreadId', id: worker.threadId });
process.send({
type: 'nsolid',
id: nsolid.id,
appName: nsolid.appName,
metrics: nsolid.metrics(),
});
process.on('message', (message) => {
assert.strictEqual(message, 'exit');
process.exit(0);
});
} else {
nsolid.setThreadName('worker-thread');
}
} else {
const expectedProcMetrics = [
Expand Down Expand Up @@ -436,19 +441,23 @@ if (process.argv[2] === 'child') {

let nsolidId;
let nsolidAppName;
let nsolidMetrics;

function checkResource(resource) {
validateArray(resource.attributes, 'attributes');
assert.strictEqual(resource.attributes.length, 5);

const expectedAttributes = {
'telemetry.sdk.version': '1.16.0',
'telemetry.sdk.language': 'cpp',
'telemetry.sdk.name': 'opentelemetry',
'service.instance.id': nsolidId,
'service.name': nsolidAppName
'service.name': nsolidAppName,
'process.title': nsolidMetrics.title,
'process.owner': nsolidMetrics.user,
};

assert.strictEqual(resource.attributes.length, Object.keys(expectedAttributes).length);

resource.attributes.forEach((attribute) => {
assert.strictEqual(attribute.value.stringValue, expectedAttributes[attribute.key]);
delete expectedAttributes[attribute.key];
Expand Down Expand Up @@ -484,6 +493,13 @@ if (process.argv[2] === 'child') {
const attrIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.id' && a.value.intValue === `${context.threadId}`);
if (attrIndex > -1) {
indicesToRemove.push(i);
const nameIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.name');
assert(nameIndex > -1);
if (context.threadId === 0) { // main-thread
assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'main-thread');
} else { // worker-thread
assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'worker-thread');
}
}
} else {
indicesToRemove.push(i);
Expand Down Expand Up @@ -583,6 +599,7 @@ if (process.argv[2] === 'child') {
if (message.type === 'nsolid') {
nsolidId = message.id;
nsolidAppName = message.appName;
nsolidMetrics = message.metrics;
} else if (message.type === 'workerThreadId') {
context.threadList.push(message.id);
}
Expand Down
23 changes: 20 additions & 3 deletions test/agents/test-otlp-metrics.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,22 @@ if (process.argv[2] === 'child') {
tracingEnabled: false
});

nsolid.setThreadName('main-thread');

const worker = new Worker(__filename, { argv: ['child'] });
process.send({ type: 'workerThreadId', id: worker.threadId });
process.send({
type: 'nsolid',
id: nsolid.id,
appName: nsolid.appName,
metrics: nsolid.metrics(),
});
process.on('message', (message) => {
assert.strictEqual(message, 'exit');
process.exit(0);
});
} else {
nsolid.setThreadName('worker-thread');
}
} else {
const expectedProcMetrics = [
Expand Down Expand Up @@ -436,19 +441,23 @@ if (process.argv[2] === 'child') {

let nsolidId;
let nsolidAppName;
let nsolidMetrics;

function checkResource(resource) {
validateArray(resource.attributes, 'attributes');
assert.strictEqual(resource.attributes.length, 5);

const expectedAttributes = {
'telemetry.sdk.version': '1.16.0',
'telemetry.sdk.language': 'cpp',
'telemetry.sdk.name': 'opentelemetry',
'service.instance.id': nsolidId,
'service.name': nsolidAppName
'service.name': nsolidAppName,
'process.title': nsolidMetrics.title,
'process.owner': nsolidMetrics.user,
};

assert.strictEqual(resource.attributes.length, Object.keys(expectedAttributes).length);

resource.attributes.forEach((attribute) => {
assert.strictEqual(attribute.value.stringValue, expectedAttributes[attribute.key]);
delete expectedAttributes[attribute.key];
Expand Down Expand Up @@ -484,6 +493,13 @@ if (process.argv[2] === 'child') {
const attrIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.id' && a.value.intValue === `${context.threadId}`);
if (attrIndex > -1) {
indicesToRemove.push(i);
const nameIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.name');
assert(nameIndex > -1);
if (context.threadId === 0) { // main-thread
assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'main-thread');
} else { // worker-thread
assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'worker-thread');
}
}
} else {
indicesToRemove.push(i);
Expand Down Expand Up @@ -561,7 +577,7 @@ if (process.argv[2] === 'child') {
metrics: [],
expected: [],
threadId: null,
threadList: [ threadId ]
threadList: [ threadId ],
};

async function runTest(getEnv) {
Expand All @@ -585,6 +601,7 @@ if (process.argv[2] === 'child') {
if (message.type === 'nsolid') {
nsolidId = message.id;
nsolidAppName = message.appName;
nsolidMetrics = message.metrics;
} else if (message.type === 'workerThreadId') {
context.threadList.push(message.id);
}
Expand Down

0 comments on commit 350f273

Please sign in to comment.