From e1e82f9b301920cd383d39786a7a046dac879cfa Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Sat, 2 Nov 2024 15:12:04 +0200 Subject: [PATCH] Use ValueMap for ExpoHistogram --- .../metrics/internal/exponential_histogram.rs | 331 ++++++++---------- opentelemetry-sdk/src/metrics/mod.rs | 7 +- 2 files changed, 155 insertions(+), 183 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index 13f4200112..49a8e19f06 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -1,14 +1,11 @@ -use std::{collections::HashMap, f64::consts::LOG2_E, sync::Mutex, time::SystemTime}; +use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime}; use once_cell::sync::Lazy; use opentelemetry::{otel_debug, KeyValue}; -use crate::{ - metrics::data::{self, Aggregation, Temporality}, - metrics::AttributeSet, -}; +use crate::metrics::data::{self, Aggregation, Temporality}; -use super::Number; +use super::{Aggregator, Number, ValueMap}; pub(crate) const EXPO_MAX_SCALE: i8 = 20; pub(crate) const EXPO_MIN_SCALE: i8 = -10; @@ -16,33 +13,26 @@ pub(crate) const EXPO_MIN_SCALE: i8 = -10; /// A single data point in an exponential histogram. #[derive(Debug, PartialEq)] struct ExpoHistogramDataPoint { + max_size: i32, count: usize, min: T, max: T, sum: T, - - max_size: i32, - record_min_max: bool, - record_sum: bool, - scale: i8, - pos_buckets: ExpoBuckets, neg_buckets: ExpoBuckets, zero_count: u64, } impl ExpoHistogramDataPoint { - fn new(max_size: i32, max_scale: i8, record_min_max: bool, record_sum: bool) -> Self { + fn new(config: &BucketConfig) -> Self { ExpoHistogramDataPoint { + max_size: config.max_size, count: 0, min: T::max(), max: T::min(), sum: T::default(), - max_size, - record_min_max, - record_sum, - scale: max_scale, + scale: config.max_scale, pos_buckets: ExpoBuckets::default(), neg_buckets: ExpoBuckets::default(), zero_count: 0, @@ -57,17 +47,13 @@ impl ExpoHistogramDataPoint { fn record(&mut self, v: T) { self.count += 1; - if self.record_min_max { - if v < self.min { - self.min = v; - } - if v > self.max { - self.max = v; - } + if v < self.min { + self.min = v; } - if self.record_sum { - self.sum += v; + if v > self.max { + self.max = v; } + self.sum += v; let abs_v = v.into_float().abs(); @@ -315,20 +301,49 @@ impl ExpoBuckets { } } +impl Aggregator for Mutex> +where + T: Number, +{ + type InitConfig = BucketConfig; + + type PreComputedValue = T; + + fn create(init: &BucketConfig) -> Self { + Mutex::new(ExpoHistogramDataPoint::new(init)) + } + + fn update(&self, value: T) { + let mut this = match self.lock() { + Ok(guard) => guard, + Err(_) => return, + }; + this.record(value); + } + + fn clone_and_reset(&self, init: &BucketConfig) -> Self { + let mut current = self.lock().unwrap_or_else(|err| err.into_inner()); + let cloned = replace(current.deref_mut(), ExpoHistogramDataPoint::new(init)); + Mutex::new(cloned) + } +} + +#[derive(Debug, Clone, Copy, PartialEq)] +struct BucketConfig { + max_size: i32, + max_scale: i8, +} + /// An aggregator that summarizes a set of measurements as an exponential /// histogram. /// /// Each histogram is scoped by attributes and the aggregation cycle the /// measurements were made in. -pub(crate) struct ExpoHistogram { +pub(crate) struct ExpoHistogram { + value_map: ValueMap>>, + start: Mutex, record_sum: bool, record_min_max: bool, - max_size: i32, - max_scale: i8, - - values: Mutex>>, - - start: Mutex, } impl ExpoHistogram { @@ -340,11 +355,12 @@ impl ExpoHistogram { record_sum: bool, ) -> Self { ExpoHistogram { + value_map: ValueMap::new(BucketConfig { + max_size: max_size as i32, + max_scale, + }), record_sum, record_min_max, - max_size: max_size as i32, - max_scale, - values: Mutex::new(HashMap::default()), start: Mutex::new(SystemTime::now()), } } @@ -353,22 +369,11 @@ impl ExpoHistogram { let f_value = value.into_float(); // Ignore NaN and infinity. // Only makes sense if T is f64, maybe this could be no-op for other cases? - if f_value.is_infinite() || f_value.is_nan() { + if !f_value.is_finite() { return; } - let attrs: AttributeSet = attrs.into(); - if let Ok(mut values) = self.values.lock() { - let v = values.entry(attrs).or_insert_with(|| { - ExpoHistogramDataPoint::new( - self.max_size, - self.max_scale, - self.record_min_max, - self.record_sum, - ) - }); - v.record(value) - } + self.value_map.measure(value, attrs); } pub(crate) fn delta( @@ -376,11 +381,6 @@ impl ExpoHistogram { dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { let t = SystemTime::now(); - let start = self - .start - .lock() - .map(|s| *s) - .unwrap_or_else(|_| SystemTime::now()); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { @@ -393,59 +393,48 @@ impl ExpoHistogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Delta; - h.data_points.clear(); - - let mut values = match self.values.lock() { - Ok(g) => g, - Err(_) => return (0, None), - }; - - let n = values.len(); - if n > h.data_points.capacity() { - h.data_points.reserve_exact(n - h.data_points.capacity()); - } - for (a, b) in values.drain() { - h.data_points.push(data::ExponentialHistogramDataPoint { - attributes: a - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) - .collect(), - start_time: start, - time: t, - count: b.count, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - sum: if self.record_sum { b.sum } else { T::default() }, - scale: b.scale, - zero_count: b.zero_count, - positive_bucket: data::ExponentialBucket { - offset: b.pos_buckets.start_bin, - counts: b.pos_buckets.counts.clone(), - }, - negative_bucket: data::ExponentialBucket { - offset: b.neg_buckets.start_bin, - counts: b.neg_buckets.counts.clone(), - }, - zero_threshold: 0.0, - exemplars: vec![], + let prev_start = self + .start + .lock() + .map(|mut start| replace(start.deref_mut(), t)) + .unwrap_or(t); + + self.value_map + .collect_and_reset(&mut h.data_points, |attributes, attr| { + let b = attr.into_inner().unwrap_or_else(|err| err.into_inner()); + data::ExponentialHistogramDataPoint { + attributes, + start_time: prev_start, + time: t, + count: b.count, + min: if self.record_min_max { + Some(b.min) + } else { + None + }, + max: if self.record_min_max { + Some(b.max) + } else { + None + }, + sum: if self.record_sum { b.sum } else { T::default() }, + scale: b.scale, + zero_count: b.zero_count, + positive_bucket: data::ExponentialBucket { + offset: b.pos_buckets.start_bin, + counts: b.pos_buckets.counts, + }, + negative_bucket: data::ExponentialBucket { + offset: b.neg_buckets.start_bin, + counts: b.neg_buckets.counts, + }, + zero_threshold: 0.0, + exemplars: vec![], + } }); - } - // The delta collection cycle resets. - if let Ok(mut start) = self.start.lock() { - *start = t; - } - - (n, new_agg.map(|a| Box::new(a) as Box<_>)) + (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } pub(crate) fn cumulative( @@ -453,11 +442,6 @@ impl ExpoHistogram { dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { let t = SystemTime::now(); - let start = self - .start - .lock() - .map(|s| *s) - .unwrap_or_else(|_| SystemTime::now()); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { @@ -471,57 +455,47 @@ impl ExpoHistogram { let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Cumulative; - let values = match self.values.lock() { - Ok(g) => g, - Err(_) => return (0, None), - }; - h.data_points.clear(); - - let n = values.len(); - if n > h.data_points.capacity() { - h.data_points.reserve_exact(n - h.data_points.capacity()); - } + let prev_start = self + .start + .lock() + .map(|s| *s) + .unwrap_or_else(|_| SystemTime::now()); - // TODO: This will use an unbounded amount of memory if there - // are unbounded number of attribute sets being aggregated. Attribute - // sets that become "stale" need to be forgotten so this will not - // overload the system. - for (a, b) in values.iter() { - h.data_points.push(data::ExponentialHistogramDataPoint { - attributes: a - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) - .collect(), - start_time: start, - time: t, - count: b.count, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - sum: if self.record_sum { b.sum } else { T::default() }, - scale: b.scale, - zero_count: b.zero_count, - positive_bucket: data::ExponentialBucket { - offset: b.pos_buckets.start_bin, - counts: b.pos_buckets.counts.clone(), - }, - negative_bucket: data::ExponentialBucket { - offset: b.neg_buckets.start_bin, - counts: b.neg_buckets.counts.clone(), - }, - zero_threshold: 0.0, - exemplars: vec![], + self.value_map + .collect_readonly(&mut h.data_points, |attributes, attr| { + let b = attr.lock().unwrap_or_else(|err| err.into_inner()); + data::ExponentialHistogramDataPoint { + attributes, + start_time: prev_start, + time: t, + count: b.count, + min: if self.record_min_max { + Some(b.min) + } else { + None + }, + max: if self.record_min_max { + Some(b.max) + } else { + None + }, + sum: if self.record_sum { b.sum } else { T::default() }, + scale: b.scale, + zero_count: b.zero_count, + positive_bucket: data::ExponentialBucket { + offset: b.pos_buckets.start_bin, + counts: b.pos_buckets.counts.clone(), + }, + negative_bucket: data::ExponentialBucket { + offset: b.neg_buckets.start_bin, + counts: b.neg_buckets.counts.clone(), + }, + zero_threshold: 0.0, + exemplars: vec![], + } }); - } - (n, new_agg.map(|a| Box::new(a) as Box<_>)) + (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } } @@ -529,8 +503,6 @@ impl ExpoHistogram { mod tests { use std::ops::Neg; - use opentelemetry::KeyValue; - use crate::metrics::internal::{self, AggregateBuilder}; use super::*; @@ -631,7 +603,10 @@ mod tests { ]; for test in test_cases { - let mut dp = ExpoHistogramDataPoint::::new(test.max_size, 20, true, true); + let mut dp = ExpoHistogramDataPoint::::new(&BucketConfig { + max_size: test.max_size, + max_scale: 20, + }); for v in test.values { dp.record(v); dp.record(-v); @@ -644,7 +619,6 @@ mod tests { } fn run_min_max_sum_f64() { - let alice = &[KeyValue::new("user", "alice")][..]; struct Expected { min: f64, max: f64, @@ -692,11 +666,9 @@ mod tests { for test in test_cases { let h = ExpoHistogram::new(4, 20, true, true); for v in test.values { - h.measure(v, alice); + h.measure(v, &[]); } - let values = h.values.lock().unwrap(); - let alice: AttributeSet = alice.into(); - let dp = values.get(&alice).unwrap(); + let dp = h.value_map.no_attribute_tracker.lock().unwrap(); assert_eq!(test.expected.max, dp.max); assert_eq!(test.expected.min, dp.min); @@ -706,7 +678,6 @@ mod tests { } fn run_min_max_sum>() { - let alice = &[KeyValue::new("user", "alice")][..]; struct Expected { min: T, max: T, @@ -744,11 +715,9 @@ mod tests { for test in test_cases { let h = ExpoHistogram::new(4, 20, true, true); for v in test.values { - h.measure(v, alice); + h.measure(v, &[]); } - let values = h.values.lock().unwrap(); - let alice: AttributeSet = alice.into(); - let dp = values.get(&alice).unwrap(); + let dp = h.value_map.no_attribute_tracker.lock().unwrap(); assert_eq!(test.expected.max, dp.max); assert_eq!(test.expected.min, dp.min); @@ -831,7 +800,10 @@ mod tests { }, ]; for test in test_cases { - let mut dp = ExpoHistogramDataPoint::new(test.max_size, 20, true, true); + let mut dp = ExpoHistogramDataPoint::new(&BucketConfig { + max_size: test.max_size, + max_scale: 20, + }); for v in test.values { dp.record(v); dp.record(-v); @@ -848,7 +820,11 @@ mod tests { // These bins are calculated from the following formula: // floor( log2( value) * 2^20 ) using an arbitrary precision calculator. - let mut fdp = ExpoHistogramDataPoint::new(4, 20, true, true); + let cfg = BucketConfig { + max_size: 4, + max_scale: 20, + }; + let mut fdp = ExpoHistogramDataPoint::new(&cfg); fdp.record(f64::MAX); assert_eq!( @@ -856,7 +832,7 @@ mod tests { "start bin does not match for large f64 values", ); - let mut fdp = ExpoHistogramDataPoint::new(4, 20, true, true); + let mut fdp = ExpoHistogramDataPoint::new(&cfg); fdp.record(f64::MIN_POSITIVE); assert_eq!( @@ -864,7 +840,7 @@ mod tests { "start bin does not match for small positive values", ); - let mut idp = ExpoHistogramDataPoint::new(4, 20, true, true); + let mut idp = ExpoHistogramDataPoint::new(&cfg); idp.record(i64::MAX); assert_eq!( @@ -1210,12 +1186,13 @@ mod tests { start_bin: 0, counts: vec![], }, - record_min_max: true, - record_sum: true, zero_count: 0, }; - let mut ehdp = ExpoHistogramDataPoint::new(4, 20, true, true); + let mut ehdp = ExpoHistogramDataPoint::new(&BucketConfig { + max_size: 4, + max_scale: 20, + }); ehdp.record(f64::MIN_POSITIVE); ehdp.record(f64::MIN_POSITIVE); ehdp.record(f64::MIN_POSITIVE); diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index f8c440050d..ff9e9b8744 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -67,7 +67,7 @@ use std::collections::hash_map::DefaultHasher; use std::collections::HashSet; use std::hash::{Hash, Hasher}; -use opentelemetry::{Key, KeyValue, Value}; +use opentelemetry::KeyValue; /// A unique set of attributes that can be used as instrument identifiers. /// @@ -111,11 +111,6 @@ impl AttributeSet { AttributeSet(values, hash) } - /// Iterate over key value pairs in the set - pub(crate) fn iter(&self) -> impl Iterator { - self.0.iter().map(|kv| (&kv.key, &kv.value)) - } - /// Returns the underlying Vec of KeyValue pairs pub(crate) fn into_vec(self) -> Vec { self.0