diff --git a/targets/otlp/src/client.rs b/targets/otlp/src/client.rs index e93eca3..d544ca8 100644 --- a/targets/otlp/src/client.rs +++ b/targets/otlp/src/client.rs @@ -2,7 +2,7 @@ use emit_batcher::BatchError; use std::{sync::Arc, time::Duration}; use crate::{ - data::{self, PreEncoded}, + data::{self, logs, traces, PreEncoded}, Error, }; @@ -19,14 +19,14 @@ pub struct OtlpClient { impl emit_core::emitter::Emitter for OtlpClient { fn emit(&self, evt: &emit_core::event::Event

) { if self.emit_traces { - if let Some(encoded) = crate::traces::encode_event(evt) { + if let Some(encoded) = traces::encode_event(evt) { return self.sender.send(ChannelItem::Span(encoded)); } } if self.emit_logs { self.sender - .send(ChannelItem::LogRecord(crate::logs::encode_event(evt))); + .send(ChannelItem::LogRecord(logs::encode_event(evt))); } } @@ -182,7 +182,7 @@ impl OtlpClientBuilder { let mut r = Ok(()); if let Some(client) = client.logs { - if let Err(e) = client.send(logs, crate::logs::encode_request).await { + if let Err(e) = client.send(logs, logs::encode_request).await { r = Err(e.map(|logs| Channel { logs, traces: Vec::new(), @@ -191,7 +191,7 @@ impl OtlpClientBuilder { } if let Some(client) = client.traces { - if let Err(e) = client.send(traces, crate::traces::encode_request).await { + if let Err(e) = client.send(traces, traces::encode_request).await { r = if let Err(re) = r { Err(re.map(|mut channel| { channel.traces = e.into_retryable(); diff --git a/targets/otlp/src/data/logs.rs b/targets/otlp/src/data/logs.rs index 76fedfa..292dd25 100644 --- a/targets/otlp/src/data/logs.rs +++ b/targets/otlp/src/data/logs.rs @@ -1,4 +1,52 @@ mod export_logs_service; mod log_record; +use emit_batcher::BatchError; + pub use self::{export_logs_service::*, log_record::*}; + +use super::{AnyValue, PreEncoded}; + +pub(crate) fn encode_event( + evt: &emit_core::event::Event, +) -> PreEncoded { + let time_unix_nano = evt + .extent() + .map(|extent| extent.to_point().to_unix_time().as_nanos() as u64) + .unwrap_or_default(); + + let observed_time_unix_nano = time_unix_nano; + + let protobuf = sval_protobuf::stream_to_protobuf(LogRecord { + time_unix_nano, + observed_time_unix_nano, + body: &Some(AnyValue::<_, (), (), ()>::String(&sval::Display::new( + evt.tpl(), + ))), + attributes: &PropsLogRecordAttributes(evt.props()), + dropped_attributes_count: 0, + flags: Default::default(), + }); + + PreEncoded::Proto(protobuf) +} + +pub(crate) fn encode_request( + resource: Option<&PreEncoded>, + scope: Option<&PreEncoded>, + log_records: &[PreEncoded], +) -> Result>> { + Ok(PreEncoded::Proto(sval_protobuf::stream_to_protobuf( + ExportLogsServiceRequest { + resource_logs: &[ResourceLogs { + resource: &resource, + scope_logs: &[ScopeLogs { + scope: &scope, + log_records, + schema_url: "", + }], + schema_url: "", + }], + }, + ))) +} diff --git a/targets/otlp/src/data/traces.rs b/targets/otlp/src/data/traces.rs index 922f641..3e918de 100644 --- a/targets/otlp/src/data/traces.rs +++ b/targets/otlp/src/data/traces.rs @@ -1,4 +1,56 @@ mod export_trace_service; mod span; +use emit_batcher::BatchError; + pub use self::{export_trace_service::*, span::*}; + +use super::PreEncoded; + +pub(crate) fn encode_event( + evt: &emit_core::event::Event, +) -> Option { + let (start_time_unix_nano, end_time_unix_nano) = evt + .extent() + .and_then(|extent| extent.as_span()) + .map(|span| { + ( + span.start.to_unix_time().as_nanos() as u64, + span.end.to_unix_time().as_nanos() as u64, + ) + })?; + + let protobuf = sval_protobuf::stream_to_protobuf(Span { + start_time_unix_nano, + end_time_unix_nano, + name: &sval::Display::new(evt.tpl()), + attributes: &PropsSpanAttributes { + time_unix_nano: end_time_unix_nano, + props: evt.props(), + }, + dropped_attributes_count: 0, + kind: SpanKind::Unspecified, + }); + + Some(PreEncoded::Proto(protobuf)) +} + +pub(crate) fn encode_request( + resource: Option<&PreEncoded>, + scope: Option<&PreEncoded>, + spans: &[PreEncoded], +) -> Result>> { + Ok(PreEncoded::Proto(sval_protobuf::stream_to_protobuf( + ExportTraceServiceRequest { + resource_spans: &[ResourceSpans { + resource: &resource, + scope_spans: &[ScopeSpans { + scope: &scope, + spans, + schema_url: "", + }], + schema_url: "", + }], + }, + ))) +} diff --git a/targets/otlp/src/data/traces/export_trace_service.rs b/targets/otlp/src/data/traces/export_trace_service.rs index c1140f4..bf77770 100644 --- a/targets/otlp/src/data/traces/export_trace_service.rs +++ b/targets/otlp/src/data/traces/export_trace_service.rs @@ -5,27 +5,27 @@ use crate::data::{InstrumentationScope, Resource}; use super::Span; #[derive(Value)] -pub struct ExportTraceServiceRequest<'a, RL: ?Sized = [ResourceSpans<'a>]> { +pub struct ExportTraceServiceRequest<'a, RS: ?Sized = [ResourceSpans<'a>]> { #[sval(label = "resourceSpans", index = 1)] - pub resource_spans: &'a RL, + pub resource_spans: &'a RS, } #[derive(Value)] -pub struct ResourceSpans<'a, R: ?Sized = Resource<'a>, SL: ?Sized = [ScopeSpans<'a>]> { +pub struct ResourceSpans<'a, R: ?Sized = Resource<'a>, SS: ?Sized = [ScopeSpans<'a>]> { #[sval(label = "resource", index = 1)] pub resource: &'a R, #[sval(label = "scopeSpans", index = 2)] - pub scope_spans: &'a SL, + pub scope_spans: &'a SS, #[sval(label = "schemaUrl", index = 3)] pub schema_url: &'a str, } #[derive(Value)] -pub struct ScopeSpans<'a, IS: ?Sized = InstrumentationScope<'a>, LR: ?Sized = &'a [Span<'a>]> { +pub struct ScopeSpans<'a, IS: ?Sized = InstrumentationScope<'a>, S: ?Sized = &'a [Span<'a>]> { #[sval(label = "scope", index = 1)] pub scope: &'a IS, #[sval(label = "spans", index = 2)] - pub spans: &'a LR, + pub spans: &'a S, #[sval(label = "schemaUrl", index = 3)] pub schema_url: &'a str, } diff --git a/targets/otlp/src/data/traces/span.rs b/targets/otlp/src/data/traces/span.rs index 5385cce..22239ba 100644 --- a/targets/otlp/src/data/traces/span.rs +++ b/targets/otlp/src/data/traces/span.rs @@ -130,28 +130,13 @@ impl sval::Value for PropsSpanAttributes

{ } emit_core::well_known::ERR_KEY => { has_err = true; - true + false } _ => false, }) }, )?; - let status = Status { - code: match level { - emit_core::level::Level::Error => StatusCode::Error, - _ => StatusCode::Ok, - }, - message: sval::Display::new_borrowed(&level), - }; - - stream_field( - &mut *stream, - &SPAN_STATUS_LABEL, - &SPAN_STATUS_INDEX, - |stream| stream.value_computed(&status), - )?; - if trace_id != [0; 32] { stream_field( &mut *stream, @@ -192,24 +177,38 @@ impl sval::Value for PropsSpanAttributes

{ } if has_err { + let err = self.props.get(emit_core::well_known::ERR_KEY).unwrap(); + stream_field( &mut *stream, &SPAN_EVENTS_LABEL, &SPAN_EVENTS_INDEX, |stream| { - let err = self.props.get(emit_core::well_known::ERR_KEY).unwrap(); - stream.value_computed(&[Event { name: "exception", time_unix_nano: self.time_unix_nano, dropped_attributes_count: 0, attributes: &[KeyValue { key: "exception.message", - value: AnyValue::<_, (), (), ()>::String(&sval::Display::new(err)), + value: AnyValue::<_, (), (), ()>::String(sval::Display::new_borrowed( + &err, + )), }], }]) }, )?; + + let status = Status { + code: StatusCode::Error, + message: sval::Display::new_borrowed(&err), + }; + + stream_field( + &mut *stream, + &SPAN_STATUS_LABEL, + &SPAN_STATUS_INDEX, + |stream| stream.value_computed(&status), + )?; } stream.record_tuple_end(None, None, None) diff --git a/targets/otlp/src/lib.rs b/targets/otlp/src/lib.rs index 5b87b30..3853b49 100644 --- a/targets/otlp/src/lib.rs +++ b/targets/otlp/src/lib.rs @@ -3,8 +3,6 @@ pub mod data; mod error; pub use self::{client::*, error::*}; -mod logs; -mod traces; pub fn proto() -> OtlpClientBuilder { OtlpClientBuilder::proto() diff --git a/targets/otlp/src/logs.rs b/targets/otlp/src/logs.rs deleted file mode 100644 index 4c193b6..0000000 --- a/targets/otlp/src/logs.rs +++ /dev/null @@ -1,53 +0,0 @@ -use emit_batcher::BatchError; - -use crate::data::{ - self, - logs::{ - ExportLogsServiceRequest, LogRecord, PropsLogRecordAttributes, ResourceLogs, ScopeLogs, - }, - PreEncoded, -}; - -pub(crate) fn encode_event( - evt: &emit_core::event::Event, -) -> PreEncoded { - let time_unix_nano = evt - .extent() - .map(|extent| extent.to_point().to_unix_time().as_nanos() as u64) - .unwrap_or_default(); - - let observed_time_unix_nano = time_unix_nano; - - let protobuf = sval_protobuf::stream_to_protobuf(LogRecord { - time_unix_nano, - observed_time_unix_nano, - body: &Some(data::AnyValue::<_, (), (), ()>::String( - &sval::Display::new(evt.tpl()), - )), - attributes: &PropsLogRecordAttributes(evt.props()), - dropped_attributes_count: 0, - flags: Default::default(), - }); - - PreEncoded::Proto(protobuf) -} - -pub(crate) fn encode_request( - resource: Option<&PreEncoded>, - scope: Option<&PreEncoded>, - log_records: &[PreEncoded], -) -> Result>> { - Ok(PreEncoded::Proto(sval_protobuf::stream_to_protobuf( - ExportLogsServiceRequest { - resource_logs: &[ResourceLogs { - resource: &resource, - scope_logs: &[ScopeLogs { - scope: &scope, - log_records, - schema_url: "", - }], - schema_url: "", - }], - }, - ))) -} diff --git a/targets/otlp/src/traces.rs b/targets/otlp/src/traces.rs deleted file mode 100644 index ae0fab3..0000000 --- a/targets/otlp/src/traces.rs +++ /dev/null @@ -1,57 +0,0 @@ -use emit_batcher::BatchError; - -use crate::data::{ - self, - traces::{ExportTraceServiceRequest, PropsSpanAttributes, ResourceSpans, ScopeSpans, Span}, - PreEncoded, -}; - -pub(crate) fn encode_event( - evt: &emit_core::event::Event, -) -> Option { - let (start_time_unix_nano, end_time_unix_nano) = evt - .extent() - .and_then(|extent| extent.as_span()) - .map(|span| { - ( - span.start.to_unix_time().as_nanos() as u64, - span.end.to_unix_time().as_nanos() as u64, - ) - })?; - - let protobuf = sval_protobuf::stream_to_protobuf(Span { - start_time_unix_nano, - end_time_unix_nano, - name: &Some(data::AnyValue::<_, (), (), ()>::String( - &sval::Display::new(evt.tpl()), - )), - attributes: &PropsSpanAttributes { - time_unix_nano: end_time_unix_nano, - props: evt.props(), - }, - dropped_attributes_count: 0, - kind: data::traces::SpanKind::Unspecified, - }); - - Some(PreEncoded::Proto(protobuf)) -} - -pub(crate) fn encode_request( - resource: Option<&PreEncoded>, - scope: Option<&PreEncoded>, - spans: &[PreEncoded], -) -> Result>> { - Ok(PreEncoded::Proto(sval_protobuf::stream_to_protobuf( - ExportTraceServiceRequest { - resource_spans: &[ResourceSpans { - resource: &resource, - scope_spans: &[ScopeSpans { - scope: &scope, - spans, - schema_url: "", - }], - schema_url: "", - }], - }, - ))) -}