diff --git a/targets/otlp/Cargo.toml b/targets/otlp/Cargo.toml index 1bbe86f..e61f2ab 100644 --- a/targets/otlp/Cargo.toml +++ b/targets/otlp/Cargo.toml @@ -4,7 +4,9 @@ version = "0.0.0" edition = "2021" [features] -grpc = ["dep:prost"] +default = ["http"] +http = ["dep:hyper", "dep:hyper-util"] +grpc = ["dep:prost", "dep:serde", "emit_core/serde"] [dependencies.emit_core] path = "../../core" @@ -15,26 +17,40 @@ path = "../../batcher" features = ["tokio"] [dependencies.sval] -version = "2.9" +version = "2.10" features = ["std"] [dependencies.sval_ref] -version = "2.9" +version = "2.10" [dependencies.sval_derive] -version = "2.9" +version = "2.10" features = ["std", "flatten"] [dependencies.sval_protobuf] git = "https://github.com/KodrAus/sval_protobuf" +features = ["bytes"] [dependencies.tokio] version = "1" features = ["rt-multi-thread", "sync"] -[dependencies.reqwest] -version = "0.11" +[dependencies.hyper] +optional = true +version = "1.0.0-rc.4" +features = ["client", "http1"] + +[dependencies.hyper-util] +optional = true +git = "https://github.com/hyperium/hyper-util" + +[dependencies.bytes] +version = "1" [dependencies.prost] version = "0.11" optional = true + +[dependencies.serde] +version = "1" +optional = true diff --git a/targets/otlp/gen/main.rs b/targets/otlp/gen/main.rs index 06bccc7..8fc40a6 100644 --- a/targets/otlp/gen/main.rs +++ b/targets/otlp/gen/main.rs @@ -1,7 +1,7 @@ fn main() -> Result<(), Box> { let mut config = prost_build::Config::new(); - config.out_dir("../src/proto"); + config.out_dir("../src/data/generated"); config.compile_protos( &[ diff --git a/targets/otlp/src/client.rs b/targets/otlp/src/client.rs index 7556017..a577d98 100644 --- a/targets/otlp/src/client.rs +++ b/targets/otlp/src/client.rs @@ -4,6 +4,10 @@ use sval_protobuf::buf::ProtoBuf; use crate::data::{self, PreEncoded}; +use self::http::HttpConnection; + +mod http; + pub(super) struct OtlpClient { sender: emit_batcher::Sender, } @@ -65,7 +69,7 @@ impl OtlpClientBuilder { self, mut on_batch: impl FnMut(OtlpSender, Vec) -> F + Send + 'static, ) -> OtlpClient { - let (sender, receiver) = emit_batcher::bounded(1024); + let (sender, receiver) = emit_batcher::bounded(10_000); let client = OtlpSender { client: Arc::new(match self.dst { @@ -74,10 +78,9 @@ impl OtlpClientBuilder { resource, scope, } => RawClient::HttpProto { - url, + http: HttpConnection::new(&url).expect("failed to open connection"), resource: resource.map(PreEncoded::Proto), scope: scope.map(PreEncoded::Proto), - client: reqwest::Client::new(), }, }), }; @@ -95,10 +98,9 @@ pub struct OtlpSender { enum RawClient { HttpProto { - url: String, + http: HttpConnection, resource: Option, scope: Option, - client: reqwest::Client, }, } @@ -106,29 +108,21 @@ impl OtlpSender { pub(crate) async fn emit( self, batch: Vec, - // TODO: Encode proto encode: impl FnOnce( Option<&PreEncoded>, Option<&PreEncoded>, &[T], - ) -> Result, BatchError>, + ) -> Result>, ) -> Result<(), BatchError> { match *self.client { RawClient::HttpProto { - ref url, + ref http, ref resource, ref scope, - ref client, } => { - let body = encode(resource.as_ref(), scope.as_ref(), &batch)?; - - client - .request(reqwest::Method::POST, url) - .header("content-type", "application/x-protobuf") - .body(body) - .send() + http.send(encode(resource.as_ref(), scope.as_ref(), &batch)?) .await - .map_err(|e| BatchError::retry(e, batch))?; + .map_err(|e| BatchError::no_retry(e))?; } } diff --git a/targets/otlp/src/client/http.rs b/targets/otlp/src/client/http.rs new file mode 100644 index 0000000..9d269b4 --- /dev/null +++ b/targets/otlp/src/client/http.rs @@ -0,0 +1,152 @@ +use std::{ + error, fmt, + pin::Pin, + sync::Mutex, + task::{Context, Poll}, +}; + +use bytes::Buf; +use hyper::{ + body::{Body, Frame, SizeHint}, + client::conn::{http1, http1::SendRequest}, + Method, Request, Uri, +}; +use hyper_util::rt::TokioIo; +use tokio::net::TcpStream; + +use crate::data::{PreEncoded, PreEncodedCursor}; + +pub(crate) struct Error(Box); + +impl Error { + fn new(e: impl error::Error + Send + Sync + 'static) -> Self { + Error(Box::new(e)) + } +} + +impl error::Error for Error { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + self.0.source() + } +} + +impl fmt::Debug for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.0, f) + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.0, f) + } +} + +pub(crate) struct HttpConnection { + uri: Uri, + sender: Mutex>>, +} + +impl HttpConnection { + pub fn new(url: &str) -> Result { + Ok(HttpConnection { + uri: url.parse().map_err(Error::new)?, + sender: Mutex::new(None), + }) + } + + fn poison(&self) -> Option> { + self.sender.lock().unwrap().take() + } + + fn unpoison(&self, sender: SendRequest) { + *self.sender.lock().unwrap() = Some(sender); + } + + pub async fn send(&self, body: PreEncoded) -> Result<(), Error> { + let mut sender = match self.poison() { + Some(sender) => sender, + None => { + let io = TokioIo::new( + TcpStream::connect(( + self.uri.host().unwrap(), + self.uri.port_u16().unwrap_or(80), + )) + .await + .map_err(Error::new)?, + ); + + let (sender, conn) = http1::handshake(io).await.map_err(Error::new)?; + + tokio::task::spawn(async move { + let _ = conn.await; + }); + + sender + } + }; + + send_request(&mut sender, &self.uri, body).await?; + + self.unpoison(sender); + + Ok(()) + } +} + +async fn send_request( + sender: &mut SendRequest, + uri: &Uri, + body: PreEncoded, +) -> Result<(), Error> { + sender + .send_request( + Request::builder() + .uri(uri) + .method(Method::POST) + .header("host", uri.authority().unwrap().as_str()) + .header( + "content-type", + match body { + PreEncoded::Proto(_) => "application/x-protobuf", + }, + ) + .body(HttpBody(Some(body.into_cursor()))) + .map_err(Error::new)?, + ) + .await + .map_err(Error::new)?; + + Ok(()) +} + +pub(crate) struct HttpBody(Option); + +impl Body for HttpBody { + type Data = PreEncodedCursor; + + type Error = std::convert::Infallible; + + fn poll_frame( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + if let Some(buf) = self.get_mut().0.take() { + Poll::Ready(Some(Ok(Frame::data(buf)))) + } else { + Poll::Ready(None) + } + } + + fn is_end_stream(&self) -> bool { + self.0.is_none() + } + + fn size_hint(&self) -> SizeHint { + if let Some(ref buf) = self.0 { + SizeHint::with_exact(buf.remaining() as u64) + } else { + SizeHint::with_exact(0) + } + } +} diff --git a/targets/otlp/src/data.rs b/targets/otlp/src/data.rs index cc71f1e..4046bdb 100644 --- a/targets/otlp/src/data.rs +++ b/targets/otlp/src/data.rs @@ -1,5 +1,8 @@ +use std::borrow::Cow; + +use bytes::Buf; use sval_derive::Value; -use sval_protobuf::buf::ProtoBuf; +use sval_protobuf::buf::{ProtoBuf, ProtoBufCursor}; mod any_value; mod export_logs_service; @@ -7,7 +10,10 @@ mod instrumentation_scope; mod log_record; mod resource; -pub use self::{ +#[cfg(feature = "grpc")] +pub(crate) mod generated; + +pub(crate) use self::{ any_value::*, export_logs_service::*, instrumentation_scope::*, log_record::*, resource::*, }; @@ -16,3 +22,41 @@ pub use self::{ pub(crate) enum PreEncoded { Proto(ProtoBuf), } + +impl PreEncoded { + pub fn into_cursor(self) -> PreEncodedCursor { + match self { + PreEncoded::Proto(buf) => PreEncodedCursor::Proto(buf.into_cursor()), + } + } + + pub fn to_vec(&self) -> Cow<[u8]> { + match self { + PreEncoded::Proto(buf) => buf.to_vec(), + } + } +} + +pub(crate) enum PreEncodedCursor { + Proto(ProtoBufCursor), +} + +impl Buf for PreEncodedCursor { + fn remaining(&self) -> usize { + match self { + PreEncodedCursor::Proto(cursor) => cursor.remaining(), + } + } + + fn chunk(&self) -> &[u8] { + match self { + PreEncodedCursor::Proto(cursor) => cursor.chunk(), + } + } + + fn advance(&mut self, cnt: usize) { + match self { + PreEncodedCursor::Proto(cursor) => cursor.advance(cnt), + } + } +} diff --git a/targets/otlp/src/data/any_value.rs b/targets/otlp/src/data/any_value.rs index 8c83d4e..1547d97 100644 --- a/targets/otlp/src/data/any_value.rs +++ b/targets/otlp/src/data/any_value.rs @@ -23,7 +23,6 @@ const ANY_VALUE_ARRAY_INDEX: sval::Index = sval::Index::new(5); const ANY_VALUE_KVLIST_INDEX: sval::Index = sval::Index::new(6); const ANY_VALUE_BYTES_INDEX: sval::Index = sval::Index::new(7); -// TODO: Use the consts here #[derive(Value)] pub enum AnyValue< 'a, @@ -32,31 +31,41 @@ pub enum AnyValue< KV: ?Sized = KvList<'a>, BV: ?Sized = sval::BinarySlice, > { - #[sval(label = "stringValue", index = 1)] + #[sval(label = ANY_VALUE_STRING_LABEL, index = ANY_VALUE_STRING_INDEX)] String(&'a SV), - #[sval(label = "boolValue", index = 2)] + #[sval(label = ANY_VALUE_BOOL_LABEL, index = ANY_VALUE_BOOL_INDEX)] Bool(bool), - #[sval(label = "intValue", index = 3)] + #[sval(label = ANY_VALUE_INT_LABEL, index = ANY_VALUE_INT_INDEX)] Int(i64), - #[sval(label = "doubleValue", index = 4)] + #[sval(label = ANY_VALUE_DOUBLE_LABEL, index = ANY_VALUE_DOUBLE_INDEX)] Double(f64), - #[sval(label = "arrayValue", index = 5)] + #[sval(label = ANY_VALUE_ARRAY_LABEL, index = ANY_VALUE_ARRAY_INDEX)] Array(&'a AV), - #[sval(label = "kvlistValue", index = 6)] + #[sval(label = ANY_VALUE_KVLIST_LABEL, index = ANY_VALUE_KVLIST_INDEX)] Kvlist(&'a KV), - #[sval(label = "bytesValue", index = 7)] + #[sval(label = ANY_VALUE_BYTES_LABEL, index = ANY_VALUE_BYTES_INDEX)] Bytes(&'a BV), } +const ARRAY_VALUES_LABEL: sval::Label = + sval::Label::new("values").with_tag(&sval::tags::VALUE_IDENT); + +const ARRAY_VALUES_INDEX: sval::Index = sval::Index::new(1); + #[derive(Value)] pub struct ArrayValue<'a> { - #[sval(index = 1)] + #[sval(label = ARRAY_VALUES_LABEL, index = ARRAY_VALUES_INDEX)] pub values: &'a [AnyValue<'a>], } +const KVLIST_VALUES_LABEL: sval::Label = + sval::Label::new("values").with_tag(&sval::tags::VALUE_IDENT); + +const KVLIST_VALUES_INDEX: sval::Index = sval::Index::new(1); + #[derive(Value)] pub struct KvList<'a> { - #[sval(index = 1)] + #[sval(label = KVLIST_VALUES_LABEL, index = KVLIST_VALUES_INDEX)] pub values: &'a [KeyValue<&'a str, &'a AnyValue<'a>>], } @@ -67,27 +76,14 @@ const KEY_VALUE_VALUE_LABEL: sval::Label = const KEY_VALUE_KEY_INDEX: sval::Index = sval::Index::new(1); const KEY_VALUE_VALUE_INDEX: sval::Index = sval::Index::new(2); +#[derive(Value)] pub struct KeyValue { + #[sval(label = KEY_VALUE_KEY_LABEL, index = KEY_VALUE_KEY_INDEX)] pub key: K, + #[sval(label = KEY_VALUE_VALUE_LABEL, index = KEY_VALUE_VALUE_INDEX)] pub value: V, } -impl sval::Value for KeyValue { - fn stream<'sval, S: sval::Stream<'sval> + ?Sized>(&'sval self, stream: &mut S) -> sval::Result { - stream.record_tuple_begin(None, None, None, Some(2))?; - - stream.record_tuple_value_begin(None, &KEY_VALUE_KEY_LABEL, &KEY_VALUE_KEY_INDEX)?; - stream.value(&self.key)?; - stream.record_tuple_value_end(None, &KEY_VALUE_KEY_LABEL, &KEY_VALUE_KEY_INDEX)?; - - stream.record_tuple_value_begin(None, &KEY_VALUE_VALUE_LABEL, &KEY_VALUE_VALUE_INDEX)?; - stream.value(&self.value)?; - stream.record_tuple_value_end(None, &KEY_VALUE_VALUE_LABEL, &KEY_VALUE_VALUE_INDEX)?; - - stream.record_tuple_end(None, None, None) - } -} - impl<'a, K: sval_ref::ValueRef<'a>, V: sval_ref::ValueRef<'a>> sval_ref::ValueRef<'a> for KeyValue { @@ -231,8 +227,8 @@ impl<'a> sval_ref::ValueRef<'a> for EmitValue<'a> { self.stream.record_tuple_begin(None, None, None, Some(1))?; self.stream.record_tuple_value_begin( None, - &sval::Label::new("values").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(1), + &ARRAY_VALUES_LABEL, + &ARRAY_VALUES_INDEX, )?; self.stream.seq_begin(num_entries) } @@ -249,8 +245,8 @@ impl<'a> sval_ref::ValueRef<'a> for EmitValue<'a> { self.stream.seq_end()?; self.stream.record_tuple_value_end( None, - &sval::Label::new("values").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(1), + &ARRAY_VALUES_LABEL, + &ARRAY_VALUES_INDEX, )?; self.stream.record_tuple_end(None, None, None)?; self.any_value_end(&ANY_VALUE_ARRAY_LABEL, &ANY_VALUE_ARRAY_INDEX) @@ -265,8 +261,8 @@ impl<'a> sval_ref::ValueRef<'a> for EmitValue<'a> { self.stream.record_tuple_begin(None, None, None, Some(1))?; self.stream.record_tuple_value_begin( None, - &sval::Label::new("values").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(1), + &ARRAY_VALUES_LABEL, + &ARRAY_VALUES_INDEX, )?; self.stream.seq_begin(num_entries) } @@ -278,34 +274,31 @@ impl<'a> sval_ref::ValueRef<'a> for EmitValue<'a> { self.stream.record_tuple_begin(None, None, None, Some(2))?; self.stream.record_tuple_value_begin( None, - &sval::Label::new("key").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(1), + &KEY_VALUE_KEY_LABEL, + &KEY_VALUE_KEY_INDEX, ) } fn map_key_end(&mut self) -> sval::Result { self.in_map_key = false; - self.stream.record_tuple_value_end( - None, - &sval::Label::new("key").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(1), - ) + self.stream + .record_tuple_value_end(None, &KEY_VALUE_KEY_LABEL, &KEY_VALUE_KEY_INDEX) } fn map_value_begin(&mut self) -> sval::Result { self.stream.record_tuple_value_begin( None, - &sval::Label::new("value").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(2), + &KEY_VALUE_VALUE_LABEL, + &KEY_VALUE_VALUE_INDEX, ) } fn map_value_end(&mut self) -> sval::Result { self.stream.record_tuple_value_end( None, - &sval::Label::new("value").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(2), + &KEY_VALUE_VALUE_LABEL, + &KEY_VALUE_VALUE_INDEX, )?; self.stream.record_tuple_end(None, None, None)?; self.stream.seq_value_end() @@ -315,8 +308,8 @@ impl<'a> sval_ref::ValueRef<'a> for EmitValue<'a> { self.stream.seq_end()?; self.stream.record_tuple_value_end( None, - &sval::Label::new("values").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(1), + &KVLIST_VALUES_LABEL, + &KVLIST_VALUES_INDEX, )?; self.stream.record_tuple_end(None, None, None)?; self.any_value_end(&ANY_VALUE_KVLIST_LABEL, &ANY_VALUE_KVLIST_INDEX) diff --git a/targets/otlp/src/data/export_logs_service.rs b/targets/otlp/src/data/export_logs_service.rs index fb2e0ff..a0e7868 100644 --- a/targets/otlp/src/data/export_logs_service.rs +++ b/targets/otlp/src/data/export_logs_service.rs @@ -4,26 +4,26 @@ use super::{InstrumentationScope, LogRecord, Resource}; #[derive(Value)] pub struct ExportLogsServiceRequest<'a, RL: ?Sized = [ResourceLogs<'a>]> { - #[sval(index = 1)] + #[sval(label = "resourceLogs", index = 1)] pub resource_logs: &'a RL, } #[derive(Value)] pub struct ResourceLogs<'a, R: ?Sized = Resource<'a>, SL: ?Sized = [ScopeLogs<'a>]> { - #[sval(index = 1)] + #[sval(label = "resource", index = 1)] pub resource: &'a R, - #[sval(index = 2)] + #[sval(label = "scopeLogs", index = 2)] pub scope_logs: &'a SL, - #[sval(index = 3)] + #[sval(label = "schemaUrl", index = 3)] pub schema_url: &'a str, } #[derive(Value)] pub struct ScopeLogs<'a, IS: ?Sized = InstrumentationScope<'a>, LR: ?Sized = &'a [LogRecord<'a>]> { - #[sval(index = 1)] + #[sval(label = "scope", index = 1)] pub scope: &'a IS, - #[sval(index = 2)] + #[sval(label = "logRecords", index = 2)] pub log_records: &'a LR, - #[sval(index = 3)] + #[sval(label = "schemaUrl", index = 3)] pub schema_url: &'a str, } diff --git a/targets/otlp/src/data/generated.rs b/targets/otlp/src/data/generated.rs new file mode 100644 index 0000000..d9838ff --- /dev/null +++ b/targets/otlp/src/data/generated.rs @@ -0,0 +1,516 @@ +#[path = ""] +pub(crate) mod logs { + #[path = "./generated/opentelemetry.proto.logs.v1.rs"] + pub(crate) mod v1; +} + +#[path = ""] +pub(crate) mod trace { + #[path = "./generated/opentelemetry.proto.trace.v1.rs"] + pub(crate) mod v1; +} + +#[path = ""] +pub(crate) mod common { + #[path = "./generated/opentelemetry.proto.common.v1.rs"] + pub(crate) mod v1; +} + +#[path = ""] +pub(crate) mod resource { + #[path = "./generated/opentelemetry.proto.resource.v1.rs"] + pub(crate) mod v1; +} + +#[path = ""] +pub(crate) mod collector { + #[path = ""] + pub(crate) mod logs { + #[path = "./generated/opentelemetry.proto.collector.logs.v1.rs"] + pub(crate) mod v1; + } + + #[path = ""] + pub(crate) mod trace { + #[path = "./generated/opentelemetry.proto.collector.trace.v1.rs"] + pub(crate) mod v1; + } +} + +use std::fmt; + +use serde::ser::{ + Error, Serialize, SerializeMap, SerializeSeq, SerializeStruct, SerializeStructVariant, + SerializeTuple, SerializeTupleStruct, SerializeTupleVariant, Serializer, StdError, +}; + +use common::v1::{any_value::Value, AnyValue, ArrayValue, KeyValue, KeyValueList}; + +pub(crate) fn to_value(value: emit_core::value::Value) -> Option { + value.serialize(ValueSerializer).ok() +} + +struct ValueSerializer; + +struct ValueSerializeSeq { + value: ArrayValue, +} + +struct ValueSerializeTuple { + value: ArrayValue, +} + +struct ValueSerializeTupleStruct { + value: ArrayValue, +} + +struct ValueSerializeMap { + value: KeyValueList, +} + +struct ValueSerializeStruct { + value: KeyValueList, +} + +struct ValueSerializeTupleVariant { + variant: &'static str, + value: ArrayValue, +} + +struct ValueSerializeStructVariant { + variant: &'static str, + value: KeyValueList, +} + +#[derive(Debug)] +struct ValueError(String); + +impl fmt::Display for ValueError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&self.0, f) + } +} + +impl Error for ValueError { + fn custom(msg: T) -> Self + where + T: fmt::Display, + { + ValueError(msg.to_string()) + } +} + +impl StdError for ValueError {} + +impl Serializer for ValueSerializer { + type Ok = AnyValue; + + type Error = ValueError; + + type SerializeSeq = ValueSerializeSeq; + + type SerializeTuple = ValueSerializeTuple; + + type SerializeTupleStruct = ValueSerializeTupleStruct; + + type SerializeTupleVariant = ValueSerializeTupleVariant; + + type SerializeMap = ValueSerializeMap; + + type SerializeStruct = ValueSerializeStruct; + + type SerializeStructVariant = ValueSerializeStructVariant; + + fn serialize_bool(self, v: bool) -> Result { + Ok(AnyValue { + value: Some(Value::BoolValue(v)), + }) + } + + fn serialize_i8(self, v: i8) -> Result { + self.serialize_i64(v as i64) + } + + fn serialize_i16(self, v: i16) -> Result { + self.serialize_i64(v as i64) + } + + fn serialize_i32(self, v: i32) -> Result { + self.serialize_i64(v as i64) + } + + fn serialize_i64(self, v: i64) -> Result { + Ok(AnyValue { + value: Some(Value::IntValue(v)), + }) + } + + fn serialize_i128(self, v: i128) -> Result { + if let Ok(v) = v.try_into() { + self.serialize_i64(v) + } else { + self.collect_str(&v) + } + } + + fn serialize_u8(self, v: u8) -> Result { + self.serialize_i64(v as i64) + } + + fn serialize_u16(self, v: u16) -> Result { + self.serialize_i64(v as i64) + } + + fn serialize_u32(self, v: u32) -> Result { + self.serialize_i64(v as i64) + } + + fn serialize_u64(self, v: u64) -> Result { + if let Ok(v) = v.try_into() { + self.serialize_i64(v) + } else { + self.collect_str(&v) + } + } + + fn serialize_u128(self, v: u128) -> Result { + if let Ok(v) = v.try_into() { + self.serialize_i64(v) + } else { + self.collect_str(&v) + } + } + + fn serialize_f32(self, v: f32) -> Result { + self.serialize_f64(v as f64) + } + + fn serialize_f64(self, v: f64) -> Result { + Ok(AnyValue { + value: Some(Value::DoubleValue(v)), + }) + } + + fn serialize_char(self, v: char) -> Result { + self.collect_str(&v) + } + + fn serialize_str(self, v: &str) -> Result { + Ok(AnyValue { + value: Some(Value::StringValue(v.to_owned())), + }) + } + + fn serialize_bytes(self, v: &[u8]) -> Result { + Ok(AnyValue { + value: Some(Value::BytesValue(v.to_owned())), + }) + } + + fn serialize_none(self) -> Result { + Ok(AnyValue { value: None }) + } + + fn serialize_some(self, value: &T) -> Result + where + T: serde::Serialize, + { + value.serialize(self) + } + + fn serialize_unit(self) -> Result { + Ok(AnyValue { value: None }) + } + + fn serialize_unit_struct(self, name: &'static str) -> Result { + name.serialize(self) + } + + fn serialize_unit_variant( + self, + _: &'static str, + _: u32, + variant: &'static str, + ) -> Result { + variant.serialize(self) + } + + fn serialize_newtype_struct( + self, + _: &'static str, + value: &T, + ) -> Result + where + T: serde::Serialize, + { + value.serialize(self) + } + + fn serialize_newtype_variant( + self, + _: &'static str, + _: u32, + variant: &'static str, + value: &T, + ) -> Result + where + T: serde::Serialize, + { + let mut map = self.serialize_map(Some(1))?; + map.serialize_entry(variant, value)?; + map.end() + } + + fn serialize_seq(self, _: Option) -> Result { + Ok(ValueSerializeSeq { + value: ArrayValue { values: Vec::new() }, + }) + } + + fn serialize_tuple(self, _: usize) -> Result { + Ok(ValueSerializeTuple { + value: ArrayValue { values: Vec::new() }, + }) + } + + fn serialize_tuple_struct( + self, + _: &'static str, + _: usize, + ) -> Result { + Ok(ValueSerializeTupleStruct { + value: ArrayValue { values: Vec::new() }, + }) + } + + fn serialize_tuple_variant( + self, + _: &'static str, + _: u32, + variant: &'static str, + _: usize, + ) -> Result { + Ok(ValueSerializeTupleVariant { + variant, + value: ArrayValue { values: Vec::new() }, + }) + } + + fn serialize_map(self, _: Option) -> Result { + Ok(ValueSerializeMap { + value: KeyValueList { values: Vec::new() }, + }) + } + + fn serialize_struct( + self, + _: &'static str, + _: usize, + ) -> Result { + Ok(ValueSerializeStruct { + value: KeyValueList { values: Vec::new() }, + }) + } + + fn serialize_struct_variant( + self, + _: &'static str, + _: u32, + variant: &'static str, + _: usize, + ) -> Result { + Ok(ValueSerializeStructVariant { + variant, + value: KeyValueList { values: Vec::new() }, + }) + } +} + +impl SerializeSeq for ValueSerializeSeq { + type Ok = AnyValue; + + type Error = ValueError; + + fn serialize_element(&mut self, value: &T) -> Result<(), Self::Error> + where + T: Serialize, + { + Ok(self.value.values.push(value.serialize(ValueSerializer)?)) + } + + fn end(self) -> Result { + Ok(AnyValue { + value: Some(Value::ArrayValue(self.value)), + }) + } +} + +impl SerializeTuple for ValueSerializeTuple { + type Ok = AnyValue; + + type Error = ValueError; + + fn serialize_element(&mut self, value: &T) -> Result<(), Self::Error> + where + T: Serialize, + { + Ok(self.value.values.push(value.serialize(ValueSerializer)?)) + } + + fn end(self) -> Result { + Ok(AnyValue { + value: Some(Value::ArrayValue(self.value)), + }) + } +} + +impl SerializeTupleStruct for ValueSerializeTupleStruct { + type Ok = AnyValue; + + type Error = ValueError; + + fn serialize_field(&mut self, value: &T) -> Result<(), Self::Error> + where + T: Serialize, + { + Ok(self.value.values.push(value.serialize(ValueSerializer)?)) + } + + fn end(self) -> Result { + Ok(AnyValue { + value: Some(Value::ArrayValue(self.value)), + }) + } +} + +impl SerializeTupleVariant for ValueSerializeTupleVariant { + type Ok = AnyValue; + + type Error = ValueError; + + fn serialize_field(&mut self, value: &T) -> Result<(), Self::Error> + where + T: Serialize, + { + self.value.values.push(value.serialize(ValueSerializer)?); + + Ok(()) + } + + fn end(self) -> Result { + Ok(AnyValue { + value: Some(Value::KvlistValue(KeyValueList { + values: vec![KeyValue { + key: self.variant.to_owned(), + value: Some(AnyValue { + value: Some(Value::ArrayValue(self.value)), + }), + }], + })), + }) + } +} + +impl SerializeMap for ValueSerializeMap { + type Ok = AnyValue; + + type Error = ValueError; + + fn serialize_key(&mut self, key: &T) -> Result<(), Self::Error> + where + T: Serialize, + { + let key = match key.serialize(ValueSerializer)? { + AnyValue { + value: Some(Value::StringValue(key)), + } => key, + key => format!("{:?}", key), + }; + + self.value.values.push(KeyValue { key, value: None }); + + Ok(()) + } + + fn serialize_value(&mut self, value: &T) -> Result<(), Self::Error> + where + T: Serialize, + { + self.value + .values + .last_mut() + .ok_or_else(|| Error::custom("missing key"))? + .value = Some(value.serialize(ValueSerializer)?); + + Ok(()) + } + + fn end(self) -> Result { + Ok(AnyValue { + value: Some(Value::KvlistValue(self.value)), + }) + } +} + +impl SerializeStruct for ValueSerializeStruct { + type Ok = AnyValue; + + type Error = ValueError; + + fn serialize_field( + &mut self, + key: &'static str, + value: &T, + ) -> Result<(), Self::Error> + where + T: Serialize, + { + let key = key.to_owned(); + let value = Some(value.serialize(ValueSerializer)?); + + self.value.values.push(KeyValue { key, value }); + + Ok(()) + } + + fn end(self) -> Result { + Ok(AnyValue { + value: Some(Value::KvlistValue(self.value)), + }) + } +} + +impl SerializeStructVariant for ValueSerializeStructVariant { + type Ok = AnyValue; + + type Error = ValueError; + + fn serialize_field( + &mut self, + key: &'static str, + value: &T, + ) -> Result<(), Self::Error> + where + T: Serialize, + { + self.value.values.push(KeyValue { + key: key.to_owned(), + value: Some(value.serialize(ValueSerializer)?), + }); + + Ok(()) + } + + fn end(self) -> Result { + Ok(AnyValue { + value: Some(Value::KvlistValue(KeyValueList { + values: vec![KeyValue { + key: self.variant.to_owned(), + value: Some(AnyValue { + value: Some(Value::KvlistValue(self.value)), + }), + }], + })), + }) + } +} diff --git a/targets/otlp/src/proto/opentelemetry.proto.collector.logs.v1.rs b/targets/otlp/src/data/generated/opentelemetry.proto.collector.logs.v1.rs similarity index 100% rename from targets/otlp/src/proto/opentelemetry.proto.collector.logs.v1.rs rename to targets/otlp/src/data/generated/opentelemetry.proto.collector.logs.v1.rs diff --git a/targets/otlp/src/proto/opentelemetry.proto.collector.trace.v1.rs b/targets/otlp/src/data/generated/opentelemetry.proto.collector.trace.v1.rs similarity index 100% rename from targets/otlp/src/proto/opentelemetry.proto.collector.trace.v1.rs rename to targets/otlp/src/data/generated/opentelemetry.proto.collector.trace.v1.rs diff --git a/targets/otlp/src/proto/opentelemetry.proto.common.v1.rs b/targets/otlp/src/data/generated/opentelemetry.proto.common.v1.rs similarity index 100% rename from targets/otlp/src/proto/opentelemetry.proto.common.v1.rs rename to targets/otlp/src/data/generated/opentelemetry.proto.common.v1.rs diff --git a/targets/otlp/src/proto/opentelemetry.proto.logs.v1.rs b/targets/otlp/src/data/generated/opentelemetry.proto.logs.v1.rs similarity index 100% rename from targets/otlp/src/proto/opentelemetry.proto.logs.v1.rs rename to targets/otlp/src/data/generated/opentelemetry.proto.logs.v1.rs diff --git a/targets/otlp/src/proto/opentelemetry.proto.resource.v1.rs b/targets/otlp/src/data/generated/opentelemetry.proto.resource.v1.rs similarity index 100% rename from targets/otlp/src/proto/opentelemetry.proto.resource.v1.rs rename to targets/otlp/src/data/generated/opentelemetry.proto.resource.v1.rs diff --git a/targets/otlp/src/proto/opentelemetry.proto.trace.v1.rs b/targets/otlp/src/data/generated/opentelemetry.proto.trace.v1.rs similarity index 100% rename from targets/otlp/src/proto/opentelemetry.proto.trace.v1.rs rename to targets/otlp/src/data/generated/opentelemetry.proto.trace.v1.rs diff --git a/targets/otlp/src/data/instrumentation_scope.rs b/targets/otlp/src/data/instrumentation_scope.rs index e1e5551..2d4b58d 100644 --- a/targets/otlp/src/data/instrumentation_scope.rs +++ b/targets/otlp/src/data/instrumentation_scope.rs @@ -4,18 +4,18 @@ use super::{AnyValue, KeyValue}; #[derive(Value)] pub struct InstrumentationScope<'a, A: ?Sized = InlineInstrumentationScopeAttributes<'a>> { - #[sval(index = 1)] + #[sval(label = "name", index = 1)] pub name: &'a str, - #[sval(index = 2)] + #[sval(label = "version", index = 2)] pub version: &'a str, #[sval(flatten)] pub attributes: &'a A, - #[sval(index = 4)] + #[sval(label = "droppedAttributeCount", index = 4)] pub dropped_attribute_count: u32, } #[derive(Value)] pub struct InlineInstrumentationScopeAttributes<'a> { - #[sval(index = 1)] + #[sval(label = "attributes", index = 1)] pub attributes: &'a [KeyValue<&'a str, &'a AnyValue<'a>>], } diff --git a/targets/otlp/src/data/log_record.rs b/targets/otlp/src/data/log_record.rs index fdc2c35..9873284 100644 --- a/targets/otlp/src/data/log_record.rs +++ b/targets/otlp/src/data/log_record.rs @@ -15,31 +15,60 @@ pub enum SeverityNumber { #[derive(Value)] pub struct LogRecord<'a, B: ?Sized = AnyValue<'a>, A: ?Sized = InlineLogRecordAttributes<'a>> { - #[sval(index = 1, data_tag = "sval_protobuf::tags::PROTOBUF_I64")] + #[sval( + label = "timeUnixNano", + index = 1, + data_tag = "sval_protobuf::tags::PROTOBUF_I64" + )] pub time_unix_nano: u64, - #[sval(index = 11, data_tag = "sval_protobuf::tags::PROTOBUF_I64")] + #[sval( + label = "observedTimeUnixNano", + index = 11, + data_tag = "sval_protobuf::tags::PROTOBUF_I64" + )] pub observed_time_unix_nano: u64, - #[sval(index = 7)] + #[sval(label = "droppedAttributesCount", index = 7)] pub dropped_attributes_count: u32, - #[sval(index = 8, data_tag = "sval_protobuf::tags::PROTOBUF_I32")] + #[sval( + label = "flags", + index = 8, + data_tag = "sval_protobuf::tags::PROTOBUF_I32" + )] pub flags: u32, - #[sval(index = 5)] + #[sval(label = "body", index = 5)] pub body: &'a B, #[sval(flatten)] pub attributes: &'a A, } +const LOG_RECORD_SEVERITY_NUMBER_LABEL: sval::Label = + sval::Label::new("severityNumber").with_tag(&sval::tags::VALUE_IDENT); +const LOG_RECORD_SEVERITY_TEXT_LABEL: sval::Label = + sval::Label::new("severityText").with_tag(&sval::tags::VALUE_IDENT); +const LOG_RECORD_ATTRIBUTES_LABEL: sval::Label = + sval::Label::new("attributes").with_tag(&sval::tags::VALUE_IDENT); +const LOG_RECORD_TRACE_ID_LABEL: sval::Label = + sval::Label::new("traceId").with_tag(&sval::tags::VALUE_IDENT); +const LOG_RECORD_SPAN_ID_LABEL: sval::Label = + sval::Label::new("spanId").with_tag(&sval::tags::VALUE_IDENT); + +const LOG_RECORD_SEVERITY_NUMBER_INDEX: sval::Index = sval::Index::new(2); +const LOG_RECORD_SEVERITY_TEXT_INDEX: sval::Index = sval::Index::new(3); +const LOG_RECORD_ATTRIBUTES_INDEX: sval::Index = sval::Index::new(6); +const LOG_RECORD_TRACE_ID_INDEX: sval::Index = sval::Index::new(9); +const LOG_RECORD_SPAN_ID_INDEX: sval::Index = sval::Index::new(10); + #[derive(Value)] pub struct InlineLogRecordAttributes<'a> { - #[sval(index = 2)] + #[sval(label = LOG_RECORD_SEVERITY_NUMBER_LABEL, index = LOG_RECORD_SEVERITY_NUMBER_INDEX)] pub severity_number: SeverityNumber, - #[sval(index = 3)] + #[sval(label = LOG_RECORD_SEVERITY_TEXT_LABEL, index = LOG_RECORD_SEVERITY_TEXT_INDEX)] pub severity_text: &'a str, - #[sval(index = 6)] + #[sval(label = LOG_RECORD_ATTRIBUTES_LABEL, index = LOG_RECORD_ATTRIBUTES_INDEX)] pub attributes: &'a [KeyValue<&'a str, &'a AnyValue<'a>>], - #[sval(index = 9)] + #[sval(label = LOG_RECORD_TRACE_ID_LABEL, index = LOG_RECORD_TRACE_ID_INDEX)] pub trace_id: &'a sval::BinaryArray<16>, - #[sval(index = 10)] + #[sval(label = LOG_RECORD_SPAN_ID_LABEL, index = LOG_RECORD_SPAN_ID_INDEX)] pub span_id: &'a sval::BinaryArray<8>, } @@ -55,8 +84,8 @@ impl sval::Value for EmitLogRecordAttributes

{ stream.record_tuple_value_begin( None, - &sval::Label::new("attributes").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(6), + &LOG_RECORD_ATTRIBUTES_LABEL, + &LOG_RECORD_ATTRIBUTES_INDEX, )?; stream.seq_begin(None)?; @@ -109,8 +138,8 @@ impl sval::Value for EmitLogRecordAttributes

{ stream.seq_end()?; stream.record_tuple_value_end( None, - &sval::Label::new("attributes").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(6), + &LOG_RECORD_ATTRIBUTES_LABEL, + &LOG_RECORD_ATTRIBUTES_INDEX, )?; let severity_number = match level { @@ -122,57 +151,57 @@ impl sval::Value for EmitLogRecordAttributes

{ stream.record_tuple_value_begin( None, - &sval::Label::new("severityNumber").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(2), + &LOG_RECORD_SEVERITY_NUMBER_LABEL, + &LOG_RECORD_SEVERITY_NUMBER_INDEX, )?; stream.i32(severity_number)?; stream.record_tuple_value_end( None, - &sval::Label::new("severityNumber").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(2), + &LOG_RECORD_SEVERITY_NUMBER_LABEL, + &LOG_RECORD_SEVERITY_NUMBER_INDEX, )?; stream.record_tuple_value_begin( None, - &sval::Label::new("severityText").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(3), + &LOG_RECORD_SEVERITY_TEXT_LABEL, + &LOG_RECORD_SEVERITY_TEXT_INDEX, )?; sval::stream_display(&mut *stream, level)?; stream.record_tuple_value_end( None, - &sval::Label::new("severityText").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(3), + &LOG_RECORD_SEVERITY_TEXT_LABEL, + &LOG_RECORD_SEVERITY_TEXT_INDEX, )?; if trace_id != [0; 32] { stream.record_tuple_value_begin( None, - &sval::Label::new("traceId").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(9), + &LOG_RECORD_TRACE_ID_LABEL, + &LOG_RECORD_TRACE_ID_INDEX, )?; stream.binary_begin(Some(32))?; stream.binary_fragment_computed(&trace_id)?; stream.binary_end()?; stream.record_tuple_value_end( None, - &sval::Label::new("traceId").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(9), + &LOG_RECORD_TRACE_ID_LABEL, + &LOG_RECORD_TRACE_ID_INDEX, )?; } if span_id != [0; 16] { stream.record_tuple_value_begin( None, - &sval::Label::new("spanId").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(10), + &LOG_RECORD_SPAN_ID_LABEL, + &LOG_RECORD_SPAN_ID_INDEX, )?; stream.binary_begin(Some(16))?; stream.binary_fragment_computed(&span_id)?; stream.binary_end()?; stream.record_tuple_value_end( None, - &sval::Label::new("spanId").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(10), + &LOG_RECORD_SPAN_ID_LABEL, + &LOG_RECORD_SPAN_ID_INDEX, )?; } diff --git a/targets/otlp/src/data/resource.rs b/targets/otlp/src/data/resource.rs index 4b59266..7a66d1c 100644 --- a/targets/otlp/src/data/resource.rs +++ b/targets/otlp/src/data/resource.rs @@ -12,6 +12,11 @@ pub struct Resource<'a, A: ?Sized = InlineResourceAttributes<'a>> { pub dropped_attribute_count: u32, } +const RESOURCE_ATTRIBUTES_LABEL: sval::Label = + sval::Label::new("attributes").with_tag(&sval::tags::VALUE_IDENT); + +const RESOURCE_ATTRIBUTES_INDEX: sval::Index = sval::Index::new(1); + #[derive(Value)] pub struct InlineResourceAttributes<'a> { #[sval(index = 1)] @@ -26,8 +31,8 @@ impl sval::Value for EmitResourceAttributes

{ stream.record_tuple_value_begin( None, - &sval::Label::new("attributes").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(1), + &RESOURCE_ATTRIBUTES_LABEL, + &RESOURCE_ATTRIBUTES_INDEX, )?; stream.seq_begin(None)?; @@ -61,8 +66,8 @@ impl sval::Value for EmitResourceAttributes

{ stream.seq_end()?; stream.record_tuple_value_end( None, - &sval::Label::new("attributes").with_tag(&sval::tags::VALUE_IDENT), - &sval::Index::new(1), + &RESOURCE_ATTRIBUTES_LABEL, + &RESOURCE_ATTRIBUTES_INDEX, )?; stream.record_tuple_end(None, None, None) diff --git a/targets/otlp/src/lib.rs b/targets/otlp/src/lib.rs index 36c0819..b309159 100644 --- a/targets/otlp/src/lib.rs +++ b/targets/otlp/src/lib.rs @@ -1,6 +1,3 @@ -#[cfg(feature = "grpc")] -mod proto; - mod client; mod data; diff --git a/targets/otlp/src/logs.rs b/targets/otlp/src/logs.rs index c42a049..05cbc9c 100644 --- a/targets/otlp/src/logs.rs +++ b/targets/otlp/src/logs.rs @@ -1,6 +1,6 @@ use crate::{ client::{OtlpClient, OtlpClientBuilder}, - data, + data::{self, PreEncoded}, }; use std::time::Duration; use sval_protobuf::buf::ProtoBuf; @@ -30,8 +30,8 @@ impl OtlpLogsEmitterBuilder { OtlpLogsEmitter { inner: self.inner.spawn(|client, batch| { client.emit(batch, |ref resource, ref scope, batch| { - let protobuf = - sval_protobuf::stream_to_protobuf(data::ExportLogsServiceRequest { + Ok(PreEncoded::Proto(sval_protobuf::stream_to_protobuf( + data::ExportLogsServiceRequest { resource_logs: &[data::ResourceLogs { resource, scope_logs: &[data::ScopeLogs { @@ -41,9 +41,8 @@ impl OtlpLogsEmitterBuilder { }], schema_url: "", }], - }); - - Ok(protobuf.to_vec().into_owned()) + }, + ))) }) }), } diff --git a/targets/otlp/src/proto.rs b/targets/otlp/src/proto.rs deleted file mode 100644 index 44e8c9b..0000000 --- a/targets/otlp/src/proto.rs +++ /dev/null @@ -1,38 +0,0 @@ -#[path = ""] -pub(crate) mod logs { - #[path = "./proto/opentelemetry.proto.logs.v1.rs"] - pub(crate) mod v1; -} - -#[path = ""] -pub(crate) mod trace { - #[path = "./proto/opentelemetry.proto.trace.v1.rs"] - pub(crate) mod v1; -} - -#[path = ""] -pub(crate) mod common { - #[path = "./proto/opentelemetry.proto.common.v1.rs"] - pub(crate) mod v1; -} - -#[path = ""] -pub(crate) mod resource { - #[path = "./proto/opentelemetry.proto.resource.v1.rs"] - pub(crate) mod v1; -} - -#[path = ""] -pub(crate) mod collector { - #[path = ""] - pub(crate) mod logs { - #[path = "./proto/opentelemetry.proto.collector.logs.v1.rs"] - pub(crate) mod v1; - } - - #[path = ""] - pub(crate) mod trace { - #[path = "./proto/opentelemetry.proto.collector.trace.v1.rs"] - pub(crate) mod v1; - } -} diff --git a/tests/smoke-test/main.rs b/tests/smoke-test/main.rs index edcb177..613b049 100644 --- a/tests/smoke-test/main.rs +++ b/tests/smoke-test/main.rs @@ -22,20 +22,11 @@ async fn main() { }) .spawn(), ) + .and_to(emit_term::stdout()) .init(); - let mut handles = Vec::new(); - - for i in 0..10 { - handles.push(tokio::spawn(async move { - for n in i * 5_000..i * 5_000 + 5_000 { - let _ = in_ctxt(n).await; - } - })); - } - - for handle in handles { - let _ = handle.await; + for i in 0..100 { + let _ = in_ctxt(i).await; } emitter.blocking_flush(Duration::from_secs(5)); @@ -55,6 +46,8 @@ async fn in_ctxt(a: i32) -> Result<(), io::Error> { emit::info!("working on {#[emit::as_serde] work}"); + tokio::time::sleep(Duration::from_millis(100)).await; + if a % 2 == 0 { Ok(()) } else {