Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
ensure generated futures code is Send
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Sep 30, 2023
1 parent f7a0fda commit 5a05b33
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 45 deletions.
2 changes: 1 addition & 1 deletion macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub fn with(
) -> proc_macro::TokenStream {
with::expand_tokens(with::ExpandTokens {
sync_receiver: quote!(__private::__private_with),
async_receiver: quote!(__private::__private_with_future),
async_receiver: quote!(__private::__private_with),
input: TokenStream::from(args),
item: TokenStream::from(item),
})
Expand Down
3 changes: 2 additions & 1 deletion macros/src/with.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ fn inject_async(props: &Props, receiver_tokens: TokenStream, body: TokenStream)
let props_tokens = props.props_tokens();

quote!({
emit::#receiver_tokens(#props_tokens, async #body).await
let __ctxt = emit::#receiver_tokens(#props_tokens);
__ctxt.into_future(async #body).await
})
}

Expand Down
13 changes: 1 addition & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
#[cfg(feature = "alloc")]
extern crate alloc;

use core::future::Future;

use emit_core::extent::ToExtent;

use crate::local_frame::{LocalFrame, LocalFrameFuture};
use crate::local_frame::LocalFrame;

#[doc(inline)]
pub use emit_macros::*;
Expand Down Expand Up @@ -68,15 +66,6 @@ fn base_with<C: Ctxt>(ctxt: C, props: impl Props) -> LocalFrame<C> {
LocalFrame::new(ctxt, props)
}

#[track_caller]
fn base_with_future<C: Ctxt, F: Future>(
ctxt: C,
props: impl Props,
future: F,
) -> LocalFrameFuture<C, F> {
LocalFrameFuture::new(ctxt, props, future)
}

#[track_caller]
pub fn emit(evt: &Event<impl Props>) {
let ambient = emit_core::ambient::get();
Expand Down
18 changes: 8 additions & 10 deletions src/local_frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ impl<C: Ctxt> LocalFrame<C> {
_marker: PhantomData,
}
}

#[track_caller]
pub fn into_future<F>(self, future: F) -> LocalFrameFuture<C, F> {
LocalFrameFuture {
frame: self,
future,
}
}
}

pub struct EnterGuard<'a, C: Ctxt> {
Expand All @@ -54,16 +62,6 @@ pub struct LocalFrameFuture<C: Ctxt, F> {
future: F,
}

impl<C: Ctxt, F> LocalFrameFuture<C, F> {
#[track_caller]
pub fn new(scope: C, props: impl Props, future: F) -> Self {
LocalFrameFuture {
frame: LocalFrame::new(scope, props),
future,
}
}
}

impl<C: Ctxt, F: Future> Future for LocalFrameFuture<C, F> {
type Output = F::Output;

Expand Down
19 changes: 4 additions & 15 deletions src/macro_hooks.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use core::{any::Any, fmt, future::Future, ops::ControlFlow, panic::Location};
use core::{any::Any, fmt, ops::ControlFlow, panic::Location};

use emit_core::{
ambient,
clock::Clock,
ctxt::Ctxt,
emitter::Emitter,
filter::Filter,
id::{SpanId, TraceId},
Expand All @@ -19,8 +18,8 @@ use emit_core::extent::ToExtent;
use std::error::Error;

use crate::{
base_emit, base_with, base_with_future,
local_frame::{LocalFrame, LocalFrameFuture},
base_emit, base_with,
local_frame::LocalFrame,
template::{Formatter, Part},
Key,
};
Expand Down Expand Up @@ -427,22 +426,12 @@ pub fn __private_emit(
}

#[track_caller]
pub fn __private_with(props: impl Props) -> LocalFrame<impl Ctxt + Send + Sync + 'static> {
pub fn __private_with(props: impl Props) -> LocalFrame<emit_core::ambient::Get> {
let ambient = ambient::get();

base_with(ambient, props)
}

#[track_caller]
pub fn __private_with_future<F: Future>(
props: impl Props,
future: F,
) -> LocalFrameFuture<impl Ctxt + Send + Sync + 'static, F> {
let ambient = ambient::get();

base_with_future(ambient, props, future)
}

#[track_caller]
pub fn caller() -> &'static Location<'static> {
Location::caller()
Expand Down
20 changes: 14 additions & 6 deletions tests/smoke-test/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ struct Work {
#[tokio::main]
async fn main() {
let emitter = emit::setup()
.to(emit_term::stdout())
.and_to(
.to(
emit_otlp::logs::http_proto("http://localhost:5341/ingest/otlp/v1/logs")
.resource(emit::props! {
#[emit::key("service.name")]
Expand All @@ -25,8 +24,19 @@ async fn main() {
)
.init();

let _ = in_ctxt(78).await;
let _ = in_ctxt(71).await;
let mut handles = Vec::new();

for i in 0..10 {
handles.push(tokio::spawn(async move {
for n in i * 5_000..i * 5_000 + 5_000 {
let _ = in_ctxt(n).await;
}
}));
}

for handle in handles {
let _ = handle.await;
}

emitter.blocking_flush(Duration::from_secs(5));
}
Expand All @@ -45,8 +55,6 @@ async fn in_ctxt(a: i32) -> Result<(), io::Error> {

emit::info!("working on {#[emit::as_serde] work}");

tokio::time::sleep(Duration::from_secs(1)).await;

if a % 2 == 0 {
Ok(())
} else {
Expand Down

0 comments on commit 5a05b33

Please sign in to comment.