From 475ef04b37e92b4bddcf91e168096e0a01890733 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 23 Jul 2024 21:22:05 +0000 Subject: [PATCH] feat(outbound): Add per-route stream timeout policies This change adds a new timeout configuration to the outbound HTTP and gRPC policies that sets request extensions to configure timeout behavior in the endpoint stack. This ensures that timeouts manifest as endpoint-level errors (i.e. visible to endpoint-level circuit breakers) instead of as a route-level cancellation. Several timeouts are implemented: 1. A request timeout, which is the maximum time a full request-response stream can remain in the proxy. 2. A response headers timeout that limits the amount of time after the request has been fully sent in which response headers must be fully received. This will be used to implement retry timeouts. 3. A response timeout, which limits the amount of time after the request has been fully sent in which a response may be in flight. 4. An idle timeout that limits the time a request-response stream may be sit in the proxy without any DATA frame activity. NOTE: As this timeout is enforced in the endpoint stack, it is currently possible for requests to remain in the balancer queue until a failfast timeout is tripped. The configured timeouts only apply to requests that have been dequeued and dispatched to an endpoint. NOTE: A followup API change is required to support configuring all timeouts via the API. --- Cargo.lock | 1 + linkerd/app/core/src/errors/respond.rs | 10 + linkerd/app/integration/src/policy.rs | 8 +- .../integration/src/tests/client_policy.rs | 25 +- linkerd/app/outbound/Cargo.toml | 10 +- linkerd/app/outbound/src/discover.rs | 8 +- linkerd/app/outbound/src/http/endpoint.rs | 3 +- .../outbound/src/http/logical/policy/route.rs | 37 +- .../src/http/logical/policy/route/backend.rs | 4 +- .../policy/route/backend/metrics/tests.rs | 6 +- .../http/logical/policy/route/extensions.rs | 81 +++ .../src/http/logical/policy/route/metrics.rs | 1 + .../logical/policy/route/metrics/tests.rs | 10 +- .../src/http/logical/policy/router.rs | 19 +- .../outbound/src/http/logical/policy/tests.rs | 8 +- .../app/outbound/src/http/logical/tests.rs | 51 +- linkerd/app/outbound/src/http/server.rs | 28 +- .../app/test/src/resolver/client_policy.rs | 8 +- linkerd/proxy/client-policy/src/grpc.rs | 35 +- linkerd/proxy/client-policy/src/http.rs | 42 +- linkerd/proxy/client-policy/src/lib.rs | 109 ++-- linkerd/proxy/client-policy/src/opaq.rs | 8 +- linkerd/proxy/http/Cargo.toml | 1 + linkerd/proxy/http/src/lib.rs | 4 +- linkerd/proxy/http/src/stream_timeouts.rs | 488 ++++++++++++++++++ linkerd/proxy/http/src/timeout.rs | 2 +- 26 files changed, 796 insertions(+), 211 deletions(-) create mode 100644 linkerd/app/outbound/src/http/logical/policy/route/extensions.rs create mode 100644 linkerd/proxy/http/src/stream_timeouts.rs diff --git a/Cargo.lock b/Cargo.lock index 1698cc8ac6..6499639e18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1882,6 +1882,7 @@ dependencies = [ "linkerd-proxy-balance", "linkerd-stack", "linkerd-tracing", + "parking_lot", "pin-project", "rand", "thiserror", diff --git a/linkerd/app/core/src/errors/respond.rs b/linkerd/app/core/src/errors/respond.rs index 0b57fb36a6..550204b5aa 100644 --- a/linkerd/app/core/src/errors/respond.rs +++ b/linkerd/app/core/src/errors/respond.rs @@ -126,6 +126,16 @@ impl SyntheticHttpResponse { } } + pub fn gateway_timeout_nonfatal(msg: impl ToString) -> Self { + Self { + close_connection: false, + http_status: http::StatusCode::GATEWAY_TIMEOUT, + grpc_status: tonic::Code::Unavailable, + message: Cow::Owned(msg.to_string()), + location: None, + } + } + pub fn unavailable(msg: impl ToString) -> Self { Self { close_connection: true, diff --git a/linkerd/app/integration/src/policy.rs b/linkerd/app/integration/src/policy.rs index 14760ded14..b8cf4b9cef 100644 --- a/linkerd/app/integration/src/policy.rs +++ b/linkerd/app/integration/src/policy.rs @@ -118,11 +118,11 @@ pub fn outbound_default(dst: impl ToString) -> outbound::OutboundPolicy { timeout: Some(Duration::from_secs(10).try_into().unwrap()), http1: Some(proxy_protocol::Http1 { routes: vec![route.clone()], - failure_accrual: None, + ..Default::default() }), http2: Some(proxy_protocol::Http2 { routes: vec![route], - failure_accrual: None, + ..Default::default() }), opaque: Some(proxy_protocol::Opaque { routes: vec![outbound_default_opaque_route(dst)], @@ -150,7 +150,7 @@ pub fn outbound_default_http_route(dst: impl ToString) -> outbound::HttpRoute { }], filters: Vec::new(), backends: Some(http_first_available(std::iter::once(backend(dst)))), - request_timeout: None, + ..Default::default() }], } } @@ -214,7 +214,7 @@ pub fn http_first_available( .map(|backend| http_route::RouteBackend { backend: Some(backend), filters: Vec::new(), - request_timeout: None, + ..Default::default() }) .collect(), }, diff --git a/linkerd/app/integration/src/tests/client_policy.rs b/linkerd/app/integration/src/tests/client_policy.rs index 50de7fa640..ef04b0d3e5 100644 --- a/linkerd/app/integration/src/tests/client_policy.rs +++ b/linkerd/app/integration/src/tests/client_policy.rs @@ -63,11 +63,11 @@ async fn empty_http1_route() { hosts: Vec::new(), rules: Vec::new(), }], - failure_accrual: None, + ..Default::default() }), http2: Some(proxy_protocol::Http2 { routes: vec![policy::outbound_default_http_route(&dst)], - failure_accrual: None, + ..Default::default() }), opaque: Some(proxy_protocol::Opaque { routes: vec![policy::outbound_default_opaque_route(&dst)], @@ -148,7 +148,7 @@ async fn empty_http2_route() { timeout: Some(Duration::from_secs(10).try_into().unwrap()), http1: Some(proxy_protocol::Http1 { routes: vec![policy::outbound_default_http_route(&dst)], - failure_accrual: None, + ..Default::default() }), http2: Some(proxy_protocol::Http2 { routes: vec![outbound::HttpRoute { @@ -156,7 +156,7 @@ async fn empty_http2_route() { hosts: Vec::new(), rules: Vec::new(), }], - failure_accrual: None, + ..Default::default() }), opaque: Some(proxy_protocol::Opaque { routes: vec![policy::outbound_default_opaque_route(&dst)], @@ -223,7 +223,7 @@ async fn header_based_routing() { backends: Some(policy::http_first_available(std::iter::once( policy::backend(dst), ))), - request_timeout: None, + ..Default::default() }; let route = outbound::HttpRoute { @@ -237,7 +237,7 @@ async fn header_based_routing() { backends: Some(policy::http_first_available(std::iter::once( policy::backend(&dst_world), ))), - request_timeout: None, + ..Default::default() }, // x-hello-city: sf | x-hello-city: san francisco mk_header_rule( @@ -266,11 +266,11 @@ async fn header_based_routing() { timeout: Some(Duration::from_secs(10).try_into().unwrap()), http1: Some(proxy_protocol::Http1 { routes: vec![route.clone()], - failure_accrual: None, + ..Default::default() }), http2: Some(proxy_protocol::Http2 { routes: vec![route], - failure_accrual: None, + ..Default::default() }), opaque: Some(proxy_protocol::Opaque { routes: vec![policy::outbound_default_opaque_route(&dst_world)], @@ -400,8 +400,7 @@ async fn path_based_routing() { backends: Some(policy::http_first_available(std::iter::once( policy::backend(dst), ))), - - request_timeout: None, + ..Default::default() }; let route = outbound::HttpRoute { @@ -415,7 +414,7 @@ async fn path_based_routing() { backends: Some(policy::http_first_available(std::iter::once( policy::backend(&dst_world), ))), - request_timeout: None, + ..Default::default() }, // /goodbye/* mk_path_rule( @@ -449,11 +448,11 @@ async fn path_based_routing() { timeout: Some(Duration::from_secs(10).try_into().unwrap()), http1: Some(proxy_protocol::Http1 { routes: vec![route.clone()], - failure_accrual: None, + ..Default::default() }), http2: Some(proxy_protocol::Http2 { routes: vec![route], - failure_accrual: None, + ..Default::default() }), opaque: Some(proxy_protocol::Opaque { routes: vec![policy::outbound_default_opaque_route(&dst_world)], diff --git a/linkerd/app/outbound/Cargo.toml b/linkerd/app/outbound/Cargo.toml index 364c17b5ab..258e24837f 100644 --- a/linkerd/app/outbound/Cargo.toml +++ b/linkerd/app/outbound/Cargo.toml @@ -23,13 +23,13 @@ futures = { version = "0.3", default-features = false } linkerd2-proxy-api = { version = "0.13", features = ["outbound"] } once_cell = "1" parking_lot = "0.12" +pin-project = "1" prometheus-client = "0.22" thiserror = "1" tokio = { version = "1", features = ["sync"] } tonic = { version = "0.10", default-features = false } tower = { version = "0.4", features = ["util"] } tracing = "0.1" -pin-project = "1" linkerd-app-core = { path = "../core" } linkerd-app-test = { path = "../test", optional = true } @@ -49,6 +49,10 @@ linkerd-tonic-watch = { path = "../../tonic-watch" } [dev-dependencies] hyper = { version = "0.14", features = ["http1", "http2"] } +tokio = { version = "1", features = ["macros", "sync", "time"] } +tokio-test = "0.4" +tower-test = "0.4" + linkerd-app-test = { path = "../test", features = ["client-policy"] } linkerd-http-prom = { path = "../../http/prom", features = ["test-util"] } linkerd-io = { path = "../../io", features = ["tokio-test"] } @@ -58,7 +62,3 @@ linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [ ] } linkerd-stack = { path = "../../stack", features = ["test-util"] } linkerd-tracing = { path = "../../tracing", features = ["ansi"] } -parking_lot = "0.12" -tokio = { version = "1", features = ["macros", "sync", "time"] } -tokio-test = "0.4" -tower-test = "0.4" diff --git a/linkerd/app/outbound/src/discover.rs b/linkerd/app/outbound/src/discover.rs index 51a3787580..09cff8d985 100644 --- a/linkerd/app/outbound/src/discover.rs +++ b/linkerd/app/outbound/src/discover.rs @@ -237,13 +237,11 @@ fn policy_for_backend( policy: Some(policy::opaq::Policy { meta: meta.clone(), filters: NO_OPAQ_FILTERS.clone(), - failure_policy: Default::default(), - request_timeout: None, + params: Default::default(), distribution: policy::RouteDistribution::FirstAvailable(Arc::new([ policy::RouteBackend { filters: NO_OPAQ_FILTERS.clone(), backend: backend.clone(), - request_timeout: None, }, ])), }), @@ -256,13 +254,11 @@ fn policy_for_backend( policy: policy::http::Policy { meta: meta.clone(), filters: NO_HTTP_FILTERS.clone(), - failure_policy: Default::default(), - request_timeout: None, + params: Default::default(), distribution: policy::RouteDistribution::FirstAvailable(Arc::new([ policy::RouteBackend { filters: NO_HTTP_FILTERS.clone(), backend: backend.clone(), - request_timeout: None, }, ])), }, diff --git a/linkerd/app/outbound/src/http/endpoint.rs b/linkerd/app/outbound/src/http/endpoint.rs index 60fcd62f97..f9ac5362fa 100644 --- a/linkerd/app/outbound/src/http/endpoint.rs +++ b/linkerd/app/outbound/src/http/endpoint.rs @@ -103,12 +103,13 @@ impl Outbound> { // is only done when the `Closable` parameter is set to true. // This module always strips error headers from responses. .push(NewHandleProxyErrorHeaders::layer()) + .push_on_service(http::BoxRequest::layer()) + .push_on_service(http::EnforceTimeouts::layer()) // Handle connection-level errors eagerly so that we can report 5XX failures in tap // and metrics. HTTP error metrics are not incremented here so that errors are not // double-counted--i.e., endpoint metrics track these responses and error metrics // track proxy errors that occur higher in the stack. .push(ClientRescue::layer(config.emit_headers)) - .push_on_service(http::BoxRequest::layer()) .push(tap::NewTapHttp::layer(rt.tap.clone())) .push( rt.metrics diff --git a/linkerd/app/outbound/src/http/logical/policy/route.rs b/linkerd/app/outbound/src/http/logical/policy/route.rs index ba3278886a..391b792179 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route.rs @@ -7,6 +7,7 @@ use linkerd_proxy_client_policy as policy; use std::{fmt::Debug, hash::Hash, sync::Arc}; pub(crate) mod backend; +pub(crate) mod extensions; pub(crate) mod filters; pub(crate) mod metrics; @@ -27,28 +28,28 @@ pub(crate) struct Matched { } #[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub(crate) struct Route { +pub(crate) struct Route { pub(super) parent: T, pub(super) addr: Addr, pub(super) parent_ref: ParentRef, pub(super) route_ref: RouteRef, pub(super) filters: Arc<[F]>, pub(super) distribution: BackendDistribution, - pub(super) failure_policy: E, + pub(super) params: P, } -pub(crate) type MatchedRoute = Matched>; +pub(crate) type MatchedRoute = Matched>; pub(crate) type Http = MatchedRoute< T, http_route::http::r#match::RequestMatch, policy::http::Filter, - policy::http::StatusRanges, + policy::http::RouteParams, >; pub(crate) type Grpc = MatchedRoute< T, http_route::grpc::r#match::RouteMatch, policy::grpc::Filter, - policy::grpc::Codes, + policy::grpc::RouteParams, >; pub(crate) type BackendDistribution = distribute::Distribution>; @@ -70,7 +71,7 @@ struct RouteError { // === impl MatchedRoute === -impl MatchedRoute +impl MatchedRoute where // Parent target. T: Debug + Eq + Hash, @@ -80,11 +81,12 @@ where // Request filter. F: Debug + Eq + Hash, F: Clone + Send + Sync + 'static, - // Failure policy. - E: Clone + Send + Sync + 'static, + // Route params. + P: Clone + Send + Sync + 'static, // Assert that filters can be applied. Self: filters::Apply, Self: svc::Param, + Self: svc::Param, Self: metrics::MkStreamLabel, MatchedBackend: filters::Apply, MatchedBackend: metrics::MkStreamLabel, @@ -119,6 +121,9 @@ where // leaking tasks onto the runtime. .push_on_service(svc::LoadShed::layer()) .push(filters::NewApplyFilters::::layer()) + // Set request extensions based on the route configuration + // AND/OR headers + .push(extensions::NewSetExtensions::layer()) .push(metrics::layer(&metrics.requests)) // Configure a classifier to use in the endpoint stack. // FIXME(ver) move this into NewSetExtensions @@ -179,6 +184,14 @@ impl metrics::MkStreamLabel for Http { } } +impl svc::Param for Http { + fn param(&self) -> extensions::Params { + extensions::Params { + timeouts: self.params.params.timeouts.clone(), + } + } +} + impl svc::Param for Http { fn param(&self) -> classify::Request { classify::Request::ClientPolicy(classify::ClientPolicy::Http( @@ -215,6 +228,14 @@ impl metrics::MkStreamLabel for Grpc { } } +impl svc::Param for Grpc { + fn param(&self) -> extensions::Params { + extensions::Params { + timeouts: self.params.params.timeouts.clone(), + } + } +} + impl svc::Param for Grpc { fn param(&self) -> classify::Request { classify::Request::ClientPolicy( diff --git a/linkerd/app/outbound/src/http/logical/policy/route/backend.rs b/linkerd/app/outbound/src/http/logical/policy/route/backend.rs index 6b7fd921c1..6ee2c507a3 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/backend.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/backend.rs @@ -46,10 +46,10 @@ impl Clone for Backend { // === impl Matched === -impl From<(Backend, super::MatchedRoute)> +impl From<(Backend, super::MatchedRoute)> for MatchedBackend { - fn from((params, route): (Backend, super::MatchedRoute)) -> Self { + fn from((params, route): (Backend, super::MatchedRoute)) -> Self { MatchedBackend { r#match: route.r#match, params, diff --git a/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs b/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs index f974af2a76..bcca14482d 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs @@ -296,9 +296,8 @@ fn mock_http_route_backend_metrics( policy: policy::http::Policy { meta: route_ref.0.clone(), filters: [].into(), - request_timeout: None, - failure_policy: Default::default(), distribution: policy::RouteDistribution::Empty, + params: Default::default(), }, }], }], @@ -353,9 +352,8 @@ fn mock_grpc_route_backend_metrics( policy: policy::grpc::Policy { meta: route_ref.0.clone(), filters: [].into(), - request_timeout: None, - failure_policy: Default::default(), distribution: policy::RouteDistribution::Empty, + params: Default::default(), }, }], }], diff --git a/linkerd/app/outbound/src/http/logical/policy/route/extensions.rs b/linkerd/app/outbound/src/http/logical/policy/route/extensions.rs new file mode 100644 index 0000000000..c58f9cb118 --- /dev/null +++ b/linkerd/app/outbound/src/http/logical/policy/route/extensions.rs @@ -0,0 +1,81 @@ +use linkerd_app_core::{proxy::http, svc}; +use linkerd_proxy_client_policy as policy; +use std::task::{Context, Poll}; + +#[derive(Clone, Debug)] +pub struct Params { + pub timeouts: policy::http::Timeouts, +} + +#[derive(Clone, Debug)] +pub struct NewSetExtensions { + inner: N, +} + +#[derive(Clone, Debug)] +pub struct SetExtensions { + inner: S, + params: Params, +} + +// === impl NewSetExtensions === + +impl NewSetExtensions { + pub fn layer() -> impl svc::Layer + Clone { + svc::layer::mk(|inner| Self { inner }) + } +} + +impl svc::NewService for NewSetExtensions +where + T: svc::Param, + N: svc::NewService, +{ + type Service = SetExtensions; + + fn new_service(&self, target: T) -> Self::Service { + let params = target.param(); + let inner = self.inner.new_service(target); + SetExtensions { params, inner } + } +} + +// === impl SetExtensions === + +impl svc::Service> for SetExtensions +where + S: svc::Service>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut req: http::Request) -> Self::Future { + let timeouts = self.configure_timeouts(); + tracing::debug!(?timeouts, "Setting extensions"); + + let _prior = req.extensions_mut().insert(timeouts); + debug_assert!( + _prior.is_none(), + "StreamTimeouts must only be configured once" + ); + + self.inner.call(req) + } +} + +impl SetExtensions { + fn configure_timeouts(&self) -> http::StreamTimeouts { + http::StreamTimeouts { + response_headers: None, + response_end: self.params.timeouts.response, + idle: self.params.timeouts.idle, + limit: self.params.timeouts.request.map(Into::into), + } + } +} diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics.rs index 5f872c9847..ca5afe6739 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/metrics.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics.rs @@ -108,6 +108,7 @@ impl RouteMetrics { reg.sub_registry_with_prefix("backend"), Self::RESPONSE_BUCKETS.iter().copied(), ); + Self { requests, backend } } diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs index 655c947634..ad73a69cde 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs @@ -266,9 +266,8 @@ pub fn mock_http_route_metrics( policy: policy::http::Policy { meta: route_ref.0.clone(), filters: [].into(), - request_timeout: None, - failure_policy: Default::default(), distribution: policy::RouteDistribution::Empty, + params: policy::http::RouteParams::default(), }, }], }], @@ -286,9 +285,9 @@ pub fn mock_http_route_metrics( addr: std::net::SocketAddr::new([0, 0, 0, 0].into(), 8080).into(), parent_ref: parent_ref.clone(), route_ref: route_ref.clone(), - failure_policy: Default::default(), filters: [].into(), distribution: Default::default(), + params: policy::http::RouteParams::default(), }, }); @@ -313,9 +312,8 @@ pub fn mock_grpc_route_metrics( policy: policy::grpc::Policy { meta: route_ref.0.clone(), filters: [].into(), - request_timeout: None, - failure_policy: Default::default(), distribution: policy::RouteDistribution::Empty, + params: policy::grpc::RouteParams::default(), }, }], }], @@ -333,9 +331,9 @@ pub fn mock_grpc_route_metrics( addr: std::net::SocketAddr::new([0, 0, 0, 0].into(), 8080).into(), parent_ref: parent_ref.clone(), route_ref: route_ref.clone(), - failure_policy: Default::default(), filters: [].into(), distribution: Default::default(), + params: policy::grpc::RouteParams::default(), }, }); diff --git a/linkerd/app/outbound/src/http/logical/policy/router.rs b/linkerd/app/outbound/src/http/logical/policy/router.rs index f661235d5a..f17e363a6a 100644 --- a/linkerd/app/outbound/src/http/logical/policy/router.rs +++ b/linkerd/app/outbound/src/http/logical/policy/router.rs @@ -21,9 +21,9 @@ pub struct Params { } pub type HttpParams = - Params; + Params; pub type GrpcParams = - Params; + Params; #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) struct Router { @@ -34,9 +34,9 @@ pub(crate) struct Router { } pub(super) type Http = - Router; + Router; pub(super) type Grpc = - Router; + Router; type NewBackendCache = distribute::NewBackendCache, (), N, S>; @@ -62,8 +62,10 @@ where Key = route::MatchedRoute, Error = NoRoute, >, - route::MatchedRoute: - route::filters::Apply + svc::Param + route::metrics::MkStreamLabel, + route::MatchedRoute: route::filters::Apply + + svc::Param + + svc::Param + + route::metrics::MkStreamLabel, route::MatchedBackend: route::filters::Apply + route::metrics::MkStreamLabel, { /// Builds a stack that applies routes to distribute requests over a cached @@ -199,8 +201,7 @@ where meta, filters, distribution, - failure_policy, - request_timeout: _, + params, }| { let route_ref = RouteRef(meta); let distribution = mk_distribution(&route_ref, &distribution); @@ -211,7 +212,7 @@ where route_ref, filters, distribution, - failure_policy, + params, } } }; diff --git a/linkerd/app/outbound/src/http/logical/policy/tests.rs b/linkerd/app/outbound/src/http/logical/policy/tests.rs index 28d40e2ef1..ccecc61008 100644 --- a/linkerd/app/outbound/src/http/logical/policy/tests.rs +++ b/linkerd/app/outbound/src/http/logical/policy/tests.rs @@ -47,12 +47,10 @@ async fn header_based_route() { section: None, }), filters: Arc::new([]), - failure_policy: Default::default(), - request_timeout: None, + params: Default::default(), distribution: policy::RouteDistribution::FirstAvailable(Arc::new([policy::RouteBackend { filters: Arc::new([]), backend, - request_timeout: None, }])), }; @@ -211,8 +209,7 @@ async fn http_filter_request_headers() { matches: vec![route::http::MatchRequest::default()], policy: policy::RoutePolicy { meta: policy::Meta::new_default("turtles"), - failure_policy: Default::default(), - request_timeout: None, + params: Default::default(), filters: Arc::new([policy::http::Filter::RequestHeaders( policy::http::filter::ModifyHeader { add: vec![(PIZZA.clone(), TUBULAR.clone())], @@ -228,7 +225,6 @@ async fn http_filter_request_headers() { ..Default::default() }, )]), - request_timeout: None, }, ])), }, diff --git a/linkerd/app/outbound/src/http/logical/tests.rs b/linkerd/app/outbound/src/http/logical/tests.rs index 51917dcaa9..34833d1fab 100644 --- a/linkerd/app/outbound/src/http/logical/tests.rs +++ b/linkerd/app/outbound/src/http/logical/tests.rs @@ -308,9 +308,7 @@ async fn balancer_doesnt_select_tripped_breakers() { } } -// XXX(ver): Route request timeouts will be reintroduced. #[tokio::test(flavor = "current_thread")] -#[ignore] async fn route_request_timeout() { tokio::time::pause(); let _trace = trace::test::trace_init(); @@ -333,7 +331,7 @@ async fn route_request_timeout() { let backend = default_backend(&dest); // Set a request timeout for the route, and no backend request timeout // on the backend. - let route = timeout_route(backend.clone(), Some(REQUEST_TIMEOUT), None); + let route = timeout_route(backend.clone(), REQUEST_TIMEOUT); watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams { addr: dest.into(), meta: ParentRef(client_policy::Meta::new_default("parent")), @@ -350,24 +348,29 @@ async fn route_request_timeout() { let svc = stack.new_service(target); handle.allow(1); - let rsp = send_req(svc.clone(), http::Request::get("/")); - serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")).await; - assert_eq!( - rsp.await.expect("request must succeed").status(), - http::StatusCode::OK - ); + let call = send_req(svc.clone(), http::Request::get("/")); + let (_req, send_rsp) = handle + .next_request() + .await + .expect("service must receive request"); + tokio::spawn(async move { + time::sleep(REQUEST_TIMEOUT * 2).await; + send_rsp.send_response(::http::Response::default()); + }); - // now, time out... - let rsp = send_req(svc.clone(), http::Request::get("/")); - tokio::time::sleep(REQUEST_TIMEOUT).await; - let error = rsp.await.expect_err("request must fail with a timeout"); + // tokio::time::sleep(REQUEST_TIMEOUT).await; + let error = time::timeout(REQUEST_TIMEOUT * 4, call) + .await + .expect("request must fail with a timeout") + .expect_err("request must fail with a timeout"); assert!( error.is::(), "error must originate in the logical stack" ); - assert!(errors::is_caused_by::( - error.as_ref() - )); + assert!( + errors::is_caused_by::(error.as_ref()), + "expected response timeout, got {error}" + ); } #[derive(Clone, Debug)] @@ -535,12 +538,10 @@ fn default_route(backend: client_policy::Backend) -> client_policy::http::Route policy: Policy { meta: Meta::new_default("test_route"), filters: NO_FILTERS.clone(), - failure_policy: Default::default(), - request_timeout: None, + params: http::RouteParams::default(), distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend { filters: NO_FILTERS.clone(), backend, - request_timeout: None, }])), }, }], @@ -549,8 +550,7 @@ fn default_route(backend: client_policy::Backend) -> client_policy::http::Route fn timeout_route( backend: client_policy::Backend, - route_timeout: Option, - backend_timeout: Option, + route_timeout: Duration, ) -> client_policy::http::Route { use client_policy::{ http::{self, Filter, Policy, Route, Rule}, @@ -565,12 +565,15 @@ fn timeout_route( policy: Policy { meta: Meta::new_default("test_route"), filters: NO_FILTERS.clone(), - failure_policy: Default::default(), - request_timeout: route_timeout, + params: http::RouteParams { + timeouts: http::Timeouts { + request: Some(route_timeout), + ..Default::default() + }, + }, distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend { filters: NO_FILTERS.clone(), backend, - request_timeout: backend_timeout, }])), }, }], diff --git a/linkerd/app/outbound/src/http/server.rs b/linkerd/app/outbound/src/http/server.rs index 91e150aa35..dc8f77cb1c 100644 --- a/linkerd/app/outbound/src/http/server.rs +++ b/linkerd/app/outbound/src/http/server.rs @@ -121,6 +121,24 @@ impl errors::HttpRescue for ServerRescue { fn rescue(&self, error: Error) -> Result { use super::logical::policy::errors as policy; + // No available backend can be found for a request. + if errors::is_caused_by::(&*error) { + // XXX(ver) This should probably be SERVICE_UNAVAILABLE, because + // this is basically no different from a LoadShedError, but that + // would be a change in behavior. + return Ok(errors::SyntheticHttpResponse::gateway_timeout(error)); + } + if errors::is_caused_by::(&*error) { + return Ok(errors::SyntheticHttpResponse::unavailable(error)); + } + + // Handle policy-driven timeouts. + if errors::is_caused_by::(&*error) { + return Ok(errors::SyntheticHttpResponse::gateway_timeout_nonfatal( + error, + )); + } + // A profile configured request timeout was encountered. if errors::is_caused_by::(&*error) { return Ok(errors::SyntheticHttpResponse::gateway_timeout(error)); @@ -132,16 +150,6 @@ impl errors::HttpRescue for ServerRescue { return Ok(errors::SyntheticHttpResponse::bad_gateway(error)); } - // No available backend can be found for a request. - if errors::is_caused_by::(&*error) { - // XXX(ver) This should probably be SERVICE_UNAVAILABLE, because - // this is basically no different from a LoadShedError, but that - // would be a change in behavior. - return Ok(errors::SyntheticHttpResponse::gateway_timeout(error)); - } - if errors::is_caused_by::(&*error) { - return Ok(errors::SyntheticHttpResponse::unavailable(error)); - } if errors::is_caused_by::(&*error) { return Ok(errors::SyntheticHttpResponse::bad_gateway(error)); } diff --git a/linkerd/app/test/src/resolver/client_policy.rs b/linkerd/app/test/src/resolver/client_policy.rs index c8860425cf..1b7811e576 100644 --- a/linkerd/app/test/src/resolver/client_policy.rs +++ b/linkerd/app/test/src/resolver/client_policy.rs @@ -72,12 +72,10 @@ impl ClientPolicies { policy: http::Policy { meta: Meta::new_default("default"), filters: Arc::new([]), - failure_policy: Default::default(), - request_timeout: None, + params: Default::default(), distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend { filters: Arc::new([]), backend: backend.clone(), - request_timeout: None, }])), }, }], @@ -97,12 +95,10 @@ impl ClientPolicies { policy: Some(opaq::Policy { meta: Meta::new_default("default"), filters: Arc::new([]), - failure_policy: Default::default(), - request_timeout: None, + params: Default::default(), distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend { filters: Arc::new([]), backend: backend.clone(), - request_timeout: None, }])), }), }, diff --git a/linkerd/proxy/client-policy/src/grpc.rs b/linkerd/proxy/client-policy/src/grpc.rs index 7f6a05fc2c..66d8609119 100644 --- a/linkerd/proxy/client-policy/src/grpc.rs +++ b/linkerd/proxy/client-policy/src/grpc.rs @@ -4,16 +4,22 @@ use std::sync::Arc; pub use linkerd_http_route::grpc::{filter, find, r#match, RouteMatch}; -pub type Policy = crate::RoutePolicy; +pub type Policy = crate::RoutePolicy; pub type Route = grpc::Route; pub type Rule = grpc::Rule; +#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] +pub struct RouteParams { + pub timeouts: crate::http::Timeouts, +} + // TODO HTTP2 settings #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct Grpc { pub routes: Arc<[Route]>, /// Configures how endpoints accrue observed failures. + // TODO(ver) Move this to backends and scope to endpoints. pub failure_accrual: FailureAccrual, } @@ -36,8 +42,7 @@ pub fn default(distribution: crate::RouteDistribution) -> Route { meta: crate::Meta::new_default("default"), filters: Arc::new([]), distribution, - failure_policy: Codes::default(), - request_timeout: None, + params: Default::default(), }, }], } @@ -102,7 +107,6 @@ pub mod proto { r#match::host::{proto::InvalidHostMatch, MatchHost}, }, }; - use std::time::Duration; #[derive(Debug, thiserror::Error)] pub enum InvalidGrpcRoute { @@ -121,14 +125,14 @@ pub mod proto { #[error("invalid filter: {0}")] Filter(#[from] InvalidFilter), - #[error("missing {0}")] - Missing(&'static str), - #[error("invalid failure accrual policy: {0}")] Breaker(#[from] InvalidFailureAccrual), - #[error("invalid duration: {0}")] - Duration(#[from] prost_types::DurationError), + #[error("invalid request timeout: {0}")] + RequestTimeout(#[from] prost_types::DurationError), + + #[error("missing {0}")] + Missing(&'static str), } #[derive(Debug, thiserror::Error)] @@ -199,6 +203,7 @@ pub mod proto { meta: &Arc, proto: outbound::grpc_route::Rule, ) -> Result { + #[allow(deprecated)] let outbound::grpc_route::Rule { matches, backends, @@ -220,7 +225,8 @@ pub mod proto { .ok_or(InvalidGrpcRoute::Missing("distribution"))? .try_into()?; - let request_timeout = request_timeout.map(Duration::try_from).transpose()?; + let mut params = RouteParams::default(); + params.timeouts.request = request_timeout.map(TryInto::try_into).transpose()?; Ok(Rule { matches, @@ -228,8 +234,7 @@ pub mod proto { meta: meta.clone(), filters, distribution, - failure_policy: Codes::default(), - request_timeout, + params, }, }) } @@ -280,13 +285,11 @@ pub mod proto { type Error = InvalidBackend; fn try_from( grpc_route::RouteBackend { - backend, - filters, - request_timeout, + backend, filters, .. }: grpc_route::RouteBackend, ) -> Result, InvalidBackend> { let backend = backend.ok_or(InvalidBackend::Missing("backend"))?; - RouteBackend::try_from_proto(backend, filters, request_timeout) + RouteBackend::try_from_proto(backend, filters) } } diff --git a/linkerd/proxy/client-policy/src/http.rs b/linkerd/proxy/client-policy/src/http.rs index 58664c8e64..f7a1ba643f 100644 --- a/linkerd/proxy/client-policy/src/http.rs +++ b/linkerd/proxy/client-policy/src/http.rs @@ -1,13 +1,18 @@ use crate::FailureAccrual; use linkerd_http_route::http; -use std::{ops::RangeInclusive, sync::Arc}; +use std::{ops::RangeInclusive, sync::Arc, time}; pub use linkerd_http_route::http::{filter, find, r#match, RouteMatch}; -pub type Policy = crate::RoutePolicy; +pub type Policy = crate::RoutePolicy; pub type Route = http::Route; pub type Rule = http::Rule; +#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] +pub struct RouteParams { + pub timeouts: Timeouts, +} + // TODO: keepalive settings, etc. #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct Http1 { @@ -38,6 +43,13 @@ pub enum Filter { #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct StatusRanges(pub Arc<[RangeInclusive]>); +#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] +pub struct Timeouts { + pub response: Option, + pub idle: Option, + pub request: Option, +} + pub fn default(distribution: crate::RouteDistribution) -> Route { Route { hosts: vec![], @@ -47,8 +59,7 @@ pub fn default(distribution: crate::RouteDistribution) -> Route { meta: crate::Meta::new_default("default"), filters: Arc::new([]), distribution, - failure_policy: StatusRanges::default(), - request_timeout: None, + params: RouteParams::default(), }, }], } @@ -92,6 +103,8 @@ impl Default for StatusRanges { } } +// === impl Timeouts === + #[cfg(feature = "proto")] pub mod proto { use super::*; @@ -130,11 +143,11 @@ pub mod proto { #[error("invalid failure accrual policy: {0}")] Breaker(#[from] InvalidFailureAccrual), + #[error("invalid request timeout: {0}")] + RequestTimeout(#[from] prost_types::DurationError), + #[error("missing {0}")] Missing(&'static str), - - #[error("invalid request timeout: {0}")] - Timeout(#[from] prost_types::DurationError), } #[derive(Debug, thiserror::Error)] @@ -218,6 +231,7 @@ pub mod proto { meta: &Arc, proto: outbound::http_route::Rule, ) -> Result { + #[allow(deprecated)] let outbound::http_route::Rule { matches, backends, @@ -239,9 +253,8 @@ pub mod proto { .ok_or(InvalidHttpRoute::Missing("distribution"))? .try_into()?; - let request_timeout = request_timeout - .map(std::time::Duration::try_from) - .transpose()?; + let mut params = RouteParams::default(); + params.timeouts.request = request_timeout.map(TryInto::try_into).transpose()?; Ok(Rule { matches, @@ -249,8 +262,7 @@ pub mod proto { meta: meta.clone(), filters, distribution, - failure_policy: StatusRanges::default(), - request_timeout, + params, }, }) } @@ -301,13 +313,11 @@ pub mod proto { type Error = InvalidBackend; fn try_from( http_route::RouteBackend { - backend, - filters, - request_timeout, + backend, filters, .. }: http_route::RouteBackend, ) -> Result { let backend = backend.ok_or(InvalidBackend::Missing("backend"))?; - RouteBackend::try_from_proto(backend, filters, request_timeout) + RouteBackend::try_from_proto(backend, filters) } } diff --git a/linkerd/proxy/client-policy/src/lib.rs b/linkerd/proxy/client-policy/src/lib.rs index ccbfa27be1..8835eed33d 100644 --- a/linkerd/proxy/client-policy/src/lib.rs +++ b/linkerd/proxy/client-policy/src/lib.rs @@ -54,25 +54,12 @@ pub enum Meta { } #[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub struct RoutePolicy { +pub struct RoutePolicy { pub meta: Arc, pub filters: Arc<[T]>, pub distribution: RouteDistribution, - /// Request timeout applied to HTTP and gRPC routes. - /// - /// Opaque routes are proxied as opaque TCP, and therefore, we have no - /// concept of a "request", so this field is ignored by opaque routes. - /// It's somewhat unfortunate that this field is part of the `RoutePolicy` - /// struct, which is used to represent routes for all protocols, rather than - /// as a filter, which are a generic type that depends on the protocol in - /// use. However, this can't be easily modeled as a filter using the current - /// design for filters, as filters synchronously modify a request or return - /// an error --- a filter cannot wrap the response future in order to add a - /// timeout. - pub request_timeout: Option, - - /// Configures what responses are classified as failures. - pub failure_policy: F, + + pub params: P, } // TODO(ver) Weighted random WITHOUT availability awareness, as required by @@ -90,7 +77,6 @@ pub enum RouteDistribution { pub struct RouteBackend { pub filters: Arc<[T]>, pub backend: Backend, - pub request_timeout: Option, } // TODO(ver) how does configuration like failure accrual fit in here? What about @@ -167,8 +153,7 @@ impl ClientPolicy { )) .collect(), distribution: RouteDistribution::Empty, - failure_policy: http::StatusRanges::default(), - request_timeout: None, + params: http::RouteParams::default(), }, }], }]) @@ -422,13 +407,17 @@ pub mod proto { #[derive(Debug, thiserror::Error)] pub enum InvalidFailureAccrual { #[error("invalid backoff: {0}")] + Backoff(#[from] InvalidBackoff), + #[error("missing {0}")] + Missing(&'static str), + } + + #[derive(Debug, thiserror::Error)] + pub enum InvalidBackoff { + #[error(transparent)] Backoff(#[from] linkerd_exp_backoff::InvalidBackoff), - #[error("invalid {field} duration: {error}")] - Duration { - field: &'static str, - #[source] - error: prost_types::DurationError, - }, + #[error("invalid duration: {0}")] + Duration(#[from] prost_types::DurationError), #[error("missing {0}")] Missing(&'static str), } @@ -598,7 +587,6 @@ pub mod proto { pub(crate) fn try_from_proto( backend: outbound::Backend, filters: impl IntoIterator, - request_timeout: Option, ) -> Result where T: TryFrom, @@ -610,20 +598,8 @@ pub mod proto { .map(T::try_from) .collect::, _>>() .map_err(|error| InvalidBackend::Filter(error.into()))?; - let request_timeout = - request_timeout - .map(|d| d.try_into()) - .transpose() - .map_err(|error| InvalidBackend::Duration { - field: "backend request timeout", - error, - })?; - - Ok(RouteBackend { - filters, - backend, - request_timeout, - }) + + Ok(RouteBackend { filters, backend }) } } @@ -723,34 +699,12 @@ pub mod proto { failure_accrual::Kind::ConsecutiveFailures(ConsecutiveFailures { max_failures, backoff, - }) => { - // TODO(eliza): if other failure accrual kinds are added - // that also use exponential backoffs, this could be factored out... - let outbound::ExponentialBackoff { - min_backoff, - max_backoff, - jitter_ratio, - } = backoff.ok_or(InvalidFailureAccrual::Missing( - "consecutive failures backoff", - ))?; - - let duration = |dur: Option, field: &'static str| { - dur.ok_or(InvalidFailureAccrual::Missing(field))? - .try_into() - .map_err(|error| InvalidFailureAccrual::Duration { field, error }) - }; - let min = duration(min_backoff, "min_backoff")?; - let max = duration(max_backoff, "max_backoff")?; - let backoff = linkerd_exp_backoff::ExponentialBackoff::try_new( - min, - max, - jitter_ratio as f64, - )?; - Ok(FailureAccrual::ConsecutiveFailures { - max_failures: max_failures as usize, - backoff, - }) - } + }) => Ok(FailureAccrual::ConsecutiveFailures { + max_failures: max_failures as usize, + backoff: backoff.map(try_backoff).transpose()?.ok_or( + InvalidFailureAccrual::Missing("consecutive failures backoff"), + )?, + }), } } } @@ -763,4 +717,23 @@ pub mod proto { .unwrap_or(Ok(FailureAccrual::None)) } } + + pub(crate) fn try_backoff( + outbound::ExponentialBackoff { + min_backoff, + max_backoff, + jitter_ratio, + }: outbound::ExponentialBackoff, + ) -> Result { + let min = min_backoff + .map(time::Duration::try_from) + .transpose()? + .ok_or(InvalidBackoff::Missing("min_backoff"))?; + let max = max_backoff + .map(time::Duration::try_from) + .transpose()? + .ok_or(InvalidBackoff::Missing("max_backoff"))?; + linkerd_exp_backoff::ExponentialBackoff::try_new(min, max, jitter_ratio as f64) + .map_err(Into::into) + } } diff --git a/linkerd/proxy/client-policy/src/opaq.rs b/linkerd/proxy/client-policy/src/opaq.rs index 863067eecd..4dd7f28303 100644 --- a/linkerd/proxy/client-policy/src/opaq.rs +++ b/linkerd/proxy/client-policy/src/opaq.rs @@ -5,7 +5,7 @@ pub struct Opaque { pub policy: Option, } -pub type Policy = RoutePolicy; +pub type Policy = RoutePolicy; #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] pub struct NonIoErrors; @@ -125,10 +125,8 @@ pub(crate) mod proto { Ok(Policy { meta: meta.clone(), filters: NO_FILTERS.clone(), - failure_policy: NonIoErrors, + params: (), distribution, - // Request timeouts are ignored on opaque routes. - request_timeout: None, }) } @@ -180,7 +178,7 @@ pub(crate) mod proto { opaque_route::RouteBackend { backend }: opaque_route::RouteBackend, ) -> Result { let backend = backend.ok_or(InvalidBackend::Missing("backend"))?; - RouteBackend::try_from_proto(backend, std::iter::empty::<()>(), None) + RouteBackend::try_from_proto(backend, std::iter::empty::<()>()) } } diff --git a/linkerd/proxy/http/Cargo.toml b/linkerd/proxy/http/Cargo.toml index ea24295365..5c05b87b90 100644 --- a/linkerd/proxy/http/Cargo.toml +++ b/linkerd/proxy/http/Cargo.toml @@ -29,6 +29,7 @@ hyper = { version = "0.14", features = [ "runtime", ] } hyper-balance = { path = "../../../hyper-balance" } +parking_lot = "0.12" pin-project = "1" rand = "0.8" thiserror = "1" diff --git a/linkerd/proxy/http/src/lib.rs b/linkerd/proxy/http/src/lib.rs index eb5a928526..9a5b51df89 100644 --- a/linkerd/proxy/http/src/lib.rs +++ b/linkerd/proxy/http/src/lib.rs @@ -20,6 +20,7 @@ pub mod orig_proto; mod override_authority; mod retain; mod server; +pub mod stream_timeouts; pub mod strip_header; pub mod timeout; pub mod upgrade; @@ -39,12 +40,13 @@ pub use self::{ override_authority::{AuthorityOverride, NewOverrideAuthority}, retain::Retain, server::{NewServeHttp, Params as ServerParams, ServeHttp}, + stream_timeouts::{EnforceTimeouts, StreamTimeouts}, strip_header::StripHeader, timeout::{NewTimeout, ResponseTimeout, ResponseTimeoutError}, version::Version, }; pub use http::{ - header::{self, HeaderName, HeaderValue}, + header::{self, HeaderMap, HeaderName, HeaderValue}, uri, Method, Request, Response, StatusCode, }; pub use hyper::body::HttpBody; diff --git a/linkerd/proxy/http/src/stream_timeouts.rs b/linkerd/proxy/http/src/stream_timeouts.rs new file mode 100644 index 0000000000..8781c9fbd1 --- /dev/null +++ b/linkerd/proxy/http/src/stream_timeouts.rs @@ -0,0 +1,488 @@ +use futures::FutureExt; +use linkerd_error::{Error, Result}; +use linkerd_stack as svc; +use parking_lot::RwLock; +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use thiserror::Error; +use tokio::{sync::oneshot, time}; + +/// A request extension set on HTTP requests that expresses deadlines to be +/// enforced by the proxy. +#[derive(Clone, Debug, Default)] +pub struct StreamTimeouts { + /// The maximum amount of time between the body of the request being fully + /// flushed and the response headers being received. + pub response_headers: Option, + + /// The maximum amount of time between the body of the request being fully + /// flushed (or the response headers being received, if that occurs first) + /// and the response being fully received. + pub response_end: Option, + + /// The maximum amount of time the stream may be idle. + pub idle: Option, + + /// Limits the total time the stream may be active in the proxy. + pub limit: Option, +} + +#[derive(Clone, Copy, Debug)] +pub struct StreamLifetime { + /// The deadline for the stream. + pub deadline: time::Instant, + /// The maximum amount of time the stream may be active, used for error reporting. + pub lifetime: time::Duration, +} + +#[derive(Clone, Debug)] +pub struct EnforceTimeouts { + inner: S, +} + +#[derive(Clone, Copy, Debug, Error)] +#[error("timed out waiting for response headers: {0:?}")] +pub struct ResponseHeadersTimeoutError(time::Duration); + +#[derive(Clone, Copy, Debug, Error)] +#[error("timed out waiting for response stream: {0:?}")] +pub struct ResponseStreamTimeoutError(time::Duration); + +#[derive(Clone, Copy, Debug, Error)] +#[error("stream deadline met: {0:?}")] +pub struct StreamDeadlineError(time::Duration); + +#[derive(Clone, Copy, Debug, Error)] +#[error("stream timed out due to idleness: {0:?}")] +pub struct StreamIdleError(time::Duration); + +#[derive(Clone, Copy, Debug, Error)] +pub enum ResponseTimeoutError { + #[error(transparent)] + Headers(#[from] ResponseHeadersTimeoutError), + + #[error(transparent)] + Lifetime(#[from] StreamDeadlineError), +} + +#[derive(Clone, Copy, Debug, Error)] +pub enum BodyTimeoutError { + #[error(transparent)] + Response(#[from] ResponseStreamTimeoutError), + + #[error(transparent)] + Lifetime(#[from] StreamDeadlineError), + + #[error(transparent)] + Idle(#[from] StreamIdleError), +} + +#[derive(Debug)] +#[pin_project] +pub struct ResponseFuture { + #[pin] + inner: F, + + #[pin] + deadline: Option>, + + #[pin] + request_flushed: Option>, + request_flushed_at: Option, + + idle: Option<(IdleTimestamp, time::Duration)>, + + timeouts: StreamTimeouts, +} + +#[derive(Debug, Default)] +#[pin_project] +pub struct RequestBody { + #[pin] + inner: B, + + #[pin] + deadline: Option>, + idle: Option, + + request_flushed: Option>, +} + +#[derive(Debug, Default)] +#[pin_project] +pub struct ResponseBody { + #[pin] + inner: B, + + #[pin] + deadline: Option>, + idle: Option, + + timeouts: StreamTimeouts, +} + +#[derive(Debug)] +#[pin_project] +struct Deadline { + #[pin] + sleep: time::Sleep, + error: E, +} + +type IdleTimestamp = Arc>; + +#[derive(Debug)] +struct Idle { + sleep: Pin>, + timestamp: IdleTimestamp, + timeout: time::Duration, +} + +// === impl StreamLifetime === + +impl From for StreamLifetime { + fn from(lifetime: time::Duration) -> Self { + Self { + deadline: time::Instant::now() + lifetime, + lifetime, + } + } +} + +// === impl EnforceTimeouts === + +impl EnforceTimeouts { + pub fn layer() -> impl svc::layer::Layer + Clone { + svc::layer::mk(|inner| Self { inner }) + } +} + +impl svc::Service> for EnforceTimeouts +where + S: svc::Service>, Response = http::Response>, + S::Error: Into, +{ + type Response = http::Response>; + type Error = Error; + type Future = ResponseFuture; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + let timeouts = req + .extensions() + .get::() + .cloned() + .unwrap_or_default(); + tracing::trace!(?timeouts, "Enforcing timeouts on stream"); + + let (req_idle, rsp_idle) = if let Some(timeout) = timeouts.idle { + let last_update = Arc::new(RwLock::new(time::Instant::now())); + let req = Idle { + sleep: Box::pin(time::sleep(timeout)), + timestamp: last_update.clone(), + timeout, + }; + (Some(req), Some((last_update, timeout))) + } else { + (None, None) + }; + + let (tx, rx) = oneshot::channel(); + let inner = self.inner.call(req.map(move |inner| RequestBody { + inner, + request_flushed: Some(tx), + deadline: timeouts.limit.map(|l| Deadline { + sleep: time::sleep_until(l.deadline), + error: StreamDeadlineError(l.lifetime).into(), + }), + idle: req_idle, + })); + ResponseFuture { + inner, + deadline: timeouts.limit.map(|l| Deadline { + sleep: time::sleep_until(l.deadline), + error: StreamDeadlineError(l.lifetime).into(), + }), + request_flushed: Some(rx), + request_flushed_at: None, + timeouts, + idle: rsp_idle, + } + } +} + +// === impl ResponseFuture === + +impl Future for ResponseFuture +where + F: Future, E>>, + E: Into, +{ + type Output = Result>>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + // Mark the time at which the request body was fully flushed and adjust + // the response deadline as necessary. + if let Some(flushed) = this.request_flushed.as_mut().as_pin_mut() { + if let Poll::Ready(res) = flushed.poll(cx) { + tracing::trace!("Request body fully flushed"); + let start = res.unwrap_or_else(|_| time::Instant::now()); + *this.request_flushed = None; + *this.request_flushed_at = Some(start); + + if let Some(timeout) = this.timeouts.response_headers { + let headers_by = start + timeout; + if let Some(deadline) = this.deadline.as_mut().as_pin_mut() { + if headers_by < deadline.sleep.deadline() { + tracing::trace!(?timeout, "Updating response headers deadline"); + let dl = deadline.project(); + *dl.error = ResponseHeadersTimeoutError(timeout).into(); + dl.sleep.reset(headers_by); + } else { + tracing::trace!("Using original stream deadline"); + } + } else { + tracing::trace!(?timeout, "Setting response headers deadline"); + this.deadline.set(Some(Deadline { + sleep: time::sleep_until(headers_by), + error: ResponseHeadersTimeoutError(timeout).into(), + })); + } + } + } + } + + // Poll for the response headers. + let rsp = match this.inner.poll(cx) { + Poll::Ready(Ok(rsp)) => rsp, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())), + Poll::Pending => { + // If the response headers are not ready, check the deadline and + // return an error if it is exceeded. + if let Some(deadline) = this.deadline.as_pin_mut() { + let dl = deadline.project(); + if dl.sleep.poll(cx).is_ready() { + // TODO telemetry + return Poll::Ready(Err((*dl.error).into())); + } + } + return Poll::Pending; + } + }; + // We've received response headers, so we prepare the response body to + // timeout. + + // Share the idle state across request and response bodies. Update the + // state to reflect that we've accepted headers. + let idle = this.idle.take().map(|(timestamp, timeout)| { + let now = time::Instant::now(); + *timestamp.write() = now; + Idle { + timestamp, + timeout, + sleep: Box::pin(time::sleep_until(now + timeout)), + } + }); + + // We use the more restrictive of the response-end timeout (as + // measured since the request body was fully flushed) and the stream + // lifetime limit. + let start = this.request_flushed_at.unwrap_or_else(time::Instant::now); + let timeout = match (this.timeouts.response_end, this.timeouts.limit) { + (Some(eos), Some(lim)) if start + eos < lim.deadline => { + Some((start + eos, ResponseStreamTimeoutError(eos).into())) + } + (Some(_), Some(lim)) => Some((lim.deadline, StreamDeadlineError(lim.lifetime).into())), + (Some(eos), None) => Some((start + eos, ResponseStreamTimeoutError(eos).into())), + (None, Some(lim)) => Some((lim.deadline, StreamDeadlineError(lim.lifetime).into())), + (None, None) => None, + }; + + Poll::Ready(Ok(rsp.map(move |inner| ResponseBody { + inner, + deadline: timeout.map(|(t, error)| Deadline { + sleep: time::sleep_until(t), + error, + }), + idle, + timeouts: this.timeouts.clone(), + }))) + } +} + +// === impl RequestBody === + +impl crate::HttpBody for RequestBody +where + B: crate::HttpBody, +{ + type Data = B::Data; + type Error = Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let this = self.project(); + + if let Poll::Ready(res) = this.inner.poll_data(cx) { + if let Some(idle) = this.idle { + idle.reset(time::Instant::now()); + } + return Poll::Ready(res); + } + + if let Poll::Ready(e) = poll_body_timeout(this.deadline, this.idle, cx) { + // TODO telemetry + return Poll::Ready(Some(Err(Error::from(e)))); + } + + Poll::Pending + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + let this = self.project(); + + if let Poll::Ready(res) = this.inner.poll_trailers(cx) { + let now = time::Instant::now(); + if let Some(idle) = this.idle { + idle.reset(now); + } + if let Some(tx) = this.request_flushed.take() { + let _ = tx.send(now); + } + return Poll::Ready(res); + } + + if let Poll::Ready(e) = poll_body_timeout(this.deadline, this.idle, cx) { + // TODO telemetry + return Poll::Ready(Err(Error::from(e))); + } + + Poll::Pending + } + + fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } +} + +// === impl ResponseBody === + +impl crate::HttpBody for ResponseBody +where + B: crate::HttpBody, +{ + type Data = B::Data; + type Error = Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let this = self.project(); + + if let Poll::Ready(res) = this.inner.poll_data(cx) { + if let Some(idle) = this.idle { + idle.reset(time::Instant::now()); + } + return Poll::Ready(res); + } + + if let Poll::Ready(e) = poll_body_timeout(this.deadline, this.idle, cx) { + // TODO telemetry + return Poll::Ready(Some(Err(Error::from(e)))); + } + + Poll::Pending + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + let this = self.project(); + + if let Poll::Ready(res) = this.inner.poll_trailers(cx) { + if let Some(idle) = this.idle { + idle.reset(time::Instant::now()); + }; + return Poll::Ready(res); + } + + if let Poll::Ready(e) = poll_body_timeout(this.deadline, this.idle, cx) { + // TODO telemetry + return Poll::Ready(Err(Error::from(e))); + } + + Poll::Pending + } + + fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } +} + +fn poll_body_timeout( + mut deadline: Pin<&mut Option>>, + idle: &mut Option, + cx: &mut Context<'_>, +) -> Poll { + if let Some(dl) = deadline.as_mut().as_pin_mut() { + let d = dl.project(); + if d.sleep.poll(cx).is_ready() { + let error = *d.error; + deadline.set(None); // Prevent polling again. + return Poll::Ready(error); + } + } + + if let Some(idle) = idle { + if let Poll::Ready(e) = idle.poll_idle(cx) { + return Poll::Ready(e.into()); + } + } + + Poll::Pending +} + +// === impl Idle === + +impl Idle { + fn reset(&mut self, now: time::Instant) { + self.sleep.as_mut().reset(now + self.timeout); + *self.timestamp.write() = now; + } + + fn poll_idle(&mut self, cx: &mut Context<'_>) -> Poll { + loop { + if self.sleep.poll_unpin(cx).is_pending() { + return Poll::Pending; + } + + // If the idle timeout has expired, we first need to ensure that the + // other half of the stream hasn't updated the timestamp. If it has, + // reset the timer with the expected idle timeout. + let now = time::Instant::now(); + let expiry = *self.timestamp.read() + self.timeout; + if expiry <= now { + return Poll::Ready(StreamIdleError(self.timeout)); + } + self.sleep.as_mut().reset(expiry); + } + } +} diff --git a/linkerd/proxy/http/src/timeout.rs b/linkerd/proxy/http/src/timeout.rs index bc958029e4..25b10a4af6 100644 --- a/linkerd/proxy/http/src/timeout.rs +++ b/linkerd/proxy/http/src/timeout.rs @@ -3,7 +3,7 @@ use linkerd_stack::{layer, ExtractParam, MapErr, NewService, Timeout, TimeoutErr use std::time::Duration; use thiserror::Error; -/// An HTTP-specific optional timeout layer. +/// DEPRECATED: An HTTP-specific optional timeout layer. /// /// The stack target must implement `HasTimeout`, and if a duration is /// specified for the target, a timeout is applied waiting for HTTP responses.