From 40154b67747dcfba73265c0d225a4422e766615a Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Thu, 2 May 2024 17:44:43 -0700 Subject: [PATCH] Add client_requests timing module to metrics - Add a timing span to anything that can be instrumented and returns a Result. Example: ```ignore let client = GatewayClient::new(channel); client.info(req) .with_timing("iot_fetch_info") .await?; ``` This will result in a prometheus metric >> client_request_duration_seconds{name = "iot_fetch_info", quantile="xxx"} - Install the `ApiTimingLayer`. Adding `.with_span_events(FmtSpan::CLOSE)` to a regular format layer will print the timing spans to stdout as well. Example: ```ignore tracing_subscriber::registry() .with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::CLOSE)) .with(metrics::client_requests::client_request_timing_layer("histogram_name")) .init(); ``` - Remove unused `install_metrics` function, replace with nested `install` function that `start_metrics` delegates to. This allows us to start metrics in tests without needing to make a `Settings` struct. --- Cargo.lock | 4 + metrics/Cargo.toml | 6 ++ metrics/src/client_requests.rs | 187 +++++++++++++++++++++++++++++++++ metrics/src/lib.rs | 20 ++-- 4 files changed, 204 insertions(+), 13 deletions(-) create mode 100644 metrics/src/client_requests.rs diff --git a/Cargo.lock b/Cargo.lock index b943dc5a8..f6aef61b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5151,12 +5151,16 @@ dependencies = [ name = "poc-metrics" version = "0.1.0" dependencies = [ + "futures", "metrics", "metrics-exporter-prometheus", + "reqwest", "serde", "thiserror", + "tokio", "tower", "tracing", + "tracing-subscriber", ] [[package]] diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml index 5b16e567d..8eb9a10c2 100644 --- a/metrics/Cargo.toml +++ b/metrics/Cargo.toml @@ -11,5 +11,11 @@ tower = "0.4" thiserror = { workspace = true } serde = { workspace = true } tracing = { workspace = true } +tracing-subscriber = { workspace = true } metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } +futures = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } +reqwest = { workspace = true } diff --git a/metrics/src/client_requests.rs b/metrics/src/client_requests.rs new file mode 100644 index 000000000..053c7d578 --- /dev/null +++ b/metrics/src/client_requests.rs @@ -0,0 +1,187 @@ +//! Add a timing span to anything that can be instrumented and returns a Result. +//! +//! Example: +//! ```ignore +//! let client = GatewayClient::new(channel); +//! +//! client.info(req) +//! .with_timing("iot_fetch_info") +//! .await?; +//! ``` +//! +//! This will result in a prometheus metric +//! >> client_request_duration_seconds{name = "iot_fetch_info", quantile="xxx"} +//! +//! Install the `ApiTimingLayer`. +//! +//! Adding `.with_span_events(FmtSpan::CLOSE)` to a regular format layer will +//! print the timing spans to stdout as well. +//! +//! Example: +//! ```ignore +//! tracing_subscriber::registry() +//! .with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::CLOSE)) +//! .with(metrics::client_requests::client_request_timing_layer("histogram_name")) +//! .init(); +//! ``` +use futures::{future::Inspect, Future, FutureExt}; +use std::time::Instant; +use tracing::{field::Visit, instrument::Instrumented, span, Instrument, Subscriber}; +use tracing_subscriber::{filter, layer, registry::LookupSpan, Layer}; + +const SPAN_NAME: &str = "metrics::timing"; + +pub fn client_request_timing_layer(histogram_name: &'static str) -> impl layer::Layer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + ApiTimingLayer::new(histogram_name).with_filter(filter::filter_fn(|m| m.name() == SPAN_NAME)) +} + +pub trait ClientMetricTiming: Sized + Instrument + FutureExt { + fn with_timing( + self, + name: &'static str, + ) -> Instrumented)>> + where + Self: Future> + Sized; +} + +// Impl ClientMetricTiming for all futures that return a Result +impl ClientMetricTiming for F +where + F: Future> + Sized, +{ + fn with_timing( + self, + name: &'static str, + ) -> Instrumented)>> { + let span = tracing::info_span!(SPAN_NAME, name, result = tracing::field::Empty); + let inner_span = span.clone(); + self.inspect(move |res| { + inner_span.record("result", res.as_ref().ok().map_or("error", |_| "ok")); + }) + .instrument(span) + } +} + +struct Timing { + name: Option, + start: Instant, + // ok | error | unknown + result: String, +} + +impl Timing { + fn new() -> Self { + Self { + name: None, + start: Instant::now(), + result: "unknown".to_string(), + } + } + + fn record(self, histogram_name: &'static str) { + if let Some(name) = self.name { + metrics::histogram!( + histogram_name, + self.start.elapsed().as_secs_f64(), + "name" => name, + "result" => self.result + ) + } + } +} + +impl Visit for Timing { + fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn std::fmt::Debug) {} + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + match field.name() { + "name" => self.name = Some(value.to_string()), + "result" => self.result = value.to_string(), + _ => (), + } + } +} + +struct ApiTimingLayer { + histogram_name: &'static str, +} + +impl ApiTimingLayer { + fn new(histogram_name: &'static str) -> Self { + Self { histogram_name } + } +} + +impl tracing_subscriber::Layer for ApiTimingLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: layer::Context<'_, S>) { + let span = ctx.span(id).expect("Span not found, this is a bug"); + + let mut timing = Timing::new(); + attrs.values().record(&mut timing); + span.extensions_mut().insert(timing); + } + + fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: layer::Context<'_, S>) { + let span = ctx.span(id).unwrap(); + + if let Some(timing) = span.extensions_mut().get_mut::() { + values.record(timing); + }; + } + + fn on_close(&self, id: tracing::Id, ctx: layer::Context) { + let span = ctx.span(&id).unwrap(); + + if let Some(timing) = span.extensions_mut().remove::() { + timing.record(self.histogram_name); + }; + } +} + +#[cfg(test)] +mod tests { + use super::ClientMetricTiming; + use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + + #[tokio::test] + async fn test_telemetry() -> Result<(), Box> { + tracing_subscriber::registry() + // Uncomment to view traces and Spans closing + // .with( + // tracing_subscriber::fmt::layer() + // .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE), + // ) + .with(super::client_request_timing_layer("histogram_name")) + .init(); + + // Let the OS assign a port + let addr = { + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + listener.local_addr()? + }; + tracing::info!("listening on {addr}"); + super::super::install(addr)?; + + let success = async { Ok("nothing went wrong") }; + let failure = async { Err("something went wrong") }; + let _: Result<&str, &str> = success.with_timing("success").await; + let _: Result<&str, &str> = failure.with_timing("failing").await; + + // .with_timing() can only be added to futures that return Results. + // let will_not_compile = async { 1 + 2 }.with_timing("not a result"); + + let res = reqwest::get(format!("http://{addr}")).await?; + let body = res.text().await?; + + tracing::info!("response: \n{body}"); + assert!(body.contains(r#"histogram_name_count{name="success",result="ok"} 1"#)); + assert!(body.contains(r#"histogram_name_count{name="failing",result="error"} 1"#)); + + Ok(()) + } +} diff --git a/metrics/src/lib.rs b/metrics/src/lib.rs index 5e4b6b60b..02ddbc4f2 100644 --- a/metrics/src/lib.rs +++ b/metrics/src/lib.rs @@ -12,32 +12,26 @@ use std::{ }; use tower::{Layer, Service}; +pub mod client_requests; mod error; pub mod settings; pub fn start_metrics(settings: &Settings) -> Result { let socket: SocketAddr = settings.endpoint.parse()?; - PrometheusBuilder::new() - .with_http_listener(socket) - .install()?; - Ok(()) + install(socket) } -/// Install the Prometheus export gateway -pub fn install_metrics() { - let endpoint = - std::env::var("METRICS_SCRAPE_ENDPOINT").unwrap_or_else(|_| String::from("0.0.0.0:9000")); - let socket: SocketAddr = endpoint - .parse() - .expect("Invalid METRICS_SCRAPE_ENDPOINT value"); +fn install(socket_addr: SocketAddr) -> Result { if let Err(e) = PrometheusBuilder::new() - .with_http_listener(socket) + .with_http_listener(socket_addr) .install() { tracing::error!(target: "poc", "Failed to install Prometheus scrape endpoint: {e}"); } else { - tracing::info!(target: "poc", "Metrics scrape endpoint listening on {endpoint}"); + tracing::info!(target: "poc", "Metrics scrape endpoint listening on {socket_addr}"); } + + Ok(()) } /// Measure the duration of a block and record it