Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
clean up metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Apr 6, 2024
1 parent db8c4ad commit 6af76a9
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 344 deletions.
112 changes: 47 additions & 65 deletions batcher/src/internal_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,54 @@
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Default)]
pub(crate) struct InternalMetrics {
pub(crate) queue_overflow: Counter,
pub(crate) queue_batch_processed: Counter,
pub(crate) queue_batch_failed: Counter,
pub(crate) queue_batch_panicked: Counter,
pub(crate) queue_batch_retry: Counter,
macro_rules! metrics {
($container:ident {
$(
$name:ident: $ty:ty,
)*
}) => {
#[derive(Default)]
pub(crate) struct $container {
$(
pub(crate) $name: $ty
),*
}

impl $container {
pub fn sample(
&self,
) -> impl Iterator<Item = emit::metric::Metric<'static, emit::empty::Empty>> + 'static {
let $container {
$(
$name
),*
} = self;

[
$(
emit::metric::Metric::new(
env!("CARGO_PKG_NAME"),
emit::empty::Empty,
stringify!($name),
emit::well_known::METRIC_AGG_COUNT,
$name.sample(),
emit::empty::Empty,
)
),*
]
.into_iter()
}
}
};
}

metrics!(InternalMetrics {
queue_overflow: Counter,
queue_batch_processed: Counter,
queue_batch_failed: Counter,
queue_batch_panicked: Counter,
queue_batch_retry: Counter,
});

#[derive(Default)]
pub(crate) struct Counter(AtomicUsize);

Expand All @@ -25,61 +65,3 @@ impl Counter {
self.0.load(Ordering::Relaxed)
}
}

impl InternalMetrics {
pub fn sample(
&self,
) -> impl Iterator<Item = emit::metric::Metric<'static, emit::empty::Empty>> + 'static {
let InternalMetrics {
queue_overflow,
queue_batch_processed,
queue_batch_failed,
queue_batch_panicked,
queue_batch_retry,
} = self;

[
emit::metric::Metric::new(
"emit_batcher",
emit::empty::Empty,
stringify!(queue_overflow),
emit::well_known::METRIC_AGG_COUNT,
queue_overflow.sample(),
emit::empty::Empty,
),
emit::metric::Metric::new(
"emit_batcher",
emit::empty::Empty,
stringify!(queue_batch_processed),
emit::well_known::METRIC_AGG_COUNT,
queue_batch_processed.sample(),
emit::empty::Empty,
),
emit::metric::Metric::new(
"emit_batcher",
emit::empty::Empty,
stringify!(queue_batch_failed),
emit::well_known::METRIC_AGG_COUNT,
queue_batch_failed.sample(),
emit::empty::Empty,
),
emit::metric::Metric::new(
"emit_batcher",
emit::empty::Empty,
stringify!(queue_batch_panicked),
emit::well_known::METRIC_AGG_COUNT,
queue_batch_panicked.sample(),
emit::empty::Empty,
),
emit::metric::Metric::new(
"emit_batcher",
emit::empty::Empty,
stringify!(queue_batch_retry),
emit::well_known::METRIC_AGG_COUNT,
queue_batch_retry.sample(),
emit::empty::Empty,
),
]
.into_iter()
}
}
2 changes: 1 addition & 1 deletion batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ impl<T: Channel> Shared<T> {
let queue_length = { self.state.lock().unwrap().next_batch.channel.remaining() };

self.metrics.sample().chain(Some(emit::metric::Metric::new(
"emit_batcher",
env!("CARGO_PKG_NAME"),
emit::empty::Empty,
"queue_length",
emit::well_known::METRIC_AGG_LAST,
Expand Down
67 changes: 0 additions & 67 deletions core/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,73 +193,6 @@ impl<TEmitter: Emitter, TFilter: Filter, TCtxt: Ctxt, TClock: Clock, TRng: Rng>
}
}

impl<TEmitter: Emitter, TFilter: Filter, TCtxt: Ctxt, TClock: Clock, TRng: Rng> Emitter
for Runtime<TEmitter, TFilter, TCtxt, TClock, TRng>
{
fn emit<P: Props>(&self, evt: &Event<P>) {
self.emit(evt)
}

fn blocking_flush(&self, timeout: core::time::Duration) {
self.emitter.blocking_flush(timeout)
}
}

impl<TEmitter: Emitter, TFilter: Filter, TCtxt: Ctxt, TClock: Clock, TRng: Rng> Filter
for Runtime<TEmitter, TFilter, TCtxt, TClock, TRng>
{
fn matches<P: Props>(&self, evt: &Event<P>) -> bool {
self.filter.matches(evt)
}
}

impl<TEmitter: Emitter, TFilter: Filter, TCtxt: Ctxt, TClock: Clock, TRng: Rng> Ctxt
for Runtime<TEmitter, TFilter, TCtxt, TClock, TRng>
{
type Current = TCtxt::Current;
type Frame = TCtxt::Frame;

fn open_root<P: Props>(&self, props: P) -> Self::Frame {
self.ctxt.open_root(props)
}

fn open_push<P: Props>(&self, props: P) -> Self::Frame {
self.ctxt.open_push(props)
}

fn enter(&self, scope: &mut Self::Frame) {
self.ctxt.enter(scope)
}

fn with_current<R, F: FnOnce(&Self::Current) -> R>(&self, with: F) -> R {
self.ctxt.with_current(with)
}

fn exit(&self, scope: &mut Self::Frame) {
self.ctxt.exit(scope)
}

fn close(&self, span: Self::Frame) {
self.ctxt.close(span)
}
}

impl<TEmitter: Emitter, TFilter: Filter, TCtxt: Ctxt, TClock: Clock, TRng: Rng> Clock
for Runtime<TEmitter, TFilter, TCtxt, TClock, TRng>
{
fn now(&self) -> Option<Timestamp> {
self.clock.now()
}
}

impl<TEmitter: Emitter, TFilter: Filter, TCtxt: Ctxt, TClock: Clock, TRng: Rng> Rng
for Runtime<TEmitter, TFilter, TCtxt, TClock, TRng>
{
fn gen_u64(&self) -> Option<u64> {
self.rng.gen_u64()
}
}

pub struct AssertInternal<T>(pub T);

impl<T: Emitter> Emitter for AssertInternal<T> {
Expand Down
8 changes: 4 additions & 4 deletions src/macro_hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ pub fn __private_emit<'a, E: Emitter, F: Filter, C: Ctxt, T: Clock, R: Rng>(
source.into(),
FirstDefined(when, rt.filter()),
rt.ctxt(),
extent.to_extent().or_else(|| rt.now().to_extent()),
extent.to_extent().or_else(|| rt.clock().now().to_extent()),
tpl,
props,
);
Expand Down Expand Up @@ -561,15 +561,15 @@ pub fn __private_push_span_ctxt<'a, 'b, E: Emitter, F: Filter, C: Ctxt, T: Clock
}
}

let (mut trace_id, span_parent) = rt.with_current(|current| {
let (mut trace_id, span_parent) = rt.ctxt().with_current(|current| {
(
current.pull::<TraceId, _>(KEY_TRACE_ID),
current.pull::<SpanId, _>(KEY_SPAN_ID),
)
});

trace_id = trace_id.or_else(|| TraceId::random(rt));
let span_id = SpanId::random(rt);
trace_id = trace_id.or_else(|| TraceId::random(rt.rng()));
let span_id = SpanId::random(rt.rng());

let trace_ctxt = TraceContext {
trace_id,
Expand Down
20 changes: 20 additions & 0 deletions src/metric.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use core::ops::ControlFlow;

use emit_core::{
event::Event,
extent::{Extent, ToExtent},
path::Path,
props::{ByRef, ErasedProps, Props},
str::{Str, ToStr},
template::{self, Template},
value::{ToValue, Value},
well_known::{KEY_METRIC_AGG, KEY_METRIC_NAME, KEY_METRIC_VALUE},
};
Expand Down Expand Up @@ -96,6 +98,24 @@ impl<'a, P> Metric<'a, P> {
props,
}
}

pub fn to_event(&self) -> Event<&Self> {
// "{metric_agg} of {metric_name} is {metric_value}"
const TEMPLATE: &'static [template::Part<'static>] = &[
template::Part::hole("metric_agg"),
template::Part::text(" of "),
template::Part::hole("metric_name"),
template::Part::text(" is "),
template::Part::hole("metric_value"),
];

Event::new(
self.module.by_ref(),
self.extent.clone(),
Template::new(TEMPLATE),
self,
)
}
}

impl<'a, P: Props> Metric<'a, P> {
Expand Down
Loading

0 comments on commit 6af76a9

Please sign in to comment.