Skip to content

Commit

Permalink
feat(outbound): Add response metrics to policy router (#3086)
Browse files Browse the repository at this point in the history
The outbound policy router includes a requests counter that measures the number
of requests dispatched to each route-backend; but this does not provide
visibility into success rate or response time. Before introducing timeouts and
retires on outbound routes, this change introduces visibility into per-route
response metrics.

The route_request_statuses counters measure responses from the application's
point of view. Once retries are introduced, this will provide visibility into
the _effective_ success rate of each route.

    outbound_http_route_request_statuses_total{parent...,route...,http_status="200",error="TIMEOUT"} 0
    outbound_grpc_route_request_statuses_total{parent...,route...,grpc_status="NOT_FOUND",error="TIMEOUT"} 0

A coarse histogram is introduced at this scope to track the total duration of
requests dispatched to each route, covering all retries and all response stream
processing:

    outbound_http_route_request_duration_seconds_sum{parent...,route...} 0
    outbound_http_route_request_duration_seconds_count{parent...,route...} 0
    outbound_http_route_request_duration_seconds_bucket{le="0.05",parent...,route...} 0
    outbound_http_route_request_duration_seconds_bucket{le="0.5",parent...,route...} 0
    outbound_http_route_request_duration_seconds_bucket{le="1.0",parent...,route...} 0
    outbound_http_route_request_duration_seconds_bucket{le="10.0",parent...,route...} 0
    outbound_http_route_request_duration_seconds_bucket{le="+Inf",parent...,route...} 0

The route_backend_response_statuses counters measure the responses from
individual backends. This reflects the _actual_ success rate of each route as
served by the backend services.

    outbound_http_route_backend_response_statuses_total{parent...,route...,backend...,http_status="...",error="..."} 0
    outbound_grpc_route_backend_response_statuses_total{parent...,route...,backend...,grpc_status="...",error="..."} 0

A slightly more detailed histogram is introduced at this scope to track the time
spend processing responses from each backend (i.e. after the request has been
fully dispatched):

    outbound_http_route_backend_response_duration_seconds_sum{parent...,route...,backend...} 0
    outbound_http_route_backend_response_duration_seconds_count{parent...,route...,backend...} 0
    outbound_http_route_backend_response_duration_seconds_bucket{le="0.025",parent...,route...,backend...} 0
    outbound_http_route_backend_response_duration_seconds_bucket{le="0.05",parent...,route...,backend...} 0
    outbound_http_route_backend_response_duration_seconds_bucket{le="0.1",parent...,route...,backend...} 0
    outbound_http_route_backend_response_duration_seconds_bucket{le="0.25",parent...,route...,backend...} 0
    outbound_http_route_backend_response_duration_seconds_bucket{le="0.5",parent...,route...,backend...} 0
    outbound_http_route_backend_response_duration_seconds_bucket{le="1.0",parent...,route...,backend...} 0
    outbound_http_route_backend_response_duration_seconds_bucket{le="10.0",parent...,route...,backend...} 0
    outbound_http_route_backend_response_duration_seconds_bucket{le="+Inf",parent...,route...,backend...} 0

Note that duration histograms omit status code labels, as they needlessly
inflate metrics cardinality. The histograms that we have introduced here are
generally much more constrained, as we much choose broadly applicable buckets
and want to avoid cardinality explosion when many routes are used.
  • Loading branch information
olix0r authored Jul 23, 2024
1 parent b310d63 commit 7c99d15
Show file tree
Hide file tree
Showing 22 changed files with 2,339 additions and 149 deletions.
19 changes: 19 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,7 @@ dependencies = [
"linkerd-app-test",
"linkerd-distribute",
"linkerd-http-classify",
"linkerd-http-prom",
"linkerd-http-retry",
"linkerd-http-route",
"linkerd-identity",
Expand Down Expand Up @@ -1493,6 +1494,24 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-http-prom"
version = "0.1.0"
dependencies = [
"futures",
"http",
"http-body",
"linkerd-error",
"linkerd-http-box",
"linkerd-metrics",
"linkerd-stack",
"parking_lot",
"pin-project",
"prometheus-client",
"thiserror",
"tokio",
]

[[package]]
name = "linkerd-http-retry"
version = "0.1.0"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ members = [
"linkerd/http/classify",
"linkerd/http/h2",
"linkerd/http/metrics",
"linkerd/http/prom",
"linkerd/http/retry",
"linkerd/http/route",
"linkerd/identity",
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ linkerd-app-core = { path = "../core" }
linkerd-app-test = { path = "../test", optional = true }
linkerd-distribute = { path = "../../distribute" }
linkerd-http-classify = { path = "../../http/classify" }
linkerd-http-prom = { path = "../../http/prom" }
linkerd-http-retry = { path = "../../http/retry" }
linkerd-http-route = { path = "../../http/route" }
linkerd-identity = { path = "../../identity" }
Expand All @@ -49,6 +50,7 @@ linkerd-tonic-watch = { path = "../../tonic-watch" }
[dev-dependencies]
hyper = { version = "0.14", features = ["http1", "http2"] }
linkerd-app-test = { path = "../test", features = ["client-policy"] }
linkerd-http-prom = { path = "../../http/prom", features = ["test-util"] }
linkerd-io = { path = "../../io", features = ["tokio-test"] }
linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] }
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [
Expand Down
8 changes: 4 additions & 4 deletions linkerd/app/outbound/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pub struct Http<T>(T);
#[derive(Clone, Debug, Default)]
pub struct HttpMetrics {
balancer: concrete::BalancerMetrics,
http_route: policy::RouteMetrics,
grpc_route: policy::RouteMetrics,
http_route: policy::HttpRouteMetrics,
grpc_route: policy::GrpcRouteMetrics,
}

pub fn spawn_routes<T>(
Expand Down Expand Up @@ -132,12 +132,12 @@ where
impl HttpMetrics {
pub fn register(registry: &mut prom::Registry) -> Self {
let http = registry.sub_registry_with_prefix("http");
let http_route = policy::RouteMetrics::register(http.sub_registry_with_prefix("route"));
let http_route = policy::HttpRouteMetrics::register(http.sub_registry_with_prefix("route"));
let balancer =
concrete::BalancerMetrics::register(http.sub_registry_with_prefix("balancer"));

let grpc = registry.sub_registry_with_prefix("grpc");
let grpc_route = policy::RouteMetrics::register(grpc.sub_registry_with_prefix("route"));
let grpc_route = policy::GrpcRouteMetrics::register(grpc.sub_registry_with_prefix("route"));

Self {
balancer,
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/outbound/src/http/logical/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod router;
mod tests;

pub use self::{
route::{errors, RouteMetrics},
route::{errors, GrpcRouteMetrics, HttpRouteMetrics},
router::{GrpcParams, HttpParams},
};
pub use linkerd_proxy_client_policy::{ClientPolicy, FailureAccrual};
Expand Down Expand Up @@ -50,8 +50,8 @@ where
/// routing configurations to route requests over cached inner backend
/// services.
pub(super) fn layer<N, S>(
http_metrics: route::RouteMetrics,
grpc_metrics: route::RouteMetrics,
http_metrics: route::HttpRouteMetrics,
grpc_metrics: route::GrpcRouteMetrics,
) -> impl svc::Layer<N, Service = svc::ArcNewCloneHttp<Self>> + Clone
where
// Inner stack.
Expand Down
101 changes: 64 additions & 37 deletions linkerd/app/outbound/src/http/logical/policy/route.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
use super::super::Concrete;
use crate::RouteRef;
use linkerd_app_core::{classify, metrics::prom, proxy::http, svc, Addr, Error, Result};
use crate::{ParentRef, RouteRef};
use linkerd_app_core::{classify, proxy::http, svc, Addr, Error, Result};
use linkerd_distribute as distribute;
use linkerd_http_route as http_route;
use linkerd_proxy_client_policy as policy;
use std::{fmt::Debug, hash::Hash, sync::Arc};

pub(crate) mod backend;
pub(crate) mod filters;
pub(crate) mod metrics;

pub(crate) use self::backend::{Backend, MatchedBackend};
pub use self::filters::errors;
use self::metrics::labels::Route as RouteLabels;

#[derive(Clone, Debug, Default)]
pub struct RouteMetrics {
backend: backend::RouteBackendMetrics,
}
pub use self::metrics::{GrpcRouteMetrics, HttpRouteMetrics};

/// A target type that includes a summary of exactly how a request was matched.
/// This match state is required to apply route filters.
Expand All @@ -31,6 +30,7 @@ pub(crate) struct Matched<M, P> {
pub(crate) struct Route<T, F, E> {
pub(super) parent: T,
pub(super) addr: Addr,
pub(super) parent_ref: ParentRef,
pub(super) route_ref: RouteRef,
pub(super) filters: Arc<[F]>,
pub(super) distribution: BackendDistribution<T, F>,
Expand All @@ -55,6 +55,11 @@ pub(crate) type Grpc<T> = MatchedRoute<
pub(crate) type BackendDistribution<T, F> = distribute::Distribution<Backend<T, F>>;
pub(crate) type NewDistribute<T, F, N> = distribute::NewDistribute<Backend<T, F>, (), N>;

pub type Metrics<R, B> = metrics::RouteMetrics<
<R as metrics::MkStreamLabel>::StreamLabel,
<B as metrics::MkStreamLabel>::StreamLabel,
>;

/// Wraps errors with route metadata.
#[derive(Debug, thiserror::Error)]
#[error("route {}: {source}", route.0)]
Expand All @@ -64,28 +69,6 @@ struct RouteError {
source: Error,
}

// === impl RouteMetrics ===

impl RouteMetrics {
pub fn register(reg: &mut prom::Registry) -> Self {
Self {
backend: backend::RouteBackendMetrics::register(
reg.sub_registry_with_prefix("backend"),
),
}
}

#[cfg(test)]
pub(crate) fn request_count(
&self,
p: crate::ParentRef,
r: RouteRef,
b: crate::BackendRef,
) -> backend::RequestCount {
self.backend.request_count(p, r, b)
}
}

// === impl MatchedRoute ===

impl<T, M, F, E> MatchedRoute<T, M, F, E>
Expand All @@ -103,13 +86,15 @@ where
// Assert that filters can be applied.
Self: filters::Apply,
Self: svc::Param<classify::Request>,
Self: metrics::MkStreamLabel,
MatchedBackend<T, M, F>: filters::Apply,
MatchedBackend<T, M, F>: metrics::MkStreamLabel,
{
/// Builds a route stack that applies policy filters to requests and
/// distributes requests over each route's backends. These [`Concrete`]
/// backends are expected to be cached/shared by the inner stack.
pub(crate) fn layer<N, S>(
metrics: RouteMetrics,
metrics: Metrics<Self, MatchedBackend<T, M, F>>,
) -> impl svc::Layer<N, Service = svc::ArcNewCloneHttp<Self>> + Clone
where
// Inner stack.
Expand All @@ -134,10 +119,11 @@ where
// consideration, so we must eagerly fail requests to prevent
// leaking tasks onto the runtime.
.push_on_service(svc::LoadShed::layer())
// TODO(ver) attach the `E` typed failure policy to requests.
.push(filters::NewApplyFilters::<Self, _, _>::layer())
// Sets an optional request timeout.
.push(http::NewTimeout::layer())
.push(metrics::layer(&metrics.requests))
// Configure a classifier to use in the endpoint stack.
// FIXME(ver) move this into NewSetExtensions
.push(classify::NewClassify::layer())
.push(svc::NewMapErr::layer_with(|rt: &Self| {
let route = rt.params.route_ref.clone();
Expand All @@ -152,18 +138,29 @@ where
}
}

impl<T: Clone, M, F, E> svc::Param<BackendDistribution<T, F>> for MatchedRoute<T, M, F, E> {
impl<T: Clone, M, F, P> svc::Param<BackendDistribution<T, F>> for MatchedRoute<T, M, F, P> {
fn param(&self) -> BackendDistribution<T, F> {
self.params.distribution.clone()
}
}

impl<T, M, F, E> svc::Param<http::timeout::ResponseTimeout> for MatchedRoute<T, M, F, E> {
impl<T: Clone, M, F, P> svc::Param<RouteLabels> for MatchedRoute<T, M, F, P> {
fn param(&self) -> RouteLabels {
RouteLabels(
self.params.parent_ref.clone(),
self.params.route_ref.clone(),
)
}
}

impl<T, M, F, P> svc::Param<http::timeout::ResponseTimeout> for MatchedRoute<T, M, F, P> {
fn param(&self) -> http::timeout::ResponseTimeout {
http::timeout::ResponseTimeout(self.params.request_timeout)
}
}

// === impl Http ===

impl<T> filters::Apply for Http<T> {
#[inline]
fn apply_request<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
Expand All @@ -176,14 +173,30 @@ impl<T> filters::Apply for Http<T> {
}
}

impl<T> metrics::MkStreamLabel for Http<T> {
type StatusLabels = metrics::labels::HttpRouteRsp;
type DurationLabels = metrics::labels::Route;
type StreamLabel = metrics::LabelHttpRouteRsp;

fn mk_stream_labeler<B>(&self, _: &::http::Request<B>) -> Option<Self::StreamLabel> {
let parent = self.params.parent_ref.clone();
let route = self.params.route_ref.clone();
Some(metrics::LabelHttpRsp::from(metrics::labels::Route::from((
parent, route,
))))
}
}

impl<T> svc::Param<classify::Request> for Http<T> {
fn param(&self) -> classify::Request {
classify::Request::ClientPolicy(classify::ClientPolicy::Http(
self.params.failure_policy.clone(),
policy::http::StatusRanges::default(),
))
}
}

// === impl Grpc ===

impl<T> filters::Apply for Grpc<T> {
#[inline]
fn apply_request<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
Expand All @@ -196,10 +209,24 @@ impl<T> filters::Apply for Grpc<T> {
}
}

impl<T> metrics::MkStreamLabel for Grpc<T> {
type StatusLabels = metrics::labels::GrpcRouteRsp;
type DurationLabels = metrics::labels::Route;
type StreamLabel = metrics::LabelGrpcRouteRsp;

fn mk_stream_labeler<B>(&self, _: &::http::Request<B>) -> Option<Self::StreamLabel> {
let parent = self.params.parent_ref.clone();
let route = self.params.route_ref.clone();
Some(metrics::LabelGrpcRsp::from(metrics::labels::Route::from((
parent, route,
))))
}
}

impl<T> svc::Param<classify::Request> for Grpc<T> {
fn param(&self) -> classify::Request {
classify::Request::ClientPolicy(classify::ClientPolicy::Grpc(
self.params.failure_policy.clone(),
))
classify::Request::ClientPolicy(
classify::ClientPolicy::Grpc(policy::grpc::Codes::default()),
)
}
}
45 changes: 37 additions & 8 deletions linkerd/app/outbound/src/http/logical/policy/route/backend.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use super::{super::Concrete, filters};
use crate::{BackendRef, ParentRef, RouteRef};
use linkerd_app_core::{proxy::http, svc, Error, Result};
use linkerd_http_prom::record_response::MkStreamLabel;
use linkerd_http_route as http_route;
use linkerd_proxy_client_policy as policy;
use std::{fmt::Debug, hash::Hash, sync::Arc};

mod count_reqs;
mod metrics;

pub use self::count_reqs::RequestCount;
pub use self::metrics::RouteBackendMetrics;
pub(super) mod metrics;

#[derive(Debug, PartialEq, Eq, Hash)]
pub(crate) struct Backend<T, F> {
Expand All @@ -25,6 +22,8 @@ pub(crate) type Http<T> =
pub(crate) type Grpc<T> =
MatchedBackend<T, http_route::grpc::r#match::RouteMatch, policy::grpc::Filter>;

pub type Metrics<T> = metrics::RouteBackendMetrics<<T as MkStreamLabel>::StreamLabel>;

/// Wraps errors with backend metadata.
#[derive(Debug, thiserror::Error)]
#[error("backend {}: {source}", backend.0)]
Expand Down Expand Up @@ -71,15 +70,15 @@ where
F: Clone + Send + Sync + 'static,
// Assert that filters can be applied.
Self: filters::Apply,
RouteBackendMetrics: svc::ExtractParam<RequestCount, Self>,
Self: metrics::MkStreamLabel,
{
/// Builds a stack that applies per-route-backend policy filters over an
/// inner [`Concrete`] stack.
///
/// This [`MatchedBackend`] must implement [`filters::Apply`] to apply these
/// filters.
pub(crate) fn layer<N, S>(
metrics: RouteBackendMetrics,
metrics: Metrics<Self>,
) -> impl svc::Layer<N, Service = svc::ArcNewCloneHttp<Self>> + Clone
where
// Inner stack.
Expand All @@ -103,7 +102,7 @@ where
)
.push(filters::NewApplyFilters::<Self, _, _>::layer())
.push(http::NewTimeout::layer())
.push(count_reqs::NewCountRequests::layer_via(metrics.clone()))
.push(metrics::layer(&metrics))
.push(svc::NewMapErr::layer_with(|t: &Self| {
let backend = t.params.concrete.backend_ref.clone();
move |source| {
Expand Down Expand Up @@ -155,6 +154,21 @@ impl<T> filters::Apply for Http<T> {
}
}

impl<T> metrics::MkStreamLabel for Http<T> {
type StatusLabels = metrics::labels::HttpRouteBackendRsp;
type DurationLabels = metrics::labels::RouteBackend;
type StreamLabel = metrics::LabelHttpRouteBackendRsp;

fn mk_stream_labeler<B>(&self, _: &::http::Request<B>) -> Option<Self::StreamLabel> {
let parent = self.params.concrete.parent_ref.clone();
let route = self.params.route_ref.clone();
let backend = self.params.concrete.backend_ref.clone();
Some(metrics::LabelHttpRsp::from(
metrics::labels::RouteBackend::from((parent, route, backend)),
))
}
}

impl<T> filters::Apply for Grpc<T> {
#[inline]
fn apply_request<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
Expand All @@ -165,3 +179,18 @@ impl<T> filters::Apply for Grpc<T> {
filters::apply_grpc_response(&self.params.filters, rsp)
}
}

impl<T> metrics::MkStreamLabel for Grpc<T> {
type StatusLabels = metrics::labels::GrpcRouteBackendRsp;
type DurationLabels = metrics::labels::RouteBackend;
type StreamLabel = metrics::LabelGrpcRouteBackendRsp;

fn mk_stream_labeler<B>(&self, _: &::http::Request<B>) -> Option<Self::StreamLabel> {
let parent = self.params.concrete.parent_ref.clone();
let route = self.params.route_ref.clone();
let backend = self.params.concrete.backend_ref.clone();
Some(metrics::LabelGrpcRsp::from(
metrics::labels::RouteBackend::from((parent, route, backend)),
))
}
}
Loading

0 comments on commit 7c99d15

Please sign in to comment.