Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
use regular macros in targets
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Feb 12, 2024
1 parent dcf2360 commit 14843b6
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 142 deletions.
2 changes: 1 addition & 1 deletion core/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub trait Filter {
}
}

fn or_filter<U>(self, other: U) -> Or<Self, U>
fn or_when<U>(self, other: U) -> Or<Self, U>
where
Self: Sized,
{
Expand Down
11 changes: 10 additions & 1 deletion macros/src/emit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct ExpandTokens {
}

struct Args {
rt: TokenStream,
extent: TokenStream,
to: TokenStream,
when: TokenStream,
Expand All @@ -25,6 +26,11 @@ impl Parse for Args {

Ok(quote!(#expr))
});
let mut rt = Arg::token_stream("rt", |fv| {
let expr = &fv.expr;

Ok(quote!(#expr))
});
let mut to = Arg::token_stream("to", |fv| {
let expr = &fv.expr;

Expand All @@ -38,11 +44,12 @@ impl Parse for Args {

args::set_from_field_values(
input.parse_terminated(FieldValue::parse, Token![,])?.iter(),
[&mut extent, &mut to, &mut when],
[&mut extent, &mut rt, &mut to, &mut when],
)?;

Ok(Args {
extent: extent.take().unwrap_or_else(|| quote!(emit::empty::Empty)),
rt: rt.take().unwrap_or_else(|| quote!(emit::runtime::shared())),
to: to.take().unwrap_or_else(|| quote!(emit::empty::Empty)),
when: when.take().unwrap_or_else(|| quote!(emit::empty::Empty)),
})
Expand All @@ -59,6 +66,7 @@ pub fn expand_tokens(opts: ExpandTokens) -> Result<TokenStream, syn::Error> {
let props_tokens = props.match_bound_tokens();

let extent_tokens = args.extent;
let rt_tokens = args.rt;
let to_tokens = args.to;
let when_tokens = args.when;

Expand All @@ -68,6 +76,7 @@ pub fn expand_tokens(opts: ExpandTokens) -> Result<TokenStream, syn::Error> {
match (#(#props_match_input_tokens),*) {
(#(#props_match_binding_tokens),*) => {
emit::__private::__private_emit(
#rt_tokens,
#to_tokens,
#when_tokens,
#extent_tokens,
Expand Down
21 changes: 18 additions & 3 deletions macros/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@ pub struct ExpandTokens {
}

struct Args {
rt: TokenStream,
to: TokenStream,
when: TokenStream,
arg: Option<Ident>,
}

impl Parse for Args {
fn parse(input: syn::parse::ParseStream) -> syn::Result<Self> {
let mut rt = Arg::token_stream("rt", |fv| {
let expr = &fv.expr;

Ok(quote!(#expr))
});
let mut to = Arg::token_stream("to", |fv| {
let expr = &fv.expr;

Expand All @@ -39,10 +45,11 @@ impl Parse for Args {

args::set_from_field_values(
input.parse_terminated(FieldValue::parse, Token![,])?.iter(),
[&mut arg, &mut to, &mut when],
[&mut arg, &mut rt, &mut to, &mut when],
)?;

Ok(Args {
rt: rt.take().unwrap_or_else(|| quote!(emit::runtime::shared())),
to: to.take().unwrap_or_else(|| quote!(emit::empty::Empty)),
when: when.take().unwrap_or_else(|| quote!(emit::empty::Empty)),
arg: arg.take(),
Expand Down Expand Up @@ -71,6 +78,7 @@ pub fn expand_tokens(opts: ExpandTokens) -> Result<TokenStream, syn::Error> {
..
})) => {
**block = syn::parse2::<Block>(inject_sync(
&args.rt,
&args.to,
&args.when,
&template,
Expand All @@ -83,6 +91,7 @@ pub fn expand_tokens(opts: ExpandTokens) -> Result<TokenStream, syn::Error> {
// A synchronous block
Stmt::Expr(Expr::Block(ExprBlock { block, .. }), _) => {
*block = syn::parse2::<Block>(inject_sync(
&args.rt,
&args.to,
&args.when,
&template,
Expand All @@ -101,6 +110,7 @@ pub fn expand_tokens(opts: ExpandTokens) -> Result<TokenStream, syn::Error> {
..
})) => {
**block = syn::parse2::<Block>(inject_async(
&args.rt,
&args.to,
&args.when,
&template,
Expand All @@ -113,6 +123,7 @@ pub fn expand_tokens(opts: ExpandTokens) -> Result<TokenStream, syn::Error> {
// An asynchronous block
Stmt::Expr(Expr::Async(ExprAsync { block, .. }), _) => {
*block = syn::parse2::<Block>(inject_async(
&args.rt,
&args.to,
&args.when,
&template,
Expand All @@ -129,6 +140,7 @@ pub fn expand_tokens(opts: ExpandTokens) -> Result<TokenStream, syn::Error> {
}

fn inject_sync(
rt_tokens: &TokenStream,
to_tokens: &TokenStream,
when_tokens: &TokenStream,
template: &Template,
Expand All @@ -142,11 +154,12 @@ fn inject_sync(
let template_tokens = template.template_tokens();

quote!({
let (mut __ctxt, __timer) = emit::__private::__private_push_span_ctxt(#when_tokens, #template_tokens, #ctxt_props_tokens, #evt_props_tokens);
let (mut __ctxt, __timer) = emit::__private::__private_push_span_ctxt(#rt_tokens, #when_tokens, #template_tokens, #ctxt_props_tokens, #evt_props_tokens);
let __ctxt_guard = __ctxt.enter();

let #span_arg = emit::__private::__private_begin_span(__timer, |extent| {
emit::__private::__private_emit(
#rt_tokens,
#to_tokens,
emit::empty::Empty,
extent,
Expand All @@ -160,6 +173,7 @@ fn inject_sync(
}

fn inject_async(
rt_tokens: &TokenStream,
to_tokens: &TokenStream,
when_tokens: &TokenStream,
template: &Template,
Expand All @@ -173,11 +187,12 @@ fn inject_async(
let template_tokens = template.template_tokens();

quote!({
let (__ctxt, __timer) = emit::__private::__private_push_span_ctxt(#when_tokens, #template_tokens, #ctxt_props_tokens, #evt_props_tokens);
let (__ctxt, __timer) = emit::__private::__private_push_span_ctxt(#rt_tokens, #when_tokens, #template_tokens, #ctxt_props_tokens, #evt_props_tokens);

__ctxt.with_future(async {
let #span_arg = emit::__private::__private_begin_span(__timer, |extent| {
emit::__private::__private_emit(
#rt_tokens,
#to_tokens,
emit::empty::Empty,
extent,
Expand Down
36 changes: 8 additions & 28 deletions src/macro_hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use emit_core::{
extent::{Extent, ToExtent},
filter::Filter,
props::Props,
rng::Rng,
runtime::Runtime,
str::ToStr,
template::Template,
value::{ToValue, Value},
Expand All @@ -16,7 +18,6 @@ use emit_core::{
ctxt::Ctxt,
empty::Empty,
event::Event,
runtime::{AmbientClock, AmbientCtxt},
well_known::{SPAN_ID_KEY, SPAN_PARENT_KEY, TRACE_ID_KEY},
};

Expand Down Expand Up @@ -451,15 +452,14 @@ pub fn __private_format(tpl: Template, props: impl Props) -> alloc::string::Stri
}

#[track_caller]
pub fn __private_emit(
pub fn __private_emit<'a, E: Emitter, F: Filter, C: Ctxt, T: Clock, R: Rng>(
rt: &'a Runtime<E, F, C, T, R>,
to: impl Emitter,
when: impl Filter,
extent: impl ToExtent,
tpl: Template,
props: impl Props,
) {
let rt = crate::runtime::shared();

base_emit(
rt.emitter().and_to(to),
rt.filter().and_when(when),
Expand All @@ -472,31 +472,13 @@ pub fn __private_emit(

#[track_caller]
#[cfg(feature = "alloc")]
pub fn __private_push_ctxt(props: impl Props) -> Frame<AmbientCtxt<'static>> {
let rt = crate::runtime::shared();

Frame::new_push(rt.ctxt(), props)
}

#[track_caller]
#[cfg(feature = "alloc")]
pub fn __private_root_ctxt(props: impl Props) -> Frame<AmbientCtxt<'static>> {
let rt = crate::runtime::shared();

Frame::new_root(rt.ctxt(), props)
}

#[track_caller]
#[cfg(feature = "alloc")]
pub fn __private_push_span_ctxt(
pub fn __private_push_span_ctxt<'a, E: Emitter, F: Filter, C: Ctxt, T: Clock, R: Rng>(
rt: &'a Runtime<E, F, C, T, R>,
when: impl Filter,
tpl: Template,
ctxt_props: impl Props,
evt_props: impl Props,
) -> (
Frame<Option<AmbientCtxt<'static>>>,
Option<Timer<AmbientClock<'static>>>,
) {
) -> (Frame<Option<&'a C>>, Option<Timer<&'a T>>) {
struct TraceContext {
trace_id: Option<TraceId>,
span_parent: Option<SpanId>,
Expand Down Expand Up @@ -524,8 +506,6 @@ pub fn __private_push_span_ctxt(
}
}

let rt = crate::runtime::shared();

let mut trace_id = None;
let mut span_parent = None;

Expand All @@ -543,7 +523,7 @@ pub fn __private_push_span_ctxt(
span_id,
};

let timer = Timer::start(*rt.clock());
let timer = Timer::start(rt.clock());

if when.matches(&Event::new(
timer.extent().map(|extent| *extent.as_point()),
Expand Down
74 changes: 43 additions & 31 deletions targets/file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,8 @@ impl Worker {
}
}

#[emit::span(rt: emit::runtime::internal(), arg: span, "on_batch")]
fn on_batch(&mut self, mut batch: EventBatch) -> Result<(), BatchError<EventBatch>> {
let rt = emit::runtime::internal();

let timer = emit::timer::Timer::start(rt);

let now = std::time::UNIX_EPOCH.elapsed().unwrap();
let ts = emit::Timestamp::new(now).unwrap();
let parts = ts.to_parts();
Expand All @@ -257,25 +254,30 @@ impl Worker {

if file.is_none() {
if let Err(err) = fs::create_dir_all(&self.dir) {
rt.emit(&emit::warn_event!(
"failed to create root directory {path}: {err}",
#[emit::as_debug]
path: &self.dir,
err,
));
span.complete(|extent| {
emit::warn!(
rt: emit::runtime::internal(),
extent,
"failed to create root directory {path}: {err}",
#[emit::as_debug]
path: &self.dir,
err,
)
});

return Err(emit_batcher::BatchError::retry(err, batch));
}

let _ = file_set
.read(&self.file_prefix, &self.file_ext)
.map_err(|err| {
rt.emit(&emit::warn_event!(
emit::warn!(
rt: emit::runtime::internal(),
"failed to files in read {path}: {err}",
#[emit::as_debug]
path: &file_set.dir,
err,
));
);

err
});
Expand All @@ -287,12 +289,13 @@ impl Worker {

file = ActiveFile::try_open_reuse(&path)
.map_err(|err| {
rt.emit(&emit::warn_event!(
emit::warn!(
rt: emit::runtime::internal(),
"failed to open {path}: {err}",
#[emit::as_debug]
path,
err,
));
);

err
})
Expand Down Expand Up @@ -325,21 +328,23 @@ impl Worker {

match ActiveFile::try_open_create(&path) {
Ok(file) => {
rt.emit(&emit::warn_event!(
emit::debug!(
rt: emit::runtime::internal(),
"created {path}",
#[emit::as_debug]
path: file.file_path,
));
);

file
}
Err(err) => {
rt.emit(&emit::warn_event!(
emit::warn!(
rt: emit::runtime::internal(),
"failed to create {path}: {err}",
#[emit::as_debug]
path,
err,
));
);

return Err(emit_batcher::BatchError::retry(err, batch));
}
Expand All @@ -350,12 +355,16 @@ impl Worker {

while let Some(buf) = batch.current() {
if let Err(err) = file.write_event(buf.as_bytes()) {
rt.emit(&emit::warn_event!(
"failed to write event to {path}: {err}",
#[emit::as_debug]
path: file.file_path,
err,
));
span.complete(|extent| {
emit::warn!(
rt: emit::runtime::internal(),
extent,
"failed to write event to {path}: {err}",
#[emit::as_debug]
path: file.file_path,
err,
)
});

return Err(emit_batcher::BatchError::retry(err, batch));
}
Expand All @@ -370,13 +379,16 @@ impl Worker {
.sync_all()
.map_err(|e| emit_batcher::BatchError::no_retry(e))?;

rt.emit(&emit::debug_event!(
extent: timer,
"wrote {written_bytes} bytes to {path}",
written_bytes,
#[emit::as_debug]
path: file.file_path,
));
span.complete(|extent| {
emit::debug!(
rt: emit::runtime::internal(),
extent,
"wrote {written_bytes} bytes to {path}",
written_bytes,
#[emit::as_debug]
path: file.file_path,
)
});

// Set the active file so the next batch can attempt to use it
// At this point the file is expected to be valid
Expand Down
Loading

0 comments on commit 14843b6

Please sign in to comment.