Skip to content

Commit

Permalink
feat(outbound): Control retry & timeout policies with headers
Browse files Browse the repository at this point in the history
Recent changes have introduced the ability to control retry and timeout via the
control plane. This PR extends this functionality with a set of headers that can
be used to augment/override these policies on a per-request basis.

This feature must be enabled by the control plane (via OutboundPolicy API
response). When enabled, the following headers can be used to control retry and
timeouts:

- `l5d-timeout` / `l5d-request-timeout`: Controls the overall request-response
  stream timeout.
- `l5d-response-timeout`: Controls the timeout for the response stream.
- `l5d-retry-timeout`: Controls the timeout for each retry attempt (triggering
  another retry if exceeded).
- `l5d-retry-limit`: Controls the maximum number of retries.
- `l5d-retry-http`: Controls the HTTP status codes that trigger a retry.
- `l5d-retry-grpc`: Controls the gRPC status codes that trigger a retry.
  • Loading branch information
olix0r committed Jul 26, 2024
1 parent ab4a702 commit f840824
Show file tree
Hide file tree
Showing 8 changed files with 442 additions and 19 deletions.
5 changes: 4 additions & 1 deletion linkerd/app/outbound/src/http/logical/policy/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ where
.push_on_service(svc::LoadShed::layer())
.push(filters::NewApplyFilters::<Self, _, _>::layer())
.push(retry::NewHttpRetry::layer(metrics.retry.clone()))
// Set request extensions based on the route configuration.
// Set request extensions based on the route configuration
// AND/OR headers
.push(extensions::NewSetExtensions::layer())
.push(metrics::layer(&metrics.requests))
// Configure a classifier to use in the endpoint stack.
Expand Down Expand Up @@ -198,6 +199,7 @@ impl<T> svc::Param<extensions::Params> for Http<T> {
retryable_http_statuses: Some(r.status_ranges),
retryable_grpc_statuses: None,
}),
allow_l5d_request_headers: self.params.params.allow_l5d_request_headers,
}
}
}
Expand Down Expand Up @@ -251,6 +253,7 @@ impl<T> svc::Param<extensions::Params> for Grpc<T> {
retryable_http_statuses: None,
retryable_grpc_statuses: Some(r.codes),
}),
allow_l5d_request_headers: self.params.params.allow_l5d_request_headers,
}
}
}
Expand Down
174 changes: 167 additions & 7 deletions linkerd/app/outbound/src/http/logical/policy/route/extensions.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use super::retry::RetryPolicy;
use linkerd_app_core::{proxy::http, svc};
use linkerd_app_core::{config::ExponentialBackoff, proxy::http, svc};
use linkerd_proxy_client_policy as policy;
use std::task::{Context, Poll};
use tokio::time;

#[derive(Clone, Debug)]
pub struct Params {
pub retry: Option<RetryPolicy>,
pub timeouts: policy::http::Timeouts,
pub allow_l5d_request_headers: bool,
}

// A request extension that marks the number of times a request has been
Expand Down Expand Up @@ -63,11 +65,11 @@ where
}

fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
let retry = self.configure_retry();
let retry = self.configure_retry(req.headers_mut());

// Ensure that we get response headers within the retry timeout. Note
// that this may be cleared super::retry::RetryPolicy::set_extensions.
let mut timeouts = self.configure_timeouts();
let mut timeouts = self.configure_timeouts(req.headers_mut());
timeouts.response_headers = retry.as_ref().and_then(|r| r.timeout);

tracing::debug!(?retry, ?timeouts, "Initializing route extensions");
Expand All @@ -90,16 +92,174 @@ where
}

impl<S> SetExtensions<S> {
fn configure_retry(&self) -> Option<RetryPolicy> {
self.params.retry.clone()
fn configure_retry(&self, req: &mut http::HeaderMap) -> Option<RetryPolicy> {
if !self.params.allow_l5d_request_headers {
return self.params.retry.clone();
}

let user_retry_http = req
.remove("l5d-retry-http")
.and_then(|val| val.to_str().ok().and_then(parse_http_conditions));
let user_retry_grpc = req
.remove("l5d-retry-grpc")
.and_then(|val| val.to_str().ok().and_then(parse_grpc_conditions));
let user_retry_limit = req
.remove("l5d-retry-limit")
.and_then(|val| val.to_str().ok().and_then(|v| v.parse::<usize>().ok()));
let user_retry_timeout = req.remove("l5d-retry-timeout").and_then(parse_duration);

if let Some(retry) = self.params.retry.clone() {
return Some(RetryPolicy {
timeout: user_retry_timeout.or(retry.timeout),
retryable_http_statuses: user_retry_http.or(retry.retryable_http_statuses.clone()),
retryable_grpc_statuses: user_retry_grpc.or(retry.retryable_grpc_statuses.clone()),
max_retries: user_retry_limit.unwrap_or(retry.max_retries),
..retry
});
}

match (
user_retry_http,
user_retry_grpc,
user_retry_limit,
user_retry_timeout,
) {
(None, None, None, None) => None,
(retryable_http_statuses, retryable_grpc_statuses, retry_limit, timeout) => {
Some(RetryPolicy {
timeout,
retryable_http_statuses,
retryable_grpc_statuses,
max_retries: retry_limit.unwrap_or(1),
max_request_bytes: 64 * 1024,
backoff: Some(ExponentialBackoff::new_unchecked(
std::time::Duration::from_millis(25),
std::time::Duration::from_millis(250),
1.0,
)),
})
}
}
}

fn configure_timeouts(&self) -> http::StreamTimeouts {
http::StreamTimeouts {
fn configure_timeouts(&self, req: &mut http::HeaderMap) -> http::StreamTimeouts {
let mut timeouts = http::StreamTimeouts {
response_headers: None,
response_end: self.params.timeouts.response,
idle: self.params.timeouts.idle,
limit: self.params.timeouts.request.map(Into::into),
};

if !self.params.allow_l5d_request_headers {
return timeouts;
}

// Accept both a shorthand and longer, more explicit version, the
// latter taking precedence.
if let Some(t) = req.remove("l5d-timeout").and_then(parse_duration) {
timeouts.limit = Some(t.into());
}
if let Some(t) = req.remove("l5d-request-timeout").and_then(parse_duration) {
timeouts.limit = Some(t.into());
}

if let Some(t) = req.remove("l5d-response-timeout").and_then(parse_duration) {
timeouts.response_end = Some(t);
}

timeouts
}
}

fn parse_http_conditions(s: &str) -> Option<policy::http::StatusRanges> {
fn to_code(s: &str) -> Option<u16> {
let code = s.parse::<u16>().ok()?;
if (100..600).contains(&code) {
Some(code)
} else {
None
}
}

Some(policy::http::StatusRanges(
s.split(',')
.filter_map(|cond| {
if cond.eq_ignore_ascii_case("5xx") {
return Some(500..=599);
}
if cond.eq_ignore_ascii_case("gateway-error") {
return Some(502..=504);
}

if let Some(code) = to_code(cond) {
return Some(code..=code);
}
if let Some((start, end)) = cond.split_once('-') {
if let (Some(s), Some(e)) = (to_code(start), to_code(end)) {
if s <= e {
return Some(s..=e);
}
}
}

None
})
.collect(),
))
}

fn parse_grpc_conditions(s: &str) -> Option<policy::grpc::Codes> {
Some(policy::grpc::Codes(std::sync::Arc::new(
s.split(',')
.filter_map(|cond| {
if cond.eq_ignore_ascii_case("cancelled") {
return Some(tonic::Code::Cancelled as u16);
}
if cond.eq_ignore_ascii_case("deadline-exceeded") {
return Some(tonic::Code::DeadlineExceeded as u16);
}
if cond.eq_ignore_ascii_case("internal") {
return Some(tonic::Code::Internal as u16);
}
if cond.eq_ignore_ascii_case("resource-exhausted") {
return Some(tonic::Code::ResourceExhausted as u16);
}
if cond.eq_ignore_ascii_case("unavailable") {
return Some(tonic::Code::Unavailable as u16);
}
None
})
.collect(),
)))
}

// Copied from the policy controller so that we handle the same duration values
// as we do in the YAML config.
fn parse_duration(hv: http::HeaderValue) -> Option<time::Duration> {
#[inline]
fn parse(s: &str) -> Option<time::Duration> {
let s = s.trim();
let offset = s.rfind(|c: char| c.is_ascii_digit())?;
let (magnitude, unit) = s.split_at(offset + 1);
let magnitude = magnitude.parse::<u64>().ok()?;

let mul = match unit {
"" if magnitude == 0 => 0,
"ms" => 1,
"s" => 1000,
"m" => 1000 * 60,
"h" => 1000 * 60 * 60,
"d" => 1000 * 60 * 60 * 24,
_ => return None,
};

let ms = magnitude.checked_mul(mul)?;
Some(time::Duration::from_millis(ms))
}
let s = hv.to_str().ok()?;
let Some(d) = parse(s) else {
tracing::debug!("Invalid duration: {:?}", s);
return None;
};
Some(d)
}
7 changes: 2 additions & 5 deletions linkerd/app/outbound/src/http/logical/policy/route/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,13 @@ impl retry::Policy for RetryPolicy {
};
dst.insert(extensions::Attempt(attempt));

let retries_remain = u16::from(attempt) as usize <= self.max_retries;
if retries_remain {
dst.insert(self.clone());
}

if let Some(mut timeouts) = src.get::<http::StreamTimeouts>().cloned() {
// If retries are exhausted, remove the response headers timeout,
// since we're not blocked on making a decision on a retry decision.
// This may give the request additional time to be satisfied.
let retries_remain = u16::from(attempt) as usize <= self.max_retries;
if !retries_remain {
tracing::debug!("Exhausted retries; removing response headers timeout");
timeouts.response_headers = None;
}
dst.insert(timeouts);
Expand Down
6 changes: 2 additions & 4 deletions linkerd/app/outbound/src/http/logical/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tracing::Instrument;

mod basic;
mod failure_accrual;
mod headers;
mod retries;
mod timeouts;

Expand Down Expand Up @@ -129,10 +130,7 @@ async fn mk_rsp(status: StatusCode, body: impl ToString) -> Result<Response> {
async fn mk_grpc_rsp(code: tonic::Code) -> Result<Response> {
Ok(http::Response::builder()
.version(::http::Version::HTTP_2)
.header(
"content-type",
http::HeaderValue::from_static("application/grpc"),
)
.header("content-type", "application/grpc")
.body(BoxBody::new(MockBody::trailers(async move {
let mut trls = http::HeaderMap::default();
trls.insert("grpc-status", (code as u8).to_string().parse().unwrap());
Expand Down
Loading

0 comments on commit f840824

Please sign in to comment.