From 39f9c179753315358ae925428105cf031d5258b0 Mon Sep 17 00:00:00 2001 From: KodrAus Date: Mon, 22 Jan 2024 19:43:23 +1000 Subject: [PATCH] diagnostic improvements to HTTP client --- core/src/value.rs | 22 +++++ macros/src/in_ctxt.rs | 13 +-- targets/file/src/lib.rs | 10 +- targets/otlp/Cargo.toml | 2 +- targets/otlp/src/client.rs | 76 ++++++++------ targets/otlp/src/client/http.rs | 32 ++++-- targets/otlp/src/data.rs | 6 ++ targets/otlp/src/data/generated.rs | 1 - targets/otlp/src/data/metrics.rs | 154 ++++++++++++++++++++++++++++- 9 files changed, 260 insertions(+), 56 deletions(-) diff --git a/core/src/value.rs b/core/src/value.rs index c4030df..619f570 100644 --- a/core/src/value.rs +++ b/core/src/value.rs @@ -83,6 +83,10 @@ impl<'v> Value<'v> { pub fn to_usize(&self) -> Option { self.0.to_u64()?.try_into().ok() } + + pub fn to_i64(&self) -> Option { + self.0.to_i64() + } } pub trait Visitor<'v> { @@ -238,6 +242,24 @@ impl<'v> FromValue<'v> for f64 { } } +impl ToValue for i64 { + fn to_value(&self) -> Value { + Value::from(*self) + } +} + +impl<'v> From for Value<'v> { + fn from(value: i64) -> Self { + Value(value.into()) + } +} + +impl<'v> FromValue<'v> for i64 { + fn from_value(value: Value<'v>) -> Option { + value.to_i64() + } +} + #[cfg(feature = "alloc")] mod alloc_support { use super::*; diff --git a/macros/src/in_ctxt.rs b/macros/src/in_ctxt.rs index 55bd46b..0ed2693 100644 --- a/macros/src/in_ctxt.rs +++ b/macros/src/in_ctxt.rs @@ -1,8 +1,5 @@ use proc_macro2::TokenStream; -use syn::{ - parse::Parse, spanned::Spanned, Block, Expr, ExprAsync, ExprBlock, Item, ItemFn, Signature, - Stmt, -}; +use syn::{spanned::Spanned, Block, Expr, ExprAsync, ExprBlock, Item, ItemFn, Signature, Stmt}; use crate::props::Props; @@ -11,14 +8,6 @@ pub struct ExpandTokens { pub input: TokenStream, } -struct Args {} - -impl Parse for Args { - fn parse(_: syn::parse::ParseStream) -> syn::Result { - Ok(Args {}) - } -} - pub fn expand_tokens(opts: ExpandTokens) -> Result { let props = syn::parse2::(opts.input)?; diff --git a/targets/file/src/lib.rs b/targets/file/src/lib.rs index 6321aec..b45dc81 100644 --- a/targets/file/src/lib.rs +++ b/targets/file/src/lib.rs @@ -78,7 +78,7 @@ impl FileSetBuilder { let _ = receiver.blocking_exec(|mut batch: Buffer| { use emit_batcher::Channel as _; - emit::debug!(rt: emit::runtime::internal(), "writing file batch of {batch_size: batch.remaining()} events"); + let batch_size = batch.remaining(); let (mut file, path) = match active_file.take() { Some(file) => file, @@ -89,8 +89,10 @@ impl FileSetBuilder { let mut path = PathBuf::from(dir.clone()); - if let Err(e) = fs::create_dir_all(&path) { - return Err(emit_batcher::BatchError::retry(e, batch)); + if let Err(err) = fs::create_dir_all(&path) { + emit::warn!(rt: emit::runtime::internal(), "failed to create root directory {#[emit::as_debug] path}: {err}"); + + return Err(emit_batcher::BatchError::retry(err, batch)); } let file_ts = file_ts(self.roll_by, parts); @@ -180,6 +182,8 @@ impl FileSetBuilder { active_file = Some((file, path)); + emit::debug!(rt: emit::runtime::internal(), "wrote file batch of {batch_size} events"); + Ok(()) }); }); diff --git a/targets/otlp/Cargo.toml b/targets/otlp/Cargo.toml index 8105f4f..34bbc5a 100644 --- a/targets/otlp/Cargo.toml +++ b/targets/otlp/Cargo.toml @@ -4,7 +4,7 @@ version = "0.0.0" edition = "2021" [features] -default = ["http"] +default = ["http", "grpc"] http = ["dep:hyper", "dep:hyper-util"] grpc = ["dep:prost", "dep:serde", "emit/serde"] decode_responses = ["dep:prost"] diff --git a/targets/otlp/src/client.rs b/targets/otlp/src/client.rs index 6fd9bba..5c5ce81 100644 --- a/targets/otlp/src/client.rs +++ b/targets/otlp/src/client.rs @@ -476,43 +476,61 @@ impl RawClient { ) -> Result>>, decode: Option)>, ) -> Result<(), BatchError>> { - match self { - RawClient::Http { - ref http, - ref resource, - ref scope, - } => { - emit::debug!(rt: emit::runtime::internal(), "sending OTLP batch of {batch_size: batch.len()} events"); - - let res = http - .send(encode(resource.as_ref(), scope.as_ref(), &batch)?) - .await - .map_err(|err| { - emit::warn!(rt: emit::runtime::internal(), "failed to send OTLP request: {err}"); - - BatchError::no_retry(err) - })?; - - if let Some(decode) = decode { - let status = res.status(); - let body = res - .read_to_vec() + use emit::IdRng as _; + + let rt = emit::runtime::internal(); + + let ctxt = emit::frame::Frame::new( + rt.ctxt(), + emit::props! { + trace_id: rt.gen_trace_id(), + span_id: rt.gen_span_id(), + }, + ); + + ctxt.with_future(async move { + match self { + RawClient::Http { + ref http, + ref resource, + ref scope, + } => { + let batch_size = batch.len(); + + let timer = emit::clock::Timer::start(rt.clock()); + + let res = http + .send(encode(resource.as_ref(), scope.as_ref(), &batch)?) .await .map_err(|err| { - emit::warn!(rt: emit::runtime::internal(), "failed to read OTLP response: {err}"); + emit::warn!(rt, extent: timer, "OTLP batch of {batch_size} failed to send: {err}"); - BatchError::no_retry(err) + BatchError::retry(err, batch) })?; - if status >= 200 && status < 300 { - decode(Ok(&body)); - } else { - decode(Err(&body)); + emit::debug!(rt, extent: timer, "OTLP batch of {batch_size} responded {status_code: res.status()}"); + + if let Some(decode) = decode { + let status = res.status(); + let body = res + .read_to_vec() + .await + .map_err(|err| { + emit::warn!(rt, "failed to read OTLP response: {err}"); + + BatchError::no_retry(err) + })?; + + if status >= 200 && status < 300 { + decode(Ok(&body)); + } else { + decode(Err(&body)); + } } } } - } - Ok(()) + Ok(()) + }).await } } diff --git a/targets/otlp/src/client/http.rs b/targets/otlp/src/client/http.rs index 12eee9b..46fe0ea 100644 --- a/targets/otlp/src/client/http.rs +++ b/targets/otlp/src/client/http.rs @@ -119,9 +119,13 @@ async fn send_request( uri: &Uri, body: PreEncoded, ) -> Result, Error> { + let rt = emit::runtime::internal(); + let res = sender - .send_request( - Request::builder() + .send_request({ + use emit::{Ctxt as _, Props as _}; + + let req = Request::builder() .uri(uri) .method(Method::POST) .header("host", uri.authority().unwrap().as_str()) @@ -130,10 +134,26 @@ async fn send_request( match body { PreEncoded::Proto(_) => "application/x-protobuf", }, - ) - .body(HttpBody(Some(body.into_cursor()))) - .map_err(Error::new)?, - ) + ); + + // Propagate traceparent for the batch + let mut trace_id = None; + let mut span_id = None; + + rt.ctxt().with_current(|props| { + trace_id = props.pull::(); + span_id = props.pull::(); + }); + + let req = if let (Some(trace_id), Some(span_id)) = (trace_id, span_id) { + req.header("traceparent", format!("00-{trace_id}-{span_id}-00")) + } else { + req + }; + + req.body(HttpBody(Some(body.into_cursor()))) + .map_err(Error::new)? + }) .await .map_err(Error::new)?; diff --git a/targets/otlp/src/data.rs b/targets/otlp/src/data.rs index ebe6e52..0d702f0 100644 --- a/targets/otlp/src/data.rs +++ b/targets/otlp/src/data.rs @@ -29,6 +29,12 @@ impl PreEncoded { PreEncoded::Proto(buf) => PreEncodedCursor::Proto(buf.into_cursor()), } } + + pub fn to_vec(&self) -> Vec { + match self { + PreEncoded::Proto(buf) => buf.to_vec().into_owned(), + } + } } pub(crate) enum PreEncodedCursor { diff --git a/targets/otlp/src/data/generated.rs b/targets/otlp/src/data/generated.rs index 362f10b..a43443c 100644 --- a/targets/otlp/src/data/generated.rs +++ b/targets/otlp/src/data/generated.rs @@ -49,7 +49,6 @@ pub(crate) mod collector { } } -#[cfg(feature = "grpc")] pub(crate) mod any_value { use std::fmt; diff --git a/targets/otlp/src/data/metrics.rs b/targets/otlp/src/data/metrics.rs index 0723861..15025de 100644 --- a/targets/otlp/src/data/metrics.rs +++ b/targets/otlp/src/data/metrics.rs @@ -1,6 +1,14 @@ +use std::{borrow::Cow, ops::ControlFlow}; + +use emit::{ + value::FromValue, + well_known::{METRIC_KIND_KEY, METRIC_KIND_SUM, METRIC_NAME_KEY, METRIC_VALUE_KEY}, + Props, +}; use emit_batcher::BatchError; +use sval_protobuf::buf::ProtoBuf; -use super::{AnyValue, MessageFormatter, MessageRenderer, PreEncoded}; +use super::{MessageFormatter, PreEncoded}; pub(crate) struct EventEncoder { pub name: Box, @@ -11,16 +19,154 @@ impl EventEncoder { &self, evt: &emit::event::Event, ) -> Option { - todo!() + use prost::Message; + + if let (Some(metric_name), Some(metric_value), metric_kind) = ( + evt.props().get(METRIC_NAME_KEY), + evt.props().get(METRIC_VALUE_KEY), + evt.props().get(METRIC_KIND_KEY), + ) { + use crate::data::generated::{common::v1::*, metrics::v1::*}; + + let metric_value = metric_value.pull::()?; + let metric_name = metric_name + .to_cow_str() + .unwrap_or_else(|| Cow::Owned(metric_name.to_string())); + + let mut attributes = Vec::new(); + + evt.props() + .filter(|k, _| k != METRIC_NAME_KEY && k != METRIC_VALUE_KEY) + .for_each(|k, v| { + let key = k.to_cow().into_owned(); + let value = crate::data::generated::any_value::to_value(v); + + attributes.push(KeyValue { key, value }); + + ControlFlow::Continue(()) + }); + + let (time_unix_nano, start_time_unix_nano, aggregation_temporality) = evt + .extent() + .map(|extent| { + let range = extent.as_range(); + + ( + range.start.to_unix_time().as_nanos() as u64, + range.end.to_unix_time().as_nanos() as u64, + if extent.is_span() { + AggregationTemporality::Delta + } else { + AggregationTemporality::Cumulative + } as i32, + ) + }) + .unwrap_or_default(); + + let data_point = NumberDataPoint { + attributes, + start_time_unix_nano, + time_unix_nano, + exemplars: Vec::new(), + flags: 0, + value: Some(match metric_value { + MetricValue::F64(value) => number_data_point::Value::AsDouble(value), + MetricValue::I64(value) => number_data_point::Value::AsInt(value), + }), + }; + + let data = match metric_kind.and_then(|kind| kind.to_cow_str()).as_deref() { + Some(METRIC_KIND_SUM) => Some(metric::Data::Sum(Sum { + aggregation_temporality, + is_monotonic: false, + data_points: vec![data_point], + })), + _ => Some(metric::Data::Gauge(Gauge { + data_points: vec![data_point], + })), + }; + + let msg = Metric { + name: metric_name.into_owned(), + description: String::new(), + unit: String::new(), + data, + }; + + let mut buf = Vec::new(); + msg.encode(&mut buf).unwrap(); + + return Some(PreEncoded::Proto(ProtoBuf::pre_encoded(buf))); + } + + None + } +} + +enum MetricValue { + F64(f64), + I64(i64), +} + +impl<'v> FromValue<'v> for MetricValue { + fn from_value(value: emit::Value<'v>) -> Option { + if let Some(value) = value.to_i64() { + return Some(MetricValue::I64(value)); + } + + let value = value.as_f64(); + if value.is_finite() { + return Some(MetricValue::F64(value)); + } + + None } } pub(crate) fn encode_request( resource: Option<&PreEncoded>, scope: Option<&PreEncoded>, - log_records: &[PreEncoded], + metrics: &[PreEncoded], ) -> Result>> { - todo!() + use prost::Message; + + use crate::data::generated::{ + collector::metrics::v1::*, common::v1::*, metrics::v1::*, resource::v1::*, + }; + + let resource = if let Some(resource) = resource { + Some(Resource::decode(&*resource.to_vec()).unwrap()) + } else { + None + }; + + let scope = if let Some(scope) = scope { + Some(InstrumentationScope::decode(&*scope.to_vec()).unwrap()) + } else { + None + }; + + let metrics = metrics + .iter() + .map(|metric| Metric::decode(&*metric.to_vec()).unwrap()) + .collect(); + + let msg = ExportMetricsServiceRequest { + resource_metrics: vec![ResourceMetrics { + resource, + scope_metrics: vec![ScopeMetrics { + scope, + metrics, + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + + let mut buf = Vec::new(); + msg.encode(&mut buf).unwrap(); + + Ok(PreEncoded::Proto(ProtoBuf::pre_encoded(buf))) } #[cfg(feature = "decode_responses")]