Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
use Event::module as the instrumentation scope
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed May 27, 2024
1 parent c8c70d3 commit 9655494
Show file tree
Hide file tree
Showing 16 changed files with 468 additions and 351 deletions.
78 changes: 45 additions & 33 deletions batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,25 +171,30 @@ impl<T> Drop for Receiver<T> {
}

pub struct BatchError<T> {
retryable: T,
retryable: Option<T>,
}

impl<T: Channel> BatchError<T> {
pub fn retry(_: impl std::error::Error + Send + Sync + 'static, retryable: T) -> Self {
BatchError { retryable }
impl<T> BatchError<T> {
pub fn no_retry(_: impl std::error::Error + Send + Sync + 'static) -> Self {
BatchError { retryable: None }
}

pub fn no_retry(_: impl std::error::Error + Send + Sync + 'static) -> Self {
pub fn retry(_: impl std::error::Error + Send + Sync + 'static, retryable: T) -> Self {
BatchError {
retryable: T::new(),
retryable: Some(retryable),
}
}

pub fn into_retryable(self) -> T {
pub fn into_retryable(self) -> Option<T> {
self.retryable
}

pub fn map<U>(self, f: impl FnOnce(T) -> U) -> BatchError<U> {
/**
Map the retryable batch.
If the batch is already retryable, the input to `f` will be `Some`. The resulting batch is retryable if `f` returns `Some`.
*/
pub fn map_retryable<U>(self, f: impl FnOnce(Option<T>) -> Option<U>) -> BatchError<U> {
BatchError {
retryable: f(self.retryable),
}
Expand Down Expand Up @@ -299,37 +304,44 @@ impl<T: Channel> Receiver<T> {
loop {
match panic::catch_unwind(AssertUnwindSafe(|| on_batch(current_batch.channel)))
{
Ok(on_batch) => match CatchUnwind(AssertUnwindSafe(on_batch)).await {
Ok(Ok(())) => {
self.shared.metrics.queue_batch_processed.increment();
}
Ok(Err(BatchError { retryable })) => {
self.shared.metrics.queue_batch_failed.increment();

if retryable.remaining() > 0 && self.retry.next() {
// Delay a bit before trying again; this gives the external service
// a chance to get itself together
wait(self.retry_delay.next()).await;

current_batch = Batch {
channel: retryable,
watchers: current_batch.watchers,
};

self.shared.metrics.queue_batch_retry.increment();
continue;
Ok(on_batch_future) => {
match CatchUnwind(AssertUnwindSafe(on_batch_future)).await {
Ok(Ok(())) => {
self.shared.metrics.queue_batch_processed.increment();
break;
}
Ok(Err(BatchError { retryable })) => {
self.shared.metrics.queue_batch_failed.increment();

if let Some(retryable) = retryable {
if retryable.remaining() > 0 && self.retry.next() {
// Delay a bit before trying again; this gives the external service
// a chance to get itself together
wait(self.retry_delay.next()).await;

current_batch = Batch {
channel: retryable,
watchers: current_batch.watchers,
};

self.shared.metrics.queue_batch_retry.increment();
continue;
}
}

break;
}
Err(_) => {
self.shared.metrics.queue_batch_panicked.increment();
break;
}
}
Err(_) => {
self.shared.metrics.queue_batch_panicked.increment();
}
},
}
Err(_) => {
self.shared.metrics.queue_batch_panicked.increment();
break;
}
}

break;
}

// After the batch has been emitted, notify any watchers
Expand Down
19 changes: 12 additions & 7 deletions batcher/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,33 @@ impl Trigger {
pub fn wait_timeout(&self, mut timeout: Duration) -> bool {
let mut flushed_slot = (self.0).0.lock().unwrap();
loop {
// If we flushed then return
// This condition may already be set before we start waiting
if *flushed_slot {
return true;
}

let now = Instant::now();
match (self.0).1.wait_timeout(flushed_slot, timeout).unwrap() {
(flushed, r) if !r.timed_out() => {
// If we flushed then return
if *flushed {
return true;
}

flushed_slot = flushed;

// Reduce the remaining timeout just in case we didn't time out,
// but woke up spuriously for some reason
timeout = match timeout.checked_sub(now.elapsed()) {
Some(timeout) => timeout,
// We didn't time out, but got close enough that we should now anyways
None => return false,
None => {
return *flushed_slot;
}
};

continue;
}
// Timed out
_ => return false,
(flushed, _) => {
return *flushed;
}
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,15 @@ fn main() {
extern crate alloc;
extern crate core;

pub use core::module_path as module;
/**
Get a [`Path`] of the executing module for use in [`Event::module`].
*/
#[macro_export]
macro_rules! module {
() => {
$crate::Path::new($crate::__private::core::module_path!())
};
}

#[doc(inline)]
pub use emit_macros::*;
Expand Down
2 changes: 2 additions & 0 deletions src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ where
This method initializes [`crate::runtime::shared`].
*/
#[must_use = "call `blocking_flush` at the end of `main` to ensure events are flushed."]
#[cfg(feature = "implicit_rt")]
pub fn init(self) -> Init<&'static TEmitter, &'static TCtxt> {
self.init_slot(emit_core::runtime::shared_slot())
}
Expand Down Expand Up @@ -275,6 +276,7 @@ where
This method initializes [`crate::runtime::internal`].
*/
#[must_use = "call `blocking_flush` at the end of `main` (after flushing the main runtime) to ensure events are flushed."]
#[cfg(feature = "implicit_rt")]
pub fn init_internal(self) -> Init<&'static TEmitter, &'static TCtxt> {
let ambient = emit_core::runtime::internal_slot()
.init(
Expand Down
Loading

0 comments on commit 9655494

Please sign in to comment.