Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
adjust how responses are read
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Jan 22, 2024
1 parent 40a7ef4 commit 423ff2c
Show file tree
Hide file tree
Showing 11 changed files with 465 additions and 409 deletions.
16 changes: 16 additions & 0 deletions core/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,10 @@ mod std_support {
AmbientSlot(OnceLock::new())
}

pub fn is_enabled(&self) -> bool {
self.0.get().is_some()
}

pub fn init<TEmitter, TFilter, TCtxt, TClock, TRng>(
&self,
pipeline: Runtime<TEmitter, TFilter, TCtxt, TClock, TRng>,
Expand Down Expand Up @@ -465,6 +469,10 @@ mod std_support {
AmbientInternalSlot(AmbientSlot(OnceLock::new()))
}

pub fn is_enabled(&self) -> bool {
self.0.is_enabled()
}

pub fn init<TEmitter, TFilter, TCtxt, TClock, TRng>(
&self,
pipeline: Runtime<TEmitter, TFilter, TCtxt, TClock, TRng>,
Expand Down Expand Up @@ -502,6 +510,10 @@ mod no_std_support {
AmbientSlot {}
}

pub fn is_enabled(&self) -> bool {
false
}

pub fn get(&self) -> &Runtime {
const EMPTY_AMBIENT_RUNTIME: Runtime = Runtime::new();

Expand All @@ -514,6 +526,10 @@ mod no_std_support {
AmbientInternalSlot(AmbientSlot::new())
}

pub fn is_enabled(&self) -> bool {
false
}

pub fn get(&self) -> &Runtime {
self.0.get()
}
Expand Down
15 changes: 5 additions & 10 deletions targets/file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ impl FileSetBuilder {
});
});

Ok(FileSet { sender, handle })
Ok(FileSet {
sender,
_handle: handle,
})
}
}

Expand Down Expand Up @@ -273,10 +276,6 @@ fn read_file_ts(file_name: &str) -> Option<&str> {
file_name.split('.').skip(1).next()
}

fn read_file_id(file_name: &str) -> Option<&str> {
file_name.split('.').skip(2).next()
}

fn file_name(file_prefix: &str, file_ext: &str, ts: &str, id: &str) -> String {
format!("{}.{}.{}.{}", file_prefix, ts, id, file_ext)
}
Expand Down Expand Up @@ -356,7 +355,7 @@ impl emit_batcher::Channel for Buffer {

pub struct FileSet {
sender: emit_batcher::Sender<Buffer>,
handle: thread::JoinHandle<()>,
_handle: thread::JoinHandle<()>,
}

impl emit::Emitter for FileSet {
Expand Down Expand Up @@ -437,7 +436,3 @@ impl<'a, P: emit::Props> sval::Value for EventValue<'a, P> {
stream.record_end(None, None, None)
}
}

struct FileWriter {
receiver: emit_batcher::Receiver<Buffer>,
}
3 changes: 2 additions & 1 deletion targets/otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ version = "0.0.0"
edition = "2021"

[features]
default = ["http", "grpc"]
default = ["http"]
http = ["dep:hyper", "dep:hyper-util"]
grpc = ["dep:prost", "dep:serde", "emit/serde"]
decode_responses = ["dep:prost"]

[dependencies.emit]
path = "../../"
Expand Down
64 changes: 51 additions & 13 deletions targets/otlp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,20 @@ impl OtlpClientBuilder {

if let Some(client) = client.logs {
if let Err(e) = client
.send(logs, logs::encode_request, logs::decode_response)
.send(logs, logs::encode_request, {
#[cfg(feature = "decode_responses")]
{
if emit::runtime::internal_slot().is_enabled() {
Some(logs::decode_response)
} else {
None
}
}
#[cfg(not(feature = "decode_responses"))]
{
None::<fn(Result<&[u8], &[u8]>)>
}
})
.await
{
r = Err(e.map(|logs| Channel {
Expand All @@ -240,7 +253,20 @@ impl OtlpClientBuilder {

if let Some(client) = client.traces {
if let Err(e) = client
.send(traces, traces::encode_request, traces::decode_response)
.send(traces, traces::encode_request, {
#[cfg(feature = "decode_responses")]
{
if emit::runtime::internal_slot().is_enabled() {
Some(traces::decode_response)
} else {
None
}
}
#[cfg(not(feature = "decode_responses"))]
{
None::<fn(Result<&[u8], &[u8]>)>
}
})
.await
{
r = if let Err(re) = r {
Expand Down Expand Up @@ -325,29 +351,41 @@ impl RawClient {
Option<&PreEncoded>,
&[PreEncoded],
) -> Result<PreEncoded, BatchError<Vec<PreEncoded>>>,
decode: impl FnOnce(Result<&[u8], &[u8]>),
decode: Option<impl FnOnce(Result<&[u8], &[u8]>)>,
) -> Result<(), BatchError<Vec<PreEncoded>>> {
match self {
RawClient::Http {
ref http,
ref resource,
ref scope,
} => {
emit::debug!(rt: emit::runtime::internal(), "sending OTLP batch of {batch_size: batch.len()} events");

let res = http
.send(encode(resource.as_ref(), scope.as_ref(), &batch)?)
.await
.map_err(|e| BatchError::no_retry(e))?;
.map_err(|err| {
emit::warn!(rt: emit::runtime::internal(), "failed to send OTLP request: {err}");

let status = res.status();
let body = res
.read_to_vec()
.await
.map_err(|e| BatchError::no_retry(e))?;
BatchError::no_retry(err)
})?;

if let Some(decode) = decode {
let status = res.status();
let body = res
.read_to_vec()
.await
.map_err(|err| {
emit::warn!(rt: emit::runtime::internal(), "failed to read OTLP response: {err}");

BatchError::no_retry(err)
})?;

if status >= 200 && status < 300 {
decode(Ok(&body));
} else {
decode(Err(&body));
if status >= 200 && status < 300 {
decode(Ok(&body));
} else {
decode(Err(&body));
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion targets/otlp/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod any_value;
mod instrumentation_scope;
mod resource;

#[cfg(feature = "grpc")]
#[cfg(any(feature = "grpc", feature = "decode_responses"))]
pub(crate) mod generated;

pub use self::{any_value::*, instrumentation_scope::*, resource::*};
Expand Down
Loading

0 comments on commit 423ff2c

Please sign in to comment.