From ce96b43f8690df3172ee5eac9ddeaa54237dc254 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Fri, 9 Aug 2024 12:47:34 -0700 Subject: [PATCH 1/8] Add test coverage for observable gauge (#2007) Co-authored-by: Cijo Thomas --- opentelemetry-sdk/src/metrics/mod.rs | 89 ++++++++++++++++++++++++++-- 1 file changed, 85 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 3225e660ac..aeb79ac32a 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -267,6 +267,18 @@ mod tests { gauge_aggregation_helper(Temporality::Cumulative); } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn observable_gauge_aggregation() { + // Run this test with stdout enabled to see output. + // cargo test observable_gauge_aggregation --features=testing -- --nocapture + + // Gauge should use last value aggregation regardless of the aggregation temporality used. + observable_gauge_aggregation_helper(Temporality::Delta, false); + observable_gauge_aggregation_helper(Temporality::Delta, true); + observable_gauge_aggregation_helper(Temporality::Cumulative, false); + observable_gauge_aggregation_helper(Temporality::Cumulative, true); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn observable_counter_aggregation_cumulative_non_zero_increment() { // Run this test with stdout enabled to see output. @@ -1445,17 +1457,80 @@ mod tests { test_context.flush_metrics(); - let sum = test_context.get_aggregation::>("my_gauge", None); - assert_eq!(sum.data_points.len(), 2); - let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1") + let gauge = test_context.get_aggregation::>("my_gauge", None); + assert_eq!(gauge.data_points.len(), 2); + let data_point1 = find_datapoint_with_key_value(&gauge.data_points, "key1", "value1") .expect("datapoint with key1=value1 expected"); assert_eq!(data_point1.value, 41); - let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2") + let data_point1 = find_datapoint_with_key_value(&gauge.data_points, "key1", "value2") .expect("datapoint with key1=value2 expected"); assert_eq!(data_point1.value, 54); } + fn observable_gauge_aggregation_helper(temporality: Temporality, use_empty_attributes: bool) { + // Arrange + let mut test_context = TestContext::new(temporality); + let _observable_gauge = test_context + .meter() + .i64_observable_gauge("test_observable_gauge") + .with_callback(move |observer| { + if use_empty_attributes { + observer.observe(1, &[]); + } + observer.observe(4, &[KeyValue::new("key1", "value1")]); + observer.observe(5, &[KeyValue::new("key2", "value2")]); + }) + .init(); + + test_context.flush_metrics(); + + // Assert + let gauge = test_context.get_aggregation::>("test_observable_gauge", None); + // Expecting 2 time-series. + let expected_time_series_count = if use_empty_attributes { 3 } else { 2 }; + assert_eq!(gauge.data_points.len(), expected_time_series_count); + + if use_empty_attributes { + // find and validate zero attribute datapoint + let zero_attribute_datapoint = find_datapoint_with_no_attributes(&gauge.data_points) + .expect("datapoint with no attributes expected"); + assert_eq!(zero_attribute_datapoint.value, 1); + } + + // find and validate key1=value1 datapoint + let data_point1 = find_datapoint_with_key_value(&gauge.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 4); + + // find and validate key2=value2 datapoint + let data_point2 = find_datapoint_with_key_value(&gauge.data_points, "key2", "value2") + .expect("datapoint with key2=value2 expected"); + assert_eq!(data_point2.value, 5); + + // Reset and report more measurements + test_context.reset_metrics(); + + test_context.flush_metrics(); + + let gauge = test_context.get_aggregation::>("test_observable_gauge", None); + assert_eq!(gauge.data_points.len(), expected_time_series_count); + + if use_empty_attributes { + let zero_attribute_datapoint = find_datapoint_with_no_attributes(&gauge.data_points) + .expect("datapoint with no attributes expected"); + assert_eq!(zero_attribute_datapoint.value, 1); + } + + let data_point1 = find_datapoint_with_key_value(&gauge.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 4); + + let data_point2 = find_datapoint_with_key_value(&gauge.data_points, "key2", "value2") + .expect("datapoint with key2=value2 expected"); + assert_eq!(data_point2.value, 5); + } + fn counter_aggregation_helper(temporality: Temporality) { // Arrange let mut test_context = TestContext::new(temporality); @@ -1622,6 +1697,12 @@ mod tests { }) } + fn find_datapoint_with_no_attributes(data_points: &[DataPoint]) -> Option<&DataPoint> { + data_points + .iter() + .find(|&datapoint| datapoint.attributes.is_empty()) + } + fn find_histogram_datapoint_with_key_value<'a, T>( data_points: &'a [HistogramDataPoint], key: &str, From 6c3cffb739af7fd51536b2757d9ca0493a7b973a Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Fri, 9 Aug 2024 15:08:20 -0700 Subject: [PATCH 2/8] Nit update to stress test (#2009) --- stress/src/logs.rs | 10 +++++++++- stress/src/metrics_histogram.rs | 3 +++ stress/src/traces.rs | 3 +++ 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/stress/src/logs.rs b/stress/src/logs.rs index e13c88f85c..d091dccc6b 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -4,6 +4,9 @@ Hardware: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz, 16vCPUs, RAM: 64.0 GB ~31 M/sec + + Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs, + ~38 M /sec */ use opentelemetry_appender_tracing::layer; @@ -50,5 +53,10 @@ fn main() { } fn test_log() { - error!(target: "my-system", event_id = 20, event_name = "my-event_name", user_name = "otel", user_email = "otel@opentelemetry.io"); + error!( + name = "CheckoutFailed", + book_id = "12345", + book_title = "Rust Programming Adventures", + message = "Unable to process checkout." + ); } diff --git a/stress/src/metrics_histogram.rs b/stress/src/metrics_histogram.rs index d65160af22..b5a9e63d3b 100644 --- a/stress/src/metrics_histogram.rs +++ b/stress/src/metrics_histogram.rs @@ -4,6 +4,9 @@ Hardware: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz, 16vCPUs, RAM: 64.0 GB ~1.8 M/sec + + Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs, + ~2.2 M /sec */ use lazy_static::lazy_static; diff --git a/stress/src/traces.rs b/stress/src/traces.rs index 14a5c756c0..62598b10ad 100644 --- a/stress/src/traces.rs +++ b/stress/src/traces.rs @@ -4,6 +4,9 @@ Hardware: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz, 16vCPUs, RAM: 64.0 GB ~6.5 M/sec + + Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs, + ~10.6 M /sec */ use lazy_static::lazy_static; From ed82d78eb8134a3931a3b745ee59a1e321eb9e05 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Mon, 12 Aug 2024 12:48:56 -0700 Subject: [PATCH 3/8] Add a special test for synchronous instruments (cumulative aggregation) (#2010) --- opentelemetry-sdk/src/metrics/mod.rs | 142 +++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index aeb79ac32a..df29d6647d 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -1251,6 +1251,140 @@ mod tests { counter_multithreaded_aggregation_helper(Temporality::Cumulative); } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn synchronous_instruments_cumulative_with_gap_in_measurements() { + // Run this test with stdout enabled to see output. + // cargo test synchronous_instruments_cumulative_with_gap_in_measurements --features=testing -- --nocapture + + synchronous_instruments_cumulative_with_gap_in_measurements_helper("counter"); + synchronous_instruments_cumulative_with_gap_in_measurements_helper("updown_counter"); + synchronous_instruments_cumulative_with_gap_in_measurements_helper("histogram"); + + /* Synchronous Gauge has an aggregation bug. Uncomment the code below to run the test for gauge + once this issue is fixed: https://github.com/open-telemetry/opentelemetry-rust/issues/1975 + */ + + // synchronous_instruments_cumulative_with_gap_in_measurements_helper("gauge"); + } + + fn synchronous_instruments_cumulative_with_gap_in_measurements_helper( + instrument_name: &'static str, + ) { + let mut test_context = TestContext::new(Temporality::Cumulative); + let attributes = &[KeyValue::new("key1", "value1")]; + + // Create instrument and emit measurements + match instrument_name { + "counter" => { + let counter = test_context.meter().u64_counter("test_counter").init(); + counter.add(5, &[]); + counter.add(10, attributes); + } + "updown_counter" => { + let updown_counter = test_context + .meter() + .i64_up_down_counter("test_updowncounter") + .init(); + updown_counter.add(15, &[]); + updown_counter.add(20, attributes); + } + "histogram" => { + let histogram = test_context.meter().u64_histogram("test_histogram").init(); + histogram.record(25, &[]); + histogram.record(30, attributes); + } + "gauge" => { + let gauge = test_context.meter().u64_gauge("test_gauge").init(); + gauge.record(35, &[]); + gauge.record(40, attributes); + } + _ => panic!("Incorrect instrument kind provided"), + }; + + test_context.flush_metrics(); + + // Test the first export + assert_correct_export(&mut test_context, instrument_name); + + // Reset and export again without making any measurements + test_context.reset_metrics(); + + test_context.flush_metrics(); + + // Test that latest export has the same data as the previous one + assert_correct_export(&mut test_context, instrument_name); + + fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) { + match instrument_name { + "counter" => { + let counter_data = + test_context.get_aggregation::>("test_counter", None); + assert_eq!(counter_data.data_points.len(), 2); + let zero_attribute_datapoint = + find_datapoint_with_no_attributes(&counter_data.data_points) + .expect("datapoint with no attributes expected"); + assert_eq!(zero_attribute_datapoint.value, 5); + let data_point1 = + find_datapoint_with_key_value(&counter_data.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 10); + } + "updown_counter" => { + let updown_counter_data = + test_context.get_aggregation::>("test_updowncounter", None); + assert_eq!(updown_counter_data.data_points.len(), 2); + let zero_attribute_datapoint = + find_datapoint_with_no_attributes(&updown_counter_data.data_points) + .expect("datapoint with no attributes expected"); + assert_eq!(zero_attribute_datapoint.value, 15); + let data_point1 = find_datapoint_with_key_value( + &updown_counter_data.data_points, + "key1", + "value1", + ) + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 20); + } + "histogram" => { + let histogram_data = test_context + .get_aggregation::>("test_histogram", None); + assert_eq!(histogram_data.data_points.len(), 2); + let zero_attribute_datapoint = + find_histogram_datapoint_with_no_attributes(&histogram_data.data_points) + .expect("datapoint with no attributes expected"); + assert_eq!(zero_attribute_datapoint.count, 1); + assert_eq!(zero_attribute_datapoint.sum, 25); + assert_eq!(zero_attribute_datapoint.min, Some(25)); + assert_eq!(zero_attribute_datapoint.max, Some(25)); + let data_point1 = find_histogram_datapoint_with_key_value( + &histogram_data.data_points, + "key1", + "value1", + ) + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.count, 1); + assert_eq!(data_point1.sum, 30); + assert_eq!(data_point1.min, Some(30)); + assert_eq!(data_point1.max, Some(30)); + } + "gauge" => { + let gauge_data = + test_context.get_aggregation::>("test_gauge", None); + assert_eq!(gauge_data.data_points.len(), 2); + let zero_attribute_datapoint = + find_datapoint_with_no_attributes(&gauge_data.data_points) + .expect("datapoint with no attributes expected"); + assert_eq!(zero_attribute_datapoint.value, 35); + let data_point1 = + find_datapoint_with_key_value(&gauge_data.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 40); + } + _ => panic!("Incorrect instrument kind provided"), + } + } + } + fn counter_multithreaded_aggregation_helper(temporality: Temporality) { // Arrange let mut test_context = TestContext::new(temporality); @@ -1716,6 +1850,14 @@ mod tests { }) } + fn find_histogram_datapoint_with_no_attributes( + data_points: &[HistogramDataPoint], + ) -> Option<&HistogramDataPoint> { + data_points + .iter() + .find(|&datapoint| datapoint.attributes.is_empty()) + } + fn find_scope_metric<'a>( metrics: &'a [ScopeMetrics], name: &'a str, From d583695d30681ee1bd910156de27d91be3711822 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Mon, 12 Aug 2024 15:45:15 -0700 Subject: [PATCH 4/8] Move ValueMap to mod file to allow for code reuse (#2012) --- opentelemetry-sdk/src/metrics/internal/mod.rs | 128 ++++++++++++++++- opentelemetry-sdk/src/metrics/internal/sum.rs | 131 +----------------- 2 files changed, 130 insertions(+), 129 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index cf0edeb47c..84d0735053 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -5,12 +5,136 @@ mod last_value; mod sum; use core::fmt; +use std::collections::HashMap; +use std::marker::PhantomData; use std::ops::{Add, AddAssign, Sub}; -use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; -use std::sync::Mutex; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; +use aggregate::is_under_cardinality_limit; pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; +use once_cell::sync::Lazy; +use opentelemetry::metrics::MetricsError; +use opentelemetry::{global, KeyValue}; + +use crate::metrics::AttributeSet; + +pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = + Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); + +/// Abstracts the update operation for a measurement. +pub(crate) trait Operation { + fn update_tracker>(tracker: &AT, value: T); +} + +struct Increment; + +impl Operation for Increment { + fn update_tracker>(tracker: &AT, value: T) { + tracker.add(value); + } +} + +struct Assign; + +impl Operation for Assign { + fn update_tracker>(tracker: &AT, value: T) { + tracker.store(value); + } +} + +/// The storage for sums. +/// +/// This structure is parametrized by an `Operation` that indicates how +/// updates to the underlying value trackers should be performed. +pub(crate) struct ValueMap, O> { + /// Trackers store the values associated with different attribute sets. + trackers: RwLock, Arc>>, + /// Number of different attribute set stored in the `trackers` map. + count: AtomicUsize, + /// Indicates whether a value with no attributes has been stored. + has_no_attribute_value: AtomicBool, + /// Tracker for values with no attributes attached. + no_attribute_tracker: T::AtomicTracker, + phantom: PhantomData, +} + +impl, O> Default for ValueMap { + fn default() -> Self { + ValueMap::new() + } +} + +impl, O> ValueMap { + fn new() -> Self { + ValueMap { + trackers: RwLock::new(HashMap::new()), + has_no_attribute_value: AtomicBool::new(false), + no_attribute_tracker: T::new_atomic_tracker(), + count: AtomicUsize::new(0), + phantom: PhantomData, + } + } +} + +impl, O: Operation> ValueMap { + fn measure(&self, measurement: T, attributes: &[KeyValue]) { + if attributes.is_empty() { + O::update_tracker(&self.no_attribute_tracker, measurement); + self.has_no_attribute_value.store(true, Ordering::Release); + return; + } + + let Ok(trackers) = self.trackers.read() else { + return; + }; + + // Try to retrieve and update the tracker with the attributes in the provided order first + if let Some(tracker) = trackers.get(attributes) { + O::update_tracker(&**tracker, measurement); + return; + } + + // Try to retrieve and update the tracker with the attributes sorted. + let sorted_attrs = AttributeSet::from(attributes).into_vec(); + if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + O::update_tracker(&**tracker, measurement); + return; + } + + // Give up the read lock before acquiring the write lock. + drop(trackers); + + let Ok(mut trackers) = self.trackers.write() else { + return; + }; + + // Recheck both the provided and sorted orders after acquiring the write lock + // in case another thread has pushed an update in the meantime. + if let Some(tracker) = trackers.get(attributes) { + O::update_tracker(&**tracker, measurement); + } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + O::update_tracker(&**tracker, measurement); + } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { + let new_tracker = Arc::new(T::new_atomic_tracker()); + O::update_tracker(&*new_tracker, measurement); + + // Insert tracker with the attributes in the provided and sorted orders + trackers.insert(attributes.to_vec(), new_tracker.clone()); + trackers.insert(sorted_attrs, new_tracker); + + self.count.fetch_add(1, Ordering::SeqCst); + } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { + O::update_tracker(&**overflow_value, measurement); + } else { + let new_tracker = T::new_atomic_tracker(); + O::update_tracker(&new_tracker, measurement); + trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker)); + global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); + } + } +} /// Marks a type that can have a value added and retrieved atomically. Required since /// different types have different backing atomic mechanisms diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 1ed76fdae9..36108c86dc 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,137 +1,14 @@ use std::collections::HashSet; -use std::marker::PhantomData; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::vec; -use std::{ - collections::HashMap, - sync::{Mutex, RwLock}, - time::SystemTime, -}; +use std::{collections::HashMap, sync::Mutex, time::SystemTime}; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; -use crate::metrics::AttributeSet; -use once_cell::sync::Lazy; use opentelemetry::KeyValue; -use opentelemetry::{global, metrics::MetricsError}; -use super::{aggregate::is_under_cardinality_limit, AtomicTracker, Number}; - -pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = - Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); - -/// Abstracts the update operation for a measurement. -trait Operation { - fn update_tracker>(tracker: &AT, value: T); -} - -struct Increment; - -impl Operation for Increment { - fn update_tracker>(tracker: &AT, value: T) { - tracker.add(value); - } -} - -struct Assign; - -impl Operation for Assign { - fn update_tracker>(tracker: &AT, value: T) { - tracker.store(value); - } -} - -/// The storage for sums. -/// -/// This structure is parametrized by an `Operation` that indicates how -/// updates to the underlying value trackers should be performed. -struct ValueMap, O> { - /// Trackers store the values associated with different attribute sets. - trackers: RwLock, Arc>>, - /// Number of different attribute set stored in the `trackers` map. - count: AtomicUsize, - /// Indicates whether a value with no attributes has been stored. - has_no_attribute_value: AtomicBool, - /// Tracker for values with no attributes attached. - no_attribute_tracker: T::AtomicTracker, - phantom: PhantomData, -} - -impl, O> Default for ValueMap { - fn default() -> Self { - ValueMap::new() - } -} - -impl, O> ValueMap { - fn new() -> Self { - ValueMap { - trackers: RwLock::new(HashMap::new()), - has_no_attribute_value: AtomicBool::new(false), - no_attribute_tracker: T::new_atomic_tracker(), - count: AtomicUsize::new(0), - phantom: PhantomData, - } - } -} - -impl, O: Operation> ValueMap { - fn measure(&self, measurement: T, attributes: &[KeyValue]) { - if attributes.is_empty() { - O::update_tracker(&self.no_attribute_tracker, measurement); - self.has_no_attribute_value.store(true, Ordering::Release); - return; - } - - let Ok(trackers) = self.trackers.read() else { - return; - }; - - // Try to retrieve and update the tracker with the attributes in the provided order first - if let Some(tracker) = trackers.get(attributes) { - O::update_tracker(&**tracker, measurement); - return; - } - - // Try to retrieve and update the tracker with the attributes sorted. - let sorted_attrs = AttributeSet::from(attributes).into_vec(); - if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - O::update_tracker(&**tracker, measurement); - return; - } - - // Give up the read lock before acquiring the write lock. - drop(trackers); - - let Ok(mut trackers) = self.trackers.write() else { - return; - }; - - // Recheck both the provided and sorted orders after acquiring the write lock - // in case another thread has pushed an update in the meantime. - if let Some(tracker) = trackers.get(attributes) { - O::update_tracker(&**tracker, measurement); - } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - O::update_tracker(&**tracker, measurement); - } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { - let new_tracker = Arc::new(T::new_atomic_tracker()); - O::update_tracker(&*new_tracker, measurement); - - // Insert tracker with the attributes in the provided and sorted orders - trackers.insert(attributes.to_vec(), new_tracker.clone()); - trackers.insert(sorted_attrs, new_tracker); - - self.count.fetch_add(1, Ordering::SeqCst); - } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { - O::update_tracker(&**overflow_value, measurement); - } else { - let new_tracker = T::new_atomic_tracker(); - O::update_tracker(&new_tracker, measurement); - trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker)); - global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); - } - } -} +use super::{Assign, Increment, ValueMap}; +use super::{AtomicTracker, Number}; /// Summarizes a set of measurements made as their arithmetic sum. pub(crate) struct Sum> { From 5781f501153a9a1dec217cc84128549a29d450ea Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 12 Aug 2024 16:58:59 -0700 Subject: [PATCH 5/8] initial commit --- opentelemetry-sdk/src/logs/record.rs | 19 ++++++++----------- opentelemetry/src/logs/noop.rs | 6 +----- opentelemetry/src/logs/record.rs | 6 +----- 3 files changed, 10 insertions(+), 21 deletions(-) diff --git a/opentelemetry-sdk/src/logs/record.rs b/opentelemetry-sdk/src/logs/record.rs index 856cb7d64e..89722a55ad 100644 --- a/opentelemetry-sdk/src/logs/record.rs +++ b/opentelemetry-sdk/src/logs/record.rs @@ -11,7 +11,7 @@ use std::{borrow::Cow, time::SystemTime}; /// is provided to `LogExporter`s as input. pub struct LogRecord { /// Event name. Optional as not all the logging API support it. - pub event_name: Option>, + pub event_name: Option<&'static str>, /// Target of the log record pub target: Option>, @@ -38,11 +38,8 @@ pub struct LogRecord { } impl opentelemetry::logs::LogRecord for LogRecord { - fn set_event_name(&mut self, name: T) - where - T: Into>, - { - self.event_name = Some(name.into()); + fn set_event_name(&mut self, name: &'static str) { + self.event_name = Some(name); } // Sets the `target` of a record @@ -154,7 +151,7 @@ mod tests { fn test_set_eventname() { let mut log_record = LogRecord::default(); log_record.set_event_name("test_event"); - assert_eq!(log_record.event_name, Some(Cow::Borrowed("test_event"))); + assert_eq!(log_record.event_name, Some("test_event")); } #[test] @@ -247,7 +244,7 @@ mod tests { #[test] fn compare_log_record() { let log_record = LogRecord { - event_name: Some(Cow::Borrowed("test_event")), + event_name: Some("test_event"), target: Some(Cow::Borrowed("foo::bar")), timestamp: Some(SystemTime::now()), observed_timestamp: Some(SystemTime::now()), @@ -267,7 +264,7 @@ mod tests { assert_eq!(log_record, log_record_cloned); let mut log_record_different = log_record.clone(); - log_record_different.event_name = Some(Cow::Borrowed("different_event")); + log_record_different.event_name = Some("different_event"); assert_ne!(log_record, log_record_different); } @@ -275,12 +272,12 @@ mod tests { #[test] fn compare_log_record_target_borrowed_eq_owned() { let log_record_borrowed = LogRecord { - event_name: Some(Cow::Borrowed("test_event")), + event_name: Some("test_event"), ..Default::default() }; let log_record_owned = LogRecord { - event_name: Some(Cow::Owned("test_event".to_string())), + event_name: Some("test_event"), ..Default::default() }; diff --git a/opentelemetry/src/logs/noop.rs b/opentelemetry/src/logs/noop.rs index 0264a76843..ea67a79313 100644 --- a/opentelemetry/src/logs/noop.rs +++ b/opentelemetry/src/logs/noop.rs @@ -41,11 +41,7 @@ pub struct NoopLogRecord; impl LogRecord for NoopLogRecord { // Implement the LogRecord trait methods with empty bodies. #[inline] - fn set_event_name(&mut self, _name: T) - where - T: Into>, - { - } + fn set_event_name(&mut self, _name: &'static str) {} #[inline] fn set_timestamp(&mut self, _timestamp: SystemTime) {} #[inline] diff --git a/opentelemetry/src/logs/record.rs b/opentelemetry/src/logs/record.rs index a77f25c072..223445b551 100644 --- a/opentelemetry/src/logs/record.rs +++ b/opentelemetry/src/logs/record.rs @@ -4,11 +4,7 @@ use std::{borrow::Cow, collections::HashMap, time::SystemTime}; /// SDK implemented trait for managing log records pub trait LogRecord { /// Sets the `event_name` of a record - fn set_event_name(&mut self, _name: T) - where - T: Into>, - { - } + fn set_event_name(&mut self, _name: &'static str); /// Sets the `target` of a record. /// Currently, both `opentelemetry-appender-tracing` and `opentelemetry-appender-log` create a single logger From 6ee6e31b7da314a5f93d921353507ef447dd88e1 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 12 Aug 2024 17:10:24 -0700 Subject: [PATCH 6/8] add severity text --- opentelemetry-sdk/src/logs/record.rs | 12 ++++++------ opentelemetry-stdout/src/logs/transform.rs | 2 +- opentelemetry/src/logs/noop.rs | 2 +- opentelemetry/src/logs/record.rs | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/opentelemetry-sdk/src/logs/record.rs b/opentelemetry-sdk/src/logs/record.rs index 89722a55ad..8e152c1059 100644 --- a/opentelemetry-sdk/src/logs/record.rs +++ b/opentelemetry-sdk/src/logs/record.rs @@ -26,7 +26,8 @@ pub struct LogRecord { pub trace_context: Option, /// The original severity string from the source - pub severity_text: Option>, + pub severity_text: Option<&'static str>, + /// The corresponding severity value, normalized pub severity_number: Option, @@ -58,7 +59,7 @@ impl opentelemetry::logs::LogRecord for LogRecord { self.observed_timestamp = Some(timestamp); } - fn set_severity_text(&mut self, severity_text: Cow<'static, str>) { + fn set_severity_text(&mut self, severity_text: &'static str) { self.severity_text = Some(severity_text); } @@ -180,9 +181,8 @@ mod tests { #[test] fn test_set_severity_text() { let mut log_record = LogRecord::default(); - let severity_text: Cow<'static, str> = "ERROR".into(); // Explicitly typed - log_record.set_severity_text(severity_text); - assert_eq!(log_record.severity_text, Some(Cow::Borrowed("ERROR"))); + log_record.set_severity_text("ERROR"); + assert_eq!(log_record.severity_text, Some("ERROR")); } #[test] @@ -248,7 +248,7 @@ mod tests { target: Some(Cow::Borrowed("foo::bar")), timestamp: Some(SystemTime::now()), observed_timestamp: Some(SystemTime::now()), - severity_text: Some(Cow::Borrowed("ERROR")), + severity_text: Some("ERROR"), severity_number: Some(Severity::Error), body: Some(AnyValue::String("Test body".into())), attributes: Some(vec![(Key::new("key"), AnyValue::String("value".into()))]), diff --git a/opentelemetry-stdout/src/logs/transform.rs b/opentelemetry-stdout/src/logs/transform.rs index 2f3199bd25..4e1b918137 100644 --- a/opentelemetry-stdout/src/logs/transform.rs +++ b/opentelemetry-stdout/src/logs/transform.rs @@ -91,7 +91,7 @@ struct LogRecord { observed_time: SystemTime, severity_number: u32, #[serde(skip_serializing_if = "Option::is_none")] - severity_text: Option>, + severity_text: Option<&'static str>, #[serde(skip_serializing_if = "Option::is_none")] body: Option, attributes: Vec, diff --git a/opentelemetry/src/logs/noop.rs b/opentelemetry/src/logs/noop.rs index ea67a79313..8c31328e5d 100644 --- a/opentelemetry/src/logs/noop.rs +++ b/opentelemetry/src/logs/noop.rs @@ -47,7 +47,7 @@ impl LogRecord for NoopLogRecord { #[inline] fn set_observed_timestamp(&mut self, _timestamp: SystemTime) {} #[inline] - fn set_severity_text(&mut self, _text: Cow<'static, str>) {} + fn set_severity_text(&mut self, _text: &'static str) {} #[inline] fn set_severity_number(&mut self, _number: Severity) {} #[inline] diff --git a/opentelemetry/src/logs/record.rs b/opentelemetry/src/logs/record.rs index 223445b551..2e171ef0a1 100644 --- a/opentelemetry/src/logs/record.rs +++ b/opentelemetry/src/logs/record.rs @@ -4,7 +4,7 @@ use std::{borrow::Cow, collections::HashMap, time::SystemTime}; /// SDK implemented trait for managing log records pub trait LogRecord { /// Sets the `event_name` of a record - fn set_event_name(&mut self, _name: &'static str); + fn set_event_name(&mut self, name: &'static str); /// Sets the `target` of a record. /// Currently, both `opentelemetry-appender-tracing` and `opentelemetry-appender-log` create a single logger @@ -21,7 +21,7 @@ pub trait LogRecord { fn set_observed_timestamp(&mut self, timestamp: SystemTime); /// Sets severity as text. - fn set_severity_text(&mut self, text: Cow<'static, str>); + fn set_severity_text(&mut self, text: &'static str); /// Sets severity as a numeric value. fn set_severity_number(&mut self, number: Severity); From 72809362fb21125b0fbfe8b6b8fce23f37c822ed Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 12 Aug 2024 17:29:15 -0700 Subject: [PATCH 7/8] fix lint --- opentelemetry-appender-log/src/lib.rs | 2 +- opentelemetry-appender-tracing/src/layer.rs | 2 +- opentelemetry-sdk/benches/log.rs | 10 +++++----- opentelemetry-sdk/src/logs/mod.rs | 2 +- opentelemetry-stdout/src/logs/transform.rs | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/opentelemetry-appender-log/src/lib.rs b/opentelemetry-appender-log/src/lib.rs index 0b38968393..291f888488 100644 --- a/opentelemetry-appender-log/src/lib.rs +++ b/opentelemetry-appender-log/src/lib.rs @@ -128,7 +128,7 @@ where if self.enabled(record.metadata()) { let mut log_record = self.logger.create_log_record(); log_record.set_severity_number(severity_of_level(record.level())); - log_record.set_severity_text(record.level().as_str().into()); + log_record.set_severity_text(record.level().as_str()); log_record.set_body(AnyValue::from(record.args().to_string())); log_record.add_attributes(log_attributes(record.key_values())); log_record.set_target(record.metadata().target().to_string()); diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index 29eed63e8c..776dd27502 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -176,7 +176,7 @@ where log_record.set_target(meta.target().to_string()); log_record.set_event_name(meta.name()); log_record.set_severity_number(severity_of_level(meta.level())); - log_record.set_severity_text(meta.level().as_str().into()); + log_record.set_severity_text(meta.level().as_str()); let mut visitor = EventVisitor::new(&mut log_record); #[cfg(feature = "experimental_metadata_attributes")] visitor.visit_experimental_metadata(meta); diff --git a/opentelemetry-sdk/benches/log.rs b/opentelemetry-sdk/benches/log.rs index ba0a94cd17..198be8cf2f 100644 --- a/opentelemetry-sdk/benches/log.rs +++ b/opentelemetry-sdk/benches/log.rs @@ -131,7 +131,7 @@ fn logging_comparable_to_appender(c: &mut Criterion) { log_record.set_target("my-target".to_string()); log_record.set_event_name("CheckoutFailed"); log_record.set_severity_number(Severity::Warn); - log_record.set_severity_text("WARN".into()); + log_record.set_severity_text("WARN"); log_record.add_attribute("book_id", "12345"); log_record.add_attribute("book_title", "Rust Programming Adventures"); log_record.add_attribute("message", "Unable to process checkout."); @@ -275,7 +275,7 @@ fn criterion_benchmark(c: &mut Criterion) { log_record.set_timestamp(now); log_record.set_observed_timestamp(now); log_record.set_severity_number(Severity::Warn); - log_record.set_severity_text(Severity::Warn.name().into()); + log_record.set_severity_text(Severity::Warn.name()); logger.emit(log_record); }); @@ -285,7 +285,7 @@ fn criterion_benchmark(c: &mut Criterion) { log_record.set_timestamp(now); log_record.set_observed_timestamp(now); log_record.set_severity_number(Severity::Warn); - log_record.set_severity_text(Severity::Warn.name().into()); + log_record.set_severity_text(Severity::Warn.name()); log_record.add_attribute("name", "my-event-name"); log_record.add_attribute("event.id", 20); log_record.add_attribute("user.name", "otel"); @@ -299,7 +299,7 @@ fn criterion_benchmark(c: &mut Criterion) { log_record.set_timestamp(now); log_record.set_observed_timestamp(now); log_record.set_severity_number(Severity::Warn); - log_record.set_severity_text(Severity::Warn.name().into()); + log_record.set_severity_text(Severity::Warn.name()); log_record.add_attribute("name", "my-event-name"); log_record.add_attribute("event.id", 20); log_record.add_attribute("user.name", "otel"); @@ -338,7 +338,7 @@ fn criterion_benchmark(c: &mut Criterion) { log_record.set_timestamp(now); log_record.set_observed_timestamp(now); log_record.set_severity_number(Severity::Warn); - log_record.set_severity_text(Severity::Warn.name().into()); + log_record.set_severity_text(Severity::Warn.name()); log_record.add_attributes(attributes.clone()); logger.emit(log_record); }); diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 207da4255c..5d2e72719b 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -41,7 +41,7 @@ mod tests { let logger = logger_provider.logger("test-logger"); let mut log_record = logger.create_log_record(); log_record.set_severity_number(Severity::Error); - log_record.set_severity_text("Error".into()); + log_record.set_severity_text("Error"); // Adding attributes using a vector with explicitly constructed Key and AnyValue objects. log_record.add_attributes(vec![ diff --git a/opentelemetry-stdout/src/logs/transform.rs b/opentelemetry-stdout/src/logs/transform.rs index 4e1b918137..0560e0c064 100644 --- a/opentelemetry-stdout/src/logs/transform.rs +++ b/opentelemetry-stdout/src/logs/transform.rs @@ -115,11 +115,11 @@ impl From for LogRecord { .collect::>(); // Collect into a Vecs #[cfg(feature = "populate-logs-event-name")] - if let Some(event_name) = &value.record.event_name { + if let Some(event_name) = value.record.event_name { let mut attributes_with_name = attributes; attributes_with_name.push(KeyValue::from(( "name".into(), - opentelemetry::Value::String(event_name.clone().into()), + opentelemetry::Value::String(event_name.into()), ))); attributes_with_name } else { From 5d89aee9db638abb548e393edf58ccd87580238c Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Mon, 12 Aug 2024 17:44:27 -0700 Subject: [PATCH 8/8] Add stress test for Synchronous Gauge (#2013) Co-authored-by: Cijo Thomas --- opentelemetry-sdk/benches/metric_gauge.rs | 26 ++++----- stress/Cargo.toml | 5 ++ stress/src/metrics_gauge.rs | 66 +++++++++++++++++++++++ 3 files changed, 84 insertions(+), 13 deletions(-) create mode 100644 stress/src/metrics_gauge.rs diff --git a/opentelemetry-sdk/benches/metric_gauge.rs b/opentelemetry-sdk/benches/metric_gauge.rs index e455c5a577..4fe2a4ead7 100644 --- a/opentelemetry-sdk/benches/metric_gauge.rs +++ b/opentelemetry-sdk/benches/metric_gauge.rs @@ -1,12 +1,12 @@ /* The benchmark results: criterion = "0.5.1" - OS: Ubuntu 22.04.3 LTS (5.15.146.1-microsoft-standard-WSL2) - Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs, + OS: Ubuntu 22.04.4 LTS (5.15.153.1-microsoft-standard-WSL2) + Hardware: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz, 16vCPUs, RAM: 64.0 GB | Test | Average time| |--------------------------------|-------------| - | Gauge_Add_4 | 586 ns | + | Gauge_Add | 483.78 ns | */ use criterion::{criterion_group, criterion_main, Criterion}; @@ -26,6 +26,11 @@ thread_local! { static CURRENT_RNG: RefCell = RefCell::new(rngs::SmallRng::from_entropy()); } +static ATTRIBUTE_VALUES: [&str; 10] = [ + "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9", + "value10", +]; + // Run this benchmark with: // cargo bench --bench metric_gauge fn create_gauge() -> Gauge { @@ -42,13 +47,8 @@ fn criterion_benchmark(c: &mut Criterion) { } fn gauge_record(c: &mut Criterion) { - let attribute_values = [ - "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9", - "value10", - ]; - let gauge = create_gauge(); - c.bench_function("Gauge_Add_4", |b| { + c.bench_function("Gauge_Add", |b| { b.iter(|| { // 4*4*10*10 = 1600 time series. let rands = CURRENT_RNG.with(|rng| { @@ -67,10 +67,10 @@ fn gauge_record(c: &mut Criterion) { gauge.record( 1, &[ - KeyValue::new("attribute1", attribute_values[index_first_attribute]), - KeyValue::new("attribute2", attribute_values[index_second_attribute]), - KeyValue::new("attribute3", attribute_values[index_third_attribute]), - KeyValue::new("attribute4", attribute_values[index_fourth_attribute]), + KeyValue::new("attribute1", ATTRIBUTE_VALUES[index_first_attribute]), + KeyValue::new("attribute2", ATTRIBUTE_VALUES[index_second_attribute]), + KeyValue::new("attribute3", ATTRIBUTE_VALUES[index_third_attribute]), + KeyValue::new("attribute4", ATTRIBUTE_VALUES[index_fourth_attribute]), ], ); }); diff --git a/stress/Cargo.toml b/stress/Cargo.toml index d7e2c7a1c4..0591cde7eb 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -9,6 +9,11 @@ name = "metrics" path = "src/metrics_counter.rs" doc = false +[[bin]] # Bin to run the metrics stress tests for Gauge +name = "metrics_gauge" +path = "src/metrics_gauge.rs" +doc = false + [[bin]] # Bin to run the metrics stress tests for Histogram name = "metrics_histogram" path = "src/metrics_histogram.rs" diff --git a/stress/src/metrics_gauge.rs b/stress/src/metrics_gauge.rs new file mode 100644 index 0000000000..0ecdea5d00 --- /dev/null +++ b/stress/src/metrics_gauge.rs @@ -0,0 +1,66 @@ +/* + Stress test results: + OS: Ubuntu 22.04.4 LTS (5.15.153.1-microsoft-standard-WSL2) + Hardware: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz, 16vCPUs, + RAM: 64.0 GB + ~1.5 M/sec +*/ + +use lazy_static::lazy_static; +use opentelemetry::{ + metrics::{Gauge, MeterProvider as _}, + KeyValue, +}; +use opentelemetry_sdk::metrics::{ManualReader, SdkMeterProvider}; +use rand::{ + rngs::{self}, + Rng, SeedableRng, +}; +use std::cell::RefCell; + +mod throughput; + +lazy_static! { + static ref PROVIDER: SdkMeterProvider = SdkMeterProvider::builder() + .with_reader(ManualReader::builder().build()) + .build(); + static ref ATTRIBUTE_VALUES: [&'static str; 10] = [ + "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9", + "value10" + ]; + static ref GAUGE: Gauge = PROVIDER.meter("test").u64_gauge("test_gauge").init(); +} + +thread_local! { + /// Store random number generator for each thread + static CURRENT_RNG: RefCell = RefCell::new(rngs::SmallRng::from_entropy()); +} + +fn main() { + throughput::test_throughput(test_gauge); +} + +fn test_gauge() { + let len = ATTRIBUTE_VALUES.len(); + let rands = CURRENT_RNG.with(|rng| { + let mut rng = rng.borrow_mut(); + [ + rng.gen_range(0..len), + rng.gen_range(0..len), + rng.gen_range(0..len), + ] + }); + let index_first_attribute = rands[0]; + let index_second_attribute = rands[1]; + let index_third_attribute = rands[2]; + + // each attribute has 10 possible values, so there are 1000 possible combinations (time-series) + GAUGE.record( + 1, + &[ + KeyValue::new("attribute1", ATTRIBUTE_VALUES[index_first_attribute]), + KeyValue::new("attribute2", ATTRIBUTE_VALUES[index_second_attribute]), + KeyValue::new("attribute3", ATTRIBUTE_VALUES[index_third_attribute]), + ], + ); +}