From f840824ee2ac6c07e31ee59ee323e158fd6e92be Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 25 Jul 2024 15:25:15 +0000 Subject: [PATCH] feat(outbound): Control retry & timeout policies with headers Recent changes have introduced the ability to control retry and timeout via the control plane. This PR extends this functionality with a set of headers that can be used to augment/override these policies on a per-request basis. This feature must be enabled by the control plane (via OutboundPolicy API response). When enabled, the following headers can be used to control retry and timeouts: - `l5d-timeout` / `l5d-request-timeout`: Controls the overall request-response stream timeout. - `l5d-response-timeout`: Controls the timeout for the response stream. - `l5d-retry-timeout`: Controls the timeout for each retry attempt (triggering another retry if exceeded). - `l5d-retry-limit`: Controls the maximum number of retries. - `l5d-retry-http`: Controls the HTTP status codes that trigger a retry. - `l5d-retry-grpc`: Controls the gRPC status codes that trigger a retry. --- .../outbound/src/http/logical/policy/route.rs | 5 +- .../http/logical/policy/route/extensions.rs | 174 +++++++++++- .../src/http/logical/policy/route/retry.rs | 7 +- .../app/outbound/src/http/logical/tests.rs | 6 +- .../src/http/logical/tests/headers.rs | 254 ++++++++++++++++++ .../src/http/logical/tests/retries.rs | 7 + linkerd/proxy/client-policy/src/grpc.rs | 4 +- linkerd/proxy/client-policy/src/http.rs | 4 +- 8 files changed, 442 insertions(+), 19 deletions(-) create mode 100644 linkerd/app/outbound/src/http/logical/tests/headers.rs diff --git a/linkerd/app/outbound/src/http/logical/policy/route.rs b/linkerd/app/outbound/src/http/logical/policy/route.rs index beeeb5a028..46d2c698c3 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route.rs @@ -123,7 +123,8 @@ where .push_on_service(svc::LoadShed::layer()) .push(filters::NewApplyFilters::::layer()) .push(retry::NewHttpRetry::layer(metrics.retry.clone())) - // Set request extensions based on the route configuration. + // Set request extensions based on the route configuration + // AND/OR headers .push(extensions::NewSetExtensions::layer()) .push(metrics::layer(&metrics.requests)) // Configure a classifier to use in the endpoint stack. @@ -198,6 +199,7 @@ impl svc::Param for Http { retryable_http_statuses: Some(r.status_ranges), retryable_grpc_statuses: None, }), + allow_l5d_request_headers: self.params.params.allow_l5d_request_headers, } } } @@ -251,6 +253,7 @@ impl svc::Param for Grpc { retryable_http_statuses: None, retryable_grpc_statuses: Some(r.codes), }), + allow_l5d_request_headers: self.params.params.allow_l5d_request_headers, } } } diff --git a/linkerd/app/outbound/src/http/logical/policy/route/extensions.rs b/linkerd/app/outbound/src/http/logical/policy/route/extensions.rs index 50e0aecadd..59ca528a4f 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/extensions.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/extensions.rs @@ -1,12 +1,14 @@ use super::retry::RetryPolicy; -use linkerd_app_core::{proxy::http, svc}; +use linkerd_app_core::{config::ExponentialBackoff, proxy::http, svc}; use linkerd_proxy_client_policy as policy; use std::task::{Context, Poll}; +use tokio::time; #[derive(Clone, Debug)] pub struct Params { pub retry: Option, pub timeouts: policy::http::Timeouts, + pub allow_l5d_request_headers: bool, } // A request extension that marks the number of times a request has been @@ -63,11 +65,11 @@ where } fn call(&mut self, mut req: http::Request) -> Self::Future { - let retry = self.configure_retry(); + let retry = self.configure_retry(req.headers_mut()); // Ensure that we get response headers within the retry timeout. Note // that this may be cleared super::retry::RetryPolicy::set_extensions. - let mut timeouts = self.configure_timeouts(); + let mut timeouts = self.configure_timeouts(req.headers_mut()); timeouts.response_headers = retry.as_ref().and_then(|r| r.timeout); tracing::debug!(?retry, ?timeouts, "Initializing route extensions"); @@ -90,16 +92,174 @@ where } impl SetExtensions { - fn configure_retry(&self) -> Option { - self.params.retry.clone() + fn configure_retry(&self, req: &mut http::HeaderMap) -> Option { + if !self.params.allow_l5d_request_headers { + return self.params.retry.clone(); + } + + let user_retry_http = req + .remove("l5d-retry-http") + .and_then(|val| val.to_str().ok().and_then(parse_http_conditions)); + let user_retry_grpc = req + .remove("l5d-retry-grpc") + .and_then(|val| val.to_str().ok().and_then(parse_grpc_conditions)); + let user_retry_limit = req + .remove("l5d-retry-limit") + .and_then(|val| val.to_str().ok().and_then(|v| v.parse::().ok())); + let user_retry_timeout = req.remove("l5d-retry-timeout").and_then(parse_duration); + + if let Some(retry) = self.params.retry.clone() { + return Some(RetryPolicy { + timeout: user_retry_timeout.or(retry.timeout), + retryable_http_statuses: user_retry_http.or(retry.retryable_http_statuses.clone()), + retryable_grpc_statuses: user_retry_grpc.or(retry.retryable_grpc_statuses.clone()), + max_retries: user_retry_limit.unwrap_or(retry.max_retries), + ..retry + }); + } + + match ( + user_retry_http, + user_retry_grpc, + user_retry_limit, + user_retry_timeout, + ) { + (None, None, None, None) => None, + (retryable_http_statuses, retryable_grpc_statuses, retry_limit, timeout) => { + Some(RetryPolicy { + timeout, + retryable_http_statuses, + retryable_grpc_statuses, + max_retries: retry_limit.unwrap_or(1), + max_request_bytes: 64 * 1024, + backoff: Some(ExponentialBackoff::new_unchecked( + std::time::Duration::from_millis(25), + std::time::Duration::from_millis(250), + 1.0, + )), + }) + } + } } - fn configure_timeouts(&self) -> http::StreamTimeouts { - http::StreamTimeouts { + fn configure_timeouts(&self, req: &mut http::HeaderMap) -> http::StreamTimeouts { + let mut timeouts = http::StreamTimeouts { response_headers: None, response_end: self.params.timeouts.response, idle: self.params.timeouts.idle, limit: self.params.timeouts.request.map(Into::into), + }; + + if !self.params.allow_l5d_request_headers { + return timeouts; + } + + // Accept both a shorthand and longer, more explicit version, the + // latter taking precedence. + if let Some(t) = req.remove("l5d-timeout").and_then(parse_duration) { + timeouts.limit = Some(t.into()); + } + if let Some(t) = req.remove("l5d-request-timeout").and_then(parse_duration) { + timeouts.limit = Some(t.into()); + } + + if let Some(t) = req.remove("l5d-response-timeout").and_then(parse_duration) { + timeouts.response_end = Some(t); + } + + timeouts + } +} + +fn parse_http_conditions(s: &str) -> Option { + fn to_code(s: &str) -> Option { + let code = s.parse::().ok()?; + if (100..600).contains(&code) { + Some(code) + } else { + None } } + + Some(policy::http::StatusRanges( + s.split(',') + .filter_map(|cond| { + if cond.eq_ignore_ascii_case("5xx") { + return Some(500..=599); + } + if cond.eq_ignore_ascii_case("gateway-error") { + return Some(502..=504); + } + + if let Some(code) = to_code(cond) { + return Some(code..=code); + } + if let Some((start, end)) = cond.split_once('-') { + if let (Some(s), Some(e)) = (to_code(start), to_code(end)) { + if s <= e { + return Some(s..=e); + } + } + } + + None + }) + .collect(), + )) +} + +fn parse_grpc_conditions(s: &str) -> Option { + Some(policy::grpc::Codes(std::sync::Arc::new( + s.split(',') + .filter_map(|cond| { + if cond.eq_ignore_ascii_case("cancelled") { + return Some(tonic::Code::Cancelled as u16); + } + if cond.eq_ignore_ascii_case("deadline-exceeded") { + return Some(tonic::Code::DeadlineExceeded as u16); + } + if cond.eq_ignore_ascii_case("internal") { + return Some(tonic::Code::Internal as u16); + } + if cond.eq_ignore_ascii_case("resource-exhausted") { + return Some(tonic::Code::ResourceExhausted as u16); + } + if cond.eq_ignore_ascii_case("unavailable") { + return Some(tonic::Code::Unavailable as u16); + } + None + }) + .collect(), + ))) +} + +// Copied from the policy controller so that we handle the same duration values +// as we do in the YAML config. +fn parse_duration(hv: http::HeaderValue) -> Option { + #[inline] + fn parse(s: &str) -> Option { + let s = s.trim(); + let offset = s.rfind(|c: char| c.is_ascii_digit())?; + let (magnitude, unit) = s.split_at(offset + 1); + let magnitude = magnitude.parse::().ok()?; + + let mul = match unit { + "" if magnitude == 0 => 0, + "ms" => 1, + "s" => 1000, + "m" => 1000 * 60, + "h" => 1000 * 60 * 60, + "d" => 1000 * 60 * 60 * 24, + _ => return None, + }; + + let ms = magnitude.checked_mul(mul)?; + Some(time::Duration::from_millis(ms)) + } + let s = hv.to_str().ok()?; + let Some(d) = parse(s) else { + tracing::debug!("Invalid duration: {:?}", s); + return None; + }; + Some(d) } diff --git a/linkerd/app/outbound/src/http/logical/policy/route/retry.rs b/linkerd/app/outbound/src/http/logical/policy/route/retry.rs index e47c89bd23..76fe65b4c2 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/retry.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/retry.rs @@ -85,16 +85,13 @@ impl retry::Policy for RetryPolicy { }; dst.insert(extensions::Attempt(attempt)); - let retries_remain = u16::from(attempt) as usize <= self.max_retries; - if retries_remain { - dst.insert(self.clone()); - } - if let Some(mut timeouts) = src.get::().cloned() { // If retries are exhausted, remove the response headers timeout, // since we're not blocked on making a decision on a retry decision. // This may give the request additional time to be satisfied. + let retries_remain = u16::from(attempt) as usize <= self.max_retries; if !retries_remain { + tracing::debug!("Exhausted retries; removing response headers timeout"); timeouts.response_headers = None; } dst.insert(timeouts); diff --git a/linkerd/app/outbound/src/http/logical/tests.rs b/linkerd/app/outbound/src/http/logical/tests.rs index 6006b9c687..dcace6c2b7 100644 --- a/linkerd/app/outbound/src/http/logical/tests.rs +++ b/linkerd/app/outbound/src/http/logical/tests.rs @@ -14,6 +14,7 @@ use tracing::Instrument; mod basic; mod failure_accrual; +mod headers; mod retries; mod timeouts; @@ -129,10 +130,7 @@ async fn mk_rsp(status: StatusCode, body: impl ToString) -> Result { async fn mk_grpc_rsp(code: tonic::Code) -> Result { Ok(http::Response::builder() .version(::http::Version::HTTP_2) - .header( - "content-type", - http::HeaderValue::from_static("application/grpc"), - ) + .header("content-type", "application/grpc") .body(BoxBody::new(MockBody::trailers(async move { let mut trls = http::HeaderMap::default(); trls.insert("grpc-status", (code as u8).to_string().parse().unwrap()); diff --git a/linkerd/app/outbound/src/http/logical/tests/headers.rs b/linkerd/app/outbound/src/http/logical/tests/headers.rs new file mode 100644 index 0000000000..c2cd136065 --- /dev/null +++ b/linkerd/app/outbound/src/http/logical/tests/headers.rs @@ -0,0 +1,254 @@ +use super::*; +use linkerd_app_core::{proxy::http::StatusCode, trace}; +use linkerd_proxy_client_policy::http::RouteParams as HttpParams; +use tokio::time; +use tracing::{info, Instrument}; + +// === HTTP === + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn http() { + let _trace = trace::test::trace_init(); + + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(HttpParams { + allow_l5d_request_headers: true, + ..Default::default() + }); + + info!("Sending a request that will initially fail and then succeed"); + tokio::spawn( + async move { + handle.allow(3); + serve(&mut handle, async move { + info!("Failing the first request"); + mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "").await + }) + .await; + serve(&mut handle, async move { + info!("Delaying the second request"); + time::sleep(TIMEOUT * 2).await; + mk_rsp(StatusCode::IM_A_TEAPOT, "").await + }) + .await; + serve(&mut handle, async move { + info!("Serving the third request"); + mk_rsp(StatusCode::NO_CONTENT, "").await + }) + .await; + handle + } + .in_current_span(), + ); + + info!("Verifying that we see the successful response"); + let rsp = time::timeout( + TIMEOUT * 4, + send_req( + svc.clone(), + http::Request::get("/") + .header("l5d-retry-limit", "2") + .header("l5d-retry-http", "5xx") + .header("l5d-retry-timeout", "100ms") + .body(Default::default()) + .unwrap(), + ), + ) + .await + .expect("response"); + assert_eq!(rsp.expect("response").status(), StatusCode::NO_CONTENT); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn http_requires_allow() { + let _trace = trace::test::trace_init(); + + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(HttpParams { + allow_l5d_request_headers: false, + ..Default::default() + }); + + tokio::spawn( + async move { + handle.allow(2); + serve(&mut handle, mk_rsp(StatusCode::INTERNAL_SERVER_ERROR, "")).await; + serve(&mut handle, mk_rsp(StatusCode::NO_CONTENT, "")).await; + handle + } + .in_current_span(), + ); + + info!("Sending a request that will initially fail and then succeed"); + let rsp = time::timeout( + TIMEOUT, + send_req( + svc.clone(), + http::Request::get("/") + .header("l5d-retry-limit", "1") + .header("l5d-retry-http", "5xx") + .body(Default::default()) + .unwrap(), + ), + ) + .await + .expect("response"); + + info!("Verifying that we see the successful response"); + assert_eq!( + rsp.expect("response").status(), + StatusCode::INTERNAL_SERVER_ERROR + ); +} + +// === gRPC === + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn grpc() { + let _trace = trace::test::trace_init(); + + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(HttpParams { + allow_l5d_request_headers: true, + ..Default::default() + }); + + info!("Sending a request that will initially fail and then succeed"); + tokio::spawn( + async move { + handle.allow(3); + serve(&mut handle, async move { + info!("Failing the first request"); + mk_grpc_rsp(tonic::Code::Unavailable).await + }) + .await; + serve(&mut handle, async move { + info!("Delaying the second request"); + time::sleep(TIMEOUT).await; + mk_grpc_rsp(tonic::Code::NotFound).await + }) + .await; + serve(&mut handle, async move { + info!("Serving the third request"); + mk_grpc_rsp(tonic::Code::Ok).await + }) + .await; + handle + } + .in_current_span(), + ); + + info!("Verifying that we see the successful response"); + let mut rsp = time::timeout( + TIMEOUT * 4, + send_req( + svc.clone(), + http::Request::get("/") + .version(::http::Version::HTTP_2) + .header("content-type", "application/grpc") + .header("l5d-retry-limit", "2") + .header("l5d-retry-grpc", "unavailable") + .header("l5d-retry-timeout", "100ms") + .body(Default::default()) + .unwrap(), + ), + ) + .await + .expect("response") + .expect("response"); + assert_eq!(rsp.status(), StatusCode::OK); + while let Some(res) = rsp.body_mut().data().await { + res.expect("data"); + } + let trailers = rsp + .body_mut() + .trailers() + .await + .expect("trailers") + .expect("trailers"); + assert_eq!( + trailers + .get("grpc-status") + .expect("grpc-status") + .to_str() + .unwrap() + .parse::() + .unwrap(), + tonic::Code::Ok as u8 + ); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn grpc_requires_allow() { + let _trace = trace::test::trace_init(); + + const TIMEOUT: time::Duration = time::Duration::from_secs(2); + let (svc, mut handle) = mock_http(HttpParams { + allow_l5d_request_headers: false, + ..Default::default() + }); + + info!("Sending a request that will initially fail and then succeed"); + tokio::spawn( + async move { + handle.allow(3); + serve(&mut handle, async move { + info!("Failing the first request"); + mk_grpc_rsp(tonic::Code::Unavailable).await + }) + .await; + serve(&mut handle, async move { + info!("Delaying the second request"); + time::sleep(TIMEOUT).await; + mk_grpc_rsp(tonic::Code::NotFound).await + }) + .await; + serve(&mut handle, async move { + info!("Serving the third request"); + mk_grpc_rsp(tonic::Code::Ok).await + }) + .await; + handle + } + .in_current_span(), + ); + + info!("Verifying that we see the successful response"); + let mut rsp = time::timeout( + TIMEOUT * 4, + send_req( + svc.clone(), + http::Request::get("/") + .version(::http::Version::HTTP_2) + .header("content-type", "application/grpc") + .header("l5d-retry-limit", "2") + .header("l5d-retry-grpc", "unavailable") + .header("l5d-retry-timeout", "100ms") + .body(Default::default()) + .unwrap(), + ), + ) + .await + .expect("response") + .expect("response"); + assert_eq!(rsp.status(), StatusCode::OK); + while let Some(res) = rsp.body_mut().data().await { + res.expect("data"); + } + let trailers = rsp + .body_mut() + .trailers() + .await + .expect("trailers") + .expect("trailers"); + assert_eq!( + trailers + .get("grpc-status") + .expect("grpc-status") + .to_str() + .unwrap() + .parse::() + .unwrap(), + tonic::Code::Unavailable as u8 + ); +} diff --git a/linkerd/app/outbound/src/http/logical/tests/retries.rs b/linkerd/app/outbound/src/http/logical/tests/retries.rs index 42c284a592..854ca9c836 100644 --- a/linkerd/app/outbound/src/http/logical/tests/retries.rs +++ b/linkerd/app/outbound/src/http/logical/tests/retries.rs @@ -33,6 +33,7 @@ async fn http_5xx() { timeout: None, backoff: None, }), + ..Default::default() }); tokio::spawn( @@ -70,6 +71,7 @@ async fn http_5xx_limited() { timeout: None, backoff: None, }), + ..Default::default() }); info!("Sending a request that will initially fail and then succeed"); @@ -125,6 +127,7 @@ async fn http_timeout() { timeout: Some(TIMEOUT / 4), backoff: None, }), + ..Default::default() }); info!("Sending a request that will initially timeout and then succeed"); @@ -174,6 +177,7 @@ async fn http_timeout_on_limit() { timeout: Some(TIMEOUT / 4), backoff: None, }), + ..Default::default() }); tokio::spawn( @@ -225,6 +229,7 @@ async fn http_timeout_with_request_timeout() { timeout: Some(TIMEOUT), backoff: None, }), + ..Default::default() }); info!("Sending a request that will initially timeout and then succeed"); @@ -319,6 +324,7 @@ async fn grpc_internal() { timeout: None, backoff: None, }), + ..Default::default() }); info!("Sending a request that will initially fail and then succeed"); @@ -379,6 +385,7 @@ async fn grpc_timeout() { max_request_bytes: 1000, backoff: None, }), + ..Default::default() }); info!("Sending a request that will initially fail and then succeed"); diff --git a/linkerd/proxy/client-policy/src/grpc.rs b/linkerd/proxy/client-policy/src/grpc.rs index 86d5e4fd99..b0055c7236 100644 --- a/linkerd/proxy/client-policy/src/grpc.rs +++ b/linkerd/proxy/client-policy/src/grpc.rs @@ -13,6 +13,7 @@ pub type Rule = grpc::Rule; pub struct RouteParams { pub timeouts: crate::http::Timeouts, pub retry: Option, + pub allow_l5d_request_headers: bool, } // TODO HTTP2 settings @@ -279,7 +280,7 @@ pub mod proto { fn try_from_proto( timeouts: Option, retry: Option, - _allow_l5d_request_headers: bool, + allow_l5d_request_headers: bool, ) -> Result { Ok(Self { retry: retry.map(Retry::try_from).transpose()?, @@ -287,6 +288,7 @@ pub mod proto { .map(crate::http::Timeouts::try_from) .transpose()? .unwrap_or_default(), + allow_l5d_request_headers, }) } } diff --git a/linkerd/proxy/client-policy/src/http.rs b/linkerd/proxy/client-policy/src/http.rs index 3f18e864c6..d9adc612af 100644 --- a/linkerd/proxy/client-policy/src/http.rs +++ b/linkerd/proxy/client-policy/src/http.rs @@ -13,6 +13,7 @@ pub type Rule = http::Rule; pub struct RouteParams { pub timeouts: Timeouts, pub retry: Option, + pub allow_l5d_request_headers: bool, } // TODO: keepalive settings, etc. @@ -317,7 +318,7 @@ pub mod proto { fn try_from_proto( timeouts: Option, retry: Option, - _allow_l5d_request_headers: bool, + allow_l5d_request_headers: bool, ) -> Result { Ok(Self { retry: retry.map(Retry::try_from).transpose()?, @@ -325,6 +326,7 @@ pub mod proto { .map(Timeouts::try_from) .transpose()? .unwrap_or_default(), + allow_l5d_request_headers, }) } }