From 89fbed2909b2a8b140a4590d72b28c007d484710 Mon Sep 17 00:00:00 2001 From: KodrAus Date: Thu, 28 Mar 2024 16:30:40 +1000 Subject: [PATCH] add attributes to metrics --- targets/otlp/Cargo.toml | 4 ++ targets/otlp/src/data/metrics.rs | 88 +++++++++++++++++++++----------- tests/smoke-test/main.rs | 1 + 3 files changed, 64 insertions(+), 29 deletions(-) diff --git a/targets/otlp/Cargo.toml b/targets/otlp/Cargo.toml index 6f91457..2c0ea1d 100644 --- a/targets/otlp/Cargo.toml +++ b/targets/otlp/Cargo.toml @@ -33,6 +33,10 @@ features = ["std", "flatten"] version = "0.1" features = ["bytes"] +[dependencies.sval_buffer] +version = "2" +features = ["std"] + [dependencies.tokio] version = "1" features = ["rt-multi-thread", "sync"] diff --git a/targets/otlp/src/data/metrics.rs b/targets/otlp/src/data/metrics.rs index 63e5d8e..8eb7621 100644 --- a/targets/otlp/src/data/metrics.rs +++ b/targets/otlp/src/data/metrics.rs @@ -1,12 +1,14 @@ mod export_metrics_service; mod metric; +use std::ops::ControlFlow; + pub use self::{export_metrics_service::*, metric::*}; use emit::{ well_known::{ - KEY_METRIC_AGG, KEY_METRIC_NAME, KEY_METRIC_UNIT, KEY_METRIC_VALUE, METRIC_AGG_COUNT, - METRIC_AGG_SUM, + KEY_METRIC_AGG, KEY_METRIC_NAME, KEY_METRIC_UNIT, KEY_METRIC_VALUE, KEY_SPAN_ID, + KEY_SPAN_PARENT, KEY_TRACE_ID, METRIC_AGG_COUNT, METRIC_AGG_SUM, }, Props, }; @@ -14,7 +16,8 @@ use emit_batcher::BatchError; use sval::Value; use super::{ - EventEncoder, MessageFormatter, MessageRenderer, PreEncoded, RawEncoder, RequestEncoder, + any_value, EventEncoder, KeyValue, MessageFormatter, MessageRenderer, PreEncoded, RawEncoder, + RequestEncoder, }; pub(crate) struct MetricsEventEncoder { @@ -70,7 +73,28 @@ impl EventEncoder for MetricsEventEncoder { evt, }; - let metric_unit = evt.props().get(KEY_METRIC_UNIT); + let mut metric_unit = None; + let mut attributes = Vec::new(); + + evt.props().for_each(|k, v| match k.get() { + KEY_METRIC_UNIT => { + metric_unit = Some(v); + + ControlFlow::Continue(()) + } + KEY_METRIC_NAME | KEY_METRIC_VALUE | KEY_METRIC_AGG | KEY_SPAN_ID + | KEY_SPAN_PARENT | KEY_TRACE_ID => ControlFlow::Continue(()), + _ => { + if let Ok(value) = sval_buffer::stream_to_value_owned(any_value::EmitValue(v)) { + attributes.push(KeyValue { + key: k.to_owned(), + value, + }); + } + + ControlFlow::Continue(()) + } + }); let encoded = match metric_agg.and_then(|kind| kind.to_cow_str()).as_deref() { Some(METRIC_AGG_SUM) => E::encode(Metric::<_, _, _> { @@ -79,7 +103,7 @@ impl EventEncoder for MetricsEventEncoder { data: &MetricData::Sum::<_>(Sum::<_> { aggregation_temporality, is_monotonic: false, - data_points: &SumPoints::new().points_from_value( + data_points: &SumPoints::new(&attributes).points_from_value( start_time_unix_nano, time_unix_nano, metric_value, @@ -92,7 +116,7 @@ impl EventEncoder for MetricsEventEncoder { data: &MetricData::Sum::<_>(Sum::<_> { aggregation_temporality, is_monotonic: true, - data_points: &SumPoints::new().points_from_value( + data_points: &SumPoints::new(&attributes).points_from_value( start_time_unix_nano, time_unix_nano, metric_value, @@ -103,7 +127,7 @@ impl EventEncoder for MetricsEventEncoder { name: &sval::Display::new(metric_name), unit: &metric_unit.map(sval::Display::new), data: &MetricData::Gauge(Gauge::<_> { - data_points: &RawPointSet::new().points_from_value( + data_points: &RawPointSet::new(&attributes).points_from_value( start_time_unix_nano, time_unix_nano, metric_value, @@ -211,12 +235,12 @@ trait DataPointBuilder { fn into_points(self, start_time_unix_nano: u64, time_unix_nano: u64) -> Option; } -struct SumPoints(NumberDataPoint<'static>); +struct SumPoints<'a, A>(NumberDataPoint<'a, A>); -impl SumPoints { - fn new() -> Self { +impl<'a, A> SumPoints<'a, A> { + fn new(attributes: &'a A) -> Self { SumPoints(NumberDataPoint { - attributes: &[], + attributes, start_time_unix_nano: Default::default(), time_unix_nano: Default::default(), value: NumberDataPointValue::AsInt(AsInt(0)), @@ -224,8 +248,8 @@ impl SumPoints { } } -impl DataPointBuilder for SumPoints { - type Points = [NumberDataPoint<'static>; 1]; +impl<'a, A> DataPointBuilder for SumPoints<'a, A> { + type Points = [NumberDataPoint<'a, A>; 1]; fn push_point_i64(&mut self, value: i64) { self.0.value = match self.0.value { @@ -262,20 +286,26 @@ impl DataPointBuilder for SumPoints { } } -struct RawPointSet(Vec>); +struct RawPointSet<'a, A> { + attributes: &'a A, + points: Vec>, +} -impl RawPointSet { - fn new() -> Self { - RawPointSet(Vec::new()) +impl<'a, A> RawPointSet<'a, A> { + fn new(attributes: &'a A) -> Self { + RawPointSet { + attributes, + points: Vec::new(), + } } } -impl DataPointBuilder for RawPointSet { - type Points = Vec>; +impl<'a, A> DataPointBuilder for RawPointSet<'a, A> { + type Points = Vec>; fn push_point_i64(&mut self, value: i64) { - self.0.push(NumberDataPoint { - attributes: &[], + self.points.push(NumberDataPoint { + attributes: self.attributes, start_time_unix_nano: Default::default(), time_unix_nano: Default::default(), value: NumberDataPointValue::AsInt(AsInt(value)), @@ -283,8 +313,8 @@ impl DataPointBuilder for RawPointSet { } fn push_point_f64(&mut self, value: f64) { - self.0.push(NumberDataPoint { - attributes: &[], + self.points.push(NumberDataPoint { + attributes: self.attributes, start_time_unix_nano: Default::default(), time_unix_nano: Default::default(), value: NumberDataPointValue::AsDouble(AsDouble(value)), @@ -296,26 +326,26 @@ impl DataPointBuilder for RawPointSet { start_time_unix_nano: u64, time_unix_nano: u64, ) -> Option { - match self.0.len() as u64 { + match self.points.len() as u64 { 0 => None, 1 => { - self.0[0].start_time_unix_nano = start_time_unix_nano; - self.0[0].time_unix_nano = time_unix_nano; + self.points[0].start_time_unix_nano = start_time_unix_nano; + self.points[0].time_unix_nano = time_unix_nano; - Some(self.0) + Some(self.points) } points => { let point_time_range = time_unix_nano.saturating_sub(start_time_unix_nano); let step = point_time_range / points; let mut point_time = start_time_unix_nano; - for point in &mut self.0 { + for point in &mut self.points { point.start_time_unix_nano = point_time; point_time += step; point.time_unix_nano = point_time; } - Some(self.0) + Some(self.points) } } } diff --git a/tests/smoke-test/main.rs b/tests/smoke-test/main.rs index 30933a8..11e8842 100644 --- a/tests/smoke-test/main.rs +++ b/tests/smoke-test/main.rs @@ -178,6 +178,7 @@ fn sample_metrics() { metric_agg, metric_name, metric_value: metric_value.load(Ordering::Relaxed), + x: "data", ); } }