Skip to content

Commit

Permalink
Metrics collect stress test
Browse files Browse the repository at this point in the history
  • Loading branch information
Mindaugas Vinkelis committed Oct 25, 2024
1 parent 2f65b54 commit 02be567
Show file tree
Hide file tree
Showing 19 changed files with 410 additions and 357 deletions.
39 changes: 7 additions & 32 deletions opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use rand::Rng;
use std::sync::{Arc, Weak};

use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use opentelemetry::{
metrics::{Counter, Histogram, MeterProvider as _, Result},
metrics::{Counter, Histogram, MeterProvider},
Key, KeyValue,
};
use opentelemetry_sdk::{
Expand All @@ -14,34 +13,10 @@ use opentelemetry_sdk::{
Aggregation, Instrument, InstrumentKind, ManualReader, Pipeline, SdkMeterProvider, Stream,
View,
},
testing::metrics::metric_reader::SharedReader,
Resource,
};

#[derive(Clone, Debug)]
struct SharedReader(Arc<dyn MetricReader>);

impl MetricReader for SharedReader {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
self.0.register_pipeline(pipeline)
}

fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> {
self.0.collect(rm)
}

fn force_flush(&self) -> Result<()> {
self.0.force_flush()
}

fn shutdown(&self) -> Result<()> {
self.0.shutdown()
}

fn temporality(&self, kind: InstrumentKind) -> Temporality {
self.0.temporality(kind)
}
}

// * Summary *

// rustc 1.68.0 (2c8cc3432 2023-03-06)
Expand Down Expand Up @@ -112,13 +87,13 @@ impl MetricReader for SharedReader {
// time: [726.87 ns 736.52 ns 747.09 ns]
fn bench_counter(view: Option<Box<dyn View>>, temporality: &str) -> (SharedReader, Counter<u64>) {
let rdr = if temporality == "cumulative" {
SharedReader(Arc::new(ManualReader::builder().build()))
SharedReader::new(ManualReader::builder().build())
} else {
SharedReader(Arc::new(
SharedReader::new(
ManualReader::builder()
.with_temporality(Temporality::Delta)
.build(),
))
)
};
let mut builder = SdkMeterProvider::builder().with_reader(rdr.clone());
if let Some(view) = view {
Expand Down Expand Up @@ -336,7 +311,7 @@ fn bench_histogram(bound_count: usize) -> (SharedReader, Histogram<u64>) {
.unwrap(),
);

let r = SharedReader(Arc::new(ManualReader::default()));
let r = SharedReader::new(ManualReader::default());
let mut builder = SdkMeterProvider::builder().with_reader(r.clone());
if let Some(view) = view {
builder = builder.with_view(view);
Expand Down Expand Up @@ -377,7 +352,7 @@ fn histograms(c: &mut Criterion) {
}

fn benchmark_collect_histogram(b: &mut Bencher, n: usize) {
let r = SharedReader(Arc::new(ManualReader::default()));
let r = SharedReader::new(ManualReader::default());
let mtr = SdkMeterProvider::builder()
.with_reader(r.clone())
.build()
Expand Down
35 changes: 35 additions & 0 deletions opentelemetry-sdk/src/testing/metrics/metric_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,38 @@ impl MetricReader for TestMetricReader {
Temporality::default()
}
}

/// Allow to clone [`MetricReader`], so it could be accessed in tests
#[derive(Clone, Debug)]
pub struct SharedReader(Arc<dyn MetricReader>);

impl SharedReader {
pub fn new<R>(reader: R) -> Self
where
R: MetricReader,
{
Self(Arc::new(reader))
}
}

impl MetricReader for SharedReader {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
self.0.register_pipeline(pipeline)
}

fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> {
self.0.collect(rm)
}

fn force_flush(&self) -> Result<()> {
self.0.force_flush()
}

fn shutdown(&self) -> Result<()> {
self.0.shutdown()
}

fn temporality(&self, kind: InstrumentKind) -> Temporality {
self.0.temporality(kind)
}
}
1 change: 0 additions & 1 deletion opentelemetry-sdk/src/testing/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@ pub use in_memory_exporter::{InMemoryMetricsExporter, InMemoryMetricsExporterBui

#[doc(hidden)]
pub mod metric_reader;
pub use metric_reader::TestMetricReader;
40 changes: 3 additions & 37 deletions stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,53 +4,19 @@ version = "0.1.0"
edition = "2021"
publish = false

[[bin]] # Bin to run the metrics stress tests for Counter
name = "metrics"
path = "src/metrics_counter.rs"
doc = false

[[bin]] # Bin to run the metrics stress tests for Gauge
name = "metrics_gauge"
path = "src/metrics_gauge.rs"
doc = false

[[bin]] # Bin to run the metrics stress tests for Histogram
name = "metrics_histogram"
path = "src/metrics_histogram.rs"
doc = false

[[bin]] # Bin to run the metrics overflow stress tests
name = "metrics_overflow"
path = "src/metrics_overflow.rs"
doc = false

[[bin]] # Bin to run the logs stress tests
name = "logs"
path = "src/logs.rs"
doc = false

[[bin]] # Bin to run the traces stress tests
name = "traces"
path = "src/traces.rs"
doc = false

[[bin]] # Bin to run the stress tests to show the cost of random number generation
name = "random"
path = "src/random.rs"
doc = false

[dependencies]
ctrlc = "3.2.5"
lazy_static = "1.4.0"
num_cpus = "1.15.0"
opentelemetry = { path = "../opentelemetry", features = ["metrics", "logs", "trace", "logs_level_enabled"] }
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "logs_level_enabled"] }
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "logs_level_enabled", "testing"] }
opentelemetry-appender-tracing = { path = "../opentelemetry-appender-tracing"}
rand = { version = "0.8.4", features = ["small_rng"] }
tracing = { workspace = true, features = ["std"]}
tracing-subscriber = { workspace = true, features = ["registry", "std"] }
num-format = "0.4.4"
sysinfo = { version = "0.30.12", optional = true }
clap = { version = "4.5.20", features = ["derive"] }

[features]
stats = ["sysinfo"]
stats = ["sysinfo"]
13 changes: 13 additions & 0 deletions stress/src/attributes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use opentelemetry::KeyValue;
use rand::{rngs::SmallRng, Rng};

pub fn random_attribute_set3(rng: &mut SmallRng, values: &[&'static str]) -> [KeyValue; 3] {
let len = values.len();
unsafe {
[
KeyValue::new("attribute1", *values.get_unchecked(rng.gen_range(0..len))),
KeyValue::new("attribute2", *values.get_unchecked(rng.gen_range(0..len))),
KeyValue::new("attribute3", *values.get_unchecked(rng.gen_range(0..len))),
]
}
}
2 changes: 1 addition & 1 deletion stress/src/logs.rs → stress/src/bin/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use opentelemetry_sdk::logs::{LogProcessor, LoggerProvider};
use tracing::error;
use tracing_subscriber::prelude::*;

mod throughput;
use stress::throughput;

#[derive(Debug)]
pub struct NoOpLogProcessor;
Expand Down
176 changes: 176 additions & 0 deletions stress/src/bin/metrics_collect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use std::{
ops::DerefMut,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Barrier,
},
time::{Duration, Instant},
};

use opentelemetry::metrics::{Histogram, MeterProvider};
use opentelemetry_sdk::{
metrics::{
data::{ResourceMetrics, Temporality},
reader::MetricReader,
ManualReader, SdkMeterProvider,
},
testing::metrics::metric_reader::SharedReader,
Resource,
};

use stress::{
attributes::random_attribute_set3,
globals::{ATTRIBUTE_VALUES, CURRENT_RNG},
};

use clap::{Parser, ValueEnum};

#[derive(Debug, Clone, Copy, ValueEnum)]
enum CliTemporality {
Cumulative,
Delta,
}

/// Simple program to greet a person
#[derive(Parser, Debug)]
#[command(
version,
about = "Measure metrics performance while collecting",
long_about = "The purpose of this test is to see how collecing interferre with measurements.\n\
Most of the test measure how fast is collecting phase, but more important is\n\
that it doesn't \"stop-the-world\" while collection phase is running."
)]
struct Cli {
/// Select collection phase temporality
temporality: CliTemporality,
}

fn main() {
let cli = Cli::parse();
let temporality = match cli.temporality {
CliTemporality::Cumulative => Temporality::Cumulative,
CliTemporality::Delta => Temporality::Delta,
};
let reader = SharedReader::new(
ManualReader::builder()
.with_temporality(temporality)
.build(),
);
let provider = SdkMeterProvider::builder()
.with_reader(reader.clone())
.build();
// use histogram, as it is a bit more complicated during
let histogram = provider.meter("test").u64_histogram("hello").build();

calculate_measurements_during_collection(histogram, reader).print_results();
}

fn calculate_measurements_during_collection(
histogram: Histogram<u64>,
reader: SharedReader,
) -> MeasurementResults {
// we don't need to use every single CPU, better leave other CPU for operating system work,
// so our running threads could be much more stable in performance.
// just for the record, this is has HUGE effect on my machine (laptop intel i7-1355u)
let num_threads = num_cpus::get() / 2;

let mut res = MeasurementResults {
total_measurements_count: 0,
total_time_collecting: 0,
num_iterations: 0,
};
let start = Instant::now();
while start.elapsed() < Duration::from_secs(3) {
res.num_iterations += 1;
let is_collecting = AtomicBool::new(false);
let measurements_while_collecting = AtomicUsize::new(0);
let time_while_collecting = AtomicUsize::new(0);
let barrier = Barrier::new(num_threads + 1);
std::thread::scope(|s| {
// first create bunch of measurements,
// so that collection phase wouldn't be "empty"
let mut handles = Vec::new();
for _t in 0..num_threads {
handles.push(s.spawn(|| {
for _i in 0..1000 {
CURRENT_RNG.with(|rng| {
histogram.record(
1,
&random_attribute_set3(
rng.borrow_mut().deref_mut(),
ATTRIBUTE_VALUES.as_ref(),
),
);
});
}
}));
}
for handle in handles {
handle.join().unwrap();
}

// simultaneously start collecting and creating more measurements
for _ in 0..num_threads - 1 {
s.spawn(|| {
barrier.wait();
let now = Instant::now();
let mut count = 0;
while is_collecting.load(Ordering::Acquire) {
CURRENT_RNG.with(|rng| {
histogram.record(
1,
&random_attribute_set3(
rng.borrow_mut().deref_mut(),
ATTRIBUTE_VALUES.as_ref(),
),
);
});
count += 1;
}
measurements_while_collecting.fetch_add(count, Ordering::AcqRel);
time_while_collecting
.fetch_add(now.elapsed().as_micros() as usize, Ordering::AcqRel);
});
}

let collect_handle = s.spawn(|| {
let mut rm = ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
};
is_collecting.store(true, Ordering::Release);
barrier.wait();
reader.collect(&mut rm).unwrap();
is_collecting.store(false, Ordering::Release);
});
barrier.wait();
collect_handle.join().unwrap();
});
res.total_measurements_count += measurements_while_collecting.load(Ordering::Acquire);
res.total_time_collecting += time_while_collecting.load(Ordering::Acquire);
}
res
}

struct MeasurementResults {
total_measurements_count: usize,
total_time_collecting: usize,
num_iterations: usize,
}

impl MeasurementResults {
fn print_results(&self) {
println!(
"{:>10.2} measurements/ms",
self.total_measurements_count as f32 / (self.total_time_collecting as f32 / 1000.0f32)
);
println!(
"{:>10.2} measurements/it",
self.total_measurements_count as f32 / self.num_iterations as f32,
);
println!(
"{:>10.2} μs/it",
self.total_time_collecting as f32 / self.num_iterations as f32,
);
}
}
Loading

0 comments on commit 02be567

Please sign in to comment.