diff --git a/batcher/src/internal_metrics.rs b/batcher/src/internal_metrics.rs index a656256..ff97ec5 100644 --- a/batcher/src/internal_metrics.rs +++ b/batcher/src/internal_metrics.rs @@ -27,7 +27,7 @@ impl Counter { } impl InternalMetrics { - pub fn sample(&self) -> impl Iterator> + 'static { + pub fn sample(&self) -> impl Iterator> + 'static { let InternalMetrics { queue_overflow, queue_batch_processed, @@ -38,29 +38,44 @@ impl InternalMetrics { [ emit::metrics::Metric::new( + "emit_batcher", + emit::empty::Empty, stringify!(queue_overflow), emit::well_known::METRIC_AGG_COUNT, queue_overflow.sample(), + emit::empty::Empty, ), emit::metrics::Metric::new( + "emit_batcher", + emit::empty::Empty, stringify!(queue_batch_processed), emit::well_known::METRIC_AGG_COUNT, queue_batch_processed.sample(), + emit::empty::Empty, ), emit::metrics::Metric::new( + "emit_batcher", + emit::empty::Empty, stringify!(queue_batch_failed), emit::well_known::METRIC_AGG_COUNT, queue_batch_failed.sample(), + emit::empty::Empty, ), emit::metrics::Metric::new( + "emit_batcher", + emit::empty::Empty, stringify!(queue_batch_panicked), emit::well_known::METRIC_AGG_COUNT, queue_batch_panicked.sample(), + emit::empty::Empty, ), emit::metrics::Metric::new( + "emit_batcher", + emit::empty::Empty, stringify!(queue_batch_retry), emit::well_known::METRIC_AGG_COUNT, queue_batch_retry.sample(), + emit::empty::Empty, ), ] .into_iter() diff --git a/batcher/src/lib.rs b/batcher/src/lib.rs index fd5812a..cdcdd81 100644 --- a/batcher/src/lib.rs +++ b/batcher/src/lib.rs @@ -149,7 +149,7 @@ impl Sender { } } - pub fn sample_metrics(&self) -> impl Iterator> + 'static { + pub fn sample_metrics(&self) -> impl Iterator> + 'static { self.shared.sample_metrics() } } @@ -350,7 +350,7 @@ impl Receiver { } } - pub fn sample_metrics(&self) -> impl Iterator> + 'static { + pub fn sample_metrics(&self) -> impl Iterator> + 'static { self.shared.sample_metrics() } } @@ -434,13 +434,16 @@ struct Shared { } impl Shared { - fn sample_metrics(&self) -> impl Iterator> + 'static { + fn sample_metrics(&self) -> impl Iterator> + 'static { let queue_length = { self.state.lock().unwrap().next_batch.channel.remaining() }; self.metrics.sample().chain(Some(emit::metrics::Metric::new( + "emit_batcher", + emit::empty::Empty, "queue_length", emit::well_known::METRIC_AGG_LAST, queue_length, + emit::empty::Empty, ))) } } diff --git a/core/src/event.rs b/core/src/event.rs index f1d7da6..99aa70b 100644 --- a/core/src/event.rs +++ b/core/src/event.rs @@ -40,10 +40,20 @@ impl<'a, P> Event<'a, P> { &self.module } + pub fn with_module(mut self, module: impl Into>) -> Self { + self.module = module.into(); + self + } + pub fn extent(&self) -> Option<&Extent> { self.extent.as_ref() } + pub fn with_extent(mut self, extent: impl ToExtent) -> Self { + self.extent = extent.to_extent(); + self + } + pub fn ts(&self) -> Option<&Timestamp> { self.extent.as_ref().map(|extent| extent.as_point()) } @@ -59,9 +69,18 @@ impl<'a, P> Event<'a, P> { &self.tpl } + pub fn with_tpl(mut self, tpl: impl Into>) -> Self { + self.tpl = tpl.into(); + self + } + pub fn props(&self) -> &P { &self.props } + + pub fn with_props(self, props: U) -> Event<'a, U> { + Event { module: self.module, extent: self.extent, tpl: self.tpl, props } + } } impl<'a, P: Props> Event<'a, P> { diff --git a/core/src/filter.rs b/core/src/filter.rs index 5b6b8f3..7362812 100644 --- a/core/src/filter.rs +++ b/core/src/filter.rs @@ -30,7 +30,7 @@ pub trait Filter { } } - fn wrap(self, emitter: E) -> Wrap + fn wrap_emitter(self, emitter: E) -> Wrap where Self: Sized, { @@ -109,7 +109,7 @@ impl Emitter for Wrap { } pub fn wrap(filter: F, emitter: E) -> Wrap { - filter.wrap(emitter) + filter.wrap_emitter(emitter) } pub struct And { diff --git a/core/src/props.rs b/core/src/props.rs index 317ca70..c1e5459 100644 --- a/core/src/props.rs +++ b/core/src/props.rs @@ -29,6 +29,30 @@ pub trait Props { value } + fn pull<'kv, V: FromValue<'kv>, K: ToStr>(&'kv self, key: K) -> Option { + self.get(key).and_then(|v| v.cast()) + } + + fn count(&self) -> usize { + let mut count = 0; + + self.for_each(|_, _| { + count += 1; + + ControlFlow::Continue(()) + }); + + count + } + + fn is_unique(&self) -> bool { + false + } + + fn is_sorted(&self) -> bool { + false + } + fn chain(self, other: U) -> Chain where Self: Sized, @@ -50,8 +74,14 @@ pub trait Props { ByRef(self) } - fn pull<'kv, V: FromValue<'kv>, K: ToStr>(&'kv self, key: K) -> Option { - self.get(key).and_then(|v| v.cast()) + #[cfg(feature = "alloc")] + fn dedup(self) -> Dedup + where + Self: Sized, + { + Dedup { + src: self, + } } } @@ -67,6 +97,10 @@ impl<'a, P: Props + ?Sized> Props for &'a P { (**self).get(key) } + fn count(&self) -> usize { + (**self).count() + } + fn pull<'kv, V: FromValue<'kv>, K: ToStr>(&'kv self, key: K) -> Option { (**self).pull(key) } @@ -101,6 +135,18 @@ impl Props for (K, V) { ) -> ControlFlow<()> { for_each(self.0.to_str(), self.1.to_value()) } + + fn count(&self) -> usize { + 1 + } + + fn is_sorted(&self) -> bool { + true + } + + fn is_unique(&self) -> bool { + true + } } impl Props for [P] { @@ -114,6 +160,10 @@ impl Props for [P] { ControlFlow::Continue(()) } + + fn count(&self) -> usize { + self.iter().map(|props| props.count()).sum() + } } impl Props for [T; N] @@ -126,6 +176,10 @@ where ) -> ControlFlow<()> { (self as &[_]).for_each(for_each) } + + fn count(&self) -> usize { + (self as &[_]).count() + } } #[cfg(feature = "alloc")] @@ -148,6 +202,18 @@ where fn get<'v, Q: ToStr>(&'v self, key: Q) -> Option> { self.get(key.to_str().as_ref()).map(|v| v.to_value()) } + + fn count(&self) -> usize { + self.len() + } + + fn is_unique(&self) -> bool { + true + } + + fn is_sorted(&self) -> bool { + true + } } #[cfg(feature = "std")] @@ -170,6 +236,14 @@ where fn get<'v, Q: ToStr>(&'v self, key: Q) -> Option> { self.get(key.to_str().as_ref()).map(|v| v.to_value()) } + + fn count(&self) -> usize { + self.len() + } + + fn is_unique(&self) -> bool { + true + } } impl Props for Empty { @@ -179,6 +253,10 @@ impl Props for Empty { ) -> ControlFlow<()> { ControlFlow::Continue(()) } + + fn count(&self) -> usize { + 0 + } } pub struct Chain { @@ -201,6 +279,11 @@ impl Props for Chain { &'kv self, mut for_each: F, ) -> ControlFlow<()> { + if self.first.is_sorted() && self.second.is_sorted() { + // Use a joining strategy that yields from `first` until `second` is more recent + todo!() + } + self.first.for_each(&mut for_each)?; self.second.for_each(for_each) } @@ -210,6 +293,14 @@ impl Props for Chain { self.first.get(key).or_else(|| self.second.get(key)) } + + fn count(&self) -> usize { + self.first.count() + self.second.count() + } + + fn is_sorted(&self) -> bool { + self.first.is_sorted() && self.second.is_sorted() + } } pub struct ByRef<'a, T: ?Sized>(&'a T); @@ -221,6 +312,10 @@ impl<'a, P: Props + ?Sized> Props for ByRef<'a, P> { ) -> ControlFlow<()> { self.0.for_each(for_each) } + + fn count(&self) -> usize { + self.0.count() + } } pub struct Filter { @@ -252,6 +347,55 @@ impl bool> Props for Filter { } } +#[cfg(feature = "alloc")] +mod alloc_support { + use super::*; + + pub struct Dedup

{ + pub(super) src: P, + } + + impl Props for Dedup

{ + fn for_each<'kv, F: FnMut(Str<'kv>, Value<'kv>) -> ControlFlow<()>>( + &'kv self, + mut for_each: F, + ) -> ControlFlow<()> { + if self.src.is_unique() { + return self.src.for_each(for_each); + } + + let mut seen = alloc::collections::BTreeMap::new(); + + self.src.for_each(|key, value| { + seen.entry(key).or_insert(value); + + ControlFlow::Continue(()) + }); + + for (key, value) in seen { + for_each(key, value)?; + } + + ControlFlow::Continue(()) + } + + fn get<'v, K: ToStr>(&'v self, key: K) -> Option> { + self.src.get(key) + } + + fn is_unique(&self) -> bool { + true + } + + fn is_sorted(&self) -> bool { + true + } + } +} + +#[cfg(feature = "alloc")] +pub use alloc_support::*; + mod internal { use core::ops::ControlFlow; @@ -264,6 +408,12 @@ mod internal { ) -> ControlFlow<()>; fn dispatch_get(&self, key: Str) -> Option; + + fn dispatch_count(&self) -> usize; + + fn dispatch_is_unique(&self) -> bool; + + fn dispatch_is_sorted(&self) -> bool; } pub trait SealedProps { @@ -292,6 +442,18 @@ impl internal::DispatchProps for P { fn dispatch_get<'v>(&'v self, key: Str) -> Option> { self.get(key) } + + fn dispatch_count(&self) -> usize { + self.count() + } + + fn dispatch_is_sorted(&self) -> bool { + self.is_sorted() + } + + fn dispatch_is_unique(&self) -> bool { + self.is_unique() + } } impl<'a> Props for dyn ErasedProps + 'a { @@ -305,4 +467,16 @@ impl<'a> Props for dyn ErasedProps + 'a { fn get<'v, K: ToStr>(&'v self, key: K) -> Option> { self.erase_props().0.dispatch_get(key.to_str()) } + + fn count(&self) -> usize { + self.erase_props().0.dispatch_count() + } + + fn is_sorted(&self) -> bool { + self.erase_props().0.dispatch_is_sorted() + } + + fn is_unique(&self) -> bool { + self.erase_props().0.dispatch_is_unique() + } } diff --git a/src/lib.rs b/src/lib.rs index cd4f0c7..ea981cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,7 @@ The core event model includes: - **Module (`module`):** The name of the component that triggered the event. - **Extent (`ts_start`..`ts`):** The point or span of time for which the event is relevant. -- **Template (`tpl`):** A human-readable description of the event that its properties may be interpolated into. +- **Template (`tpl` and `msg`):** A human-readable description of the event that its properties may be interpolated into. - **Properties (`props`):** The payload of the event. ## Extensions @@ -37,6 +37,14 @@ Emit doesn't hard-code common observability concepts into events. It instead rel - **Metric aggregation (`metric_agg`):** The aggregation over the data source the metric sample was computed with. - **Metric value (`metric_value`):** The sampled value from the metric source. - **Metric unit (`metric_unit`):** The unit the sampled value is in. + +## Extents + +The extent of an event is the time for which the event is relevant. This may be a single point in time if the event was triggered by something happening, or a span of time if the event was started at some point and completed at another. + +## Templates + +The primary source of information in an event is its _template_. A template is a human-readable description of an event with holes to interpolate values into. Templates are responsible for both capturing ambient state to package with an event, and to format that state into a human-readable representation. Templates are also useful as low-cardinality identifiers for a specific shaped event, independent of the properties captured by any particular instance of it. */ #![cfg_attr(not(feature = "std"), no_std)] diff --git a/src/macro_hooks.rs b/src/macro_hooks.rs index 50446e1..345f763 100644 --- a/src/macro_hooks.rs +++ b/src/macro_hooks.rs @@ -646,4 +646,16 @@ impl<'a> Props for __PrivateMacroProps<'a> { .ok() .and_then(|i| self.0[i].1.as_ref().map(|v| v.by_ref())) } + + fn count(&self) -> usize { + self.0.len() + } + + fn is_sorted(&self) -> bool { + true + } + + fn is_unique(&self) -> bool { + true + } } diff --git a/src/metrics.rs b/src/metrics.rs index 662f413..884696e 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,15 +1,11 @@ use core::ops::ControlFlow; use emit_core::{ - empty::Empty, - extent::{Extent, ToExtent}, - props::Props, - str::{Str, ToStr}, - value::{ToValue, Value}, - well_known::{KEY_METRIC_AGG, KEY_METRIC_NAME, KEY_METRIC_VALUE}, + empty::Empty, extent::{Extent, ToExtent}, path::Path, props::Props, str::{Str, ToStr}, value::{ToValue, Value}, well_known::{KEY_METRIC_AGG, KEY_METRIC_NAME, KEY_METRIC_VALUE} }; -pub struct Metric<'a, P = Empty> { +pub struct Metric<'a, P> { + module: Path<'a>, extent: Option, name: Str<'a>, agg: Str<'a>, @@ -17,23 +13,34 @@ pub struct Metric<'a, P = Empty> { props: P, } -impl<'a> Metric<'a> { +impl<'a, P> Metric<'a, P> { pub fn new( + module: impl Into>, + extent: impl ToExtent, name: impl Into>, agg: impl Into>, value: impl Into>, + props: P, ) -> Self { Metric { - extent: None, + module: module.into(), + extent: extent.to_extent(), name: name.into(), agg: agg.into(), value: value.into(), - props: Empty, + props, } } -} -impl<'a, P> Metric<'a, P> { + pub fn module(&self) -> &Path<'a> { + &self.module + } + + pub fn with_module(mut self, module: impl Into>) -> Self { + self.module = module.into(); + self + } + pub fn name(&self) -> &Str<'a> { &self.name } @@ -76,6 +83,7 @@ impl<'a, P> Metric<'a, P> { pub fn with_props(self, props: U) -> Metric<'a, U> { Metric { + module: self.module, extent: self.extent, name: self.name, agg: self.agg, @@ -85,6 +93,30 @@ impl<'a, P> Metric<'a, P> { } } +impl<'a, P: Props> Metric<'a, P> { + pub fn by_ref<'b>(&'b self) -> Metric<'b, ByRef<'b, P>> { + Metric { + module: self.module.by_ref(), + extent: self.extent.clone(), + name: self.name.by_ref(), + agg: self.agg.by_ref(), + value: self.value.by_ref(), + props: self.props.by_ref(), + } + } + + pub fn erase<'b>(&'b self) -> Metric<'b, &'b dyn ErasedProps> { + Metric { + module: self.module.by_ref(), + extent: self.extent.clone(), + name: self.name.by_ref(), + agg: self.agg.by_ref(), + value: self.value.by_ref(), + props: &self.props, + } + } +} + impl<'a, P> ToExtent for Metric<'a, P> { fn to_extent(&self) -> Option { self.extent.clone() diff --git a/targets/file/src/internal_metrics.rs b/targets/file/src/internal_metrics.rs index e876cfa..52e9517 100644 --- a/targets/file/src/internal_metrics.rs +++ b/targets/file/src/internal_metrics.rs @@ -33,7 +33,7 @@ impl Counter { } impl InternalMetrics { - pub fn sample(&self) -> impl Iterator> + 'static { + pub fn sample(&self) -> impl Iterator> + 'static { let InternalMetrics { file_set_read_failed, file_open_failed, @@ -46,39 +46,60 @@ impl InternalMetrics { [ emit::metrics::Metric::new( + "emit_file", + emit::empty::Empty, stringify!(file_set_read_failed), emit::well_known::METRIC_AGG_COUNT, file_set_read_failed.sample(), + emit::empty::Empty, ), emit::metrics::Metric::new( + "emit_file", + emit::empty::Empty, stringify!(file_open_failed), emit::well_known::METRIC_AGG_COUNT, file_open_failed.sample(), + emit::empty::Empty, ), emit::metrics::Metric::new( + "emit_file", + emit::empty::Empty, stringify!(file_create), emit::well_known::METRIC_AGG_COUNT, file_create.sample(), + emit::empty::Empty, ), emit::metrics::Metric::new( + "emit_file", + emit::empty::Empty, stringify!(file_create_failed), emit::well_known::METRIC_AGG_COUNT, file_create_failed.sample(), + emit::empty::Empty, ), emit::metrics::Metric::new( + "emit_file", + emit::empty::Empty, stringify!(file_write_failed), emit::well_known::METRIC_AGG_COUNT, file_write_failed.sample(), + emit::empty::Empty, ), emit::metrics::Metric::new( + "emit_file", + emit::empty::Empty, stringify!(file_delete), emit::well_known::METRIC_AGG_COUNT, file_delete.sample(), + emit::empty::Empty, ), emit::metrics::Metric::new( + "emit_file", + emit::empty::Empty, stringify!(file_delete_failed), emit::well_known::METRIC_AGG_COUNT, file_delete_failed.sample(), + emit::empty::Empty, ), ] .into_iter() diff --git a/targets/file/src/lib.rs b/targets/file/src/lib.rs index 3759610..f86d5df 100644 --- a/targets/file/src/lib.rs +++ b/targets/file/src/lib.rs @@ -120,14 +120,10 @@ pub struct FileSet { impl FileSet { pub fn sample_metrics<'a>( &'a self, - ) -> impl Iterator> + 'a { + ) -> impl Iterator> + 'a { self.sender .sample_metrics() - .map(|metric| { - let name = format!("file_{}", metric.name()); - - metric.with_name(name) - }) + .map(|metric| metric.with_module("emit_file")) .chain(self.metrics.sample()) } } diff --git a/targets/otlp/src/client.rs b/targets/otlp/src/client.rs index 4b11abd..b78ee1d 100644 --- a/targets/otlp/src/client.rs +++ b/targets/otlp/src/client.rs @@ -23,14 +23,10 @@ pub struct Otlp { impl Otlp { pub fn sample_metrics<'a>( &'a self, - ) -> impl Iterator> + 'a { + ) -> impl Iterator> + 'a { self.sender .sample_metrics() - .map(|metric| { - let name = format!("otlp_{}", metric.name()); - - metric.with_name(name) - }) + .map(|metric| metric.with_module("emit_otlp")) .chain(self.metrics.sample()) } } diff --git a/targets/otlp/src/internal_metrics.rs b/targets/otlp/src/internal_metrics.rs index 902e970..5075e5b 100644 --- a/targets/otlp/src/internal_metrics.rs +++ b/targets/otlp/src/internal_metrics.rs @@ -27,15 +27,18 @@ impl Counter { } impl InternalMetrics { - pub fn sample(&self) -> impl Iterator> + 'static { + pub fn sample(&self) -> impl Iterator> + 'static { let InternalMetrics { otlp_event_discarded, } = self; [emit::metrics::Metric::new( + "emit_otlp", + emit::empty::Empty, stringify!(otlp_event_discarded), emit::well_known::METRIC_AGG_COUNT, otlp_event_discarded.sample(), + emit::empty::Empty, )] .into_iter() } diff --git a/tests/smoke-test/main.rs b/tests/smoke-test/main.rs index 185c957..e20b72d 100644 --- a/tests/smoke-test/main.rs +++ b/tests/smoke-test/main.rs @@ -51,7 +51,7 @@ async fn main() { .unwrap()) .and_to(emit_term::stdout()) .and_to( - emit::level::min_level(emit::Level::Warn).wrap( + emit::level::min_level(emit::Level::Warn).wrap_emitter( emit_file::set("./target/logs/log.txt") .reuse_files(true) .roll_by_minute()