diff --git a/Cargo.lock b/Cargo.lock index 9f000fb2b0..1698cc8ac6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1277,6 +1277,7 @@ dependencies = [ "linkerd-app-test", "linkerd-distribute", "linkerd-http-classify", + "linkerd-http-prom", "linkerd-http-retry", "linkerd-http-route", "linkerd-identity", @@ -1493,6 +1494,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "linkerd-http-prom" +version = "0.1.0" +dependencies = [ + "futures", + "http", + "http-body", + "linkerd-error", + "linkerd-http-box", + "linkerd-metrics", + "linkerd-stack", + "parking_lot", + "pin-project", + "prometheus-client", + "thiserror", + "tokio", +] + [[package]] name = "linkerd-http-retry" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 424923246d..8446291d00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ members = [ "linkerd/http/classify", "linkerd/http/h2", "linkerd/http/metrics", + "linkerd/http/prom", "linkerd/http/retry", "linkerd/http/route", "linkerd/identity", diff --git a/linkerd/app/outbound/Cargo.toml b/linkerd/app/outbound/Cargo.toml index 0d4a68c889..364c17b5ab 100644 --- a/linkerd/app/outbound/Cargo.toml +++ b/linkerd/app/outbound/Cargo.toml @@ -35,6 +35,7 @@ linkerd-app-core = { path = "../core" } linkerd-app-test = { path = "../test", optional = true } linkerd-distribute = { path = "../../distribute" } linkerd-http-classify = { path = "../../http/classify" } +linkerd-http-prom = { path = "../../http/prom" } linkerd-http-retry = { path = "../../http/retry" } linkerd-http-route = { path = "../../http/route" } linkerd-identity = { path = "../../identity" } @@ -49,6 +50,7 @@ linkerd-tonic-watch = { path = "../../tonic-watch" } [dev-dependencies] hyper = { version = "0.14", features = ["http1", "http2"] } linkerd-app-test = { path = "../test", features = ["client-policy"] } +linkerd-http-prom = { path = "../../http/prom", features = ["test-util"] } linkerd-io = { path = "../../io", features = ["tokio-test"] } linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] } linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [ diff --git a/linkerd/app/outbound/src/http.rs b/linkerd/app/outbound/src/http.rs index a76cfbb674..e5f18fa578 100644 --- a/linkerd/app/outbound/src/http.rs +++ b/linkerd/app/outbound/src/http.rs @@ -32,8 +32,8 @@ pub struct Http(T); #[derive(Clone, Debug, Default)] pub struct HttpMetrics { balancer: concrete::BalancerMetrics, - http_route: policy::RouteMetrics, - grpc_route: policy::RouteMetrics, + http_route: policy::HttpRouteMetrics, + grpc_route: policy::GrpcRouteMetrics, } pub fn spawn_routes( @@ -132,12 +132,12 @@ where impl HttpMetrics { pub fn register(registry: &mut prom::Registry) -> Self { let http = registry.sub_registry_with_prefix("http"); - let http_route = policy::RouteMetrics::register(http.sub_registry_with_prefix("route")); + let http_route = policy::HttpRouteMetrics::register(http.sub_registry_with_prefix("route")); let balancer = concrete::BalancerMetrics::register(http.sub_registry_with_prefix("balancer")); let grpc = registry.sub_registry_with_prefix("grpc"); - let grpc_route = policy::RouteMetrics::register(grpc.sub_registry_with_prefix("route")); + let grpc_route = policy::GrpcRouteMetrics::register(grpc.sub_registry_with_prefix("route")); Self { balancer, diff --git a/linkerd/app/outbound/src/http/logical/policy.rs b/linkerd/app/outbound/src/http/logical/policy.rs index 254f3dfa49..70353a44c1 100644 --- a/linkerd/app/outbound/src/http/logical/policy.rs +++ b/linkerd/app/outbound/src/http/logical/policy.rs @@ -8,7 +8,7 @@ mod router; mod tests; pub use self::{ - route::{errors, RouteMetrics}, + route::{errors, GrpcRouteMetrics, HttpRouteMetrics}, router::{GrpcParams, HttpParams}, }; pub use linkerd_proxy_client_policy::{ClientPolicy, FailureAccrual}; @@ -50,8 +50,8 @@ where /// routing configurations to route requests over cached inner backend /// services. pub(super) fn layer( - http_metrics: route::RouteMetrics, - grpc_metrics: route::RouteMetrics, + http_metrics: route::HttpRouteMetrics, + grpc_metrics: route::GrpcRouteMetrics, ) -> impl svc::Layer> + Clone where // Inner stack. diff --git a/linkerd/app/outbound/src/http/logical/policy/route.rs b/linkerd/app/outbound/src/http/logical/policy/route.rs index 7ba5290989..565e72b39d 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route.rs @@ -1,6 +1,6 @@ use super::super::Concrete; -use crate::RouteRef; -use linkerd_app_core::{classify, metrics::prom, proxy::http, svc, Addr, Error, Result}; +use crate::{ParentRef, RouteRef}; +use linkerd_app_core::{classify, proxy::http, svc, Addr, Error, Result}; use linkerd_distribute as distribute; use linkerd_http_route as http_route; use linkerd_proxy_client_policy as policy; @@ -8,14 +8,13 @@ use std::{fmt::Debug, hash::Hash, sync::Arc}; pub(crate) mod backend; pub(crate) mod filters; +pub(crate) mod metrics; pub(crate) use self::backend::{Backend, MatchedBackend}; pub use self::filters::errors; +use self::metrics::labels::Route as RouteLabels; -#[derive(Clone, Debug, Default)] -pub struct RouteMetrics { - backend: backend::RouteBackendMetrics, -} +pub use self::metrics::{GrpcRouteMetrics, HttpRouteMetrics}; /// A target type that includes a summary of exactly how a request was matched. /// This match state is required to apply route filters. @@ -31,6 +30,7 @@ pub(crate) struct Matched { pub(crate) struct Route { pub(super) parent: T, pub(super) addr: Addr, + pub(super) parent_ref: ParentRef, pub(super) route_ref: RouteRef, pub(super) filters: Arc<[F]>, pub(super) distribution: BackendDistribution, @@ -55,6 +55,11 @@ pub(crate) type Grpc = MatchedRoute< pub(crate) type BackendDistribution = distribute::Distribution>; pub(crate) type NewDistribute = distribute::NewDistribute, (), N>; +pub type Metrics = metrics::RouteMetrics< + ::StreamLabel, + ::StreamLabel, +>; + /// Wraps errors with route metadata. #[derive(Debug, thiserror::Error)] #[error("route {}: {source}", route.0)] @@ -64,28 +69,6 @@ struct RouteError { source: Error, } -// === impl RouteMetrics === - -impl RouteMetrics { - pub fn register(reg: &mut prom::Registry) -> Self { - Self { - backend: backend::RouteBackendMetrics::register( - reg.sub_registry_with_prefix("backend"), - ), - } - } - - #[cfg(test)] - pub(crate) fn request_count( - &self, - p: crate::ParentRef, - r: RouteRef, - b: crate::BackendRef, - ) -> backend::RequestCount { - self.backend.request_count(p, r, b) - } -} - // === impl MatchedRoute === impl MatchedRoute @@ -103,13 +86,15 @@ where // Assert that filters can be applied. Self: filters::Apply, Self: svc::Param, + Self: metrics::MkStreamLabel, MatchedBackend: filters::Apply, + MatchedBackend: metrics::MkStreamLabel, { /// Builds a route stack that applies policy filters to requests and /// distributes requests over each route's backends. These [`Concrete`] /// backends are expected to be cached/shared by the inner stack. pub(crate) fn layer( - metrics: RouteMetrics, + metrics: Metrics>, ) -> impl svc::Layer> + Clone where // Inner stack. @@ -134,10 +119,11 @@ where // consideration, so we must eagerly fail requests to prevent // leaking tasks onto the runtime. .push_on_service(svc::LoadShed::layer()) - // TODO(ver) attach the `E` typed failure policy to requests. .push(filters::NewApplyFilters::::layer()) - // Sets an optional request timeout. .push(http::NewTimeout::layer()) + .push(metrics::layer(&metrics.requests)) + // Configure a classifier to use in the endpoint stack. + // FIXME(ver) move this into NewSetExtensions .push(classify::NewClassify::layer()) .push(svc::NewMapErr::layer_with(|rt: &Self| { let route = rt.params.route_ref.clone(); @@ -152,18 +138,29 @@ where } } -impl svc::Param> for MatchedRoute { +impl svc::Param> for MatchedRoute { fn param(&self) -> BackendDistribution { self.params.distribution.clone() } } -impl svc::Param for MatchedRoute { +impl svc::Param for MatchedRoute { + fn param(&self) -> RouteLabels { + RouteLabels( + self.params.parent_ref.clone(), + self.params.route_ref.clone(), + ) + } +} + +impl svc::Param for MatchedRoute { fn param(&self) -> http::timeout::ResponseTimeout { http::timeout::ResponseTimeout(self.params.request_timeout) } } +// === impl Http === + impl filters::Apply for Http { #[inline] fn apply_request(&self, req: &mut ::http::Request) -> Result<()> { @@ -176,14 +173,30 @@ impl filters::Apply for Http { } } +impl metrics::MkStreamLabel for Http { + type StatusLabels = metrics::labels::HttpRouteRsp; + type DurationLabels = metrics::labels::Route; + type StreamLabel = metrics::LabelHttpRouteRsp; + + fn mk_stream_labeler(&self, _: &::http::Request) -> Option { + let parent = self.params.parent_ref.clone(); + let route = self.params.route_ref.clone(); + Some(metrics::LabelHttpRsp::from(metrics::labels::Route::from(( + parent, route, + )))) + } +} + impl svc::Param for Http { fn param(&self) -> classify::Request { classify::Request::ClientPolicy(classify::ClientPolicy::Http( - self.params.failure_policy.clone(), + policy::http::StatusRanges::default(), )) } } +// === impl Grpc === + impl filters::Apply for Grpc { #[inline] fn apply_request(&self, req: &mut ::http::Request) -> Result<()> { @@ -196,10 +209,24 @@ impl filters::Apply for Grpc { } } +impl metrics::MkStreamLabel for Grpc { + type StatusLabels = metrics::labels::GrpcRouteRsp; + type DurationLabels = metrics::labels::Route; + type StreamLabel = metrics::LabelGrpcRouteRsp; + + fn mk_stream_labeler(&self, _: &::http::Request) -> Option { + let parent = self.params.parent_ref.clone(); + let route = self.params.route_ref.clone(); + Some(metrics::LabelGrpcRsp::from(metrics::labels::Route::from(( + parent, route, + )))) + } +} + impl svc::Param for Grpc { fn param(&self) -> classify::Request { - classify::Request::ClientPolicy(classify::ClientPolicy::Grpc( - self.params.failure_policy.clone(), - )) + classify::Request::ClientPolicy( + classify::ClientPolicy::Grpc(policy::grpc::Codes::default()), + ) } } diff --git a/linkerd/app/outbound/src/http/logical/policy/route/backend.rs b/linkerd/app/outbound/src/http/logical/policy/route/backend.rs index 1ee9b2f258..8877457fe7 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/backend.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/backend.rs @@ -1,15 +1,12 @@ use super::{super::Concrete, filters}; use crate::{BackendRef, ParentRef, RouteRef}; use linkerd_app_core::{proxy::http, svc, Error, Result}; +use linkerd_http_prom::record_response::MkStreamLabel; use linkerd_http_route as http_route; use linkerd_proxy_client_policy as policy; use std::{fmt::Debug, hash::Hash, sync::Arc}; -mod count_reqs; -mod metrics; - -pub use self::count_reqs::RequestCount; -pub use self::metrics::RouteBackendMetrics; +pub(super) mod metrics; #[derive(Debug, PartialEq, Eq, Hash)] pub(crate) struct Backend { @@ -25,6 +22,8 @@ pub(crate) type Http = pub(crate) type Grpc = MatchedBackend; +pub type Metrics = metrics::RouteBackendMetrics<::StreamLabel>; + /// Wraps errors with backend metadata. #[derive(Debug, thiserror::Error)] #[error("backend {}: {source}", backend.0)] @@ -71,7 +70,7 @@ where F: Clone + Send + Sync + 'static, // Assert that filters can be applied. Self: filters::Apply, - RouteBackendMetrics: svc::ExtractParam, + Self: metrics::MkStreamLabel, { /// Builds a stack that applies per-route-backend policy filters over an /// inner [`Concrete`] stack. @@ -79,7 +78,7 @@ where /// This [`MatchedBackend`] must implement [`filters::Apply`] to apply these /// filters. pub(crate) fn layer( - metrics: RouteBackendMetrics, + metrics: Metrics, ) -> impl svc::Layer> + Clone where // Inner stack. @@ -103,7 +102,7 @@ where ) .push(filters::NewApplyFilters::::layer()) .push(http::NewTimeout::layer()) - .push(count_reqs::NewCountRequests::layer_via(metrics.clone())) + .push(metrics::layer(&metrics)) .push(svc::NewMapErr::layer_with(|t: &Self| { let backend = t.params.concrete.backend_ref.clone(); move |source| { @@ -155,6 +154,21 @@ impl filters::Apply for Http { } } +impl metrics::MkStreamLabel for Http { + type StatusLabels = metrics::labels::HttpRouteBackendRsp; + type DurationLabels = metrics::labels::RouteBackend; + type StreamLabel = metrics::LabelHttpRouteBackendRsp; + + fn mk_stream_labeler(&self, _: &::http::Request) -> Option { + let parent = self.params.concrete.parent_ref.clone(); + let route = self.params.route_ref.clone(); + let backend = self.params.concrete.backend_ref.clone(); + Some(metrics::LabelHttpRsp::from( + metrics::labels::RouteBackend::from((parent, route, backend)), + )) + } +} + impl filters::Apply for Grpc { #[inline] fn apply_request(&self, req: &mut ::http::Request) -> Result<()> { @@ -165,3 +179,18 @@ impl filters::Apply for Grpc { filters::apply_grpc_response(&self.params.filters, rsp) } } + +impl metrics::MkStreamLabel for Grpc { + type StatusLabels = metrics::labels::GrpcRouteBackendRsp; + type DurationLabels = metrics::labels::RouteBackend; + type StreamLabel = metrics::LabelGrpcRouteBackendRsp; + + fn mk_stream_labeler(&self, _: &::http::Request) -> Option { + let parent = self.params.concrete.parent_ref.clone(); + let route = self.params.route_ref.clone(); + let backend = self.params.concrete.backend_ref.clone(); + Some(metrics::LabelGrpcRsp::from( + metrics::labels::RouteBackend::from((parent, route, backend)), + )) + } +} diff --git a/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs b/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs index 9f909183e3..0d79f95b5a 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs @@ -1,61 +1,122 @@ +#![allow(warnings)] + use crate::{BackendRef, ParentRef, RouteRef}; +use futures::Stream; use linkerd_app_core::{ metrics::prom::{self, encoding::*, EncodeLabelSetMut}, svc, }; +use linkerd_http_prom::{ + record_response::{self, NewResponseDuration, StreamLabel}, + NewCountRequests, RequestCount, RequestCountFamilies, +}; + +pub use super::super::metrics::*; +pub use linkerd_http_prom::record_response::MkStreamLabel; + +#[cfg(test)] +mod tests; + +#[derive(Debug)] +pub struct RouteBackendMetrics { + requests: RequestCountFamilies, + responses: ResponseMetrics, +} -#[derive(Clone, Debug, Default)] -pub struct RouteBackendMetrics { - metrics: super::count_reqs::RequestCountFamilies, +type ResponseMetrics = record_response::ResponseMetrics< + ::DurationLabels, + ::StatusLabels, +>; + +pub fn layer( + metrics: &RouteBackendMetrics, +) -> impl svc::Layer< + N, + Service = NewCountRequests< + ExtractRequestCount, + NewResponseDuration>, N>, + >, +> + Clone +where + T: MkStreamLabel, + N: svc::NewService, + NewCountRequests< + ExtractRequestCount, + NewResponseDuration>, N>, + >: svc::NewService, + NewResponseDuration>, N>: + svc::NewService, +{ + let RouteBackendMetrics { + requests, + responses, + } = metrics.clone(); + svc::layer::mk(move |inner| { + use svc::Layer; + NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer( + NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone())) + .layer(inner), + ) + }) } -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -struct RouteBackendLabels(ParentRef, RouteRef, BackendRef); +#[derive(Clone, Debug)] +pub struct ExtractRequestCount(RequestCountFamilies); // === impl RouteBackendMetrics === -impl RouteBackendMetrics { - pub fn register(reg: &mut prom::Registry) -> Self { +impl RouteBackendMetrics { + pub fn register(reg: &mut prom::Registry, histo: impl IntoIterator) -> Self { + let requests = RequestCountFamilies::register(reg); + let responses = record_response::ResponseMetrics::register(reg, histo); Self { - metrics: super::count_reqs::RequestCountFamilies::register(reg), + requests, + responses, } } #[cfg(test)] - pub(crate) fn request_count( + pub(crate) fn backend_request_count( &self, p: ParentRef, r: RouteRef, b: BackendRef, - ) -> super::count_reqs::RequestCount { - self.metrics.metrics(&RouteBackendLabels(p, r, b)) + ) -> linkerd_http_prom::RequestCount { + self.requests.metrics(&labels::RouteBackend(p, r, b)) } -} -impl svc::ExtractParam for RouteBackendMetrics -where - T: svc::Param + svc::Param + svc::Param, -{ - fn extract_param(&self, t: &T) -> super::count_reqs::RequestCount { - self.metrics - .metrics(&RouteBackendLabels(t.param(), t.param(), t.param())) + #[cfg(test)] + pub(crate) fn get_statuses(&self, l: &L::StatusLabels) -> prom::Counter { + self.responses.get_statuses(l) } } -// === impl RouteBackendLabels === +impl Default for RouteBackendMetrics { + fn default() -> Self { + Self { + requests: Default::default(), + responses: Default::default(), + } + } +} -impl EncodeLabelSetMut for RouteBackendLabels { - fn encode_label_set(&self, enc: &mut LabelSetEncoder<'_>) -> std::fmt::Result { - let Self(parent, route, backend) = self; - parent.encode_label_set(enc)?; - route.encode_label_set(enc)?; - backend.encode_label_set(enc)?; - Ok(()) +impl Clone for RouteBackendMetrics { + fn clone(&self) -> Self { + Self { + requests: self.requests.clone(), + responses: self.responses.clone(), + } } } -impl EncodeLabelSet for RouteBackendLabels { - fn encode(&self, mut enc: LabelSetEncoder<'_>) -> std::fmt::Result { - self.encode_label_set(&mut enc) +// === impl ExtractRequestCount === + +impl svc::ExtractParam for ExtractRequestCount +where + T: svc::Param + svc::Param + svc::Param, +{ + fn extract_param(&self, t: &T) -> RequestCount { + self.0 + .metrics(&labels::RouteBackend(t.param(), t.param(), t.param())) } } diff --git a/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs b/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs new file mode 100644 index 0000000000..b8848dd699 --- /dev/null +++ b/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs @@ -0,0 +1,394 @@ +use super::{ + super::{Backend, Grpc, Http}, + labels, + test_util::*, + LabelGrpcRouteBackendRsp, LabelHttpRouteBackendRsp, RouteBackendMetrics, +}; +use crate::http::{concrete, logical::Concrete}; +use linkerd2_proxy_api::outbound::backend; +use linkerd_app_core::{ + metrics::prom::Counter, + svc::{self, http::BoxBody, Layer, NewService, Service, ServiceExt}, + transport::{Remote, ServerAddr}, +}; +use linkerd_proxy_client_policy as policy; + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn http_request_statuses() { + let _trace = linkerd_tracing::test::trace_init(); + + let metrics = super::RouteBackendMetrics::default(); + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let backend_ref = crate::BackendRef(policy::Meta::new_default("backend")); + let (mut svc, mut handle) = + mock_http_route_backend_metrics(&metrics, &parent_ref, &route_ref, &backend_ref); + + let route_backend = + labels::RouteBackend(parent_ref.clone(), route_ref.clone(), backend_ref.clone()); + + let requests = + metrics.backend_request_count(parent_ref.clone(), route_ref.clone(), backend_ref.clone()); + assert_eq!(requests.get(), 0); + + // Send one request and ensure it's counted. + let ok = metrics.get_statuses(&labels::Rsp( + route_backend.clone(), + labels::HttpRsp { + status: Some(http::StatusCode::OK), + error: None, + }, + )); + send_assert_incremented(&ok, &mut handle, &mut svc, Default::default(), |tx| { + tx.send_response( + http::Response::builder() + .status(200) + .body(BoxBody::default()) + .unwrap(), + ) + }) + .await; + assert_eq!(requests.get(), 1); + + // Send another request and ensure it's counted with a different response + // status. + let no_content = metrics.get_statuses(&labels::Rsp( + route_backend.clone(), + labels::HttpRsp { + status: Some(http::StatusCode::NO_CONTENT), + error: None, + }, + )); + send_assert_incremented( + &no_content, + &mut handle, + &mut svc, + Default::default(), + |tx| { + tx.send_response( + http::Response::builder() + .status(204) + .body(BoxBody::default()) + .unwrap(), + ) + }, + ) + .await; + assert_eq!(requests.get(), 2); + + // Emit a response with an error and ensure it's counted. + let unknown = metrics.get_statuses(&labels::Rsp( + route_backend.clone(), + labels::HttpRsp { + status: None, + error: Some(labels::Error::Unknown), + }, + )); + send_assert_incremented(&unknown, &mut handle, &mut svc, Default::default(), |tx| { + tx.send_error("a spooky ghost") + }) + .await; + assert_eq!(requests.get(), 3); + + // Emit a successful response with a body that fails and ensure that both + // the status and error are recorded. + let mixed = metrics.get_statuses(&labels::Rsp( + route_backend.clone(), + labels::HttpRsp { + status: Some(http::StatusCode::OK), + error: Some(labels::Error::Unknown), + }, + )); + send_assert_incremented(&mixed, &mut handle, &mut svc, Default::default(), |tx| { + tx.send_response( + http::Response::builder() + .status(200) + .body(BoxBody::new(MockBody::new(async { + Err("a spooky ghost".into()) + }))) + .unwrap(), + ) + }) + .await; + assert_eq!(requests.get(), 4); + + assert_eq!(unknown.get(), 1); + assert_eq!(ok.get(), 1); + assert_eq!(no_content.get(), 1); + assert_eq!(mixed.get(), 1); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn grpc_request_statuses_ok() { + let _trace = linkerd_tracing::test::trace_init(); + + let metrics = super::RouteBackendMetrics::default(); + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let backend_ref = crate::BackendRef(policy::Meta::new_default("backend")); + let (mut svc, mut handle) = + mock_grpc_route_backend_metrics(&metrics, &parent_ref, &route_ref, &backend_ref); + + let requests = + metrics.backend_request_count(parent_ref.clone(), route_ref.clone(), backend_ref.clone()); + assert_eq!(requests.get(), 0); + + let ok = metrics.get_statuses(&labels::Rsp( + labels::RouteBackend(parent_ref.clone(), route_ref.clone(), backend_ref.clone()), + labels::GrpcRsp { + status: Some(tonic::Code::Ok), + error: None, + }, + )); + send_assert_incremented( + &ok, + &mut handle, + &mut svc, + http::Request::builder() + .method("POST") + .uri("http://host/svc/method") + .body(Default::default()) + .unwrap(), + |tx| { + tx.send_response( + http::Response::builder() + .body(BoxBody::new(MockBody::trailers(async move { + let mut trailers = http::HeaderMap::new(); + trailers.insert("grpc-status", http::HeaderValue::from_static("0")); + Ok(Some(trailers)) + }))) + .unwrap(), + ) + }, + ) + .await; + assert_eq!(requests.get(), 1); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn grpc_request_statuses_not_found() { + let _trace = linkerd_tracing::test::trace_init(); + + let metrics = super::RouteBackendMetrics::default(); + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let backend_ref = crate::BackendRef(policy::Meta::new_default("backend")); + let (mut svc, mut handle) = + mock_grpc_route_backend_metrics(&metrics, &parent_ref, &route_ref, &backend_ref); + + let not_found = metrics.get_statuses(&labels::Rsp( + labels::RouteBackend(parent_ref.clone(), route_ref.clone(), backend_ref.clone()), + labels::GrpcRsp { + status: Some(tonic::Code::NotFound), + error: None, + }, + )); + send_assert_incremented( + ¬_found, + &mut handle, + &mut svc, + http::Request::builder() + .method("POST") + .uri("http://host/svc/method") + .body(Default::default()) + .unwrap(), + |tx| { + tx.send_response( + http::Response::builder() + .body(BoxBody::new(MockBody::trailers(async move { + let mut trailers = http::HeaderMap::new(); + trailers.insert("grpc-status", http::HeaderValue::from_static("5")); + Ok(Some(trailers)) + }))) + .unwrap(), + ) + }, + ) + .await; +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn grpc_request_statuses_error_response() { + let _trace = linkerd_tracing::test::trace_init(); + + let metrics = super::RouteBackendMetrics::default(); + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let backend_ref = crate::BackendRef(policy::Meta::new_default("backend")); + let (mut svc, mut handle) = + mock_grpc_route_backend_metrics(&metrics, &parent_ref, &route_ref, &backend_ref); + + let unknown = metrics.get_statuses(&labels::Rsp( + labels::RouteBackend(parent_ref.clone(), route_ref.clone(), backend_ref.clone()), + labels::GrpcRsp { + status: None, + error: Some(labels::Error::Unknown), + }, + )); + send_assert_incremented( + &unknown, + &mut handle, + &mut svc, + http::Request::builder() + .method("POST") + .uri("http://host/svc/method") + .body(Default::default()) + .unwrap(), + |tx| tx.send_error("a spooky ghost"), + ) + .await; +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn grpc_request_statuses_error_body() { + let _trace = linkerd_tracing::test::trace_init(); + + let metrics = super::RouteBackendMetrics::default(); + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let backend_ref = crate::BackendRef(policy::Meta::new_default("backend")); + let (mut svc, mut handle) = + mock_grpc_route_backend_metrics(&metrics, &parent_ref, &route_ref, &backend_ref); + + let unknown = metrics.get_statuses(&labels::Rsp( + labels::RouteBackend(parent_ref.clone(), route_ref.clone(), backend_ref.clone()), + labels::GrpcRsp { + status: None, + error: Some(labels::Error::Unknown), + }, + )); + send_assert_incremented( + &unknown, + &mut handle, + &mut svc, + http::Request::builder() + .method("POST") + .uri("http://host/svc/method") + .body(Default::default()) + .unwrap(), + |tx| { + tx.send_response( + http::Response::builder() + .body(BoxBody::new(MockBody::new(async { + Err("a spooky ghost".into()) + }))) + .unwrap(), + ) + }, + ) + .await; +} + +// === Util === + +fn mock_http_route_backend_metrics( + metrics: &RouteBackendMetrics, + parent_ref: &crate::ParentRef, + route_ref: &crate::RouteRef, + backend_ref: &crate::BackendRef, +) -> (svc::BoxHttp, Handle) { + let req = http::Request::builder().body(()).unwrap(); + let (r#match, _) = policy::route::find( + &[policy::http::Route { + hosts: vec![], + rules: vec![policy::route::Rule { + matches: vec![policy::http::r#match::MatchRequest::default()], + policy: policy::http::Policy { + meta: route_ref.0.clone(), + filters: [].into(), + request_timeout: None, + failure_policy: Default::default(), + distribution: policy::RouteDistribution::Empty, + }, + }], + }], + &req, + ) + .expect("find default route"); + + let (tx, handle) = tower_test::mock::pair::, http::Response>(); + let svc = super::layer(metrics) + .layer(move |_t: Http<()>| tx.clone()) + .new_service(Http { + r#match, + params: Backend { + route_ref: route_ref.clone(), + request_timeout: None, + filters: [].into(), + concrete: Concrete { + target: concrete::Dispatch::Forward( + Remote(ServerAddr(std::net::SocketAddr::new( + [0, 0, 0, 0].into(), + 8080, + ))), + Default::default(), + ), + authority: None, + failure_accrual: Default::default(), + parent: (), + parent_ref: parent_ref.clone(), + backend_ref: backend_ref.clone(), + }, + }, + }); + + (svc::BoxHttp::new(svc), handle) +} + +fn mock_grpc_route_backend_metrics( + metrics: &RouteBackendMetrics, + parent_ref: &crate::ParentRef, + route_ref: &crate::RouteRef, + backend_ref: &crate::BackendRef, +) -> (svc::BoxHttp, Handle) { + let req = http::Request::builder() + .method("POST") + .uri("http://host/svc/method") + .body(()) + .unwrap(); + let (r#match, _) = policy::route::find( + &[policy::grpc::Route { + hosts: vec![], + rules: vec![policy::route::Rule { + matches: vec![policy::grpc::r#match::MatchRoute::default()], + policy: policy::grpc::Policy { + meta: route_ref.0.clone(), + filters: [].into(), + request_timeout: None, + failure_policy: Default::default(), + distribution: policy::RouteDistribution::Empty, + }, + }], + }], + &req, + ) + .expect("find default route"); + + let (tx, handle) = tower_test::mock::pair::, http::Response>(); + let svc = super::layer(metrics) + .layer(move |_t: Grpc<()>| tx.clone()) + .new_service(Grpc { + r#match, + params: Backend { + route_ref: route_ref.clone(), + request_timeout: None, + filters: [].into(), + concrete: Concrete { + target: concrete::Dispatch::Forward( + Remote(ServerAddr(std::net::SocketAddr::new( + [0, 0, 0, 0].into(), + 8080, + ))), + Default::default(), + ), + authority: None, + failure_accrual: Default::default(), + parent: (), + parent_ref: parent_ref.clone(), + backend_ref: backend_ref.clone(), + }, + }, + }); + + (svc::BoxHttp::new(svc), handle) +} diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics.rs new file mode 100644 index 0000000000..5f872c9847 --- /dev/null +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics.rs @@ -0,0 +1,245 @@ +use super::backend::metrics as backend; +use linkerd_app_core::{ + metrics::prom::{self, EncodeLabelSetMut}, + svc, +}; +use linkerd_http_prom::record_response::{self, StreamLabel}; + +pub use linkerd_http_prom::record_response::MkStreamLabel; + +pub mod labels; +#[cfg(test)] +pub(super) mod test_util; +#[cfg(test)] +mod tests; + +pub type RequestMetrics = record_response::RequestMetrics< + ::DurationLabels, + ::StatusLabels, +>; + +#[derive(Debug)] +pub struct RouteMetrics { + pub(super) requests: RequestMetrics, + pub(super) backend: backend::RouteBackendMetrics, +} + +pub type HttpRouteMetrics = RouteMetrics; +pub type GrpcRouteMetrics = RouteMetrics; + +/// Tracks HTTP streams to produce response labels. +#[derive(Clone, Debug)] +pub struct LabelHttpRsp { + parent: L, + status: Option, + error: Option, +} + +/// Tracks gRPC streams to produce response labels. +#[derive(Clone, Debug)] +pub struct LabelGrpcRsp { + parent: L, + status: Option, + error: Option, +} + +pub type LabelHttpRouteRsp = LabelHttpRsp; +pub type LabelGrpcRouteRsp = LabelGrpcRsp; + +pub type LabelHttpRouteBackendRsp = LabelHttpRsp; +pub type LabelGrpcRouteBackendRsp = LabelGrpcRsp; + +pub type NewRecordDuration = + record_response::NewRecordResponse, M, N>; + +#[derive(Clone, Debug)] +pub struct ExtractRecordDurationParams(pub M); + +pub fn layer( + metrics: &RequestMetrics, +) -> impl svc::Layer, N>> + Clone +where + T: Clone + MkStreamLabel, +{ + NewRecordDuration::layer_via(ExtractRecordDurationParams(metrics.clone())) +} + +// === impl RouteMetrics === + +impl RouteMetrics { + // There are two histograms for which we need to register metrics: request + // durations, measured on routes, and response durations, measured on + // route-backends. + // + // Response duration is probably the more meaninful metric + // operationally--and it includes more backend metadata--so we opt to + // preserve higher fidelity for response durations (especially for lower + // values). + // + // We elide several buckets for request durations to be conservative about + // the costs of tracking these two largely overlapping histograms + const REQUEST_BUCKETS: &'static [f64] = &[0.05, 0.5, 1.0, 10.0]; + const RESPONSE_BUCKETS: &'static [f64] = &[0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 10.0]; +} + +impl Default for RouteMetrics { + fn default() -> Self { + Self { + requests: Default::default(), + backend: Default::default(), + } + } +} + +impl Clone for RouteMetrics { + fn clone(&self) -> Self { + Self { + requests: self.requests.clone(), + backend: self.backend.clone(), + } + } +} + +impl RouteMetrics { + pub fn register(reg: &mut prom::Registry) -> Self { + let requests = RequestMetrics::::register(reg, Self::REQUEST_BUCKETS.iter().copied()); + + let backend = backend::RouteBackendMetrics::register( + reg.sub_registry_with_prefix("backend"), + Self::RESPONSE_BUCKETS.iter().copied(), + ); + Self { requests, backend } + } + + #[cfg(test)] + pub(crate) fn backend_request_count( + &self, + p: crate::ParentRef, + r: crate::RouteRef, + b: crate::BackendRef, + ) -> linkerd_http_prom::RequestCount { + self.backend.backend_request_count(p, r, b) + } +} + +// === impl ExtractRequestDurationParams === + +impl svc::ExtractParam, T> for ExtractRecordDurationParams +where + T: Clone + MkStreamLabel, + M: Clone, +{ + fn extract_param(&self, target: &T) -> record_response::Params { + record_response::Params { + labeler: target.clone(), + metric: self.0.clone(), + } + } +} + +// === impl LabelHttpRsp === + +impl

From

for LabelHttpRsp

{ + fn from(parent: P) -> Self { + Self { + parent, + status: None, + error: None, + } + } +} + +impl

StreamLabel for LabelHttpRsp

+where + P: EncodeLabelSetMut + Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static, +{ + type StatusLabels = labels::Rsp; + type DurationLabels = P; + + fn init_response(&mut self, rsp: &http::Response) { + self.status = Some(rsp.status()); + } + + fn end_response(&mut self, res: Result, &linkerd_app_core::Error>) { + if let Err(e) = res { + match labels::Error::new_or_status(e) { + Ok(l) => self.error = Some(l), + Err(code) => match http::StatusCode::from_u16(code) { + Ok(s) => self.status = Some(s), + // This is kind of pathological, so mark it as an unkown error. + Err(_) => self.error = Some(labels::Error::Unknown), + }, + } + } + } + + fn status_labels(&self) -> Self::StatusLabels { + labels::Rsp( + self.parent.clone(), + labels::HttpRsp { + status: self.status, + error: self.error, + }, + ) + } + + fn duration_labels(&self) -> Self::DurationLabels { + self.parent.clone() + } +} + +// === impl LabelGrpcRsp === + +impl

From

for LabelGrpcRsp

{ + fn from(parent: P) -> Self { + Self { + parent, + status: None, + error: None, + } + } +} + +impl

StreamLabel for LabelGrpcRsp

+where + P: EncodeLabelSetMut + Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static, +{ + type StatusLabels = labels::Rsp; + type DurationLabels = P; + + fn init_response(&mut self, rsp: &http::Response) { + self.status = rsp + .headers() + .get("grpc-status") + .map(|v| tonic::Code::from_bytes(v.as_bytes())); + } + + fn end_response(&mut self, res: Result, &linkerd_app_core::Error>) { + match res { + Ok(Some(trailers)) => { + self.status = trailers + .get("grpc-status") + .map(|v| tonic::Code::from_bytes(v.as_bytes())); + } + Ok(None) => {} + Err(e) => match labels::Error::new_or_status(e) { + Ok(l) => self.error = Some(l), + Err(code) => self.status = Some(tonic::Code::from_i32(i32::from(code))), + }, + } + } + + fn status_labels(&self) -> Self::StatusLabels { + labels::Rsp( + self.parent.clone(), + labels::GrpcRsp { + status: self.status, + error: self.error, + }, + ) + } + + fn duration_labels(&self) -> Self::DurationLabels { + self.parent.clone() + } +} diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs new file mode 100644 index 0000000000..6bb7f4046b --- /dev/null +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs @@ -0,0 +1,241 @@ +//! Prometheus label types. +use linkerd_app_core::{errors, metrics::prom::EncodeLabelSetMut, Error as BoxError}; +use prometheus_client::encoding::*; + +use crate::{BackendRef, ParentRef, RouteRef}; + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct Route(pub ParentRef, pub RouteRef); + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct RouteBackend(pub ParentRef, pub RouteRef, pub BackendRef); + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct Rsp(pub P, pub L); + +pub type RouteRsp = Rsp; +pub type HttpRouteRsp = RouteRsp; +pub type GrpcRouteRsp = RouteRsp; + +pub type RouteBackendRsp = Rsp; +pub type HttpRouteBackendRsp = RouteBackendRsp; +pub type GrpcRouteBackendRsp = RouteBackendRsp; + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct HttpRsp { + pub status: Option, + pub error: Option, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct GrpcRsp { + pub status: Option, + pub error: Option, +} + +#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)] +pub enum Error { + FailFast, + LoadShed, + Timeout, + Cancel, + Refused, + EnhanceYourCalm, + Reset, + GoAway, + Io, + Unknown, +} + +// === impl Route === + +impl From<(ParentRef, RouteRef)> for Route { + fn from((parent, route): (ParentRef, RouteRef)) -> Self { + Self(parent, route) + } +} + +impl EncodeLabelSetMut for Route { + fn encode_label_set(&self, enc: &mut LabelSetEncoder<'_>) -> std::fmt::Result { + let Self(parent, route) = self; + parent.encode_label_set(enc)?; + route.encode_label_set(enc)?; + Ok(()) + } +} + +impl EncodeLabelSet for Route { + fn encode(&self, mut enc: LabelSetEncoder<'_>) -> std::fmt::Result { + self.encode_label_set(&mut enc) + } +} + +// === impl RouteBackend === + +impl From<(ParentRef, RouteRef, BackendRef)> for RouteBackend { + fn from((parent, route, backend): (ParentRef, RouteRef, BackendRef)) -> Self { + Self(parent, route, backend) + } +} + +impl EncodeLabelSetMut for RouteBackend { + fn encode_label_set(&self, enc: &mut LabelSetEncoder<'_>) -> std::fmt::Result { + let Self(parent, route, backend) = self; + parent.encode_label_set(enc)?; + route.encode_label_set(enc)?; + backend.encode_label_set(enc)?; + Ok(()) + } +} + +impl EncodeLabelSet for RouteBackend { + fn encode(&self, mut enc: LabelSetEncoder<'_>) -> std::fmt::Result { + self.encode_label_set(&mut enc) + } +} + +// === impl Rsp === + +impl EncodeLabelSetMut for Rsp { + fn encode_label_set(&self, enc: &mut LabelSetEncoder<'_>) -> std::fmt::Result { + let Self(route, rsp) = self; + route.encode_label_set(enc)?; + rsp.encode_label_set(enc)?; + Ok(()) + } +} + +impl EncodeLabelSet for Rsp { + fn encode(&self, mut enc: LabelSetEncoder<'_>) -> std::fmt::Result { + self.encode_label_set(&mut enc) + } +} + +// === impl HttpRsp === + +impl EncodeLabelSetMut for HttpRsp { + fn encode_label_set(&self, enc: &mut LabelSetEncoder<'_>) -> std::fmt::Result { + let Self { status, error } = self; + + ("http_status", status.map(|c| c.as_u16())).encode(enc.encode_label())?; + ("error", *error).encode(enc.encode_label())?; + + Ok(()) + } +} + +impl EncodeLabelSet for HttpRsp { + fn encode(&self, mut enc: LabelSetEncoder<'_>) -> std::fmt::Result { + self.encode_label_set(&mut enc) + } +} + +// === impl GrpcRsp === + +impl EncodeLabelSetMut for GrpcRsp { + fn encode_label_set(&self, enc: &mut LabelSetEncoder<'_>) -> std::fmt::Result { + let Self { status, error } = self; + + ( + "grpc_status", + match status.unwrap_or(tonic::Code::Unknown) { + tonic::Code::Ok => "OK", + tonic::Code::Cancelled => "CANCELLED", + tonic::Code::InvalidArgument => "INVALID_ARGUMENT", + tonic::Code::DeadlineExceeded => "DEADLINE_EXCEEDED", + tonic::Code::NotFound => "NOT_FOUND", + tonic::Code::AlreadyExists => "ALREADY_EXISTS", + tonic::Code::PermissionDenied => "PERMISSION_DENIED", + tonic::Code::ResourceExhausted => "RESOURCE_EXHAUSTED", + tonic::Code::FailedPrecondition => "FAILED_PRECONDITION", + tonic::Code::Aborted => "ABORTED", + tonic::Code::OutOfRange => "OUT_OF_RANGE", + tonic::Code::Unimplemented => "UNIMPLEMENTED", + tonic::Code::Internal => "INTERNAL", + tonic::Code::Unavailable => "UNAVAILABLE", + tonic::Code::DataLoss => "DATA_LOSS", + tonic::Code::Unauthenticated => "UNAUTHENTICATED", + _ => "UNKNOWN", + }, + ) + .encode(enc.encode_label())?; + + ("error", *error).encode(enc.encode_label())?; + + Ok(()) + } +} + +impl EncodeLabelSet for GrpcRsp { + fn encode(&self, mut enc: LabelSetEncoder<'_>) -> std::fmt::Result { + self.encode_label_set(&mut enc) + } +} + +// === impl Error === + +impl Error { + pub fn new_or_status(error: &BoxError) -> Result { + use super::super::super::errors as policy; + use crate::http::h2::{H2Error, Reason}; + + // No available backend can be found for a request. + if errors::is_caused_by::(&**error) { + return Ok(Self::FailFast); + } + if errors::is_caused_by::(&**error) { + return Ok(Self::LoadShed); + } + + if let Some(policy::HttpRouteRedirect { status, .. }) = errors::cause_ref(&**error) { + return Err(status.as_u16()); + } + + // Policy-driven request failures. + if let Some(policy::HttpRouteInjectedFailure { status, .. }) = errors::cause_ref(&**error) { + return Err(status.as_u16()); + } + if let Some(policy::GrpcRouteInjectedFailure { code, .. }) = errors::cause_ref(&**error) { + return Err(*code); + } + + // HTTP/2 errors. + if let Some(h2e) = errors::cause_ref::(&**error) { + if h2e.is_reset() { + match h2e.reason() { + Some(Reason::CANCEL) => return Ok(Self::Cancel), + Some(Reason::REFUSED_STREAM) => return Ok(Self::Refused), + Some(Reason::ENHANCE_YOUR_CALM) => return Ok(Self::EnhanceYourCalm), + _ => return Ok(Self::Reset), + } + } + if h2e.is_go_away() { + return Ok(Self::GoAway); + } + if h2e.is_io() { + return Ok(Self::Io); + } + } + + tracing::debug!(?error, "Unlabeled error"); + Ok(Self::Unknown) + } +} + +impl EncodeLabelValue for Error { + fn encode(&self, enc: &mut LabelValueEncoder<'_>) -> std::fmt::Result { + use std::fmt::Write; + match self { + Self::FailFast => enc.write_str("FAIL_FAST"), + Self::LoadShed => enc.write_str("LOAD_SHED"), + Self::Timeout => enc.write_str("TIMEOUT"), + Self::Cancel => enc.write_str("CANCEL"), + Self::Refused => enc.write_str("REFUSED"), + Self::EnhanceYourCalm => enc.write_str("ENHANCE_YOUR_CALM"), + Self::Reset => enc.write_str("RESET"), + Self::GoAway => enc.write_str("GO_AWAY"), + Self::Io => enc.write_str("IO"), + Self::Unknown => enc.write_str("UNKNOWN"), + } + } +} diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics/test_util.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics/test_util.rs new file mode 100644 index 0000000000..fd43587c4d --- /dev/null +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics/test_util.rs @@ -0,0 +1,110 @@ +use hyper::body::HttpBody; +use linkerd_app_core::{ + metrics::prom::Counter, + svc::{self, http::BoxBody, Service, ServiceExt}, +}; + +pub use self::mock_body::MockBody; + +pub async fn send_assert_incremented( + counter: &Counter, + handle: &mut Handle, + svc: &mut svc::BoxHttp, + req: http::Request, + send: impl FnOnce(SendResponse), +) { + handle.allow(1); + assert_eq!(counter.get(), 0); + svc.ready().await.expect("ready"); + let mut call = svc.call(req); + let (_req, tx) = tokio::select! { + _ = (&mut call) => unreachable!(), + res = handle.next_request() => res.unwrap(), + }; + assert_eq!(counter.get(), 0); + send(tx); + if let Ok(mut rsp) = call.await { + if !rsp.body().is_end_stream() { + assert_eq!(counter.get(), 0); + while let Some(Ok(_)) = rsp.body_mut().data().await {} + let _ = rsp.body_mut().trailers().await; + } + } + assert_eq!(counter.get(), 1); +} + +pub type Handle = tower_test::mock::Handle, http::Response>; +pub type SendResponse = tower_test::mock::SendResponse>; + +mod mock_body { + use bytes::Bytes; + use hyper::body::HttpBody; + use linkerd_app_core::{Error, Result}; + use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + }; + + #[derive(Default)] + #[pin_project::pin_project] + pub struct MockBody { + #[pin] + data: Option>>, + #[pin] + trailers: Option>>>, + } + + impl MockBody { + pub fn new(data: impl Future> + Send + 'static) -> Self { + Self { + data: Some(Box::pin(data)), + trailers: None, + } + } + + pub fn trailers( + trailers: impl Future>> + Send + 'static, + ) -> Self { + Self { + data: None, + trailers: Some(Box::pin(trailers)), + } + } + } + + impl HttpBody for MockBody { + type Data = Bytes; + type Error = Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let mut this = self.project(); + if let Some(rx) = this.data.as_mut().as_pin_mut() { + let ready = futures::ready!(rx.poll(cx)); + *this.data = None; + return Poll::Ready(ready.err().map(Err)); + } + Poll::Ready(None) + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + let mut this = self.project(); + if let Some(rx) = this.trailers.as_mut().as_pin_mut() { + let ready = futures::ready!(rx.poll(cx)); + *this.trailers = None; + return Poll::Ready(ready); + } + Poll::Ready(Ok(None)) + } + + fn is_end_stream(&self) -> bool { + self.data.is_none() && self.trailers.is_none() + } + } +} diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs new file mode 100644 index 0000000000..8aa3f08970 --- /dev/null +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs @@ -0,0 +1,345 @@ +use super::{ + super::{Grpc, Http, Route}, + labels, + test_util::*, + LabelGrpcRouteRsp, LabelHttpRouteRsp, RequestMetrics, +}; +use linkerd_app_core::svc::{self, http::BoxBody, Layer, NewService}; +use linkerd_proxy_client_policy as policy; + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn http_request_statuses() { + let _trace = linkerd_tracing::test::trace_init(); + + let metrics = super::HttpRouteMetrics::default().requests; + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let (mut svc, mut handle) = mock_http_route_metrics(&metrics, &parent_ref, &route_ref); + + // Send one request and ensure it's counted. + let ok = metrics.get_statuses(&labels::Rsp( + labels::Route(parent_ref.clone(), route_ref.clone()), + labels::HttpRsp { + status: Some(http::StatusCode::OK), + error: None, + }, + )); + send_assert_incremented(&ok, &mut handle, &mut svc, Default::default(), |tx| { + tx.send_response( + http::Response::builder() + .status(200) + .body(BoxBody::default()) + .unwrap(), + ) + }) + .await; + + // Send another request and ensure it's counted with a different response + // status. + let no_content = metrics.get_statuses(&labels::Rsp( + labels::Route(parent_ref.clone(), route_ref.clone()), + labels::HttpRsp { + status: Some(http::StatusCode::NO_CONTENT), + error: None, + }, + )); + send_assert_incremented( + &no_content, + &mut handle, + &mut svc, + Default::default(), + |tx| { + tx.send_response( + http::Response::builder() + .status(204) + .body(BoxBody::default()) + .unwrap(), + ) + }, + ) + .await; + + // Emit a response with an error and ensure it's counted. + let unknown = metrics.get_statuses(&labels::Rsp( + labels::Route(parent_ref.clone(), route_ref.clone()), + labels::HttpRsp { + status: None, + error: Some(labels::Error::Unknown), + }, + )); + send_assert_incremented(&unknown, &mut handle, &mut svc, Default::default(), |tx| { + tx.send_error("a spooky ghost") + }) + .await; + + // Emit a successful response with a body that fails and ensure that both + // the status and error are recorded. + let mixed = metrics.get_statuses(&labels::Rsp( + labels::Route(parent_ref, route_ref), + labels::HttpRsp { + status: Some(http::StatusCode::OK), + error: Some(labels::Error::Unknown), + }, + )); + send_assert_incremented(&mixed, &mut handle, &mut svc, Default::default(), |tx| { + tx.send_response( + http::Response::builder() + .status(200) + .body(BoxBody::new(MockBody::new(async { + Err("a spooky ghost".into()) + }))) + .unwrap(), + ) + }) + .await; + + assert_eq!(unknown.get(), 1); + assert_eq!(ok.get(), 1); + assert_eq!(no_content.get(), 1); + assert_eq!(mixed.get(), 1); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn grpc_request_statuses_ok() { + let _trace = linkerd_tracing::test::trace_init(); + + let metrics = super::GrpcRouteMetrics::default().requests; + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let (mut svc, mut handle) = mock_grpc_route_metrics(&metrics, &parent_ref, &route_ref); + + // Send one request and ensure it's counted. + let ok = metrics.get_statuses(&labels::Rsp( + labels::Route(parent_ref.clone(), route_ref.clone()), + labels::GrpcRsp { + status: Some(tonic::Code::Ok), + error: None, + }, + )); + send_assert_incremented( + &ok, + &mut handle, + &mut svc, + http::Request::builder() + .method("POST") + .uri("http://host/svc/method") + .body(Default::default()) + .unwrap(), + |tx| { + tx.send_response( + http::Response::builder() + .body(BoxBody::new(MockBody::trailers(async move { + let mut trailers = http::HeaderMap::new(); + trailers.insert("grpc-status", http::HeaderValue::from_static("0")); + Ok(Some(trailers)) + }))) + .unwrap(), + ) + }, + ) + .await; +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn grpc_request_statuses_not_found() { + let _trace = linkerd_tracing::test::trace_init(); + + let metrics = super::GrpcRouteMetrics::default().requests; + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let (mut svc, mut handle) = mock_grpc_route_metrics(&metrics, &parent_ref, &route_ref); + + // Send another request and ensure it's counted with a different response + // status. + let not_found = metrics.get_statuses(&labels::Rsp( + labels::Route(parent_ref.clone(), route_ref.clone()), + labels::GrpcRsp { + status: Some(tonic::Code::NotFound), + error: None, + }, + )); + send_assert_incremented( + ¬_found, + &mut handle, + &mut svc, + http::Request::builder() + .method("POST") + .uri("http://host/svc/method") + .body(Default::default()) + .unwrap(), + |tx| { + tx.send_response( + http::Response::builder() + .body(BoxBody::new(MockBody::trailers(async move { + let mut trailers = http::HeaderMap::new(); + trailers.insert("grpc-status", http::HeaderValue::from_static("5")); + Ok(Some(trailers)) + }))) + .unwrap(), + ) + }, + ) + .await; +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn grpc_request_statuses_error_response() { + let _trace = linkerd_tracing::test::trace_init(); + + let metrics = super::GrpcRouteMetrics::default().requests; + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let (mut svc, mut handle) = mock_grpc_route_metrics(&metrics, &parent_ref, &route_ref); + + let unknown = metrics.get_statuses(&labels::Rsp( + labels::Route(parent_ref.clone(), route_ref.clone()), + labels::GrpcRsp { + status: None, + error: Some(labels::Error::Unknown), + }, + )); + send_assert_incremented( + &unknown, + &mut handle, + &mut svc, + http::Request::builder() + .method("POST") + .uri("http://host/svc/method") + .body(Default::default()) + .unwrap(), + |tx| tx.send_error("a spooky ghost"), + ) + .await; +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn grpc_request_statuses_error_body() { + let _trace = linkerd_tracing::test::trace_init(); + + let metrics = super::GrpcRouteMetrics::default().requests; + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let (mut svc, mut handle) = mock_grpc_route_metrics(&metrics, &parent_ref, &route_ref); + + let unknown = metrics.get_statuses(&labels::Rsp( + labels::Route(parent_ref.clone(), route_ref.clone()), + labels::GrpcRsp { + status: None, + error: Some(labels::Error::Unknown), + }, + )); + send_assert_incremented( + &unknown, + &mut handle, + &mut svc, + http::Request::builder() + .method("POST") + .uri("http://host/svc/method") + .body(Default::default()) + .unwrap(), + |tx| { + tx.send_response( + http::Response::builder() + .body(BoxBody::new(MockBody::new(async { + Err("a spooky ghost".into()) + }))) + .unwrap(), + ) + }, + ) + .await; +} + +// === Utils === + +pub fn mock_http_route_metrics( + metrics: &RequestMetrics, + parent_ref: &crate::ParentRef, + route_ref: &crate::RouteRef, +) -> (svc::BoxHttp, Handle) { + let req = http::Request::builder().body(()).unwrap(); + let (r#match, _) = policy::route::find( + &[policy::http::Route { + hosts: vec![], + rules: vec![policy::route::Rule { + matches: vec![policy::http::r#match::MatchRequest::default()], + policy: policy::http::Policy { + meta: route_ref.0.clone(), + filters: [].into(), + request_timeout: None, + failure_policy: Default::default(), + distribution: policy::RouteDistribution::Empty, + }, + }], + }], + &req, + ) + .expect("find default route"); + + let (tx, handle) = tower_test::mock::pair::, http::Response>(); + let svc = super::layer(metrics) + .layer(move |_t: Http<()>| tx.clone()) + .new_service(Http { + r#match, + params: Route { + parent: (), + addr: std::net::SocketAddr::new([0, 0, 0, 0].into(), 8080).into(), + parent_ref: parent_ref.clone(), + route_ref: route_ref.clone(), + failure_policy: Default::default(), + request_timeout: None, + filters: [].into(), + distribution: Default::default(), + }, + }); + + (svc::BoxHttp::new(svc), handle) +} + +pub fn mock_grpc_route_metrics( + metrics: &RequestMetrics, + parent_ref: &crate::ParentRef, + route_ref: &crate::RouteRef, +) -> (svc::BoxHttp, Handle) { + let req = http::Request::builder() + .method("POST") + .uri("http://host/svc/method") + .body(()) + .unwrap(); + let (r#match, _) = policy::route::find( + &[policy::grpc::Route { + hosts: vec![], + rules: vec![policy::route::Rule { + matches: vec![policy::grpc::r#match::MatchRoute::default()], + policy: policy::grpc::Policy { + meta: route_ref.0.clone(), + filters: [].into(), + request_timeout: None, + failure_policy: Default::default(), + distribution: policy::RouteDistribution::Empty, + }, + }], + }], + &req, + ) + .expect("find default route"); + + let (tx, handle) = tower_test::mock::pair::, http::Response>(); + let svc = super::layer(metrics) + .layer(move |_t: Grpc<()>| tx.clone()) + .new_service(Grpc { + r#match, + params: Route { + parent: (), + addr: std::net::SocketAddr::new([0, 0, 0, 0].into(), 8080).into(), + parent_ref: parent_ref.clone(), + route_ref: route_ref.clone(), + failure_policy: Default::default(), + request_timeout: None, + filters: [].into(), + distribution: Default::default(), + }, + }); + + (svc::BoxHttp::new(svc), handle) +} diff --git a/linkerd/app/outbound/src/http/logical/policy/router.rs b/linkerd/app/outbound/src/http/logical/policy/router.rs index cb517849f8..696b1b69c1 100644 --- a/linkerd/app/outbound/src/http/logical/policy/router.rs +++ b/linkerd/app/outbound/src/http/logical/policy/router.rs @@ -42,7 +42,7 @@ type NewBackendCache = distribute::NewBackendCache, (), N, // === impl Router === -impl Router +impl Router where // Parent target type. T: Clone + Debug + Eq + Hash + Send + Sync + 'static, @@ -53,24 +53,26 @@ where // Request filter. F: Debug + Eq + Hash, F: Clone + Send + Sync + 'static, - // Failure policy. - E: Debug + Eq + Hash, - E: Clone + Send + Sync + 'static, + // Route policy. + P: Debug + Eq + Hash, + P: Clone + Send + Sync + 'static, // Assert that we can route for the given match and filter types. Self: svc::router::SelectRoute< http::Request, - Key = route::MatchedRoute, + Key = route::MatchedRoute, Error = NoRoute, >, - route::MatchedRoute: route::filters::Apply + svc::Param, - route::MatchedBackend: route::filters::Apply, - route::backend::RouteBackendMetrics: - svc::ExtractParam>, + route::MatchedRoute: + route::filters::Apply + svc::Param + route::metrics::MkStreamLabel, + route::MatchedBackend: route::filters::Apply + route::metrics::MkStreamLabel, { /// Builds a stack that applies routes to distribute requests over a cached /// set of inner services so that. pub(super) fn layer( - metrics: route::RouteMetrics, + metrics: route::Metrics< + route::MatchedRoute, + route::MatchedBackend, + >, ) -> impl svc::Layer> + Clone where // Inner stack. @@ -100,14 +102,14 @@ where } } -impl From<(Params, T)> for Router +impl From<(Params, T)> for Router where T: Eq + Hash + Clone + Debug, M: Clone, F: Clone, - E: Clone, + P: Clone, { - fn from((rts, parent): (Params, T)) -> Self { + fn from((rts, parent): (Params, T)) -> Self { let Params { addr, meta: parent_ref, @@ -118,6 +120,7 @@ where let mk_concrete = { let parent = parent.clone(); + let parent_ref = parent_ref.clone(); move |backend_ref: BackendRef, target: concrete::Dispatch| { // XXX With policies we don't have a top-level authority name at // the moment. So, instead, we use the concrete addr used for @@ -189,23 +192,29 @@ where } }; - let mk_policy = |policy::RoutePolicy:: { - meta, - filters, - distribution, - failure_policy, - request_timeout, - }| { - let route_ref = RouteRef(meta); - let distribution = mk_distribution(&route_ref, &distribution); - route::Route { - addr: addr.clone(), - parent: parent.clone(), - route_ref, - filters, - failure_policy, - distribution, - request_timeout, + let mk_policy = { + let addr = addr.clone(); + let parent = parent.clone(); + let parent_ref = parent_ref.clone(); + move |policy::RoutePolicy:: { + meta, + filters, + distribution, + failure_policy, + request_timeout, + }| { + let route_ref = RouteRef(meta); + let distribution = mk_distribution(&route_ref, &distribution); + route::Route { + addr: addr.clone(), + parent: parent.clone(), + parent_ref: parent_ref.clone(), + route_ref, + filters, + distribution, + failure_policy, + request_timeout, + } } }; @@ -274,7 +283,7 @@ where } } -impl svc::Param for Router +impl svc::Param for Router where T: Eq + Hash + Clone + Debug, { @@ -283,7 +292,7 @@ where } } -impl svc::Param>> for Router +impl svc::Param>> for Router where T: Eq + Hash + Clone + Debug, { diff --git a/linkerd/app/outbound/src/http/logical/policy/tests.rs b/linkerd/app/outbound/src/http/logical/policy/tests.rs index f2b0e62e29..28d40e2ef1 100644 --- a/linkerd/app/outbound/src/http/logical/policy/tests.rs +++ b/linkerd/app/outbound/src/http/logical/policy/tests.rs @@ -125,17 +125,17 @@ async fn header_based_route() { failure_accrual: Default::default(), }); - let metrics = RouteMetrics::default(); + let metrics = HttpRouteMetrics::default(); let router = Policy::layer(metrics.clone(), Default::default()) .layer(inner) .new_service(Policy::from((routes, ()))); - let default_reqs = metrics.request_count( + let default_reqs = metrics.backend_request_count( parent_ref.clone(), default_route_ref.clone(), default_backend_ref.clone(), ); - let special_reqs = metrics.request_count( + let special_reqs = metrics.backend_request_count( parent_ref.clone(), special_route_ref.clone(), special_backend_ref.clone(), diff --git a/linkerd/http/prom/Cargo.toml b/linkerd/http/prom/Cargo.toml new file mode 100644 index 0000000000..36c253ed29 --- /dev/null +++ b/linkerd/http/prom/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "linkerd-http-prom" +version = "0.1.0" +edition = "2021" +publish = false +license = "Apache-2.0" + +[features] +test-util = [] + +[dependencies] +futures = { version = "0.3", default-features = false } +http = "0.2" +http-body = "0.4" +parking_lot = "0.12" +pin-project = "1" +prometheus-client = "0.22" +thiserror = "1" +tokio = { version = "1", features = ["time"] } + +linkerd-error = { path = "../../error" } +linkerd-http-box = { path = "../box" } +linkerd-metrics = { path = "../../metrics" } +linkerd-stack = { path = "../../stack" } diff --git a/linkerd/app/outbound/src/http/logical/policy/route/backend/count_reqs.rs b/linkerd/http/prom/src/count_reqs.rs similarity index 68% rename from linkerd/app/outbound/src/http/logical/policy/route/backend/count_reqs.rs rename to linkerd/http/prom/src/count_reqs.rs index 384e16d918..e2092fbdbc 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/backend/count_reqs.rs +++ b/linkerd/http/prom/src/count_reqs.rs @@ -1,11 +1,16 @@ -use linkerd_app_core::{metrics::prom, svc}; +use linkerd_stack as svc; +use prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{counter::Counter, family::Family}, + registry::Registry, +}; use std::task::{Context, Poll}; #[derive(Clone, Debug)] -pub struct RequestCountFamilies(prom::Family); +pub struct RequestCountFamilies(Family); #[derive(Clone, Debug)] -pub struct RequestCount(prom::Counter); +pub struct RequestCount(Counter); #[derive(Clone, Debug)] pub struct NewCountRequests { @@ -16,27 +21,7 @@ pub struct NewCountRequests { #[derive(Clone, Debug)] pub struct CountRequests { inner: S, - requests: prom::Counter, -} - -impl RequestCountFamilies -where - L: prom::encoding::EncodeLabelSet + std::fmt::Debug + std::hash::Hash, - L: Eq + Clone + Send + Sync + 'static, -{ - pub fn register(registry: &mut prom::Registry) -> Self { - let requests = prom::Family::default(); - registry.register( - "requests", - "The total number of requests dispatched", - requests.clone(), - ); - Self(requests) - } - - pub fn metrics(&self, labels: &L) -> RequestCount { - RequestCount(self.0.get_or_create(labels).clone()) - } + requests: Counter, } // === impl NewCountRequests === @@ -46,7 +31,7 @@ impl NewCountRequests { Self { extract, inner } } - pub fn layer_via(extract: X) -> impl svc::Layer + Clone { + pub fn layer_via(extract: X) -> impl svc::layer::Layer + Clone { svc::layer::mk(move |inner| Self::new(extract.clone(), inner)) } } @@ -59,16 +44,16 @@ where type Service = CountRequests; fn new_service(&self, target: T) -> Self::Service { - let RequestCount(counter) = self.extract.extract_param(&target); + let rc = self.extract.extract_param(&target); let inner = self.inner.new_service(target); - CountRequests::new(counter, inner) + CountRequests::new(rc, inner) } } // === impl CountRequests === impl CountRequests { - fn new(requests: prom::Counter, inner: S) -> Self { + pub(crate) fn new(RequestCount(requests): RequestCount, inner: S) -> Self { Self { requests, inner } } } @@ -92,18 +77,41 @@ where } } +// === impl RequestCountFamilies === + impl Default for RequestCountFamilies where - L: prom::encoding::EncodeLabelSet + std::fmt::Debug + std::hash::Hash, - L: Eq + Clone + Send + Sync + 'static, + L: EncodeLabelSet + std::fmt::Debug + std::hash::Hash, + L: Eq + Clone, { fn default() -> Self { - Self(prom::Family::default()) + Self(Family::default()) } } +impl RequestCountFamilies +where + L: EncodeLabelSet + std::fmt::Debug + std::hash::Hash, + L: Eq + Clone + Send + Sync + 'static, +{ + pub fn register(registry: &mut Registry) -> Self { + let requests = Family::default(); + registry.register( + "requests", + "The total number of requests dispatched", + requests.clone(), + ); + Self(requests) + } + + pub fn metrics(&self, labels: &L) -> RequestCount { + RequestCount(self.0.get_or_create(labels).clone()) + } +} + +// === impl RequestCount === + impl RequestCount { - #[cfg(test)] pub fn get(&self) -> u64 { self.0.get() } diff --git a/linkerd/http/prom/src/lib.rs b/linkerd/http/prom/src/lib.rs new file mode 100644 index 0000000000..4f00f842b4 --- /dev/null +++ b/linkerd/http/prom/src/lib.rs @@ -0,0 +1,7 @@ +#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] +#![forbid(unsafe_code)] + +mod count_reqs; +pub mod record_response; + +pub use self::count_reqs::{CountRequests, NewCountRequests, RequestCount, RequestCountFamilies}; diff --git a/linkerd/http/prom/src/record_response.rs b/linkerd/http/prom/src/record_response.rs new file mode 100644 index 0000000000..3810fd65ee --- /dev/null +++ b/linkerd/http/prom/src/record_response.rs @@ -0,0 +1,328 @@ +use http_body::Body; +use linkerd_error::Error; +use linkerd_http_box::BoxBody; +use linkerd_metrics::prom::Counter; +use linkerd_stack as svc; +use prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{ + family::{Family, MetricConstructor}, + histogram::Histogram, + }, +}; +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tokio::{sync::oneshot, time}; + +mod request; +mod response; + +pub use self::{ + request::{NewRequestDuration, RecordRequestDuration, RequestMetrics}, + response::{NewResponseDuration, RecordResponseDuration, ResponseMetrics}, +}; + +/// A strategy for labeling request/responses streams for status and duration +/// metrics. +/// +/// This is specifically to support higher-cardinality status counters and +/// lower-cardinality stream duration histograms. +pub trait MkStreamLabel { + type DurationLabels: EncodeLabelSet + + Clone + + Eq + + std::fmt::Debug + + std::hash::Hash + + Send + + Sync + + 'static; + type StatusLabels: EncodeLabelSet + + Clone + + Eq + + std::fmt::Debug + + std::hash::Hash + + Send + + Sync + + 'static; + + type StreamLabel: StreamLabel< + DurationLabels = Self::DurationLabels, + StatusLabels = Self::StatusLabels, + >; + + /// Returns None when the request should not be recorded. + fn mk_stream_labeler(&self, req: &http::Request) -> Option; +} + +pub trait StreamLabel: Send + 'static { + type DurationLabels: EncodeLabelSet + + Clone + + Eq + + std::fmt::Debug + + std::hash::Hash + + Send + + Sync + + 'static; + type StatusLabels: EncodeLabelSet + + Clone + + Eq + + std::fmt::Debug + + std::hash::Hash + + Send + + Sync + + 'static; + + fn init_response(&mut self, rsp: &http::Response); + fn end_response(&mut self, trailers: Result, &Error>); + + fn status_labels(&self) -> Self::StatusLabels; + fn duration_labels(&self) -> Self::DurationLabels; +} + +/// A set of parameters that can be used to construct a `RecordResponse` layer. +pub struct Params { + pub labeler: L, + pub metric: M, +} + +#[derive(Clone, Debug, thiserror::Error)] +#[error("request was cancelled before completion")] +pub struct RequestCancelled(()); + +/// Builds RecordResponse instances by extracing M-typed parameters from stack +/// targets +#[derive(Clone, Debug)] +pub struct NewRecordResponse { + inner: N, + extract: X, + _marker: std::marker::PhantomData (L, M)>, +} + +/// A Service that can record a request/response durations. +#[derive(Clone, Debug)] +pub struct RecordResponse { + inner: S, + labeler: L, + metric: M, +} + +#[pin_project::pin_project] +pub struct ResponseFuture +where + L: StreamLabel, +{ + #[pin] + inner: F, + state: Option>, +} + +/// Notifies the response labeler when the response body is flushed. +#[pin_project::pin_project(PinnedDrop)] +struct ResponseBody { + #[pin] + inner: BoxBody, + state: Option>, +} + +struct ResponseState { + labeler: L, + statuses: Family, + duration: DurationFamily, + start: oneshot::Receiver, +} + +type DurationFamily = Family; + +#[derive(Clone, Debug)] +struct MkDurationHistogram(Arc<[f64]>); + +// === impl MkDurationHistogram === + +impl MetricConstructor for MkDurationHistogram { + fn new_metric(&self) -> Histogram { + Histogram::new(self.0.iter().copied()) + } +} + +// === impl NewRecordResponse === + +impl NewRecordResponse +where + M: MkStreamLabel, +{ + pub fn new(extract: X, inner: N) -> Self { + Self { + extract, + inner, + _marker: std::marker::PhantomData, + } + } + + pub fn layer_via(extract: X) -> impl svc::layer::Layer + Clone + where + X: Clone, + { + svc::layer::mk(move |inner| Self::new(extract.clone(), inner)) + } +} + +impl NewRecordResponse +where + M: MkStreamLabel, +{ + pub fn layer() -> impl svc::layer::Layer + Clone { + Self::layer_via(()) + } +} + +impl svc::NewService for NewRecordResponse +where + L: MkStreamLabel, + X: svc::ExtractParam, T>, + N: svc::NewService, +{ + type Service = RecordResponse; + + fn new_service(&self, target: T) -> Self::Service { + let Params { labeler, metric } = self.extract.extract_param(&target); + let inner = self.inner.new_service(target); + RecordResponse::new(labeler, metric, inner) + } +} + +// === impl RecordResponse === + +impl RecordResponse +where + L: MkStreamLabel, +{ + pub(crate) fn new(labeler: L, metric: M, inner: S) -> Self { + Self { + inner, + labeler, + metric, + } + } +} + +// === impl ResponseFuture === + +impl Future for ResponseFuture +where + L: StreamLabel, + F: Future, Error>>, +{ + type Output = Result, Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let res = futures::ready!(this.inner.poll(cx)).map_err(Into::into); + let mut state = this.state.take(); + match res { + Ok(rsp) => { + if let Some(ResponseState { labeler, .. }) = state.as_mut() { + labeler.init_response(&rsp); + } + + let (head, inner) = rsp.into_parts(); + if inner.is_end_stream() { + end_stream(&mut state, Ok(None)); + } + Poll::Ready(Ok(http::Response::from_parts( + head, + BoxBody::new(ResponseBody { inner, state }), + ))) + } + Err(error) => { + end_stream(&mut state, Err(&error)); + Poll::Ready(Err(error)) + } + } + } +} + +// === impl ResponseBody === + +impl http_body::Body for ResponseBody +where + L: StreamLabel, +{ + type Data = ::Data; + type Error = Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let mut this = self.project(); + let res = + futures::ready!(this.inner.as_mut().poll_data(cx)).map(|res| res.map_err(Into::into)); + if let Some(Err(error)) = res.as_ref() { + end_stream(this.state, Err(error)); + } else if (*this.inner).is_end_stream() { + end_stream(this.state, Ok(None)); + } + Poll::Ready(res) + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Error>> { + let this = self.project(); + let res = futures::ready!(this.inner.poll_trailers(cx)).map_err(Into::into); + end_stream(this.state, res.as_ref().map(Option::as_ref)); + Poll::Ready(res) + } + + fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } +} + +#[pin_project::pinned_drop] +impl PinnedDrop for ResponseBody +where + L: StreamLabel, +{ + fn drop(self: Pin<&mut Self>) { + let this = self.project(); + if this.state.is_some() { + end_stream(this.state, Err(&RequestCancelled(()).into())); + } + } +} + +fn end_stream( + state: &mut Option>, + res: Result, &Error>, +) where + L: StreamLabel, +{ + let Some(ResponseState { + duration, + statuses: total, + mut start, + mut labeler, + }) = state.take() + else { + return; + }; + + labeler.end_response(res); + + total.get_or_create(&labeler.status_labels()).inc(); + + let elapsed = if let Ok(start) = start.try_recv() { + time::Instant::now().saturating_duration_since(start) + } else { + time::Duration::ZERO + }; + duration + .get_or_create(&labeler.duration_labels()) + .observe(elapsed.as_secs_f64()); +} diff --git a/linkerd/http/prom/src/record_response/request.rs b/linkerd/http/prom/src/record_response/request.rs new file mode 100644 index 0000000000..6daa6b2cfb --- /dev/null +++ b/linkerd/http/prom/src/record_response/request.rs @@ -0,0 +1,129 @@ +use linkerd_error::Error; +use linkerd_http_box::BoxBody; +use linkerd_metrics::prom::Counter; +use linkerd_stack as svc; +use prometheus_client::{ + encoding::EncodeLabelSet, + metrics::family::Family, + registry::{Registry, Unit}, +}; +use std::{ + sync::Arc, + task::{Context, Poll}, +}; +use tokio::{sync::oneshot, time}; + +use super::{DurationFamily, MkDurationHistogram, MkStreamLabel}; + +/// Metrics type that tracks completed requests. +#[derive(Debug)] +pub struct RequestMetrics { + duration: DurationFamily, + statuses: Family, +} + +pub type NewRequestDuration = super::NewRecordResponse< + L, + X, + RequestMetrics<::DurationLabels, ::StatusLabels>, + N, +>; + +pub type RecordRequestDuration = super::RecordResponse< + L, + RequestMetrics<::DurationLabels, ::StatusLabels>, + S, +>; + +// === impl RequestMetrics === + +impl RequestMetrics +where + DurL: EncodeLabelSet + Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static, + StatL: EncodeLabelSet + Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static, +{ + pub fn register(reg: &mut Registry, histo: impl IntoIterator) -> Self { + let duration = + DurationFamily::new_with_constructor(MkDurationHistogram(histo.into_iter().collect())); + reg.register_with_unit( + "request_duration", + "The time between request initialization and response completion", + Unit::Seconds, + duration.clone(), + ); + + let statuses = Family::default(); + reg.register( + "request_statuses", + "Completed request-response streams", + statuses.clone(), + ); + + Self { duration, statuses } + } +} + +#[cfg(feature = "test-util")] +impl RequestMetrics +where + StatL: EncodeLabelSet + Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static, + DurL: EncodeLabelSet + Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static, +{ + pub fn get_statuses(&self, labels: &StatL) -> Counter { + (*self.statuses.get_or_create(labels)).clone() + } +} + +impl Default for RequestMetrics +where + StatL: EncodeLabelSet + Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static, + DurL: EncodeLabelSet + Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static, +{ + fn default() -> Self { + Self { + duration: DurationFamily::new_with_constructor(MkDurationHistogram(Arc::new([]))), + statuses: Default::default(), + } + } +} + +impl Clone for RequestMetrics { + fn clone(&self) -> Self { + Self { + duration: self.duration.clone(), + statuses: self.statuses.clone(), + } + } +} + +impl svc::Service> for RecordRequestDuration +where + L: MkStreamLabel, + S: svc::Service, Response = http::Response, Error = Error>, +{ + type Response = http::Response; + type Error = S::Error; + type Future = super::ResponseFuture; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + let state = self.labeler.mk_stream_labeler(&req).map(|labeler| { + let (tx, start) = oneshot::channel(); + tx.send(time::Instant::now()).unwrap(); + let RequestMetrics { statuses, duration } = self.metric.clone(); + super::ResponseState { + labeler, + start, + duration, + statuses, + } + }); + + let inner = self.inner.call(req); + super::ResponseFuture { state, inner } + } +} diff --git a/linkerd/http/prom/src/record_response/response.rs b/linkerd/http/prom/src/record_response/response.rs new file mode 100644 index 0000000000..d11267aa32 --- /dev/null +++ b/linkerd/http/prom/src/record_response/response.rs @@ -0,0 +1,192 @@ +use linkerd_error::Error; +use linkerd_http_box::BoxBody; +use linkerd_metrics::prom::Counter; +use linkerd_stack as svc; +use prometheus_client::{ + encoding::EncodeLabelSet, + metrics::family::Family, + registry::{Registry, Unit}, +}; +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tokio::{sync::oneshot, time}; + +use super::{DurationFamily, MkDurationHistogram, MkStreamLabel}; + +#[derive(Debug)] +pub struct ResponseMetrics { + duration: DurationFamily, + statuses: Family, +} + +pub type NewResponseDuration = super::NewRecordResponse< + L, + X, + ResponseMetrics<::DurationLabels, ::StatusLabels>, + N, +>; + +pub type RecordResponseDuration = super::RecordResponse< + L, + ResponseMetrics<::DurationLabels, ::StatusLabels>, + S, +>; + +/// Notifies the response body when the request body is flushed. +#[pin_project::pin_project(PinnedDrop)] +struct RequestBody { + #[pin] + inner: B, + flushed: Option>, +} + +// === impl ResponseMetrics === + +impl ResponseMetrics +where + DurL: EncodeLabelSet + Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static, + StatL: EncodeLabelSet + Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static, +{ + pub fn register(reg: &mut Registry, histo: impl IntoIterator) -> Self { + let duration = + DurationFamily::new_with_constructor(MkDurationHistogram(histo.into_iter().collect())); + reg.register_with_unit( + "response_duration", + "The time between request completion and response completion", + Unit::Seconds, + duration.clone(), + ); + + let statuses = Family::default(); + reg.register("response_statuses", "Completed responses", statuses.clone()); + + Self { duration, statuses } + } +} + +#[cfg(feature = "test-util")] +impl ResponseMetrics +where + StatL: EncodeLabelSet + Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static, + DurL: EncodeLabelSet + Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static, +{ + pub fn get_statuses(&self, labels: &StatL) -> Counter { + (*self.statuses.get_or_create(labels)).clone() + } +} + +impl Default for ResponseMetrics +where + StatL: EncodeLabelSet + Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static, + DurL: EncodeLabelSet + Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static, +{ + fn default() -> Self { + Self { + duration: DurationFamily::new_with_constructor(MkDurationHistogram(Arc::new([]))), + statuses: Default::default(), + } + } +} + +impl Clone for ResponseMetrics { + fn clone(&self) -> Self { + Self { + duration: self.duration.clone(), + statuses: self.statuses.clone(), + } + } +} + +impl svc::Service> for RecordResponseDuration +where + M: MkStreamLabel, + S: svc::Service, Response = http::Response, Error = Error>, +{ + type Response = http::Response; + type Error = Error; + type Future = super::ResponseFuture; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut req: http::Request) -> Self::Future { + // If there's a labeler, wrap the request body to record the time that + // the respond flushes. + let state = if let Some(labeler) = self.labeler.mk_stream_labeler(&req) { + let (tx, start) = oneshot::channel(); + req = req.map(|inner| { + BoxBody::new(RequestBody { + inner, + flushed: Some(tx), + }) + }); + let ResponseMetrics { duration, statuses } = self.metric.clone(); + Some(super::ResponseState { + labeler, + start, + duration, + statuses, + }) + } else { + None + }; + + let inner = self.inner.call(req); + super::ResponseFuture { state, inner } + } +} + +// === impl ResponseBody === + +impl http_body::Body for RequestBody +where + B: http_body::Body, +{ + type Data = B::Data; + type Error = B::Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let mut this = self.project(); + let res = futures::ready!(this.inner.as_mut().poll_data(cx)); + if (*this.inner).is_end_stream() { + if let Some(tx) = this.flushed.take() { + let _ = tx.send(time::Instant::now()); + } + } + Poll::Ready(res) + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, B::Error>> { + let this = self.project(); + let res = futures::ready!(this.inner.poll_trailers(cx)); + if let Some(tx) = this.flushed.take() { + let _ = tx.send(time::Instant::now()); + } + Poll::Ready(res) + } + + fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } +} + +#[pin_project::pinned_drop] +impl PinnedDrop for RequestBody { + fn drop(self: Pin<&mut Self>) { + let this = self.project(); + if let Some(tx) = this.flushed.take() { + let _ = tx.send(time::Instant::now()); + } + } +} diff --git a/linkerd/http/route/src/grpc/tests.rs b/linkerd/http/route/src/grpc/tests.rs index 402bcb8ed0..4360ff1290 100644 --- a/linkerd/http/route/src/grpc/tests.rs +++ b/linkerd/http/route/src/grpc/tests.rs @@ -13,6 +13,25 @@ impl Default for Policy { } } +#[test] +fn default() { + let rts = vec![Route { + hosts: vec![], + rules: vec![Rule { + matches: vec![MatchRoute::default()], + policy: Policy::Expected, + }], + }]; + + let req = http::Request::builder() + .method(http::Method::POST) + .uri("http://foo.example.com/foo/bar") + .body(()) + .unwrap(); + let (_, policy) = find(&rts, &req).expect("must match"); + assert_eq!(*policy, Policy::Expected, "incorrect rule matched"); +} + /// Given two equivalent routes, choose the explicit hostname match and not /// the wildcard. #[test]