Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agents: add missing text metrics #193

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions agents/otlp/src/otlp_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ namespace trace = OPENTELEMETRY_NAMESPACE::trace;
namespace resource = sdk::resource;
namespace instrumentationscope = sdk::instrumentationscope;
namespace detail = trace::propagation::detail;
using resource::ResourceAttributes;
using resource::SemanticConventions::kServiceName;
using resource::SemanticConventions::kServiceInstanceId;
using resource::SemanticConventions::kServiceVersion;
Expand Down Expand Up @@ -525,7 +526,7 @@ void OTLPAgent::config_otlp_endpoint(const json& config) {
}

metrics_exporter_.reset(
new OTLPMetrics(&loop_, *GetResource(), GetScope()));
new OTLPMetrics(&loop_, GetScope()));
return;
}

Expand All @@ -549,7 +550,7 @@ void OTLPAgent::config_otlp_endpoint(const json& config) {
}

metrics_exporter_.reset(
new OTLPMetrics(&loop_, url, "", is_http, *GetResource(), GetScope()));
new OTLPMetrics(&loop_, url, "", is_http, GetScope()));
}


Expand Down
34 changes: 26 additions & 8 deletions agents/otlp/src/otlp_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "nlohmann/json.hpp"
#include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h"
#include "opentelemetry/sdk/logs/recordable.h"
#include "opentelemetry/sdk/resource/resource.h"
#include "opentelemetry/sdk/resource/semantic_conventions.h"
#include "opentelemetry/sdk/trace/recordable.h"
#include "opentelemetry/trace/propagation/detail/hex.h"
Expand Down Expand Up @@ -48,6 +47,7 @@ using opentelemetry::trace::TraceFlags;
using opentelemetry::trace::TraceId;
using opentelemetry::trace::propagation::detail::HexToBinary;
using opentelemetry::v1::trace::SemanticConventions::kThreadId;
using opentelemetry::v1::trace::SemanticConventions::kThreadName;

namespace node {
namespace nsolid {
Expand All @@ -64,6 +64,10 @@ static std::vector<std::string> discarded_metrics = {
"thread_id", "timestamp"
};

static std::unique_ptr<Resource> resource_g =
std::make_unique<Resource>(Resource::GetEmpty());
static bool isResourceInitialized_g = false;

// NOLINTNEXTLINE(runtime/references)
static void add_counter(std::vector<MetricData>& metrics,
const time_point& start,
Expand Down Expand Up @@ -134,9 +138,7 @@ InstrumentationScope* GetScope() {
}

Resource* GetResource() {
static bool isResourceInitialized = false;
static auto resource = std::make_unique<Resource>(Resource::GetEmpty());
if (!isResourceInitialized) {
if (!isResourceInitialized_g) {
json config = json::parse(nsolid::GetConfig(), nullptr, false);
// assert because the runtime should never send me an invalid JSON config
ASSERT(!config.is_discarded());
Expand All @@ -153,11 +155,26 @@ Resource* GetResource() {
}

// Directly construct a new Resource in the unique_ptr
resource = std::make_unique<Resource>(Resource::Create(attrs));
isResourceInitialized = true;
resource_g = std::make_unique<Resource>(Resource::Create(attrs));
isResourceInitialized_g = true;
}

return resource_g.get();
}

Resource* UpdateResource(ResourceAttributes&& attrs) {
// First, get current kServiceName to avoid overwriting it with the default
// value "unknown_service". (See Resource::Create() method in the SDK).
auto resource = GetResource();
auto attributes = resource->GetAttributes();
if (attributes.find(kServiceName) != attributes.end()) {
attrs.SetAttribute(kServiceName,
std::get<std::string>(attributes[kServiceName]));
}

return resource.get();
auto new_res = std::make_unique<Resource>(Resource::Create(attrs));
resource_g = std::make_unique<Resource>(resource->Merge(*new_res));
return resource_g.get();
}

// NOLINTNEXTLINE(runtime/references)
Expand Down Expand Up @@ -226,7 +243,8 @@ void fill_env_metrics(std::vector<MetricData>& metrics,
ValueType value;

PointAttributes attrs = {
{ kThreadId, static_cast<int64_t>(stor.thread_id) }
{ kThreadId, static_cast<int64_t>(stor.thread_id) },
{ kThreadName, stor.thread_name },
};

#define V(CType, CName, JSName, MType, Unit) \
Expand Down
7 changes: 4 additions & 3 deletions agents/otlp/src/otlp_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "nsolid.h"
#include "opentelemetry/sdk/metrics/data/metric_data.h"
#include "opentelemetry/sdk/resource/resource.h"

// Class pre-declaration
OPENTELEMETRY_BEGIN_NAMESPACE
Expand All @@ -28,9 +29,6 @@ class InstrumentationScope;
namespace logs {
class Recordable;
}
namespace resource {
class Resource;
}
namespace trace {
class Recordable;
}
Expand All @@ -46,6 +44,9 @@ OPENTELEMETRY_NAMESPACE::sdk::instrumentationscope::InstrumentationScope*

OPENTELEMETRY_NAMESPACE::sdk::resource::Resource* GetResource();

OPENTELEMETRY_NAMESPACE::sdk::resource::Resource* UpdateResource(
OPENTELEMETRY_NAMESPACE::sdk::resource::ResourceAttributes&&);

void fill_proc_metrics(std::vector<opentelemetry::sdk::metrics::MetricData>&,
const ProcessMetrics::MetricsStor& stor);

Expand Down
26 changes: 18 additions & 8 deletions agents/otlp/src/otlp_metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ using opentelemetry::sdk::metrics::ScopeMetrics;
using opentelemetry::sdk::metrics::SumPointData;
using opentelemetry::sdk::metrics::ValueType;
using opentelemetry::sdk::resource::Resource;
using opentelemetry::sdk::resource::ResourceAttributes;
using opentelemetry::v1::exporter::otlp::GetOtlpDefaultHttpMetricsProtocol;
using opentelemetry::v1::exporter::otlp::OtlpGrpcMetricExporter;
using opentelemetry::v1::exporter::otlp::OtlpGrpcMetricExporterOptions;
using opentelemetry::v1::exporter::otlp::OtlpHttpMetricExporter;
using opentelemetry::v1::exporter::otlp::OtlpHttpMetricExporterOptions;
using opentelemetry::v1::trace::SemanticConventions::kProcessOwner;
using opentelemetry::v1::trace::SemanticConventions::kThreadId;

namespace node {
Expand All @@ -61,9 +63,7 @@ static std::vector<std::string> discarded_metrics = {
};

OTLPMetrics::OTLPMetrics(uv_loop_t* loop,
const Resource& resource,
InstrumentationScope* scope):
resource_(resource),
scope_(scope) {
const std::string prot = GetOtlpDefaultHttpMetricsProtocol();
if (prot == "grpc") {
Expand All @@ -80,9 +80,7 @@ OTLPMetrics::OTLPMetrics(uv_loop_t* loop,
const std::string& url,
const std::string& key,
bool is_http,
const Resource& resource,
InstrumentationScope* scope):
resource_(resource),
scope_(scope),
key_(key),
url_(url) {
Expand All @@ -107,11 +105,23 @@ OTLPMetrics::~OTLPMetrics() {
void OTLPMetrics::got_proc_metrics(const ProcessMetricsStor& stor,
const ProcessMetricsStor& prev_stor) {
ResourceMetrics data;
data.resource_ = &resource_;
Resource* resource;
// Check if 'user' or 'title' are different from the previous metrics
if (prev_stor.user != stor.user || prev_stor.title != stor.title) {
ResourceAttributes attrs = {
{ kProcessOwner, stor.user },
{ "process.title", stor.title },
};

resource = UpdateResource(std::move(attrs));
} else {
resource = GetResource();
}

data.resource_ = resource;
std::vector<MetricData> metrics;
fill_proc_metrics(metrics, stor);
data.scope_metric_data_ =
std::vector<ScopeMetrics>{{scope_, metrics}};
data.scope_metric_data_ = std::vector<ScopeMetrics>{{scope_, metrics}};
auto result = otlp_metric_exporter_->Export(data);
Debug("# ProcessMetrics Exported. Result: %d\n", static_cast<int>(result));
}
Expand All @@ -120,7 +130,7 @@ void OTLPMetrics::got_proc_metrics(const ProcessMetricsStor& stor,
void OTLPMetrics::got_thr_metrics(
const std::vector<MetricsExporter::ThrMetricsStor>& thr_metrics) {
ResourceMetrics data;
data.resource_ = &resource_;
data.resource_ = GetResource();
std::vector<MetricData> metrics;

for (const auto& tm : thr_metrics) {
Expand Down
3 changes: 0 additions & 3 deletions agents/otlp/src/otlp_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@ class OTLPMetrics final: public MetricsExporter {
public:
explicit OTLPMetrics(
uv_loop_t* loop,
const OPENTELEMETRY_NAMESPACE::sdk::resource::Resource&,
OPENTELEMETRY_NAMESPACE::sdk::instrumentationscope::InstrumentationScope*);
explicit OTLPMetrics(
uv_loop_t* loop,
const std::string& url,
const std::string& key,
bool is_http,
const OPENTELEMETRY_NAMESPACE::sdk::resource::Resource&,
OPENTELEMETRY_NAMESPACE::sdk::instrumentationscope::InstrumentationScope*);

virtual ~OTLPMetrics();
Expand All @@ -52,7 +50,6 @@ class OTLPMetrics final: public MetricsExporter {
private:
std::unique_ptr<OPENTELEMETRY_NAMESPACE::sdk::metrics::PushMetricExporter>
otlp_metric_exporter_;
const OPENTELEMETRY_NAMESPACE::sdk::resource::Resource& resource_;
OPENTELEMETRY_NAMESPACE::sdk::instrumentationscope::InstrumentationScope*
scope_;
std::string key_;
Expand Down
21 changes: 19 additions & 2 deletions test/agents/test-otlp-grpc-metrics.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,22 @@ if (process.argv[2] === 'child') {
tracingEnabled: false
});

nsolid.setThreadName('main-thread');

const worker = new Worker(__filename, { argv: ['child'] });
process.send({ type: 'workerThreadId', id: worker.threadId });
process.send({
type: 'nsolid',
id: nsolid.id,
appName: nsolid.appName,
metrics: nsolid.metrics(),
});
process.on('message', (message) => {
assert.strictEqual(message, 'exit');
process.exit(0);
});
} else {
nsolid.setThreadName('worker-thread');
}
} else {
const expectedProcMetrics = [
Expand Down Expand Up @@ -436,19 +441,23 @@ if (process.argv[2] === 'child') {

let nsolidId;
let nsolidAppName;
let nsolidMetrics;

function checkResource(resource) {
validateArray(resource.attributes, 'attributes');
assert.strictEqual(resource.attributes.length, 5);

const expectedAttributes = {
'telemetry.sdk.version': '1.16.0',
'telemetry.sdk.language': 'cpp',
'telemetry.sdk.name': 'opentelemetry',
'service.instance.id': nsolidId,
'service.name': nsolidAppName
'service.name': nsolidAppName,
'process.title': nsolidMetrics.title,
'process.owner': nsolidMetrics.user,
};

assert.strictEqual(resource.attributes.length, Object.keys(expectedAttributes).length);

resource.attributes.forEach((attribute) => {
assert.strictEqual(attribute.value.stringValue, expectedAttributes[attribute.key]);
delete expectedAttributes[attribute.key];
Expand Down Expand Up @@ -484,6 +493,13 @@ if (process.argv[2] === 'child') {
const attrIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.id' && a.value.intValue === `${context.threadId}`);
if (attrIndex > -1) {
indicesToRemove.push(i);
const nameIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.name');
assert(nameIndex > -1);
if (context.threadId === 0) { // main-thread
assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'main-thread');
} else { // worker-thread
assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'worker-thread');
}
}
} else {
indicesToRemove.push(i);
Expand Down Expand Up @@ -583,6 +599,7 @@ if (process.argv[2] === 'child') {
if (message.type === 'nsolid') {
nsolidId = message.id;
nsolidAppName = message.appName;
nsolidMetrics = message.metrics;
} else if (message.type === 'workerThreadId') {
context.threadList.push(message.id);
}
Expand Down
23 changes: 20 additions & 3 deletions test/agents/test-otlp-metrics.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,22 @@ if (process.argv[2] === 'child') {
tracingEnabled: false
});

nsolid.setThreadName('main-thread');

const worker = new Worker(__filename, { argv: ['child'] });
process.send({ type: 'workerThreadId', id: worker.threadId });
process.send({
type: 'nsolid',
id: nsolid.id,
appName: nsolid.appName,
metrics: nsolid.metrics(),
});
process.on('message', (message) => {
assert.strictEqual(message, 'exit');
process.exit(0);
});
} else {
nsolid.setThreadName('worker-thread');
}
} else {
const expectedProcMetrics = [
Expand Down Expand Up @@ -436,19 +441,23 @@ if (process.argv[2] === 'child') {

let nsolidId;
let nsolidAppName;
let nsolidMetrics;

function checkResource(resource) {
validateArray(resource.attributes, 'attributes');
assert.strictEqual(resource.attributes.length, 5);

const expectedAttributes = {
'telemetry.sdk.version': '1.16.0',
'telemetry.sdk.language': 'cpp',
'telemetry.sdk.name': 'opentelemetry',
'service.instance.id': nsolidId,
'service.name': nsolidAppName
'service.name': nsolidAppName,
'process.title': nsolidMetrics.title,
'process.owner': nsolidMetrics.user,
};

assert.strictEqual(resource.attributes.length, Object.keys(expectedAttributes).length);

resource.attributes.forEach((attribute) => {
assert.strictEqual(attribute.value.stringValue, expectedAttributes[attribute.key]);
delete expectedAttributes[attribute.key];
Expand Down Expand Up @@ -484,6 +493,13 @@ if (process.argv[2] === 'child') {
const attrIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.id' && a.value.intValue === `${context.threadId}`);
if (attrIndex > -1) {
indicesToRemove.push(i);
const nameIndex = dataPoint.attributes.findIndex((a) => a.key === 'thread.name');
assert(nameIndex > -1);
if (context.threadId === 0) { // main-thread
assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'main-thread');
} else { // worker-thread
assert.strictEqual(dataPoint.attributes[nameIndex].value.stringValue, 'worker-thread');
}
}
} else {
indicesToRemove.push(i);
Expand Down Expand Up @@ -561,7 +577,7 @@ if (process.argv[2] === 'child') {
metrics: [],
expected: [],
threadId: null,
threadList: [ threadId ]
threadList: [ threadId ],
};

async function runTest(getEnv) {
Expand All @@ -585,6 +601,7 @@ if (process.argv[2] === 'child') {
if (message.type === 'nsolid') {
nsolidId = message.id;
nsolidAppName = message.appName;
nsolidMetrics = message.metrics;
} else if (message.type === 'workerThreadId') {
context.threadList.push(message.id);
}
Expand Down
Loading