diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 96e35aeba4..7d41cdf18d 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -590,6 +590,115 @@ mod tests { ); } + // "multi_thread" tokio flavor must be used else flush won't + // be able to make progress! + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + #[ignore = "Known bug: https://github.com/open-telemetry/opentelemetry-rust/issues/1598"] + async fn delta_memory_efficiency_test() { + // Run this test with stdout enabled to see output. + // cargo test delta_memory_efficiency_test --features=metrics,testing -- --nocapture + + // Arrange + let exporter = InMemoryMetricsExporterBuilder::new() + .with_temporality_selector(DeltaTemporalitySelector()) + .build(); + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + + // Act + let meter = meter_provider.meter("test"); + let counter = meter + .u64_counter("my_counter") + .with_unit(Unit::new("my_unit")) + .init(); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + + counter.add(1, &[KeyValue::new("key1", "value2")]); + counter.add(1, &[KeyValue::new("key1", "value2")]); + counter.add(1, &[KeyValue::new("key1", "value2")]); + + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert!(!resource_metrics.is_empty()); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_counter"); + assert_eq!(metric.unit.as_str(), "my_unit"); + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for Counter instruments by default"); + + // Expecting 2 time-series. + assert_eq!(sum.data_points.len(), 2); + assert!(sum.is_monotonic, "Counter should produce monotonic."); + assert_eq!( + sum.temporality, + data::Temporality::Delta, + "Should produce Delta as configured" + ); + + // find and validate key1=value1 datapoint + let mut data_point1 = None; + for datapoint in &sum.data_points { + if datapoint + .attributes + .iter() + .any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value1") + { + data_point1 = Some(datapoint); + } + } + assert_eq!( + data_point1 + .expect("datapoint with key1=value1 expected") + .value, + 5 + ); + + // find and validate key1=value2 datapoint + let mut data_point1 = None; + for datapoint in &sum.data_points { + if datapoint + .attributes + .iter() + .any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value2") + { + data_point1 = Some(datapoint); + } + } + assert_eq!( + data_point1 + .expect("datapoint with key1=value2 expected") + .value, + 3 + ); + + // flush again, and validate that nothing is flushed + // as delta temporality. + meter_provider.force_flush().unwrap(); + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + println!("resource_metrics: {:?}", resource_metrics); + assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect."); + } + + struct DeltaTemporalitySelector(); + impl TemporalitySelector for DeltaTemporalitySelector { + fn temporality(&self, _kind: InstrumentKind) -> Temporality { + Temporality::Delta + } + } + struct TestContext { exporter: InMemoryMetricsExporter, meter_provider: SdkMeterProvider,