Skip to content

Commit

Permalink
feat(outbound): Discover timeouts and retries
Browse files Browse the repository at this point in the history
This change updates linkerd2-proxy-api to v0.14.0, which introduces per-route
timeout and retry policies. These API responses are bound to the recently
introduced outbound retry policies.

Support for the legacy request_timeout configuration (which in rare situations
may be set by older control planes) is maintained, but is now deprecated in
favor of the newer timeout policies.
  • Loading branch information
olix0r committed Jul 25, 2024
1 parent 2c03996 commit 523fd63
Show file tree
Hide file tree
Showing 15 changed files with 243 additions and 54 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2294,9 +2294,9 @@ dependencies = [

[[package]]
name = "linkerd2-proxy-api"
version = "0.13.1"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65678e4c506a7e5fdf1a664c629a9b658afa70e254dffcd24df72e937b2c0159"
checksum = "26c72fb98d969e1e94e95d52a6fcdf4693764702c369e577934256e72fb5bc61"
dependencies = [
"h2",
"http",
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,6 @@ members = [
[profile.release]
debug = 1
lto = true

[workspace.dependencies]
linkerd2-proxy-api = "0.14.0"
2 changes: 1 addition & 1 deletion linkerd/app/inbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ linkerd-meshtls-rustls = { path = "../../meshtls/rustls", optional = true }
linkerd-proxy-client-policy = { path = "../../proxy/client-policy" }
linkerd-tonic-stream = { path = "../../tonic-stream" }
linkerd-tonic-watch = { path = "../../tonic-watch" }
linkerd2-proxy-api = { version = "0.13", features = ["inbound"] }
linkerd2-proxy-api = { workspace = true, features = ["inbound"] }
once_cell = "1"
parking_lot = "0.12"
rangemap = "1"
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ ipnet = "2"
linkerd-app = { path = "..", features = ["allow-loopback"] }
linkerd-app-core = { path = "../core" }
linkerd-metrics = { path = "../../metrics", features = ["test_util"] }
linkerd2-proxy-api = { version = "0.13", features = [
linkerd2-proxy-api = { workspace = true, features = [
"destination",
"arbitrary",
] }
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ahash = "0.8"
bytes = "1"
http = "0.2"
futures = { version = "0.3", default-features = false }
linkerd2-proxy-api = { version = "0.13", features = ["outbound"] }
linkerd2-proxy-api = { workspace = true, features = ["outbound"] }
once_cell = "1"
parking_lot = "0.12"
pin-project = "1"
Expand Down
2 changes: 1 addition & 1 deletion linkerd/http/route/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ tracing = "0.1"
url = "2"

[dependencies.linkerd2-proxy-api]
version = "0.13"
workspace = true
features = ["http-route", "grpc-route"]
optional = true

Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/api-resolve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Implements the Resolve trait using the proxy's gRPC API

[dependencies]
futures = { version = "0.3", default-features = false }
linkerd2-proxy-api = { version = "0.13", features = ["destination"] }
linkerd2-proxy-api = { workspace = true, features = ["destination"] }
linkerd-addr = { path = "../../addr" }
linkerd-error = { path = "../../error" }
linkerd-proxy-core = { path = "../core" }
Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/client-policy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ linkerd-proxy-api-resolve = { path = "../api-resolve" }
linkerd-proxy-core = { path = "../core" }

[dependencies.linkerd2-proxy-api]
version = "0.13"
workspace = true
optional = true
features = ["outbound"]

Expand Down
79 changes: 75 additions & 4 deletions linkerd/proxy/client-policy/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,35 @@ pub mod proto {
#[error("invalid filter: {0}")]
Filter(#[from] InvalidFilter),

#[error("missing {0}")]
Missing(&'static str),

#[error("invalid failure accrual policy: {0}")]
Breaker(#[from] InvalidFailureAccrual),

#[error("{0}")]
Retry(#[from] InvalidRetry),

#[error("invalid request timeout: {0}")]
RequestTimeout(#[from] prost_types::DurationError),

#[error("missing {0}")]
Missing(&'static str),
#[error("{0}")]
Timeout(#[from] crate::http::proto::InvalidTimeouts),
}

#[derive(Debug, thiserror::Error)]
pub enum InvalidRetry {
#[error("invalid max-retries: {0}")]
MaxRetries(u32),

#[error("invalid condition")]
Condition,

#[error("invalid timeout: {0}")]
Timeout(#[from] prost_types::DurationError),

#[error("invalid backoff: {0}")]
Backoff(#[from] crate::proto::InvalidBackoff),
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -219,6 +240,9 @@ pub mod proto {
matches,
backends,
filters,
timeouts,
retry,
allow_l5d_request_headers,
request_timeout,
} = proto;

Expand All @@ -236,8 +260,9 @@ pub mod proto {
.ok_or(InvalidGrpcRoute::Missing("distribution"))?
.try_into()?;

let mut params = RouteParams::default();
params.timeouts.request = request_timeout.map(TryInto::try_into).transpose()?;
let mut params = RouteParams::try_from_proto(timeouts, retry, allow_l5d_request_headers)?;
let legacy = request_timeout.map(TryInto::try_into).transpose()?;
params.timeouts.request = params.timeouts.request.or(legacy);

Ok(Rule {
matches,
Expand All @@ -250,6 +275,52 @@ pub mod proto {
})
}

impl RouteParams {
fn try_from_proto(
timeouts: Option<linkerd2_proxy_api::http_route::Timeouts>,
retry: Option<grpc_route::Retry>,
_allow_l5d_request_headers: bool,
) -> Result<Self, InvalidGrpcRoute> {
Ok(Self {
retry: retry.map(Retry::try_from).transpose()?,
timeouts: timeouts
.map(crate::http::Timeouts::try_from)
.transpose()?
.unwrap_or_default(),
})
}
}

impl TryFrom<outbound::grpc_route::Retry> for Retry {
type Error = InvalidRetry;

fn try_from(retry: outbound::grpc_route::Retry) -> Result<Self, Self::Error> {
let cond = retry.conditions.ok_or(InvalidRetry::Condition)?;
let codes = Codes(Arc::new(
[
cond.cancelled.then_some(tonic::Code::Cancelled as u16),
cond.deadine_exceeded
.then_some(tonic::Code::DeadlineExceeded as u16),
cond.resource_exhausted
.then_some(tonic::Code::ResourceExhausted as u16),
cond.internal.then_some(tonic::Code::Internal as u16),
cond.unavailable.then_some(tonic::Code::Unavailable as u16),
]
.into_iter()
.flatten()
.collect(),
));

Ok(Self {
codes,
max_retries: retry.max_retries as usize,
max_request_bytes: retry.max_request_bytes as _,
backoff: retry.backoff.map(crate::proto::try_backoff).transpose()?,
timeout: retry.timeout.map(time::Duration::try_from).transpose()?,
})
}
}

impl TryFrom<grpc_route::Distribution> for RouteDistribution<Filter> {
type Error = InvalidDistribution;
fn try_from(distribution: grpc_route::Distribution) -> Result<Self, Self::Error> {
Expand Down
118 changes: 116 additions & 2 deletions linkerd/proxy/client-policy/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,37 @@ pub mod proto {

#[error("missing {0}")]
Missing(&'static str),

#[error(transparent)]
Timeout(#[from] InvalidTimeouts),

#[error(transparent)]
Retry(#[from] InvalidRetry),
}

#[derive(Debug, thiserror::Error)]
pub enum InvalidRetry {
#[error("invalid max-retries: {0}")]
MaxRetries(u32),

#[error("invalid condition")]
Condition,

#[error("invalid timeout: {0}")]
Timeout(#[from] prost_types::DurationError),

#[error("invalid backoff: {0}")]
Backoff(#[from] crate::proto::InvalidBackoff),
}

#[derive(Debug, thiserror::Error)]
pub enum InvalidTimeouts {
#[error("invalid response timeout: {0}")]
Response(prost_types::DurationError),
#[error("invalid idle timeout: {0}")]
Idle(prost_types::DurationError),
#[error("invalid request timeout: {0}")]
Request(prost_types::DurationError),
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -247,6 +278,9 @@ pub mod proto {
matches,
backends,
filters,
timeouts,
retry,
allow_l5d_request_headers,
request_timeout,
} = proto;

Expand All @@ -264,8 +298,9 @@ pub mod proto {
.ok_or(InvalidHttpRoute::Missing("distribution"))?
.try_into()?;

let mut params = RouteParams::default();
params.timeouts.request = request_timeout.map(TryInto::try_into).transpose()?;
let mut params = RouteParams::try_from_proto(timeouts, retry, allow_l5d_request_headers)?;
let legacy = request_timeout.map(TryInto::try_into).transpose()?;
params.timeouts.request = params.timeouts.request.or(legacy);

Ok(Rule {
matches,
Expand All @@ -278,6 +313,85 @@ pub mod proto {
})
}

impl RouteParams {
fn try_from_proto(
timeouts: Option<linkerd2_proxy_api::http_route::Timeouts>,
retry: Option<http_route::Retry>,
_allow_l5d_request_headers: bool,
) -> Result<Self, InvalidHttpRoute> {
Ok(Self {
retry: retry.map(Retry::try_from).transpose()?,
timeouts: timeouts
.map(Timeouts::try_from)
.transpose()?
.unwrap_or_default(),
})
}
}

impl TryFrom<linkerd2_proxy_api::http_route::Timeouts> for Timeouts {
type Error = InvalidTimeouts;
fn try_from(
timeouts: linkerd2_proxy_api::http_route::Timeouts,
) -> Result<Self, Self::Error> {
Ok(Self {
response: timeouts
.response
.map(time::Duration::try_from)
.transpose()
.map_err(InvalidTimeouts::Response)?,
idle: timeouts
.idle
.map(time::Duration::try_from)
.transpose()
.map_err(InvalidTimeouts::Response)?,
request: timeouts
.request
.map(time::Duration::try_from)
.transpose()
.map_err(InvalidTimeouts::Request)?,
})
}
}

impl TryFrom<outbound::http_route::Retry> for Retry {
type Error = InvalidRetry;
fn try_from(retry: outbound::http_route::Retry) -> Result<Self, Self::Error> {
fn range(
r: outbound::http_route::retry::conditions::StatusRange,
) -> Result<RangeInclusive<u16>, InvalidRetry> {
let Ok(start) = u16::try_from(r.start) else {
return Err(InvalidRetry::Condition);
};
let Ok(end) = u16::try_from(r.end) else {
return Err(InvalidRetry::Condition);
};
if start == 0 || end == 0 || end > 599 || start > end {
return Err(InvalidRetry::Condition);
}
Ok(start..=end)
}

let status_ranges = StatusRanges(
retry
.conditions
.ok_or(InvalidRetry::Condition)?
.status_ranges
.into_iter()
.map(range)
.collect::<Result<_, _>>()?,
);
Ok(Self {
status_ranges,
max_retries: u16::try_from(retry.max_retries)
.map_err(|_| InvalidRetry::MaxRetries(retry.max_retries))?,
max_request_bytes: retry.max_request_bytes as _,
backoff: retry.backoff.map(crate::proto::try_backoff).transpose()?,
timeout: retry.timeout.map(time::Duration::try_from).transpose()?,
})
}
}

impl TryFrom<http_route::Distribution> for RouteDistribution<Filter> {
type Error = InvalidDistribution;
fn try_from(distribution: http_route::Distribution) -> Result<Self, Self::Error> {
Expand Down
Loading

0 comments on commit 523fd63

Please sign in to comment.