Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
only send non-empty batches
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Jan 22, 2024
1 parent 52886f5 commit b447551
Showing 1 changed file with 86 additions and 80 deletions.
166 changes: 86 additions & 80 deletions targets/otlp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,95 +296,101 @@ impl OtlpClientBuilder {

let mut r = Ok(());

if let Some(client) = client.logs {
if let Err(e) = client
.send(logs, logs::encode_request, {
#[cfg(feature = "decode_responses")]
{
if emit::runtime::internal_slot().is_enabled() {
Some(logs::decode_response)
} else {
None
if logs.len() > 0 {
if let Some(client) = client.logs {
if let Err(e) = client
.send(logs, logs::encode_request, {
#[cfg(feature = "decode_responses")]
{
if emit::runtime::internal_slot().is_enabled() {
Some(logs::decode_response)
} else {
None
}
}
}
#[cfg(not(feature = "decode_responses"))]
{
None::<fn(Result<&[u8], &[u8]>)>
}
})
.await
{
r = Err(e.map(|logs| Channel {
logs,
traces: Vec::new(),
metrics: Vec::new(),
}));
#[cfg(not(feature = "decode_responses"))]
{
None::<fn(Result<&[u8], &[u8]>)>
}
})
.await
{
r = Err(e.map(|logs| Channel {
logs,
traces: Vec::new(),
metrics: Vec::new(),
}));
}
}
}

if let Some(client) = client.traces {
if let Err(e) = client
.send(traces, traces::encode_request, {
#[cfg(feature = "decode_responses")]
{
if emit::runtime::internal_slot().is_enabled() {
Some(traces::decode_response)
} else {
None
if traces.len() > 0 {
if let Some(client) = client.traces {
if let Err(e) = client
.send(traces, traces::encode_request, {
#[cfg(feature = "decode_responses")]
{
if emit::runtime::internal_slot().is_enabled() {
Some(traces::decode_response)
} else {
None
}
}
}
#[cfg(not(feature = "decode_responses"))]
{
None::<fn(Result<&[u8], &[u8]>)>
}
})
.await
{
r = if let Err(re) = r {
Err(re.map(|mut channel| {
channel.traces = e.into_retryable();
channel
}))
} else {
Err(e.map(|traces| Channel {
traces,
logs: Vec::new(),
metrics: Vec::new(),
}))
};
#[cfg(not(feature = "decode_responses"))]
{
None::<fn(Result<&[u8], &[u8]>)>
}
})
.await
{
r = if let Err(re) = r {
Err(re.map(|mut channel| {
channel.traces = e.into_retryable();
channel
}))
} else {
Err(e.map(|traces| Channel {
traces,
logs: Vec::new(),
metrics: Vec::new(),
}))
};
}
}
}

if let Some(client) = client.metrics {
if let Err(e) = client
.send(metrics, metrics::encode_request, {
#[cfg(feature = "decode_responses")]
{
if emit::runtime::internal_slot().is_enabled() {
Some(metrics::decode_response)
} else {
None
if metrics.len() > 0 {
if let Some(client) = client.metrics {
if let Err(e) = client
.send(metrics, metrics::encode_request, {
#[cfg(feature = "decode_responses")]
{
if emit::runtime::internal_slot().is_enabled() {
Some(metrics::decode_response)
} else {
None
}
}
}
#[cfg(not(feature = "decode_responses"))]
{
None::<fn(Result<&[u8], &[u8]>)>
}
})
.await
{
r = if let Err(re) = r {
Err(re.map(|mut channel| {
channel.metrics = e.into_retryable();
channel
}))
} else {
Err(e.map(|metrics| Channel {
metrics,
logs: Vec::new(),
traces: Vec::new(),
}))
};
#[cfg(not(feature = "decode_responses"))]
{
None::<fn(Result<&[u8], &[u8]>)>
}
})
.await
{
r = if let Err(re) = r {
Err(re.map(|mut channel| {
channel.metrics = e.into_retryable();
channel
}))
} else {
Err(e.map(|metrics| Channel {
metrics,
logs: Vec::new(),
traces: Vec::new(),
}))
};
}
}
}

Expand Down

0 comments on commit b447551

Please sign in to comment.