Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
add attributes to metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Mar 28, 2024
1 parent 53bdcfb commit 89fbed2
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 29 deletions.
4 changes: 4 additions & 0 deletions targets/otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ features = ["std", "flatten"]
version = "0.1"
features = ["bytes"]

[dependencies.sval_buffer]
version = "2"
features = ["std"]

[dependencies.tokio]
version = "1"
features = ["rt-multi-thread", "sync"]
Expand Down
88 changes: 59 additions & 29 deletions targets/otlp/src/data/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
mod export_metrics_service;
mod metric;

use std::ops::ControlFlow;

pub use self::{export_metrics_service::*, metric::*};

use emit::{
well_known::{
KEY_METRIC_AGG, KEY_METRIC_NAME, KEY_METRIC_UNIT, KEY_METRIC_VALUE, METRIC_AGG_COUNT,
METRIC_AGG_SUM,
KEY_METRIC_AGG, KEY_METRIC_NAME, KEY_METRIC_UNIT, KEY_METRIC_VALUE, KEY_SPAN_ID,
KEY_SPAN_PARENT, KEY_TRACE_ID, METRIC_AGG_COUNT, METRIC_AGG_SUM,
},
Props,
};
use emit_batcher::BatchError;
use sval::Value;

use super::{
EventEncoder, MessageFormatter, MessageRenderer, PreEncoded, RawEncoder, RequestEncoder,
any_value, EventEncoder, KeyValue, MessageFormatter, MessageRenderer, PreEncoded, RawEncoder,
RequestEncoder,
};

pub(crate) struct MetricsEventEncoder {
Expand Down Expand Up @@ -70,7 +73,28 @@ impl EventEncoder for MetricsEventEncoder {
evt,
};

let metric_unit = evt.props().get(KEY_METRIC_UNIT);
let mut metric_unit = None;
let mut attributes = Vec::new();

evt.props().for_each(|k, v| match k.get() {
KEY_METRIC_UNIT => {
metric_unit = Some(v);

ControlFlow::Continue(())
}
KEY_METRIC_NAME | KEY_METRIC_VALUE | KEY_METRIC_AGG | KEY_SPAN_ID
| KEY_SPAN_PARENT | KEY_TRACE_ID => ControlFlow::Continue(()),
_ => {
if let Ok(value) = sval_buffer::stream_to_value_owned(any_value::EmitValue(v)) {
attributes.push(KeyValue {
key: k.to_owned(),
value,
});
}

ControlFlow::Continue(())
}
});

let encoded = match metric_agg.and_then(|kind| kind.to_cow_str()).as_deref() {
Some(METRIC_AGG_SUM) => E::encode(Metric::<_, _, _> {
Expand All @@ -79,7 +103,7 @@ impl EventEncoder for MetricsEventEncoder {
data: &MetricData::Sum::<_>(Sum::<_> {
aggregation_temporality,
is_monotonic: false,
data_points: &SumPoints::new().points_from_value(
data_points: &SumPoints::new(&attributes).points_from_value(
start_time_unix_nano,
time_unix_nano,
metric_value,
Expand All @@ -92,7 +116,7 @@ impl EventEncoder for MetricsEventEncoder {
data: &MetricData::Sum::<_>(Sum::<_> {
aggregation_temporality,
is_monotonic: true,
data_points: &SumPoints::new().points_from_value(
data_points: &SumPoints::new(&attributes).points_from_value(
start_time_unix_nano,
time_unix_nano,
metric_value,
Expand All @@ -103,7 +127,7 @@ impl EventEncoder for MetricsEventEncoder {
name: &sval::Display::new(metric_name),
unit: &metric_unit.map(sval::Display::new),
data: &MetricData::Gauge(Gauge::<_> {
data_points: &RawPointSet::new().points_from_value(
data_points: &RawPointSet::new(&attributes).points_from_value(
start_time_unix_nano,
time_unix_nano,
metric_value,
Expand Down Expand Up @@ -211,21 +235,21 @@ trait DataPointBuilder {
fn into_points(self, start_time_unix_nano: u64, time_unix_nano: u64) -> Option<Self::Points>;
}

struct SumPoints(NumberDataPoint<'static>);
struct SumPoints<'a, A>(NumberDataPoint<'a, A>);

impl SumPoints {
fn new() -> Self {
impl<'a, A> SumPoints<'a, A> {
fn new(attributes: &'a A) -> Self {
SumPoints(NumberDataPoint {
attributes: &[],
attributes,
start_time_unix_nano: Default::default(),
time_unix_nano: Default::default(),
value: NumberDataPointValue::AsInt(AsInt(0)),
})
}
}

impl DataPointBuilder for SumPoints {
type Points = [NumberDataPoint<'static>; 1];
impl<'a, A> DataPointBuilder for SumPoints<'a, A> {
type Points = [NumberDataPoint<'a, A>; 1];

fn push_point_i64(&mut self, value: i64) {
self.0.value = match self.0.value {
Expand Down Expand Up @@ -262,29 +286,35 @@ impl DataPointBuilder for SumPoints {
}
}

struct RawPointSet(Vec<NumberDataPoint<'static>>);
struct RawPointSet<'a, A> {
attributes: &'a A,
points: Vec<NumberDataPoint<'a, A>>,
}

impl RawPointSet {
fn new() -> Self {
RawPointSet(Vec::new())
impl<'a, A> RawPointSet<'a, A> {
fn new(attributes: &'a A) -> Self {
RawPointSet {
attributes,
points: Vec::new(),
}
}
}

impl DataPointBuilder for RawPointSet {
type Points = Vec<NumberDataPoint<'static>>;
impl<'a, A> DataPointBuilder for RawPointSet<'a, A> {
type Points = Vec<NumberDataPoint<'a, A>>;

fn push_point_i64(&mut self, value: i64) {
self.0.push(NumberDataPoint {
attributes: &[],
self.points.push(NumberDataPoint {
attributes: self.attributes,
start_time_unix_nano: Default::default(),
time_unix_nano: Default::default(),
value: NumberDataPointValue::AsInt(AsInt(value)),
});
}

fn push_point_f64(&mut self, value: f64) {
self.0.push(NumberDataPoint {
attributes: &[],
self.points.push(NumberDataPoint {
attributes: self.attributes,
start_time_unix_nano: Default::default(),
time_unix_nano: Default::default(),
value: NumberDataPointValue::AsDouble(AsDouble(value)),
Expand All @@ -296,26 +326,26 @@ impl DataPointBuilder for RawPointSet {
start_time_unix_nano: u64,
time_unix_nano: u64,
) -> Option<Self::Points> {
match self.0.len() as u64 {
match self.points.len() as u64 {
0 => None,
1 => {
self.0[0].start_time_unix_nano = start_time_unix_nano;
self.0[0].time_unix_nano = time_unix_nano;
self.points[0].start_time_unix_nano = start_time_unix_nano;
self.points[0].time_unix_nano = time_unix_nano;

Some(self.0)
Some(self.points)
}
points => {
let point_time_range = time_unix_nano.saturating_sub(start_time_unix_nano);
let step = point_time_range / points;

let mut point_time = start_time_unix_nano;
for point in &mut self.0 {
for point in &mut self.points {
point.start_time_unix_nano = point_time;
point_time += step;
point.time_unix_nano = point_time;
}

Some(self.0)
Some(self.points)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions tests/smoke-test/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ fn sample_metrics() {
metric_agg,
metric_name,
metric_value: metric_value.load(Ordering::Relaxed),
x: "data",
);
}
}
Expand Down

0 comments on commit 89fbed2

Please sign in to comment.