Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial commit #25

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion opentelemetry-appender-log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ where
if self.enabled(record.metadata()) {
let mut log_record = self.logger.create_log_record();
log_record.set_severity_number(severity_of_level(record.level()));
log_record.set_severity_text(record.level().as_str().into());
log_record.set_severity_text(record.level().as_str());
log_record.set_body(AnyValue::from(record.args().to_string()));
log_record.add_attributes(log_attributes(record.key_values()));
log_record.set_target(record.metadata().target().to_string());
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ where
log_record.set_target(meta.target().to_string());
log_record.set_event_name(meta.name());
log_record.set_severity_number(severity_of_level(meta.level()));
log_record.set_severity_text(meta.level().as_str().into());
log_record.set_severity_text(meta.level().as_str());
let mut visitor = EventVisitor::new(&mut log_record);
#[cfg(feature = "experimental_metadata_attributes")]
visitor.visit_experimental_metadata(meta);
Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-sdk/benches/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ fn logging_comparable_to_appender(c: &mut Criterion) {
log_record.set_target("my-target".to_string());
log_record.set_event_name("CheckoutFailed");
log_record.set_severity_number(Severity::Warn);
log_record.set_severity_text("WARN".into());
log_record.set_severity_text("WARN");
log_record.add_attribute("book_id", "12345");
log_record.add_attribute("book_title", "Rust Programming Adventures");
log_record.add_attribute("message", "Unable to process checkout.");
Expand Down Expand Up @@ -275,7 +275,7 @@ fn criterion_benchmark(c: &mut Criterion) {
log_record.set_timestamp(now);
log_record.set_observed_timestamp(now);
log_record.set_severity_number(Severity::Warn);
log_record.set_severity_text(Severity::Warn.name().into());
log_record.set_severity_text(Severity::Warn.name());
logger.emit(log_record);
});

Expand All @@ -285,7 +285,7 @@ fn criterion_benchmark(c: &mut Criterion) {
log_record.set_timestamp(now);
log_record.set_observed_timestamp(now);
log_record.set_severity_number(Severity::Warn);
log_record.set_severity_text(Severity::Warn.name().into());
log_record.set_severity_text(Severity::Warn.name());
log_record.add_attribute("name", "my-event-name");
log_record.add_attribute("event.id", 20);
log_record.add_attribute("user.name", "otel");
Expand All @@ -299,7 +299,7 @@ fn criterion_benchmark(c: &mut Criterion) {
log_record.set_timestamp(now);
log_record.set_observed_timestamp(now);
log_record.set_severity_number(Severity::Warn);
log_record.set_severity_text(Severity::Warn.name().into());
log_record.set_severity_text(Severity::Warn.name());
log_record.add_attribute("name", "my-event-name");
log_record.add_attribute("event.id", 20);
log_record.add_attribute("user.name", "otel");
Expand Down Expand Up @@ -338,7 +338,7 @@ fn criterion_benchmark(c: &mut Criterion) {
log_record.set_timestamp(now);
log_record.set_observed_timestamp(now);
log_record.set_severity_number(Severity::Warn);
log_record.set_severity_text(Severity::Warn.name().into());
log_record.set_severity_text(Severity::Warn.name());
log_record.add_attributes(attributes.clone());
logger.emit(log_record);
});
Expand Down
26 changes: 13 additions & 13 deletions opentelemetry-sdk/benches/metric_gauge.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
The benchmark results:
criterion = "0.5.1"
OS: Ubuntu 22.04.3 LTS (5.15.146.1-microsoft-standard-WSL2)
Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs,
OS: Ubuntu 22.04.4 LTS (5.15.153.1-microsoft-standard-WSL2)
Hardware: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz, 16vCPUs,
RAM: 64.0 GB
| Test | Average time|
|--------------------------------|-------------|
| Gauge_Add_4 | 586 ns |
| Gauge_Add | 483.78 ns |
*/

use criterion::{criterion_group, criterion_main, Criterion};
Expand All @@ -26,6 +26,11 @@ thread_local! {
static CURRENT_RNG: RefCell<rngs::SmallRng> = RefCell::new(rngs::SmallRng::from_entropy());
}

static ATTRIBUTE_VALUES: [&str; 10] = [
"value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9",
"value10",
];

// Run this benchmark with:
// cargo bench --bench metric_gauge
fn create_gauge() -> Gauge<u64> {
Expand All @@ -42,13 +47,8 @@ fn criterion_benchmark(c: &mut Criterion) {
}

fn gauge_record(c: &mut Criterion) {
let attribute_values = [
"value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9",
"value10",
];

let gauge = create_gauge();
c.bench_function("Gauge_Add_4", |b| {
c.bench_function("Gauge_Add", |b| {
b.iter(|| {
// 4*4*10*10 = 1600 time series.
let rands = CURRENT_RNG.with(|rng| {
Expand All @@ -67,10 +67,10 @@ fn gauge_record(c: &mut Criterion) {
gauge.record(
1,
&[
KeyValue::new("attribute1", attribute_values[index_first_attribute]),
KeyValue::new("attribute2", attribute_values[index_second_attribute]),
KeyValue::new("attribute3", attribute_values[index_third_attribute]),
KeyValue::new("attribute4", attribute_values[index_fourth_attribute]),
KeyValue::new("attribute1", ATTRIBUTE_VALUES[index_first_attribute]),
KeyValue::new("attribute2", ATTRIBUTE_VALUES[index_second_attribute]),
KeyValue::new("attribute3", ATTRIBUTE_VALUES[index_third_attribute]),
KeyValue::new("attribute4", ATTRIBUTE_VALUES[index_fourth_attribute]),
],
);
});
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ mod tests {
let logger = logger_provider.logger("test-logger");
let mut log_record = logger.create_log_record();
log_record.set_severity_number(Severity::Error);
log_record.set_severity_text("Error".into());
log_record.set_severity_text("Error");

// Adding attributes using a vector with explicitly constructed Key and AnyValue objects.
log_record.add_attributes(vec![
Expand Down
31 changes: 14 additions & 17 deletions opentelemetry-sdk/src/logs/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{borrow::Cow, time::SystemTime};
/// is provided to `LogExporter`s as input.
pub struct LogRecord {
/// Event name. Optional as not all the logging API support it.
pub event_name: Option<Cow<'static, str>>,
pub event_name: Option<&'static str>,

/// Target of the log record
pub target: Option<Cow<'static, str>>,
Expand All @@ -26,7 +26,8 @@ pub struct LogRecord {
pub trace_context: Option<TraceContext>,

/// The original severity string from the source
pub severity_text: Option<Cow<'static, str>>,
pub severity_text: Option<&'static str>,

/// The corresponding severity value, normalized
pub severity_number: Option<Severity>,

Expand All @@ -38,11 +39,8 @@ pub struct LogRecord {
}

impl opentelemetry::logs::LogRecord for LogRecord {
fn set_event_name<T>(&mut self, name: T)
where
T: Into<Cow<'static, str>>,
{
self.event_name = Some(name.into());
fn set_event_name(&mut self, name: &'static str) {
self.event_name = Some(name);
}

// Sets the `target` of a record
Expand All @@ -61,7 +59,7 @@ impl opentelemetry::logs::LogRecord for LogRecord {
self.observed_timestamp = Some(timestamp);
}

fn set_severity_text(&mut self, severity_text: Cow<'static, str>) {
fn set_severity_text(&mut self, severity_text: &'static str) {
self.severity_text = Some(severity_text);
}

Expand Down Expand Up @@ -154,7 +152,7 @@ mod tests {
fn test_set_eventname() {
let mut log_record = LogRecord::default();
log_record.set_event_name("test_event");
assert_eq!(log_record.event_name, Some(Cow::Borrowed("test_event")));
assert_eq!(log_record.event_name, Some("test_event"));
}

#[test]
Expand Down Expand Up @@ -183,9 +181,8 @@ mod tests {
#[test]
fn test_set_severity_text() {
let mut log_record = LogRecord::default();
let severity_text: Cow<'static, str> = "ERROR".into(); // Explicitly typed
log_record.set_severity_text(severity_text);
assert_eq!(log_record.severity_text, Some(Cow::Borrowed("ERROR")));
log_record.set_severity_text("ERROR");
assert_eq!(log_record.severity_text, Some("ERROR"));
}

#[test]
Expand Down Expand Up @@ -247,11 +244,11 @@ mod tests {
#[test]
fn compare_log_record() {
let log_record = LogRecord {
event_name: Some(Cow::Borrowed("test_event")),
event_name: Some("test_event"),
target: Some(Cow::Borrowed("foo::bar")),
timestamp: Some(SystemTime::now()),
observed_timestamp: Some(SystemTime::now()),
severity_text: Some(Cow::Borrowed("ERROR")),
severity_text: Some("ERROR"),
severity_number: Some(Severity::Error),
body: Some(AnyValue::String("Test body".into())),
attributes: Some(vec![(Key::new("key"), AnyValue::String("value".into()))]),
Expand All @@ -267,20 +264,20 @@ mod tests {
assert_eq!(log_record, log_record_cloned);

let mut log_record_different = log_record.clone();
log_record_different.event_name = Some(Cow::Borrowed("different_event"));
log_record_different.event_name = Some("different_event");

assert_ne!(log_record, log_record_different);
}

#[test]
fn compare_log_record_target_borrowed_eq_owned() {
let log_record_borrowed = LogRecord {
event_name: Some(Cow::Borrowed("test_event")),
event_name: Some("test_event"),
..Default::default()
};

let log_record_owned = LogRecord {
event_name: Some(Cow::Owned("test_event".to_string())),
event_name: Some("test_event"),
..Default::default()
};

Expand Down
128 changes: 126 additions & 2 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,136 @@ mod last_value;
mod sum;

use core::fmt;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::{Add, AddAssign, Sub};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};

use aggregate::is_under_cardinality_limit;
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
use once_cell::sync::Lazy;
use opentelemetry::metrics::MetricsError;
use opentelemetry::{global, KeyValue};

use crate::metrics::AttributeSet;

pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Vec<KeyValue>> =
Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]);

/// Abstracts the update operation for a measurement.
pub(crate) trait Operation {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T);
}

struct Increment;

impl Operation for Increment {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T) {
tracker.add(value);
}
}

struct Assign;

impl Operation for Assign {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T) {
tracker.store(value);
}
}

/// The storage for sums.
///
/// This structure is parametrized by an `Operation` that indicates how
/// updates to the underlying value trackers should be performed.
pub(crate) struct ValueMap<T: Number<T>, O> {
/// Trackers store the values associated with different attribute sets.
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<T::AtomicTracker>>>,
/// Number of different attribute set stored in the `trackers` map.
count: AtomicUsize,
/// Indicates whether a value with no attributes has been stored.
has_no_attribute_value: AtomicBool,
/// Tracker for values with no attributes attached.
no_attribute_tracker: T::AtomicTracker,
phantom: PhantomData<O>,
}

impl<T: Number<T>, O> Default for ValueMap<T, O> {
fn default() -> Self {
ValueMap::new()
}
}

impl<T: Number<T>, O> ValueMap<T, O> {
fn new() -> Self {
ValueMap {
trackers: RwLock::new(HashMap::new()),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_tracker: T::new_atomic_tracker(),
count: AtomicUsize::new(0),
phantom: PhantomData,
}
}
}

impl<T: Number<T>, O: Operation> ValueMap<T, O> {
fn measure(&self, measurement: T, attributes: &[KeyValue]) {
if attributes.is_empty() {
O::update_tracker(&self.no_attribute_tracker, measurement);
self.has_no_attribute_value.store(true, Ordering::Release);
return;
}

let Ok(trackers) = self.trackers.read() else {
return;
};

// Try to retrieve and update the tracker with the attributes in the provided order first
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);
return;
}

// Try to retrieve and update the tracker with the attributes sorted.
let sorted_attrs = AttributeSet::from(attributes).into_vec();
if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);
return;
}

// Give up the read lock before acquiring the write lock.
drop(trackers);

let Ok(mut trackers) = self.trackers.write() else {
return;
};

// Recheck both the provided and sorted orders after acquiring the write lock
// in case another thread has pushed an update in the meantime.
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);
} else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_tracker = Arc::new(T::new_atomic_tracker());
O::update_tracker(&*new_tracker, measurement);

// Insert tracker with the attributes in the provided and sorted orders
trackers.insert(attributes.to_vec(), new_tracker.clone());
trackers.insert(sorted_attrs, new_tracker);

self.count.fetch_add(1, Ordering::SeqCst);
} else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) {
O::update_tracker(&**overflow_value, measurement);
} else {
let new_tracker = T::new_atomic_tracker();
O::update_tracker(&new_tracker, measurement);
trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
}
}
}

/// Marks a type that can have a value added and retrieved atomically. Required since
/// different types have different backing atomic mechanisms
Expand Down
Loading
Loading