Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
start stubbing out some docs for extents and templates
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Mar 15, 2024
1 parent 21055e1 commit 0b23c37
Show file tree
Hide file tree
Showing 13 changed files with 315 additions and 36 deletions.
17 changes: 16 additions & 1 deletion batcher/src/internal_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Counter {
}

impl InternalMetrics {
pub fn sample(&self) -> impl Iterator<Item = emit::metrics::Metric<'static>> + 'static {
pub fn sample(&self) -> impl Iterator<Item = emit::metrics::Metric<'static, emit::empty::Empty>> + 'static {
let InternalMetrics {
queue_overflow,
queue_batch_processed,
Expand All @@ -38,29 +38,44 @@ impl InternalMetrics {

[
emit::metrics::Metric::new(
"emit_batcher",
emit::empty::Empty,
stringify!(queue_overflow),
emit::well_known::METRIC_AGG_COUNT,
queue_overflow.sample(),
emit::empty::Empty,
),
emit::metrics::Metric::new(
"emit_batcher",
emit::empty::Empty,
stringify!(queue_batch_processed),
emit::well_known::METRIC_AGG_COUNT,
queue_batch_processed.sample(),
emit::empty::Empty,
),
emit::metrics::Metric::new(
"emit_batcher",
emit::empty::Empty,
stringify!(queue_batch_failed),
emit::well_known::METRIC_AGG_COUNT,
queue_batch_failed.sample(),
emit::empty::Empty,
),
emit::metrics::Metric::new(
"emit_batcher",
emit::empty::Empty,
stringify!(queue_batch_panicked),
emit::well_known::METRIC_AGG_COUNT,
queue_batch_panicked.sample(),
emit::empty::Empty,
),
emit::metrics::Metric::new(
"emit_batcher",
emit::empty::Empty,
stringify!(queue_batch_retry),
emit::well_known::METRIC_AGG_COUNT,
queue_batch_retry.sample(),
emit::empty::Empty,
),
]
.into_iter()
Expand Down
9 changes: 6 additions & 3 deletions batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl<T: Channel> Sender<T> {
}
}

pub fn sample_metrics(&self) -> impl Iterator<Item = emit::metrics::Metric<'static>> + 'static {
pub fn sample_metrics(&self) -> impl Iterator<Item = emit::metrics::Metric<'static, emit::empty::Empty>> + 'static {
self.shared.sample_metrics()
}
}
Expand Down Expand Up @@ -350,7 +350,7 @@ impl<T: Channel> Receiver<T> {
}
}

pub fn sample_metrics(&self) -> impl Iterator<Item = emit::metrics::Metric<'static>> + 'static {
pub fn sample_metrics(&self) -> impl Iterator<Item = emit::metrics::Metric<'static, emit::empty::Empty>> + 'static {
self.shared.sample_metrics()
}
}
Expand Down Expand Up @@ -434,13 +434,16 @@ struct Shared<T> {
}

impl<T: Channel> Shared<T> {
fn sample_metrics(&self) -> impl Iterator<Item = emit::metrics::Metric<'static>> + 'static {
fn sample_metrics(&self) -> impl Iterator<Item = emit::metrics::Metric<'static, emit::empty::Empty>> + 'static {
let queue_length = { self.state.lock().unwrap().next_batch.channel.remaining() };

self.metrics.sample().chain(Some(emit::metrics::Metric::new(
"emit_batcher",
emit::empty::Empty,
"queue_length",
emit::well_known::METRIC_AGG_LAST,
queue_length,
emit::empty::Empty,
)))
}
}
Expand Down
19 changes: 19 additions & 0 deletions core/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,20 @@ impl<'a, P> Event<'a, P> {
&self.module
}

pub fn with_module(mut self, module: impl Into<Path<'a>>) -> Self {
self.module = module.into();
self
}

pub fn extent(&self) -> Option<&Extent> {
self.extent.as_ref()
}

pub fn with_extent(mut self, extent: impl ToExtent) -> Self {
self.extent = extent.to_extent();
self
}

pub fn ts(&self) -> Option<&Timestamp> {
self.extent.as_ref().map(|extent| extent.as_point())
}
Expand All @@ -59,9 +69,18 @@ impl<'a, P> Event<'a, P> {
&self.tpl
}

pub fn with_tpl(mut self, tpl: impl Into<Template<'a>>) -> Self {
self.tpl = tpl.into();
self
}

pub fn props(&self) -> &P {
&self.props
}

pub fn with_props<U>(self, props: U) -> Event<'a, U> {
Event { module: self.module, extent: self.extent, tpl: self.tpl, props }
}
}

impl<'a, P: Props> Event<'a, P> {
Expand Down
4 changes: 2 additions & 2 deletions core/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub trait Filter {
}
}

fn wrap<E>(self, emitter: E) -> Wrap<Self, E>
fn wrap_emitter<E>(self, emitter: E) -> Wrap<Self, E>
where
Self: Sized,
{
Expand Down Expand Up @@ -109,7 +109,7 @@ impl<F: Filter, E: Emitter> Emitter for Wrap<F, E> {
}

pub fn wrap<F: Filter, E: Emitter>(filter: F, emitter: E) -> Wrap<F, E> {
filter.wrap(emitter)
filter.wrap_emitter(emitter)
}

pub struct And<T, U> {
Expand Down
Loading

0 comments on commit 0b23c37

Please sign in to comment.