From c8591465894a8a048dfd50011fe17beba4428657 Mon Sep 17 00:00:00 2001 From: KodrAus Date: Thu, 25 Jan 2024 08:53:24 +1000 Subject: [PATCH] fill in more otlp configuration --- core/src/props.rs | 44 +++++ targets/otlp/src/client.rs | 317 ++++++++++++++++++++----------------- targets/otlp/src/lib.rs | 32 +++- tests/smoke-test/main.rs | 15 +- 4 files changed, 251 insertions(+), 157 deletions(-) diff --git a/core/src/props.rs b/core/src/props.rs index 6416a54..593501f 100644 --- a/core/src/props.rs +++ b/core/src/props.rs @@ -124,6 +124,50 @@ where } } +#[cfg(feature = "alloc")] +impl Props for alloc::collections::BTreeMap +where + K: Ord + ToStr + Borrow, + V: ToValue, +{ + fn for_each<'kv, F: FnMut(Str<'kv>, Value<'kv>) -> ControlFlow<()>>( + &'kv self, + mut for_each: F, + ) -> ControlFlow<()> { + for (k, v) in self { + for_each(k.to_str(), v.to_value())?; + } + + ControlFlow::Continue(()) + } + + fn get<'v, Q: ToStr>(&'v self, key: Q) -> Option> { + self.get(key.to_str().as_ref()).map(|v| v.to_value()) + } +} + +#[cfg(feature = "std")] +impl Props for std::collections::HashMap +where + K: Eq + std::hash::Hash + ToStr + Borrow, + V: ToValue, +{ + fn for_each<'kv, F: FnMut(Str<'kv>, Value<'kv>) -> ControlFlow<()>>( + &'kv self, + mut for_each: F, + ) -> ControlFlow<()> { + for (k, v) in self { + for_each(k.to_str(), v.to_value())?; + } + + ControlFlow::Continue(()) + } + + fn get<'v, Q: ToStr>(&'v self, key: Q) -> Option> { + self.get(key.to_str().as_ref()).map(|v| v.to_value()) + } +} + impl Props for Empty { fn for_each<'kv, F: FnMut(Str<'kv>, Value<'kv>) -> ControlFlow<()>>( &'kv self, diff --git a/targets/otlp/src/client.rs b/targets/otlp/src/client.rs index a54352a..665174c 100644 --- a/targets/otlp/src/client.rs +++ b/targets/otlp/src/client.rs @@ -1,5 +1,5 @@ use emit_batcher::BatchError; -use std::{fmt, sync::Arc, time::Duration}; +use std::{collections::HashMap, fmt, sync::Arc, time::Duration}; use crate::{ data::{self, default_message_formatter, logs, metrics, traces, PreEncoded}, @@ -10,14 +10,14 @@ use self::http::HttpConnection; mod http; -pub struct OtlpClient { +pub struct Otlp { logs: Option, traces: Option, metrics: Option, sender: emit_batcher::Sender>, } -impl emit::emitter::Emitter for OtlpClient { +impl emit::emitter::Emitter for Otlp { fn emit(&self, evt: &emit::event::Event

) { if let Some(ref encoder) = self.metrics { if let Some(encoded) = encoder.encode_event(evt) { @@ -96,33 +96,45 @@ impl emit_batcher::Channel for Channel { } } -pub struct OtlpClientBuilder { - resource: Option, - scope: Option, - encoding: Encoding, +pub struct OtlpBuilder { + resource: Option, + scope: Option, logs: Option, traces: Option, metrics: Option, } +struct Resource { + attributes: HashMap, emit::value::OwnedValue>, +} + +struct Scope { + name: String, + version: String, + attributes: HashMap, emit::value::OwnedValue>, +} + pub struct OtlpLogsBuilder { encoder: logs::EventEncoder, - transport: Transport, + encoding: Encoding, + transport: OtlpTransportBuilder, } impl OtlpLogsBuilder { - pub fn http(dst: impl Into) -> Self { + pub fn proto(transport: OtlpTransportBuilder) -> Self { OtlpLogsBuilder { encoder: logs::EventEncoder { body: default_message_formatter(), }, - transport: Transport::Http { - url: dst.into(), - headers: Vec::new(), - }, + encoding: Encoding::Proto, + transport, } } + pub fn http_proto(dst: impl Into) -> Self { + Self::proto(OtlpTransportBuilder::http(dst)) + } + pub fn body( mut self, writer: impl Fn( @@ -136,44 +148,29 @@ impl OtlpLogsBuilder { self.encoder.body = Box::new(writer); self } - - pub fn http_headers, V: Into>( - mut self, - http_headers: impl IntoIterator, - ) -> Self { - match self.transport { - Transport::Http { - ref mut headers, .. - } => { - *headers = http_headers - .into_iter() - .map(|(k, v)| (k.into(), v.into())) - .collect(); - } - } - - self - } } pub struct OtlpTracesBuilder { encoder: traces::EventEncoder, - transport: Transport, + encoding: Encoding, + transport: OtlpTransportBuilder, } impl OtlpTracesBuilder { - pub fn http(dst: impl Into) -> Self { + pub fn proto(transport: OtlpTransportBuilder) -> Self { OtlpTracesBuilder { encoder: traces::EventEncoder { name: default_message_formatter(), }, - transport: Transport::Http { - url: dst.into(), - headers: Vec::new(), - }, + encoding: Encoding::Proto, + transport, } } + pub fn http_proto(dst: impl Into) -> Self { + Self::proto(OtlpTransportBuilder::http(dst)) + } + pub fn name( mut self, writer: impl Fn( @@ -187,60 +184,27 @@ impl OtlpTracesBuilder { self.encoder.name = Box::new(writer); self } - - pub fn http_headers, V: Into>( - mut self, - http_headers: impl IntoIterator, - ) -> Self { - match self.transport { - Transport::Http { - ref mut headers, .. - } => { - *headers = http_headers - .into_iter() - .map(|(k, v)| (k.into(), v.into())) - .collect(); - } - } - - self - } } pub struct OtlpMetricsBuilder { encoder: metrics::EventEncoder, - transport: Transport, + encoding: Encoding, + transport: OtlpTransportBuilder, } impl OtlpMetricsBuilder { - pub fn http(dst: impl Into) -> Self { + pub fn proto(transport: OtlpTransportBuilder) -> Self { OtlpMetricsBuilder { encoder: metrics::EventEncoder { name: default_message_formatter(), }, - transport: Transport::Http { - url: dst.into(), - headers: Vec::new(), - }, + encoding: Encoding::Proto, + transport, } } - pub fn http_headers, V: Into>( - mut self, - http_headers: impl IntoIterator, - ) -> Self { - match self.transport { - Transport::Http { - ref mut headers, .. - } => { - *headers = http_headers - .into_iter() - .map(|(k, v)| (k.into(), v.into())) - .collect(); - } - } - - self + pub fn http_proto(dst: impl Into) -> Self { + Self::proto(OtlpTransportBuilder::http(dst)) } } @@ -248,17 +212,55 @@ enum Encoding { Proto, } -enum Transport { - Http { - url: String, - headers: Vec<(String, String)>, - }, +enum Protocol { + Http, } -impl OtlpClientBuilder { - pub fn proto() -> Self { - OtlpClientBuilder { - encoding: Encoding::Proto, +pub struct OtlpTransportBuilder { + protocol: Protocol, + url: String, + headers: Vec<(String, String)>, +} + +impl OtlpTransportBuilder { + pub fn http(dst: impl Into) -> Self { + OtlpTransportBuilder { + protocol: Protocol::Http, + url: dst.into(), + headers: Vec::new(), + } + } + + pub fn headers, V: Into>( + mut self, + headers: impl IntoIterator, + ) -> Self { + self.headers = headers + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(); + + self + } + + fn build( + self, + resource: Option, + scope: Option, + ) -> Result { + Ok(match self.protocol { + Protocol::Http => OtlpTransport::Http { + http: HttpConnection::new(self.url, self.headers)?, + resource, + scope, + }, + }) + } +} + +impl OtlpBuilder { + pub fn new() -> Self { + OtlpBuilder { resource: None, scope: None, logs: None, @@ -267,8 +269,8 @@ impl OtlpClientBuilder { } } - pub fn logs_http(self, dst: impl Into) -> Self { - self.logs(OtlpLogsBuilder::http(dst)) + pub fn logs_http_proto(self, dst: impl Into) -> Self { + self.logs(OtlpLogsBuilder::http_proto(dst)) } pub fn logs(mut self, builder: OtlpLogsBuilder) -> Self { @@ -276,8 +278,8 @@ impl OtlpClientBuilder { self } - pub fn traces_http(self, dst: impl Into) -> Self { - self.traces(OtlpTracesBuilder::http(dst)) + pub fn traces_http_proto(self, dst: impl Into) -> Self { + self.traces(OtlpTracesBuilder::http_proto(dst)) } pub fn traces(mut self, builder: OtlpTracesBuilder) -> Self { @@ -285,8 +287,8 @@ impl OtlpClientBuilder { self } - pub fn metrics_http(self, dst: impl Into) -> Self { - self.metrics(OtlpMetricsBuilder::http(dst)) + pub fn metrics_http_proto(self, dst: impl Into) -> Self { + self.metrics(OtlpMetricsBuilder::http_proto(dst)) } pub fn metrics(mut self, builder: OtlpMetricsBuilder) -> Self { @@ -295,84 +297,91 @@ impl OtlpClientBuilder { } pub fn resource(mut self, attributes: impl emit::props::Props) -> Self { - match self.encoding { - Encoding::Proto => { - let protobuf = sval_protobuf::stream_to_protobuf(data::Resource { - attributes: &data::PropsResourceAttributes(attributes), - dropped_attribute_count: 0, - }); - - self.resource = Some(PreEncoded::Proto(protobuf)); - } - } + let mut resource = Resource { + attributes: HashMap::new(), + }; + + attributes.for_each(|k, v| { + resource.attributes.insert(k.to_owned(), v.to_owned()); + + std::ops::ControlFlow::Continue(()) + }); + + self.resource = Some(resource); self } - pub fn scope(mut self, name: &str, version: &str, attributes: impl emit::props::Props) -> Self { - match self.encoding { - Encoding::Proto => { - let protobuf = sval_protobuf::stream_to_protobuf(data::InstrumentationScope { - name, - version, - attributes: &data::PropsInstrumentationScopeAttributes(attributes), - dropped_attribute_count: 0, - }); - - self.scope = Some(PreEncoded::Proto(protobuf)); - } - } + pub fn scope( + mut self, + name: impl Into, + version: impl Into, + attributes: impl emit::props::Props, + ) -> Self { + let mut scope = Scope { + name: name.into(), + version: version.into(), + attributes: HashMap::new(), + }; + + attributes.for_each(|k, v| { + scope.attributes.insert(k.to_owned(), v.to_owned()); + + std::ops::ControlFlow::Continue(()) + }); + + self.scope = Some(scope); self } - pub fn spawn(self) -> Result { + pub fn spawn(self) -> Result { let (sender, receiver) = emit_batcher::bounded(10_000); let mut logs = None; let mut traces = None; let mut metrics = None; - let client = OtlpSender { + let client = OtlpClient { logs: match self.logs { Some(OtlpLogsBuilder { encoder, - transport: Transport::Http { url, headers }, + encoding: Encoding::Proto, + transport, }) => { logs = Some(encoder); - Some(Arc::new(RawClient::Http { - http: HttpConnection::new(url, headers)?, - resource: self.resource.clone(), - scope: self.scope.clone(), - })) + Some(Arc::new(transport.build( + self.resource.as_ref().map(resource_proto), + self.scope.as_ref().map(scope_proto), + )?)) } None => None, }, traces: match self.traces { Some(OtlpTracesBuilder { encoder, - transport: Transport::Http { url, headers }, + encoding: Encoding::Proto, + transport, }) => { traces = Some(encoder); - Some(Arc::new(RawClient::Http { - http: HttpConnection::new(url, headers)?, - resource: self.resource.clone(), - scope: self.scope.clone(), - })) + Some(Arc::new(transport.build( + self.resource.as_ref().map(resource_proto), + self.scope.as_ref().map(scope_proto), + )?)) } None => None, }, metrics: match self.metrics { Some(OtlpMetricsBuilder { encoder, - transport: Transport::Http { url, headers }, + encoding: Encoding::Proto, + transport, }) => { metrics = Some(encoder); - Some(Arc::new(RawClient::Http { - http: HttpConnection::new(url, headers)?, - resource: self.resource.clone(), - scope: self.scope.clone(), - })) + Some(Arc::new(transport.build( + self.resource.as_ref().map(resource_proto), + self.scope.as_ref().map(scope_proto), + )?)) } None => None, }, @@ -492,7 +501,7 @@ impl OtlpClientBuilder { } }); - Ok(OtlpClient { + Ok(Otlp { logs, traces, metrics, @@ -501,15 +510,35 @@ impl OtlpClientBuilder { } } +fn resource_proto(resource: &Resource) -> PreEncoded { + let protobuf = sval_protobuf::stream_to_protobuf(data::Resource { + attributes: &data::PropsResourceAttributes(&resource.attributes), + dropped_attribute_count: 0, + }); + + PreEncoded::Proto(protobuf) +} + +fn scope_proto(scope: &Scope) -> PreEncoded { + let protobuf = sval_protobuf::stream_to_protobuf(data::InstrumentationScope { + name: &scope.name, + version: &scope.version, + attributes: &data::PropsInstrumentationScopeAttributes(&scope.attributes), + dropped_attribute_count: 0, + }); + + PreEncoded::Proto(protobuf) +} + #[derive(Clone)] -pub struct OtlpSender { - // TODO: Share the client - logs: Option>, - traces: Option>, - metrics: Option>, +pub struct OtlpClient { + // TODO: Share the client when possible + logs: Option>, + traces: Option>, + metrics: Option>, } -enum RawClient { +enum OtlpTransport { Http { http: HttpConnection, resource: Option, @@ -517,7 +546,7 @@ enum RawClient { }, } -impl RawClient { +impl OtlpTransport { pub(crate) async fn send( &self, batch: Vec, @@ -542,7 +571,7 @@ impl RawClient { ctxt.with_future(async move { match self { - RawClient::Http { + OtlpTransport::Http { ref http, ref resource, ref scope, diff --git a/targets/otlp/src/lib.rs b/targets/otlp/src/lib.rs index 79f2503..e4eb7e8 100644 --- a/targets/otlp/src/lib.rs +++ b/targets/otlp/src/lib.rs @@ -6,18 +6,34 @@ mod error; pub use self::{client::*, error::*}; -pub fn proto() -> OtlpClientBuilder { - OtlpClientBuilder::proto() +pub fn new() -> OtlpBuilder { + OtlpBuilder::new() } -pub fn logs_http(dst: impl Into) -> OtlpLogsBuilder { - OtlpLogsBuilder::http(dst) +pub fn http(dst: impl Into) -> OtlpTransportBuilder { + OtlpTransportBuilder::http(dst) } -pub fn traces_http(dst: impl Into) -> OtlpTracesBuilder { - OtlpTracesBuilder::http(dst) +pub fn logs_http_proto(dst: impl Into) -> OtlpLogsBuilder { + OtlpLogsBuilder::http_proto(dst) } -pub fn metrics_http(dst: impl Into) -> OtlpMetricsBuilder { - OtlpMetricsBuilder::http(dst) +pub fn logs_proto(transport: OtlpTransportBuilder) -> OtlpLogsBuilder { + OtlpLogsBuilder::proto(transport) +} + +pub fn traces_http_proto(dst: impl Into) -> OtlpTracesBuilder { + OtlpTracesBuilder::http_proto(dst) +} + +pub fn traces_proto(transport: OtlpTransportBuilder) -> OtlpTracesBuilder { + OtlpTracesBuilder::proto(transport) +} + +pub fn metrics_http_proto(dst: impl Into) -> OtlpMetricsBuilder { + OtlpMetricsBuilder::http_proto(dst) +} + +pub fn metrics_proto(transport: OtlpTransportBuilder) -> OtlpMetricsBuilder { + OtlpMetricsBuilder::proto(transport) } diff --git a/tests/smoke-test/main.rs b/tests/smoke-test/main.rs index 3ac85ae..a0d7026 100644 --- a/tests/smoke-test/main.rs +++ b/tests/smoke-test/main.rs @@ -20,16 +20,21 @@ async fn main() { .init_internal(); let emitter = emit::setup() - .to(emit_otlp::proto() + .to(emit_otlp::new() .logs( - emit_otlp::logs_http("http://localhost:4318/v1/logs") - .body(|evt, f| write!(f, "{}", evt.tpl().braced())), + emit_otlp::logs_proto( + emit_otlp::http("http://localhost:4318/v1/logs") + .headers([("X-ApiKey", "1234")]), + ) + .body(|evt, f| write!(f, "{}", evt.tpl().braced())), ) .traces( - emit_otlp::traces_http("http://localhost:4318/v1/traces") + emit_otlp::traces_http_proto("http://localhost:4318/v1/traces") .name(|evt, f| write!(f, "{}", evt.tpl().braced())), ) - .metrics(emit_otlp::metrics_http("http://localhost:4318/v1/metrics")) + .metrics(emit_otlp::metrics_http_proto( + "http://localhost:4318/v1/metrics", + )) .resource(emit::props! { #[emit::key("service.name")] service_name: "smoke-test-rs",