Skip to content

Commit

Permalink
feat(app): configure HTTP/2 server parameters
Browse files Browse the repository at this point in the history
59ea792 expanded the proxy's HTTP/2 server parameters so that a variety of
Hyper server settings may be controlled at runtime. There is currently no way
for the control plane to influence these settings.

This commit adds environment-based configuration for the inbound and outbound
proxy servers. All server parameters made configurable in 59ea792 are now
configurable via environment variables. Variables take the form:

    LINKERD2_PROXY_<PROXY>_SERVER_HTTP2_<PARAM>

Where `<PROXY>` is either `INBOUND` or `OUTBOUND`, and `<PARAM>` is the
capitalized snake-case name of the parameter.
  • Loading branch information
olix0r committed Apr 23, 2024
1 parent bd562d7 commit f07354f
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 52 deletions.
40 changes: 12 additions & 28 deletions linkerd/app/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use thiserror::Error;
use tracing::{debug, error, info, warn};

mod control;
mod http2;
mod opencensus;
mod types;

Expand Down Expand Up @@ -475,13 +476,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
let server = ServerConfig {
addr,
keepalive,
http2: h2::ServerParams {
flow_control: Some(h2::FlowControl::Fixed {
initial_stream_window_size,
initial_connection_window_size,
}),
..Default::default()
},
http2: http2::parse_server(strings, "LINKERD2_PROXY_OUTBOUND_SERVER_HTTP2")?,
};
let discovery_idle_timeout =
outbound_discovery_idle_timeout?.unwrap_or(DEFAULT_OUTBOUND_DISCOVERY_IDLE_TIMEOUT);
Expand Down Expand Up @@ -569,13 +564,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
let server = ServerConfig {
addr,
keepalive,
http2: h2::ServerParams {
flow_control: Some(h2::FlowControl::Fixed {
initial_stream_window_size,
initial_connection_window_size,
}),
..Default::default()
},
http2: http2::parse_server(strings, "LINKERD2_PROXY_INBOUND_SERVER_HTTP2")?,
};
let discovery_idle_timeout =
inbound_discovery_idle_timeout?.unwrap_or(DEFAULT_INBOUND_DISCOVERY_IDLE_TIMEOUT);
Expand Down Expand Up @@ -776,13 +765,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
server: ServerConfig {
addr: DualListenAddr(admin_listener_addr, None),
keepalive: inbound.proxy.server.keepalive,
http2: h2::ServerParams {
flow_control: Some(h2::FlowControl::Fixed {
initial_stream_window_size,
initial_connection_window_size,
}),
..Default::default()
},
http2: inbound.proxy.server.http2.clone(),
},

// TODO(ver) Currently we always enable profiling when the pprof feature
Expand Down Expand Up @@ -841,13 +824,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
config: ServerConfig {
addr: DualListenAddr(addr, None),
keepalive: inbound.proxy.server.keepalive,
http2: h2::ServerParams {
flow_control: Some(h2::FlowControl::Fixed {
initial_stream_window_size,
initial_connection_window_size,
}),
..Default::default()
},
http2: inbound.proxy.server.http2.clone(),
},
})
.unwrap_or(super::tap::Config::Disabled);
Expand Down Expand Up @@ -1206,3 +1183,10 @@ pub fn parse_linkerd_identity_config<S: Strings>(
}
}
}

#[cfg(test)]
impl Strings for std::collections::HashMap<&'static str, &'static str> {
fn get(&self, key: &str) -> Result<Option<String>, EnvError> {
Ok(self.get(key).map(ToString::to_string))
}
}
7 changes: 1 addition & 6 deletions linkerd/app/src/env/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,8 @@ mod tests {

#[test]
fn control_stream_limits() {
impl Strings for HashMap<&'static str, &'static str> {
fn get(&self, key: &str) -> Result<Option<String>, EnvError> {
Ok(self.get(key).map(ToString::to_string))
}
}

let mut env = HashMap::default();

env.insert("LINKERD2_PROXY_CONTROL_STREAM_INITIAL_TIMEOUT", "1s");
env.insert("LINKERD2_PROXY_CONTROL_STREAM_IDLE_TIMEOUT", "2s");
env.insert("LINKERD2_PROXY_CONTROL_STREAM_LIFETIME", "3s");
Expand Down
154 changes: 154 additions & 0 deletions linkerd/app/src/env/http2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use super::{parse, types::*, EnvError, Strings};
use linkerd_app_core::proxy::http::h2;
use linkerd_app_outbound::http::h2::ServerParams;

pub(super) fn parse_server<S: Strings>(
strings: &S,
base: &str,
) -> Result<h2::ServerParams, EnvError> {
Ok(ServerParams {
flow_control: Some(parse_flow_control(strings, base)?),
keep_alive: parse_keep_alive(strings, &format!("{}_KEEP_ALIVE", base))?,
max_concurrent_streams: parse(
strings,
&format!("{}_MAX_CONCURRENT_STREAMS", base),
parse_number,
)?,
max_frame_size: parse(strings, &format!("{}_MAX_FRAME_SIZE", base), parse_number)?,
max_header_list_size: parse(
strings,
&format!("{}_MAX_HEADER_LIST_SIZE", base),
parse_number,
)?,
max_pending_accept_reset_streams: parse(
strings,
&format!("{}_MAX_PENDING_ACCEPT_RESET_STREAMS", base),
parse_number,
)?,
max_send_buf_size: parse(
strings,
&format!("{}_MAX_SEND_BUF_SIZE", base),
parse_number,
)?,
})
}

fn parse_flow_control<S: Strings>(strings: &S, base: &str) -> Result<h2::FlowControl, EnvError> {
if let Some(true) = parse(
strings,
&format!("{}_ADAPTIVE_FLOW_CONTROL", base),
parse_bool,
)? {
return Ok(h2::FlowControl::Adaptive);
}

if let (Some(initial_stream_window_size), Some(initial_connection_window_size)) = (
parse(
strings,
&format!("{}_INITIAL_STREAM_WINDOW_SIZE", base),
parse_number,
)?,
parse(
strings,
&format!("{}_INITIAL_CONNECTION_WINDOW_SIZE", base),
parse_number,
)?,
) {
return Ok(h2::FlowControl::Fixed {
initial_stream_window_size,
initial_connection_window_size,
});
}

// The proxy's defaults are used if no flow control settings are provided.
Ok(h2::FlowControl::Fixed {
initial_connection_window_size: super::DEFAULT_INITIAL_CONNECTION_WINDOW_SIZE,
initial_stream_window_size: super::DEFAULT_INITIAL_STREAM_WINDOW_SIZE,
})
}

fn parse_keep_alive<S: Strings>(
strings: &S,
base: &str,
) -> Result<Option<h2::KeepAlive>, EnvError> {
if let (Some(timeout), Some(interval)) = (
parse(strings, &format!("{}_TIMEOUT", base), parse_duration)?,
parse(strings, &format!("{}_INTERVAL", base), parse_duration)?,
) {
return Ok(Some(h2::KeepAlive { interval, timeout }));
}

Ok(None)
}

#[cfg(test)]
mod tests {
use super::*;
use std::{collections::HashMap, time::Duration};

#[test]
fn server_params() {
let mut env = HashMap::default();

// Produces empty params if no relevant env vars are set.
let default = h2::ServerParams {
flow_control: Some(h2::FlowControl::Fixed {
initial_stream_window_size: super::super::DEFAULT_INITIAL_STREAM_WINDOW_SIZE,
initial_connection_window_size:
super::super::DEFAULT_INITIAL_CONNECTION_WINDOW_SIZE,
}),
..Default::default()
};
assert_eq!(parse_server(&env, "TEST").unwrap(), default);

// Set all the fields.
env.insert("TEST_MAX_CONCURRENT_STREAMS", "3");
env.insert("TEST_MAX_FRAME_SIZE", "4");
env.insert("TEST_MAX_HEADER_LIST_SIZE", "5");
env.insert("TEST_MAX_PENDING_ACCEPT_RESET_STREAMS", "6");
env.insert("TEST_MAX_SEND_BUF_SIZE", "7");
env.insert("TEST_KEEP_ALIVE_TIMEOUT", "1s");
env.insert("TEST_KEEP_ALIVE_INTERVAL", "2s");
env.insert("TEST_INITIAL_STREAM_WINDOW_SIZE", "1");
env.insert("TEST_INITIAL_CONNECTION_WINDOW_SIZE", "2");
let expected = h2::ServerParams {
flow_control: Some(h2::FlowControl::Fixed {
initial_stream_window_size: 1,
initial_connection_window_size: 2,
}),
keep_alive: Some(h2::KeepAlive {
interval: Duration::from_secs(2),
timeout: Duration::from_secs(1),
}),
max_concurrent_streams: Some(3),
max_frame_size: Some(4),
max_header_list_size: Some(5),
max_pending_accept_reset_streams: Some(6),
max_send_buf_size: Some(7),
};
assert_eq!(parse_server(&env, "TEST").unwrap(), expected);

// Enable adaptive flow control, overriding other flow control settings.
env.insert("TEST_ADAPTIVE_FLOW_CONTROL", "true");
assert_eq!(
parse_server(&env, "TEST").unwrap(),
h2::ServerParams {
flow_control: Some(h2::FlowControl::Adaptive),
..expected
}
);

// Clear the flow control and set adaptive to false to ensure the
// default flow control is used.
env.remove("TEST_INITIAL_STREAM_WINDOW_SIZE");
env.remove("TEST_INITIAL_CONNECTION_WINDOW_SIZE");
env.insert("TEST_ADAPTIVE_FLOW_CONTROL", "false");
assert_eq!(
parse_server(&env, "TEST").unwrap(),
h2::ServerParams {
flow_control: default.flow_control,
..expected
}
);
}
}
30 changes: 16 additions & 14 deletions linkerd/proxy/http/src/h2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use std::{
use tracing::instrument::Instrument;
use tracing::{debug, debug_span, trace_span};

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct ServerParams {
pub flow_control: Option<FlowControl>,
pub keepalive: Option<KeepAlive>,
pub keep_alive: Option<KeepAlive>,
pub max_concurrent_streams: Option<u32>,

// Internals
Expand All @@ -29,30 +29,31 @@ pub struct ServerParams {
pub max_send_buf_size: Option<usize>,
}

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct ClientParams {
pub flow_control: Option<FlowControl>,
pub keepalive: Option<ClientKeepAlive>,
pub keep_alive: Option<ClientKeepAlive>,

// Interansl
// Internals
pub max_concurrent_reset_streams: Option<usize>,
pub max_frame_size: Option<u32>,
pub max_send_buf_size: Option<usize>,
}

#[derive(Copy, Clone, Debug, Default)]
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub struct KeepAlive {
pub interval: Duration,
pub timeout: Duration,
}

#[derive(Copy, Clone, Debug, Default)]
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub struct ClientKeepAlive {
pub keepalive: KeepAlive,
pub interval: Duration,
pub timeout: Duration,
pub while_idle: bool,
}

#[derive(Copy, Clone, Debug)]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum FlowControl {
Adaptive,
Fixed {
Expand Down Expand Up @@ -119,7 +120,7 @@ where
fn call(&mut self, target: T) -> Self::Future {
let ClientParams {
flow_control,
keepalive,
keep_alive,
max_concurrent_reset_streams,
max_frame_size,
max_send_buf_size,
Expand Down Expand Up @@ -152,13 +153,14 @@ where

// Configure HTTP/2 PING frames
if let Some(ClientKeepAlive {
keepalive: ka,
timeout,
interval,
while_idle,
}) = keepalive
}) = keep_alive
{
builder
.http2_keep_alive_timeout(ka.timeout)
.http2_keep_alive_interval(ka.interval)
.http2_keep_alive_timeout(timeout)
.http2_keep_alive_interval(interval)
.http2_keep_alive_while_idle(while_idle);
}

Expand Down
8 changes: 4 additions & 4 deletions linkerd/proxy/http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ where
drain,
} = self.params.extract_param(&target);
let h2::ServerParams {
keepalive,
keep_alive,
flow_control,
max_concurrent_streams,
max_frame_size,
Expand All @@ -92,9 +92,9 @@ where
}

// Configure HTTP/2 PING frames
if let Some(ka) = keepalive {
srv.http2_keep_alive_timeout(ka.timeout)
.http2_keep_alive_interval(ka.interval);
if let Some(h2::KeepAlive { timeout, interval }) = keep_alive {
srv.http2_keep_alive_timeout(timeout)
.http2_keep_alive_interval(interval);
}

srv.http2_max_concurrent_streams(max_concurrent_streams)
Expand Down

0 comments on commit f07354f

Please sign in to comment.