diff --git a/Cargo.toml b/Cargo.toml index bce5a48..5d686cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,12 @@ exclude = [ "tests/smoke-test" ] +[patch.crates-io.sval] +git = "https://github.com/sval-rs/sval" + +[patch.crates-io.sval_flatten] +git = "https://github.com/sval-rs/sval" + [package] name = "emit" version = "0.0.0" diff --git a/targets/otlp/Cargo.toml b/targets/otlp/Cargo.toml index 1bbe86f..c15de7e 100644 --- a/targets/otlp/Cargo.toml +++ b/targets/otlp/Cargo.toml @@ -28,6 +28,10 @@ features = ["std", "flatten"] [dependencies.sval_protobuf] git = "https://github.com/KodrAus/sval_protobuf" +[dependencies.sval_json] +version = "2.9" +features = ["std"] + [dependencies.tokio] version = "1" features = ["rt-multi-thread", "sync"] diff --git a/targets/otlp/src/client.rs b/targets/otlp/src/client.rs index a8ddbfd..caee721 100644 --- a/targets/otlp/src/client.rs +++ b/targets/otlp/src/client.rs @@ -1,7 +1,8 @@ use emit_batcher::BatchError; -use std::{future::Future, ops::ControlFlow, sync::Arc, time::Duration}; +use sval_protobuf::buf::ProtoBuf; +use std::{future::Future, sync::Arc, time::Duration}; -use crate::data::PreEncoded; +use crate::data::{PreEncoded, self}; pub(super) struct OtlpClient { sender: emit_batcher::Sender, @@ -13,8 +14,8 @@ pub(super) struct OtlpClientBuilder { enum Destination { HttpProto { - resource: Option, - scope: Option, + resource: Option, + scope: Option, url: String, }, } @@ -40,24 +41,15 @@ impl OtlpClientBuilder { } } - pub fn resource(mut self, resource: impl emit_core::props::Props) -> Self { - /* - let mut attributes = Vec::new(); - - resource.for_each(|k, v| { - let key = k.to_string(); - let value = value::to_value(v); - - attributes.push(KeyValue { key, value }); - - ControlFlow::Continue(()) - }); - - self.resource = Some(Resource { - attributes, - dropped_attributes_count: 0, - }); - */ + pub fn resource(mut self, attributes: impl emit_core::props::Props) -> Self { + match self.dst { + Destination::HttpProto { ref mut resource, .. } => { + *resource = Some(sval_protobuf::stream_to_protobuf(data::Resource { + attributes: data::PropsResourceAttributes(attributes), + dropped_attribute_count: 0, + })); + } + } self } @@ -79,8 +71,8 @@ impl OtlpClientBuilder { scope, } => RawClient::HttpProto { url, - resource, - scope, + resource: resource.map(PreEncoded::Proto), + scope: scope.map(PreEncoded::Proto), client: reqwest::Client::new(), }, }), diff --git a/targets/otlp/src/data.rs b/targets/otlp/src/data.rs index 45ff356..1faec50 100644 --- a/targets/otlp/src/data.rs +++ b/targets/otlp/src/data.rs @@ -25,13 +25,54 @@ pub struct ResourceLogs<'a, R, SL> { } #[derive(Value)] -pub struct Resource<'a> { - #[sval(index = 1)] - pub attributes: &'a [KeyValue<'a>], +pub struct Resource { + #[sval(flatten)] + pub attributes: A, #[sval(index = 2)] pub dropped_attribute_count: u32, } +#[derive(Value)] +pub struct InlineResourceAttributes<'a> { + #[sval(index = 1)] + pub attributes: &'a [KeyValue<'a>], +} + +pub struct PropsResourceAttributes

(pub P); + +impl sval::Value for PropsResourceAttributes

{ + fn stream<'sval, S: sval::Stream<'sval> + ?Sized>(&'sval self, stream: &mut S) -> sval::Result { + stream.record_tuple_begin(None, None, None, None)?; + + stream.record_tuple_value_begin(None, &sval::Label::new("attributes").with_tag(&sval::tags::VALUE_IDENT), &sval::Index::new(1))?; + stream.seq_begin(None)?; + + let mut seen = HashSet::new(); + self.0.for_each(|k, v| { + if seen.insert(k.to_owned()) { + stream + .seq_value_begin() + .map(|_| ControlFlow::Continue(())) + .unwrap_or(ControlFlow::Break(()))?; + sval_ref::stream_ref(&mut *stream, EmitValue(v)) + .map(|_| ControlFlow::Continue(())) + .unwrap_or(ControlFlow::Break(()))?; + stream + .seq_value_end() + .map(|_| ControlFlow::Continue(())) + .unwrap_or(ControlFlow::Break(()))?; + } + + ControlFlow::Continue(()) + }); + + stream.seq_end()?; + stream.record_tuple_value_end(None, &sval::Label::new("attributes").with_tag(&sval::tags::VALUE_IDENT), &sval::Index::new(1))?; + + stream.record_tuple_end(None, None, None) + } +} + #[derive(Value)] pub struct ScopeLogs<'a, IS, LR> { #[sval(index = 1)] @@ -43,17 +84,23 @@ pub struct ScopeLogs<'a, IS, LR> { } #[derive(Value)] -pub struct InstrumentationScope<'a> { +pub struct InstrumentationScope<'a, A> { #[sval(index = 1)] pub name: &'a str, #[sval(index = 2)] pub version: &'a str, - #[sval(index = 3)] - pub attributes: &'a [KeyValue<'a>], + #[sval(flatten)] + pub attributes: A, #[sval(index = 4)] pub dropped_attribute_count: u32, } +#[derive(Value)] +pub struct InlineInstrumentationScopeAttributes<'a> { + #[sval(index = 1)] + pub attributes: &'a [KeyValue<'a>], +} + #[derive(Value)] #[repr(i32)] pub enum SeverityNumber { @@ -92,19 +139,19 @@ const ANY_VALUE_BYTES_INDEX: sval::Index = sval::Index::new(7); // TODO: Use the consts here #[derive(Value)] pub enum AnyValue<'a> { - #[sval(index = 1)] + #[sval(label = "stringValue", index = 1)] String(&'a str), - #[sval(index = 2)] + #[sval(label = "boolValue", index = 2)] Bool(bool), - #[sval(index = 3)] + #[sval(label = "intValue", index = 3)] Int(i64), - #[sval(index = 4)] + #[sval(label = "doubleValue", index = 4)] Double(f64), - #[sval(index = 5)] + #[sval(label = "arrayValue", index = 5)] Array(ArrayValue<'a>), - #[sval(index = 6)] + #[sval(label = "kvlistValue", index = 6)] Kvlist(KvList<'a>), - #[sval(index = 7)] + #[sval(label = "bytesValue", index = 7)] Bytes(&'a sval::BinarySlice), } @@ -145,7 +192,7 @@ pub struct LogRecord { } #[derive(Value)] -pub struct InlineAttributes<'a> { +pub struct InlineLogRecordAttributes<'a> { #[sval(index = 2)] pub severity_number: SeverityNumber, #[sval(index = 3)] @@ -158,15 +205,18 @@ pub struct InlineAttributes<'a> { pub span_id: &'a sval::BinaryArray<8>, } -pub struct PropsAttributes

(pub P); +pub struct PropsLogRecordAttributes

(pub P); -impl sval::Value for PropsAttributes

{ +impl sval::Value for PropsLogRecordAttributes

{ fn stream<'sval, S: sval::Stream<'sval> + ?Sized>(&'sval self, stream: &mut S) -> sval::Result { let mut trace_id = [0; 32]; let mut span_id = [0; 16]; let mut level = emit_core::level::Level::default(); - stream.tuple_begin(None, None, None, None)?; + stream.record_tuple_begin(None, None, None, None)?; + + stream.record_tuple_value_begin(None, &sval::Label::new("attributes").with_tag(&sval::tags::VALUE_IDENT), &sval::Index::new(6))?; + stream.seq_begin(None)?; let mut seen = HashSet::new(); self.0.for_each(|k, v| { @@ -189,14 +239,14 @@ impl sval::Value for PropsAttributes

{ _ => { if seen.insert(k.to_owned()) { stream - .tuple_value_begin(None, &sval::Index::new(6)) + .seq_value_begin() .map(|_| ControlFlow::Continue(())) .unwrap_or(ControlFlow::Break(()))?; sval_ref::stream_ref(&mut *stream, EmitValue(v)) .map(|_| ControlFlow::Continue(())) .unwrap_or(ControlFlow::Break(()))?; stream - .tuple_value_end(None, &sval::Index::new(6)) + .seq_value_end() .map(|_| ControlFlow::Continue(())) .unwrap_or(ControlFlow::Break(()))?; } @@ -206,6 +256,9 @@ impl sval::Value for PropsAttributes

{ ControlFlow::Continue(()) }); + stream.seq_end()?; + stream.record_tuple_value_end(None, &sval::Label::new("attributes").with_tag(&sval::tags::VALUE_IDENT), &sval::Index::new(6))?; + let severity_number = match level { emit_core::level::Level::Debug => 1i32, emit_core::level::Level::Info => 2i32, @@ -213,37 +266,37 @@ impl sval::Value for PropsAttributes

{ emit_core::level::Level::Error => 4i32, }; - stream.tuple_value_begin(None, &sval::Index::new(2))?; + stream.record_tuple_value_begin(None, &sval::Label::new("severityNumber").with_tag(&sval::tags::VALUE_IDENT), &sval::Index::new(2))?; stream.i32(severity_number)?; - stream.tuple_value_end(None, &sval::Index::new(2))?; + stream.record_tuple_value_end(None, &sval::Label::new("severityNumber").with_tag(&sval::tags::VALUE_IDENT),&sval::Index::new(2))?; - stream.tuple_value_begin(None, &sval::Index::new(3))?; + stream.record_tuple_value_begin(None, &sval::Label::new("severityText").with_tag(&sval::tags::VALUE_IDENT),&sval::Index::new(3))?; sval::stream_display(&mut *stream, level)?; - stream.tuple_value_end(None, &sval::Index::new(3))?; + stream.record_tuple_value_end(None, &sval::Label::new("severityText").with_tag(&sval::tags::VALUE_IDENT),&sval::Index::new(3))?; if trace_id != [0; 32] { - stream.tuple_value_begin(None, &sval::Index::new(9))?; + stream.record_tuple_value_begin(None, &sval::Label::new("traceId").with_tag(&sval::tags::VALUE_IDENT),&sval::Index::new(9))?; stream.binary_begin(Some(32))?; stream.binary_fragment_computed(&trace_id)?; stream.binary_end()?; - stream.tuple_value_end(None, &sval::Index::new(9))?; + stream.record_tuple_value_end(None, &sval::Label::new("traceId").with_tag(&sval::tags::VALUE_IDENT),&sval::Index::new(9))?; } if span_id != [0; 16] { - stream.tuple_value_begin(None, &sval::Index::new(10))?; + stream.record_tuple_value_begin(None, &sval::Label::new("spanId").with_tag(&sval::tags::VALUE_IDENT),&sval::Index::new(10))?; stream.binary_begin(Some(16))?; stream.binary_fragment_computed(&span_id)?; stream.binary_end()?; - stream.tuple_value_end(None, &sval::Index::new(10))?; + stream.record_tuple_value_end(None, &sval::Label::new("spanId").with_tag(&sval::tags::VALUE_IDENT),&sval::Index::new(10))?; } - stream.tuple_end(None, None, None) + stream.record_tuple_end(None, None, None) } } #[derive(Value)] pub enum DisplayValue { - #[sval(index = 1)] + #[sval(label = "stringValue", index = 1)] String(D), } @@ -324,7 +377,7 @@ impl<'a> sval_ref::ValueRef<'a> for EmitValue<'a> { fn binary_begin(&mut self, num_bytes: Option) -> sval::Result { self.any_value_begin(&ANY_VALUE_BYTES_LABEL, &ANY_VALUE_BYTES_INDEX)?; - self.stream.text_begin(num_bytes) + self.stream.binary_begin(num_bytes) } fn binary_fragment(&mut self, fragment: &'sval [u8]) -> sval::Result { @@ -336,7 +389,7 @@ impl<'a> sval_ref::ValueRef<'a> for EmitValue<'a> { } fn binary_end(&mut self) -> sval::Result { - self.stream.text_end()?; + self.stream.binary_end()?; self.any_value_end(&ANY_VALUE_BYTES_LABEL, &ANY_VALUE_BYTES_INDEX) } diff --git a/targets/otlp/src/logs.rs b/targets/otlp/src/logs.rs index 3dcfd8b..29d3382 100644 --- a/targets/otlp/src/logs.rs +++ b/targets/otlp/src/logs.rs @@ -61,12 +61,30 @@ impl emit_core::emitter::Emitter for OtlpLogsEmitter { let observed_time_unix_nano = time_unix_nano; + println!("{}", sval_json::stream_to_string(data::LogRecord { + time_unix_nano, + observed_time_unix_nano, + body: Some(data::DisplayValue::String(sval::Display::new(evt.msg()))), + attributes: data::PropsLogRecordAttributes(evt.props()), + dropped_attributes_count: 0, + flags: Default::default(), + }).unwrap()); + + println!("{}", protoscope(&sval_protobuf::stream_to_protobuf(data::LogRecord { + time_unix_nano, + observed_time_unix_nano, + body: Some(data::DisplayValue::String(sval::Display::new(evt.msg()))), + attributes: data::PropsLogRecordAttributes(evt.props()), + dropped_attributes_count: 0, + flags: Default::default(), + }).to_vec())); + self.inner .emit(sval_protobuf::stream_to_protobuf(data::LogRecord { time_unix_nano, observed_time_unix_nano, body: Some(data::DisplayValue::String(sval::Display::new(evt.msg()))), - attributes: data::PropsAttributes(evt.props()), + attributes: data::PropsLogRecordAttributes(evt.props()), dropped_attributes_count: 0, flags: Default::default(), })) @@ -76,3 +94,30 @@ impl emit_core::emitter::Emitter for OtlpLogsEmitter { self.inner.blocking_flush(timeout) } } + +fn protoscope(encoded: &[u8]) -> String { + use std::{ + io::{Read, Write}, + process::{Command, Stdio}, + }; + + let mut protoscope = Command::new("protoscope") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .expect("failed to call protoscope"); + + let mut stdin = protoscope.stdin.take().expect("missing stdin"); + stdin.write_all(encoded).expect("failed to write"); + drop(stdin); + + let mut buf = String::new(); + protoscope + .stdout + .take() + .expect("missing stdout") + .read_to_string(&mut buf) + .expect("failed to read"); + + buf +}