Skip to content

Commit

Permalink
chore(http): split h2 client and server config
Browse files Browse the repository at this point in the history
Hyper supports a variety of configuration options for for the HTTP/2 client and
servers. The proxy's http client and server types do not support modifying these
parameters. We want to support configuring HTTP/2 server and client keep-alives;
and if we are exposing these parameters to the control plane, it also makes
sense to include the others, especially the per-connection stream concurrency
limit and the initial window sizes.

This commit splits the h2 'Settings' type into 'ClientParams' and
'ServerParams', including the appropriate per-connection settings for each.
These parameters are wired into the hyper client and server builders.

These parameters are not yet exposed to the control plane via configuration
interfaces.
  • Loading branch information
olix0r committed Apr 22, 2024
1 parent 5268124 commit 1fbea22
Show file tree
Hide file tree
Showing 17 changed files with 217 additions and 99 deletions.
2 changes: 1 addition & 1 deletion linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Config {
move |t: &Http| {
http::ServerParams {
version: t.version,
h2: Default::default(),
http2: Default::default(),
drain: drain.clone(),
}
}
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ use std::time::Duration;
pub struct ServerConfig {
pub addr: DualListenAddr,
pub keepalive: Keepalive,
pub h2_settings: h2::Settings,
pub http2: h2::ServerParams,
}

#[derive(Clone, Debug)]
pub struct ConnectConfig {
pub backoff: ExponentialBackoff,
pub timeout: Duration,
pub keepalive: Keepalive,
pub h1_settings: h1::PoolSettings,
pub h2_settings: h2::Settings,
pub http1: h1::PoolSettings,
pub http2: h2::ClientParams,
}

#[derive(Clone, Debug)]
Expand Down
9 changes: 4 additions & 5 deletions linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Config {
.push(tls::Client::layer(identity))
.push_connect_timeout(self.connect.timeout)
.push_map_target(|(_version, target)| target)
.push(self::client::layer(self.connect.h2_settings))
.push(self::client::layer(self.connect.http2))
.push_on_service(svc::MapErr::layer_boxed())
.into_new_service();

Expand Down Expand Up @@ -330,7 +330,6 @@ mod client {
svc, tls,
transport::{Remote, ServerAddr},
};
use linkerd_proxy_http::h2::Settings as H2Settings;
use std::net::SocketAddr;

#[derive(Clone, Hash, Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -368,11 +367,11 @@ mod client {
// === impl Layer ===

pub fn layer<C, B>(
settings: H2Settings,
) -> impl svc::Layer<C, Service = http::h2::Connect<C, B>> + Copy
h2: http::h2::ClientParams,
) -> impl svc::Layer<C, Service = http::h2::Connect<C, B>> + Clone
where
http::h2::Connect<C, B>: tower::Service<Target>,
{
svc::layer::mk(move |mk_conn| http::h2::Connect::new(mk_conn, settings))
svc::layer::mk(move |mk_conn| http::h2::Connect::new(mk_conn, h2.clone()))
}
}
4 changes: 2 additions & 2 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ impl<C> Inbound<C> {
.check_service::<Http>()
.push_map_target(|(_version, target)| target)
.push(http::client::layer(
config.proxy.connect.h1_settings,
config.proxy.connect.h2_settings,
config.proxy.connect.http1,
config.proxy.connect.http2.clone(),
))
.check_service::<Http>()
.push_on_service(svc::MapErr::layer_boxed())
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/inbound/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,15 @@ impl<H> Inbound<H> {
HSvc::Future: Send,
{
self.map_stack(|config, rt, http| {
let h2 = config.proxy.server.h2_settings;
let h2 = config.proxy.server.http2.clone();
let drain = rt.drain.clone();

http.check_new_service::<T, http::Request<http::BoxBody>>()
.unlift_new()
.check_new_new_service::<T, http::ClientHandle, http::Request<_>>()
.push(http::NewServeHttp::layer(move |t: &T| http::ServerParams {
version: t.param(),
h2,
http2: h2.clone(),
drain: drain.clone(),
}))
.check_new_service::<T, I>()
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/inbound/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub fn default_config() -> Config {
server: config::ServerConfig {
addr: DualListenAddr(([0, 0, 0, 0], 0).into(), None),
keepalive: Keepalive(None),
h2_settings: h2::Settings::default(),
http2: h2::ServerParams::default(),
},
connect: config::ConnectConfig {
keepalive: Keepalive(None),
Expand All @@ -70,11 +70,11 @@ pub fn default_config() -> Config {
0.1,
)
.unwrap(),
h1_settings: h1::PoolSettings {
http1: h1::PoolSettings {
max_idle: 1,
idle_timeout: Duration::from_secs(1),
},
h2_settings: h2::Settings::default(),
http2: h2::ClientParams::default(),
},
max_in_flight_requests: 10_000,
detect_protocol_timeout: Duration::from_secs(10),
Expand Down
6 changes: 2 additions & 4 deletions linkerd/app/outbound/src/http/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ impl<C> Outbound<C> {
{
self.map_stack(|config, _, inner| {
let config::ConnectConfig {
h1_settings,
h2_settings,
..
http1, ref http2, ..
} = config.proxy.connect;

// Initiates an HTTP client on the underlying transport. Prior-knowledge HTTP/2
Expand All @@ -65,7 +63,7 @@ impl<C> Outbound<C> {
svc::stack(inner.into_inner().into_service())
.check_service::<Connect<T>>()
.push_map_target(|(version, inner)| Connect { version, inner })
.push(http::client::layer(h1_settings, h2_settings))
.push(http::client::layer(http1, http2.clone()))
.push_on_service(svc::MapErr::layer_boxed())
.check_service::<T>()
.into_new_service()
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/outbound/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub(crate) struct ServerRescue {

#[derive(Clone, Debug)]
pub struct ExtractServerParams {
h2: http::h2::Settings,
h2: http::h2::ServerParams,
drain: drain::Watch,
}

Expand Down Expand Up @@ -86,7 +86,7 @@ impl<N> Outbound<N> {
http.unlift_new()
.push(svc::ArcNewService::layer())
.push(http::NewServeHttp::layer(ExtractServerParams {
h2: config.proxy.server.h2_settings,
h2: config.proxy.server.http2.clone(),
drain: rt.drain.clone(),
}))
})
Expand Down Expand Up @@ -194,7 +194,7 @@ where
fn extract_param(&self, t: &T) -> http::ServerParams {
http::ServerParams {
version: t.param(),
h2: self.h2,
http2: self.h2.clone(),
drain: self.drain.clone(),
}
}
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,11 @@ impl<N> Outbound<N> {
http.check_new_service::<Http<T>, http::Request<_>>()
.unlift_new()
.push(http::NewServeHttp::layer({
let h2 = config.proxy.server.h2_settings;
let h2 = config.proxy.server.http2.clone();
let drain = rt.drain.clone();
move |http: &Http<T>| http::ServerParams {
version: http.version,
h2,
http2: h2.clone(),
drain: drain.clone()
}
}))
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ impl<N> Outbound<N> {
let opaq = self.clone().into_stack();

let http = self.with_stack(http).map_stack(|config, rt, stk| {
let h2 = config.proxy.server.h2_settings;
let h2 = config.proxy.server.http2.clone();
let drain = rt.drain.clone();
stk.unlift_new()
.push(http::NewServeHttp::layer(move |t: &Http<T>| {
http::ServerParams {
version: t.version,
h2,
http2: h2.clone(),
drain: drain.clone(),
}
}))
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/outbound/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(crate) fn default_config() -> Config {
server: config::ServerConfig {
addr: DualListenAddr(([0, 0, 0, 0], 0).into(), None),
keepalive: Keepalive(None),
h2_settings: h2::Settings::default(),
http2: h2::ServerParams::default(),
},
connect: config::ConnectConfig {
keepalive: Keepalive(None),
Expand All @@ -37,11 +37,11 @@ pub(crate) fn default_config() -> Config {
0.1,
)
.unwrap(),
h1_settings: h1::PoolSettings {
http1: h1::PoolSettings {
max_idle: 1,
idle_timeout: Duration::from_secs(1),
},
h2_settings: h2::Settings::default(),
http2: h2::ClientParams::default(),
},
max_in_flight_requests: 10_000,
detect_protocol_timeout: Duration::from_secs(3),
Expand Down
70 changes: 49 additions & 21 deletions linkerd/app/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,21 +425,13 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
);
let dst_profile_networks = parse(strings, ENV_DESTINATION_PROFILE_NETWORKS, parse_networks);

let initial_stream_window_size = parse(strings, ENV_INITIAL_STREAM_WINDOW_SIZE, parse_number);
let initial_connection_window_size =
parse(strings, ENV_INITIAL_CONNECTION_WINDOW_SIZE, parse_number);

let tap = parse_tap_config(strings);

let h2_settings = h2::Settings {
initial_stream_window_size: Some(
initial_stream_window_size?.unwrap_or(DEFAULT_INITIAL_STREAM_WINDOW_SIZE),
),
initial_connection_window_size: Some(
initial_connection_window_size?.unwrap_or(DEFAULT_INITIAL_CONNECTION_WINDOW_SIZE),
),
..Default::default()
};
let initial_stream_window_size = parse(strings, ENV_INITIAL_STREAM_WINDOW_SIZE, parse_number)?
.unwrap_or(DEFAULT_INITIAL_STREAM_WINDOW_SIZE);
let initial_connection_window_size =
parse(strings, ENV_INITIAL_CONNECTION_WINDOW_SIZE, parse_number)?
.unwrap_or(DEFAULT_INITIAL_CONNECTION_WINDOW_SIZE);

let dst_profile_suffixes = dst_profile_suffixes?
.unwrap_or_else(|| parse_dns_suffixes(DEFAULT_DESTINATION_PROFILE_SUFFIXES).unwrap());
Expand Down Expand Up @@ -481,7 +473,13 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
let server = ServerConfig {
addr,
keepalive,
h2_settings,
http2: h2::ServerParams {
flow_control: Some(h2::FlowControl::Fixed {
initial_stream_window_size,
initial_connection_window_size,
}),
..Default::default()
},
};
let discovery_idle_timeout =
outbound_discovery_idle_timeout?.unwrap_or(DEFAULT_OUTBOUND_DISCOVERY_IDLE_TIMEOUT);
Expand All @@ -502,8 +500,14 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
OUTBOUND_CONNECT_BASE,
DEFAULT_OUTBOUND_CONNECT_BACKOFF,
)?,
h2_settings,
h1_settings: h1::PoolSettings {
http2: h2::ClientParams {
flow_control: Some(h2::FlowControl::Fixed {
initial_stream_window_size,
initial_connection_window_size,
}),
..Default::default()
},
http1: h1::PoolSettings {
max_idle,
idle_timeout: connection_pool_timeout
.unwrap_or(DEFAULT_OUTBOUND_HTTP1_CONNECTION_POOL_IDLE_TIMEOUT),
Expand Down Expand Up @@ -563,7 +567,13 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
let server = ServerConfig {
addr,
keepalive,
h2_settings,
http2: h2::ServerParams {
flow_control: Some(h2::FlowControl::Fixed {
initial_stream_window_size,
initial_connection_window_size,
}),
..Default::default()
},
};
let discovery_idle_timeout =
inbound_discovery_idle_timeout?.unwrap_or(DEFAULT_INBOUND_DISCOVERY_IDLE_TIMEOUT);
Expand All @@ -584,8 +594,14 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
INBOUND_CONNECT_BASE,
DEFAULT_INBOUND_CONNECT_BACKOFF,
)?,
h2_settings,
h1_settings: h1::PoolSettings {
http2: h2::ClientParams {
flow_control: Some(h2::FlowControl::Fixed {
initial_stream_window_size,
initial_connection_window_size,
}),
..Default::default()
},
http1: h1::PoolSettings {
max_idle,
idle_timeout: connection_pool_timeout,
},
Expand Down Expand Up @@ -758,7 +774,13 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
server: ServerConfig {
addr: DualListenAddr(admin_listener_addr, None),
keepalive: inbound.proxy.server.keepalive,
h2_settings,
http2: h2::ServerParams {
flow_control: Some(h2::FlowControl::Fixed {
initial_stream_window_size,
initial_connection_window_size,
}),
..Default::default()
},
},

// TODO(ver) Currently we always enable profiling when the pprof feature
Expand Down Expand Up @@ -817,7 +839,13 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
config: ServerConfig {
addr: DualListenAddr(addr, None),
keepalive: inbound.proxy.server.keepalive,
h2_settings,
http2: h2::ServerParams {
flow_control: Some(h2::FlowControl::Fixed {
initial_stream_window_size,
initial_connection_window_size,
}),
..Default::default()
},
},
})
.unwrap_or(super::tap::Config::Disabled);
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{collections::HashSet, pin::Pin};
use tower::util::{service_fn, ServiceExt};

#[derive(Clone, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Config {
Disabled,
Enabled {
Expand Down
Loading

0 comments on commit 1fbea22

Please sign in to comment.