Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
fix up some encoding issues with tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Nov 30, 2023
1 parent 9500213 commit 28ac382
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 142 deletions.
10 changes: 5 additions & 5 deletions targets/otlp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use emit_batcher::BatchError;
use std::{sync::Arc, time::Duration};

use crate::{
data::{self, PreEncoded},
data::{self, logs, traces, PreEncoded},
Error,
};

Expand All @@ -19,14 +19,14 @@ pub struct OtlpClient {
impl emit_core::emitter::Emitter for OtlpClient {
fn emit<P: emit_core::props::Props>(&self, evt: &emit_core::event::Event<P>) {
if self.emit_traces {
if let Some(encoded) = crate::traces::encode_event(evt) {
if let Some(encoded) = traces::encode_event(evt) {
return self.sender.send(ChannelItem::Span(encoded));
}
}

if self.emit_logs {
self.sender
.send(ChannelItem::LogRecord(crate::logs::encode_event(evt)));
.send(ChannelItem::LogRecord(logs::encode_event(evt)));
}
}

Expand Down Expand Up @@ -182,7 +182,7 @@ impl OtlpClientBuilder {
let mut r = Ok(());

if let Some(client) = client.logs {
if let Err(e) = client.send(logs, crate::logs::encode_request).await {
if let Err(e) = client.send(logs, logs::encode_request).await {
r = Err(e.map(|logs| Channel {
logs,
traces: Vec::new(),
Expand All @@ -191,7 +191,7 @@ impl OtlpClientBuilder {
}

if let Some(client) = client.traces {
if let Err(e) = client.send(traces, crate::traces::encode_request).await {
if let Err(e) = client.send(traces, traces::encode_request).await {
r = if let Err(re) = r {
Err(re.map(|mut channel| {
channel.traces = e.into_retryable();
Expand Down
48 changes: 48 additions & 0 deletions targets/otlp/src/data/logs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,52 @@
mod export_logs_service;
mod log_record;

use emit_batcher::BatchError;

pub use self::{export_logs_service::*, log_record::*};

use super::{AnyValue, PreEncoded};

pub(crate) fn encode_event(
evt: &emit_core::event::Event<impl emit_core::props::Props>,
) -> PreEncoded {
let time_unix_nano = evt
.extent()
.map(|extent| extent.to_point().to_unix_time().as_nanos() as u64)
.unwrap_or_default();

let observed_time_unix_nano = time_unix_nano;

let protobuf = sval_protobuf::stream_to_protobuf(LogRecord {
time_unix_nano,
observed_time_unix_nano,
body: &Some(AnyValue::<_, (), (), ()>::String(&sval::Display::new(
evt.tpl(),
))),
attributes: &PropsLogRecordAttributes(evt.props()),
dropped_attributes_count: 0,
flags: Default::default(),
});

PreEncoded::Proto(protobuf)
}

pub(crate) fn encode_request(
resource: Option<&PreEncoded>,
scope: Option<&PreEncoded>,
log_records: &[PreEncoded],
) -> Result<PreEncoded, BatchError<Vec<PreEncoded>>> {
Ok(PreEncoded::Proto(sval_protobuf::stream_to_protobuf(
ExportLogsServiceRequest {
resource_logs: &[ResourceLogs {
resource: &resource,
scope_logs: &[ScopeLogs {
scope: &scope,
log_records,
schema_url: "",
}],
schema_url: "",
}],
},
)))
}
52 changes: 52 additions & 0 deletions targets/otlp/src/data/traces.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,56 @@
mod export_trace_service;
mod span;

use emit_batcher::BatchError;

pub use self::{export_trace_service::*, span::*};

use super::PreEncoded;

pub(crate) fn encode_event(
evt: &emit_core::event::Event<impl emit_core::props::Props>,
) -> Option<PreEncoded> {
let (start_time_unix_nano, end_time_unix_nano) = evt
.extent()
.and_then(|extent| extent.as_span())
.map(|span| {
(
span.start.to_unix_time().as_nanos() as u64,
span.end.to_unix_time().as_nanos() as u64,
)
})?;

let protobuf = sval_protobuf::stream_to_protobuf(Span {
start_time_unix_nano,
end_time_unix_nano,
name: &sval::Display::new(evt.tpl()),
attributes: &PropsSpanAttributes {
time_unix_nano: end_time_unix_nano,
props: evt.props(),
},
dropped_attributes_count: 0,
kind: SpanKind::Unspecified,
});

Some(PreEncoded::Proto(protobuf))
}

pub(crate) fn encode_request(
resource: Option<&PreEncoded>,
scope: Option<&PreEncoded>,
spans: &[PreEncoded],
) -> Result<PreEncoded, BatchError<Vec<PreEncoded>>> {
Ok(PreEncoded::Proto(sval_protobuf::stream_to_protobuf(
ExportTraceServiceRequest {
resource_spans: &[ResourceSpans {
resource: &resource,
scope_spans: &[ScopeSpans {
scope: &scope,
spans,
schema_url: "",
}],
schema_url: "",
}],
},
)))
}
12 changes: 6 additions & 6 deletions targets/otlp/src/data/traces/export_trace_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,27 @@ use crate::data::{InstrumentationScope, Resource};
use super::Span;

#[derive(Value)]
pub struct ExportTraceServiceRequest<'a, RL: ?Sized = [ResourceSpans<'a>]> {
pub struct ExportTraceServiceRequest<'a, RS: ?Sized = [ResourceSpans<'a>]> {
#[sval(label = "resourceSpans", index = 1)]
pub resource_spans: &'a RL,
pub resource_spans: &'a RS,
}

#[derive(Value)]
pub struct ResourceSpans<'a, R: ?Sized = Resource<'a>, SL: ?Sized = [ScopeSpans<'a>]> {
pub struct ResourceSpans<'a, R: ?Sized = Resource<'a>, SS: ?Sized = [ScopeSpans<'a>]> {
#[sval(label = "resource", index = 1)]
pub resource: &'a R,
#[sval(label = "scopeSpans", index = 2)]
pub scope_spans: &'a SL,
pub scope_spans: &'a SS,
#[sval(label = "schemaUrl", index = 3)]
pub schema_url: &'a str,
}

#[derive(Value)]
pub struct ScopeSpans<'a, IS: ?Sized = InstrumentationScope<'a>, LR: ?Sized = &'a [Span<'a>]> {
pub struct ScopeSpans<'a, IS: ?Sized = InstrumentationScope<'a>, S: ?Sized = &'a [Span<'a>]> {
#[sval(label = "scope", index = 1)]
pub scope: &'a IS,
#[sval(label = "spans", index = 2)]
pub spans: &'a LR,
pub spans: &'a S,
#[sval(label = "schemaUrl", index = 3)]
pub schema_url: &'a str,
}
37 changes: 18 additions & 19 deletions targets/otlp/src/data/traces/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,28 +130,13 @@ impl<P: emit_core::props::Props> sval::Value for PropsSpanAttributes<P> {
}
emit_core::well_known::ERR_KEY => {
has_err = true;
true
false
}
_ => false,
})
},
)?;

let status = Status {
code: match level {
emit_core::level::Level::Error => StatusCode::Error,
_ => StatusCode::Ok,
},
message: sval::Display::new_borrowed(&level),
};

stream_field(
&mut *stream,
&SPAN_STATUS_LABEL,
&SPAN_STATUS_INDEX,
|stream| stream.value_computed(&status),
)?;

if trace_id != [0; 32] {
stream_field(
&mut *stream,
Expand Down Expand Up @@ -192,24 +177,38 @@ impl<P: emit_core::props::Props> sval::Value for PropsSpanAttributes<P> {
}

if has_err {
let err = self.props.get(emit_core::well_known::ERR_KEY).unwrap();

stream_field(
&mut *stream,
&SPAN_EVENTS_LABEL,
&SPAN_EVENTS_INDEX,
|stream| {
let err = self.props.get(emit_core::well_known::ERR_KEY).unwrap();

stream.value_computed(&[Event {
name: "exception",
time_unix_nano: self.time_unix_nano,
dropped_attributes_count: 0,
attributes: &[KeyValue {
key: "exception.message",
value: AnyValue::<_, (), (), ()>::String(&sval::Display::new(err)),
value: AnyValue::<_, (), (), ()>::String(sval::Display::new_borrowed(
&err,
)),
}],
}])
},
)?;

let status = Status {
code: StatusCode::Error,
message: sval::Display::new_borrowed(&err),
};

stream_field(
&mut *stream,
&SPAN_STATUS_LABEL,
&SPAN_STATUS_INDEX,
|stream| stream.value_computed(&status),
)?;
}

stream.record_tuple_end(None, None, None)
Expand Down
2 changes: 0 additions & 2 deletions targets/otlp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ pub mod data;
mod error;

pub use self::{client::*, error::*};
mod logs;
mod traces;

pub fn proto() -> OtlpClientBuilder {
OtlpClientBuilder::proto()
Expand Down
53 changes: 0 additions & 53 deletions targets/otlp/src/logs.rs

This file was deleted.

57 changes: 0 additions & 57 deletions targets/otlp/src/traces.rs

This file was deleted.

0 comments on commit 28ac382

Please sign in to comment.