Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
diagnostic improvements to HTTP client
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Jan 22, 2024
1 parent dcd13af commit 39f9c17
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 56 deletions.
22 changes: 22 additions & 0 deletions core/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ impl<'v> Value<'v> {
pub fn to_usize(&self) -> Option<usize> {
self.0.to_u64()?.try_into().ok()
}

pub fn to_i64(&self) -> Option<i64> {
self.0.to_i64()
}
}

pub trait Visitor<'v> {
Expand Down Expand Up @@ -238,6 +242,24 @@ impl<'v> FromValue<'v> for f64 {
}
}

impl ToValue for i64 {
fn to_value(&self) -> Value {
Value::from(*self)
}
}

impl<'v> From<i64> for Value<'v> {
fn from(value: i64) -> Self {
Value(value.into())
}
}

impl<'v> FromValue<'v> for i64 {
fn from_value(value: Value<'v>) -> Option<Self> {
value.to_i64()
}
}

#[cfg(feature = "alloc")]
mod alloc_support {
use super::*;
Expand Down
13 changes: 1 addition & 12 deletions macros/src/in_ctxt.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use proc_macro2::TokenStream;
use syn::{
parse::Parse, spanned::Spanned, Block, Expr, ExprAsync, ExprBlock, Item, ItemFn, Signature,
Stmt,
};
use syn::{spanned::Spanned, Block, Expr, ExprAsync, ExprBlock, Item, ItemFn, Signature, Stmt};

use crate::props::Props;

Expand All @@ -11,14 +8,6 @@ pub struct ExpandTokens {
pub input: TokenStream,
}

struct Args {}

impl Parse for Args {
fn parse(_: syn::parse::ParseStream) -> syn::Result<Self> {
Ok(Args {})
}
}

pub fn expand_tokens(opts: ExpandTokens) -> Result<TokenStream, syn::Error> {
let props = syn::parse2::<Props>(opts.input)?;

Expand Down
10 changes: 7 additions & 3 deletions targets/file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl FileSetBuilder {
let _ = receiver.blocking_exec(|mut batch: Buffer| {
use emit_batcher::Channel as _;

emit::debug!(rt: emit::runtime::internal(), "writing file batch of {batch_size: batch.remaining()} events");
let batch_size = batch.remaining();

let (mut file, path) = match active_file.take() {
Some(file) => file,
Expand All @@ -89,8 +89,10 @@ impl FileSetBuilder {

let mut path = PathBuf::from(dir.clone());

if let Err(e) = fs::create_dir_all(&path) {
return Err(emit_batcher::BatchError::retry(e, batch));
if let Err(err) = fs::create_dir_all(&path) {
emit::warn!(rt: emit::runtime::internal(), "failed to create root directory {#[emit::as_debug] path}: {err}");

return Err(emit_batcher::BatchError::retry(err, batch));
}

let file_ts = file_ts(self.roll_by, parts);
Expand Down Expand Up @@ -180,6 +182,8 @@ impl FileSetBuilder {

active_file = Some((file, path));

emit::debug!(rt: emit::runtime::internal(), "wrote file batch of {batch_size} events");

Ok(())
});
});
Expand Down
2 changes: 1 addition & 1 deletion targets/otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.0.0"
edition = "2021"

[features]
default = ["http"]
default = ["http", "grpc"]
http = ["dep:hyper", "dep:hyper-util"]
grpc = ["dep:prost", "dep:serde", "emit/serde"]
decode_responses = ["dep:prost"]
Expand Down
76 changes: 47 additions & 29 deletions targets/otlp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,43 +476,61 @@ impl RawClient {
) -> Result<PreEncoded, BatchError<Vec<PreEncoded>>>,
decode: Option<impl FnOnce(Result<&[u8], &[u8]>)>,
) -> Result<(), BatchError<Vec<PreEncoded>>> {
match self {
RawClient::Http {
ref http,
ref resource,
ref scope,
} => {
emit::debug!(rt: emit::runtime::internal(), "sending OTLP batch of {batch_size: batch.len()} events");

let res = http
.send(encode(resource.as_ref(), scope.as_ref(), &batch)?)
.await
.map_err(|err| {
emit::warn!(rt: emit::runtime::internal(), "failed to send OTLP request: {err}");

BatchError::no_retry(err)
})?;

if let Some(decode) = decode {
let status = res.status();
let body = res
.read_to_vec()
use emit::IdRng as _;

let rt = emit::runtime::internal();

let ctxt = emit::frame::Frame::new(
rt.ctxt(),
emit::props! {
trace_id: rt.gen_trace_id(),
span_id: rt.gen_span_id(),
},
);

ctxt.with_future(async move {
match self {
RawClient::Http {
ref http,
ref resource,
ref scope,
} => {
let batch_size = batch.len();

let timer = emit::clock::Timer::start(rt.clock());

let res = http
.send(encode(resource.as_ref(), scope.as_ref(), &batch)?)
.await
.map_err(|err| {
emit::warn!(rt: emit::runtime::internal(), "failed to read OTLP response: {err}");
emit::warn!(rt, extent: timer, "OTLP batch of {batch_size} failed to send: {err}");

BatchError::no_retry(err)
BatchError::retry(err, batch)
})?;

if status >= 200 && status < 300 {
decode(Ok(&body));
} else {
decode(Err(&body));
emit::debug!(rt, extent: timer, "OTLP batch of {batch_size} responded {status_code: res.status()}");

if let Some(decode) = decode {
let status = res.status();
let body = res
.read_to_vec()
.await
.map_err(|err| {
emit::warn!(rt, "failed to read OTLP response: {err}");

BatchError::no_retry(err)
})?;

if status >= 200 && status < 300 {
decode(Ok(&body));
} else {
decode(Err(&body));
}
}
}
}
}

Ok(())
Ok(())
}).await
}
}
32 changes: 26 additions & 6 deletions targets/otlp/src/client/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,13 @@ async fn send_request(
uri: &Uri,
body: PreEncoded,
) -> Result<hyper::Response<body::Incoming>, Error> {
let rt = emit::runtime::internal();

let res = sender
.send_request(
Request::builder()
.send_request({
use emit::{Ctxt as _, Props as _};

let req = Request::builder()
.uri(uri)
.method(Method::POST)
.header("host", uri.authority().unwrap().as_str())
Expand All @@ -130,10 +134,26 @@ async fn send_request(
match body {
PreEncoded::Proto(_) => "application/x-protobuf",
},
)
.body(HttpBody(Some(body.into_cursor())))
.map_err(Error::new)?,
)
);

// Propagate traceparent for the batch
let mut trace_id = None;
let mut span_id = None;

rt.ctxt().with_current(|props| {
trace_id = props.pull::<emit::TraceId>();
span_id = props.pull::<emit::SpanId>();
});

let req = if let (Some(trace_id), Some(span_id)) = (trace_id, span_id) {
req.header("traceparent", format!("00-{trace_id}-{span_id}-00"))
} else {
req
};

req.body(HttpBody(Some(body.into_cursor())))
.map_err(Error::new)?
})
.await
.map_err(Error::new)?;

Expand Down
6 changes: 6 additions & 0 deletions targets/otlp/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ impl PreEncoded {
PreEncoded::Proto(buf) => PreEncodedCursor::Proto(buf.into_cursor()),
}
}

pub fn to_vec(&self) -> Vec<u8> {
match self {
PreEncoded::Proto(buf) => buf.to_vec().into_owned(),
}
}
}

pub(crate) enum PreEncodedCursor {
Expand Down
1 change: 0 additions & 1 deletion targets/otlp/src/data/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ pub(crate) mod collector {
}
}

#[cfg(feature = "grpc")]
pub(crate) mod any_value {
use std::fmt;

Expand Down
Loading

0 comments on commit 39f9c17

Please sign in to comment.