From f07354ffbdc3145a473ececd683cf6238a730f0b Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 22 Apr 2024 23:44:58 +0000 Subject: [PATCH] feat(app): configure HTTP/2 server parameters 59ea792b3 expanded the proxy's HTTP/2 server parameters so that a variety of Hyper server settings may be controlled at runtime. There is currently no way for the control plane to influence these settings. This commit adds environment-based configuration for the inbound and outbound proxy servers. All server parameters made configurable in 59ea792b3 are now configurable via environment variables. Variables take the form: LINKERD2_PROXY__SERVER_HTTP2_ Where `` is either `INBOUND` or `OUTBOUND`, and `` is the capitalized snake-case name of the parameter. --- linkerd/app/src/env.rs | 40 +++----- linkerd/app/src/env/control.rs | 7 +- linkerd/app/src/env/http2.rs | 154 +++++++++++++++++++++++++++++++ linkerd/proxy/http/src/h2.rs | 30 +++--- linkerd/proxy/http/src/server.rs | 8 +- 5 files changed, 187 insertions(+), 52 deletions(-) create mode 100644 linkerd/app/src/env/http2.rs diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 84cf81da77..8021e75771 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -18,6 +18,7 @@ use thiserror::Error; use tracing::{debug, error, info, warn}; mod control; +mod http2; mod opencensus; mod types; @@ -475,13 +476,7 @@ pub fn parse_config(strings: &S) -> Result let server = ServerConfig { addr, keepalive, - http2: h2::ServerParams { - flow_control: Some(h2::FlowControl::Fixed { - initial_stream_window_size, - initial_connection_window_size, - }), - ..Default::default() - }, + http2: http2::parse_server(strings, "LINKERD2_PROXY_OUTBOUND_SERVER_HTTP2")?, }; let discovery_idle_timeout = outbound_discovery_idle_timeout?.unwrap_or(DEFAULT_OUTBOUND_DISCOVERY_IDLE_TIMEOUT); @@ -569,13 +564,7 @@ pub fn parse_config(strings: &S) -> Result let server = ServerConfig { addr, keepalive, - http2: h2::ServerParams { - flow_control: Some(h2::FlowControl::Fixed { - initial_stream_window_size, - initial_connection_window_size, - }), - ..Default::default() - }, + http2: http2::parse_server(strings, "LINKERD2_PROXY_INBOUND_SERVER_HTTP2")?, }; let discovery_idle_timeout = inbound_discovery_idle_timeout?.unwrap_or(DEFAULT_INBOUND_DISCOVERY_IDLE_TIMEOUT); @@ -776,13 +765,7 @@ pub fn parse_config(strings: &S) -> Result server: ServerConfig { addr: DualListenAddr(admin_listener_addr, None), keepalive: inbound.proxy.server.keepalive, - http2: h2::ServerParams { - flow_control: Some(h2::FlowControl::Fixed { - initial_stream_window_size, - initial_connection_window_size, - }), - ..Default::default() - }, + http2: inbound.proxy.server.http2.clone(), }, // TODO(ver) Currently we always enable profiling when the pprof feature @@ -841,13 +824,7 @@ pub fn parse_config(strings: &S) -> Result config: ServerConfig { addr: DualListenAddr(addr, None), keepalive: inbound.proxy.server.keepalive, - http2: h2::ServerParams { - flow_control: Some(h2::FlowControl::Fixed { - initial_stream_window_size, - initial_connection_window_size, - }), - ..Default::default() - }, + http2: inbound.proxy.server.http2.clone(), }, }) .unwrap_or(super::tap::Config::Disabled); @@ -1206,3 +1183,10 @@ pub fn parse_linkerd_identity_config( } } } + +#[cfg(test)] +impl Strings for std::collections::HashMap<&'static str, &'static str> { + fn get(&self, key: &str) -> Result, EnvError> { + Ok(self.get(key).map(ToString::to_string)) + } +} diff --git a/linkerd/app/src/env/control.rs b/linkerd/app/src/env/control.rs index 2546b93a23..3f3c2d878e 100644 --- a/linkerd/app/src/env/control.rs +++ b/linkerd/app/src/env/control.rs @@ -40,13 +40,8 @@ mod tests { #[test] fn control_stream_limits() { - impl Strings for HashMap<&'static str, &'static str> { - fn get(&self, key: &str) -> Result, EnvError> { - Ok(self.get(key).map(ToString::to_string)) - } - } - let mut env = HashMap::default(); + env.insert("LINKERD2_PROXY_CONTROL_STREAM_INITIAL_TIMEOUT", "1s"); env.insert("LINKERD2_PROXY_CONTROL_STREAM_IDLE_TIMEOUT", "2s"); env.insert("LINKERD2_PROXY_CONTROL_STREAM_LIFETIME", "3s"); diff --git a/linkerd/app/src/env/http2.rs b/linkerd/app/src/env/http2.rs new file mode 100644 index 0000000000..ad0455d3bb --- /dev/null +++ b/linkerd/app/src/env/http2.rs @@ -0,0 +1,154 @@ +use super::{parse, types::*, EnvError, Strings}; +use linkerd_app_core::proxy::http::h2; +use linkerd_app_outbound::http::h2::ServerParams; + +pub(super) fn parse_server( + strings: &S, + base: &str, +) -> Result { + Ok(ServerParams { + flow_control: Some(parse_flow_control(strings, base)?), + keep_alive: parse_keep_alive(strings, &format!("{}_KEEP_ALIVE", base))?, + max_concurrent_streams: parse( + strings, + &format!("{}_MAX_CONCURRENT_STREAMS", base), + parse_number, + )?, + max_frame_size: parse(strings, &format!("{}_MAX_FRAME_SIZE", base), parse_number)?, + max_header_list_size: parse( + strings, + &format!("{}_MAX_HEADER_LIST_SIZE", base), + parse_number, + )?, + max_pending_accept_reset_streams: parse( + strings, + &format!("{}_MAX_PENDING_ACCEPT_RESET_STREAMS", base), + parse_number, + )?, + max_send_buf_size: parse( + strings, + &format!("{}_MAX_SEND_BUF_SIZE", base), + parse_number, + )?, + }) +} + +fn parse_flow_control(strings: &S, base: &str) -> Result { + if let Some(true) = parse( + strings, + &format!("{}_ADAPTIVE_FLOW_CONTROL", base), + parse_bool, + )? { + return Ok(h2::FlowControl::Adaptive); + } + + if let (Some(initial_stream_window_size), Some(initial_connection_window_size)) = ( + parse( + strings, + &format!("{}_INITIAL_STREAM_WINDOW_SIZE", base), + parse_number, + )?, + parse( + strings, + &format!("{}_INITIAL_CONNECTION_WINDOW_SIZE", base), + parse_number, + )?, + ) { + return Ok(h2::FlowControl::Fixed { + initial_stream_window_size, + initial_connection_window_size, + }); + } + + // The proxy's defaults are used if no flow control settings are provided. + Ok(h2::FlowControl::Fixed { + initial_connection_window_size: super::DEFAULT_INITIAL_CONNECTION_WINDOW_SIZE, + initial_stream_window_size: super::DEFAULT_INITIAL_STREAM_WINDOW_SIZE, + }) +} + +fn parse_keep_alive( + strings: &S, + base: &str, +) -> Result, EnvError> { + if let (Some(timeout), Some(interval)) = ( + parse(strings, &format!("{}_TIMEOUT", base), parse_duration)?, + parse(strings, &format!("{}_INTERVAL", base), parse_duration)?, + ) { + return Ok(Some(h2::KeepAlive { interval, timeout })); + } + + Ok(None) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::{collections::HashMap, time::Duration}; + + #[test] + fn server_params() { + let mut env = HashMap::default(); + + // Produces empty params if no relevant env vars are set. + let default = h2::ServerParams { + flow_control: Some(h2::FlowControl::Fixed { + initial_stream_window_size: super::super::DEFAULT_INITIAL_STREAM_WINDOW_SIZE, + initial_connection_window_size: + super::super::DEFAULT_INITIAL_CONNECTION_WINDOW_SIZE, + }), + ..Default::default() + }; + assert_eq!(parse_server(&env, "TEST").unwrap(), default); + + // Set all the fields. + env.insert("TEST_MAX_CONCURRENT_STREAMS", "3"); + env.insert("TEST_MAX_FRAME_SIZE", "4"); + env.insert("TEST_MAX_HEADER_LIST_SIZE", "5"); + env.insert("TEST_MAX_PENDING_ACCEPT_RESET_STREAMS", "6"); + env.insert("TEST_MAX_SEND_BUF_SIZE", "7"); + env.insert("TEST_KEEP_ALIVE_TIMEOUT", "1s"); + env.insert("TEST_KEEP_ALIVE_INTERVAL", "2s"); + env.insert("TEST_INITIAL_STREAM_WINDOW_SIZE", "1"); + env.insert("TEST_INITIAL_CONNECTION_WINDOW_SIZE", "2"); + let expected = h2::ServerParams { + flow_control: Some(h2::FlowControl::Fixed { + initial_stream_window_size: 1, + initial_connection_window_size: 2, + }), + keep_alive: Some(h2::KeepAlive { + interval: Duration::from_secs(2), + timeout: Duration::from_secs(1), + }), + max_concurrent_streams: Some(3), + max_frame_size: Some(4), + max_header_list_size: Some(5), + max_pending_accept_reset_streams: Some(6), + max_send_buf_size: Some(7), + }; + assert_eq!(parse_server(&env, "TEST").unwrap(), expected); + + // Enable adaptive flow control, overriding other flow control settings. + env.insert("TEST_ADAPTIVE_FLOW_CONTROL", "true"); + assert_eq!( + parse_server(&env, "TEST").unwrap(), + h2::ServerParams { + flow_control: Some(h2::FlowControl::Adaptive), + ..expected + } + ); + + // Clear the flow control and set adaptive to false to ensure the + // default flow control is used. + env.remove("TEST_INITIAL_STREAM_WINDOW_SIZE"); + env.remove("TEST_INITIAL_CONNECTION_WINDOW_SIZE"); + env.insert("TEST_ADAPTIVE_FLOW_CONTROL", "false"); + assert_eq!( + parse_server(&env, "TEST").unwrap(), + h2::ServerParams { + flow_control: default.flow_control, + ..expected + } + ); + } +} diff --git a/linkerd/proxy/http/src/h2.rs b/linkerd/proxy/http/src/h2.rs index abd4e02fe0..e0b0896956 100644 --- a/linkerd/proxy/http/src/h2.rs +++ b/linkerd/proxy/http/src/h2.rs @@ -16,10 +16,10 @@ use std::{ use tracing::instrument::Instrument; use tracing::{debug, debug_span, trace_span}; -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct ServerParams { pub flow_control: Option, - pub keepalive: Option, + pub keep_alive: Option, pub max_concurrent_streams: Option, // Internals @@ -29,30 +29,31 @@ pub struct ServerParams { pub max_send_buf_size: Option, } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct ClientParams { pub flow_control: Option, - pub keepalive: Option, + pub keep_alive: Option, - // Interansl + // Internals pub max_concurrent_reset_streams: Option, pub max_frame_size: Option, pub max_send_buf_size: Option, } -#[derive(Copy, Clone, Debug, Default)] +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] pub struct KeepAlive { pub interval: Duration, pub timeout: Duration, } -#[derive(Copy, Clone, Debug, Default)] +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] pub struct ClientKeepAlive { - pub keepalive: KeepAlive, + pub interval: Duration, + pub timeout: Duration, pub while_idle: bool, } -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] pub enum FlowControl { Adaptive, Fixed { @@ -119,7 +120,7 @@ where fn call(&mut self, target: T) -> Self::Future { let ClientParams { flow_control, - keepalive, + keep_alive, max_concurrent_reset_streams, max_frame_size, max_send_buf_size, @@ -152,13 +153,14 @@ where // Configure HTTP/2 PING frames if let Some(ClientKeepAlive { - keepalive: ka, + timeout, + interval, while_idle, - }) = keepalive + }) = keep_alive { builder - .http2_keep_alive_timeout(ka.timeout) - .http2_keep_alive_interval(ka.interval) + .http2_keep_alive_timeout(timeout) + .http2_keep_alive_interval(interval) .http2_keep_alive_while_idle(while_idle); } diff --git a/linkerd/proxy/http/src/server.rs b/linkerd/proxy/http/src/server.rs index 89d9abccdc..168ae61c7e 100644 --- a/linkerd/proxy/http/src/server.rs +++ b/linkerd/proxy/http/src/server.rs @@ -67,7 +67,7 @@ where drain, } = self.params.extract_param(&target); let h2::ServerParams { - keepalive, + keep_alive, flow_control, max_concurrent_streams, max_frame_size, @@ -92,9 +92,9 @@ where } // Configure HTTP/2 PING frames - if let Some(ka) = keepalive { - srv.http2_keep_alive_timeout(ka.timeout) - .http2_keep_alive_interval(ka.interval); + if let Some(h2::KeepAlive { timeout, interval }) = keep_alive { + srv.http2_keep_alive_timeout(timeout) + .http2_keep_alive_interval(interval); } srv.http2_max_concurrent_streams(max_concurrent_streams)