Skip to content

Commit a980729

Browse files
committed
chore(http): test HTTP/2 flow control exhaustion
This change adds tests that exercises the server's behavior when clients exhaust their HTTP/2 receive windows, i.e. so that the server is unable to send additional data to a client. These tests currently only document the existing behavior, but they will be extended to validate mitigation strategies as they are implemented.
1 parent edd01ad commit a980729

File tree

2 files changed

+243
-0
lines changed

2 files changed

+243
-0
lines changed

linkerd/proxy/http/src/server.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ use std::{
1313
use tower::Service;
1414
use tracing::{debug, Instrument};
1515

16+
#[cfg(test)]
17+
mod tests;
18+
1619
/// Configures HTTP server behavior.
1720
#[derive(Clone, Debug)]
1821
pub struct Params {
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
use std::vec;
2+
3+
use super::*;
4+
use bytes::Bytes;
5+
use http_body::Body;
6+
use linkerd_stack::CloneParam;
7+
use tokio::time;
8+
use tower::ServiceExt;
9+
use tower_test::mock;
10+
use tracing::info_span;
11+
12+
/// Tests how the server behaves when the client connection window is exhausted.
13+
#[tokio::test(flavor = "current_thread", start_paused = true)]
14+
async fn h2_connection_window_exhaustion() {
15+
let _trace = linkerd_tracing::test::with_default_filter(LOG_LEVEL);
16+
17+
// Setup a HTTP/2 server with consumers and producers that are mocked for
18+
// tests.
19+
const CONCURRENCY: u32 = 3;
20+
const CLIENT_STREAM_WINDOW: u32 = 65535;
21+
const CLIENT_CONN_WINDOW: u32 = CONCURRENCY * CLIENT_STREAM_WINDOW;
22+
23+
let mut server = TestServer::connect_h2(
24+
// A basic HTTP/2 server configuration with no overrides.
25+
H2Settings::default(),
26+
// An HTTP/2 client with constrained connection and stream windows to
27+
// force window exhaustion.
28+
hyper::client::conn::Builder::new()
29+
.http2_initial_connection_window_size(CLIENT_CONN_WINDOW)
30+
.http2_initial_stream_window_size(CLIENT_STREAM_WINDOW),
31+
)
32+
.await;
33+
34+
// Mocked response data to fill up the stream and connection windows.
35+
let bytes = (0..CLIENT_STREAM_WINDOW).map(|_| b'a').collect::<Bytes>();
36+
37+
let mut retain = vec![];
38+
39+
// Create a number of requests that will use up most of the connection
40+
// window, with enough capacity for only one more request.
41+
for _ in 0..CONCURRENCY - 1 {
42+
let rx = timeout(server.respond(bytes.clone()))
43+
.await
44+
.expect("timed out");
45+
retain.push(rx);
46+
}
47+
48+
// Ensure that a full request can be processed, as there is still connection window.
49+
let rx = timeout(server.respond(bytes.clone()))
50+
.await
51+
.expect("timed out");
52+
let body = timeout(rx.collect().instrument(info_span!("collect")))
53+
.await
54+
.expect("response timed out")
55+
.expect("response");
56+
assert_eq!(body.to_bytes(), bytes);
57+
58+
// Exhaust the connection window by issuing another request without
59+
// consuming its response.
60+
let rx = timeout(server.respond(bytes.clone()))
61+
.await
62+
.expect("timed out");
63+
retain.push(rx);
64+
65+
// Issue another request and try to consume its response body. This will
66+
// block indefinitely, since the connection window is exhausted.
67+
let mut rx = timeout(server.respond(bytes.clone()))
68+
.await
69+
.expect("timed out");
70+
tokio::select! {
71+
_ = time::sleep(time::Duration::from_secs(2)) => {}
72+
_ = rx.data() => panic!("unexpected data"),
73+
}
74+
75+
// Release some pending connection window by dropping a response body.
76+
// We should then be able to continue reading the pending response data.
77+
drop(retain.pop());
78+
let body = timeout(rx.collect().instrument(info_span!("collect")))
79+
.await
80+
.expect("response timed out")
81+
.expect("response");
82+
assert_eq!(body.to_bytes(), bytes);
83+
}
84+
85+
/// Tests how the server behaves when the client stream window is exhausted.
86+
#[tokio::test(flavor = "current_thread", start_paused = true)]
87+
async fn h2_stream_window_exhaustion() {
88+
let _trace = linkerd_tracing::test::with_default_filter(LOG_LEVEL);
89+
90+
// Setup a HTTP/2 server with consumers and producers that are mocked for
91+
// tests.
92+
const CLIENT_STREAM_WINDOW: u32 = 1024;
93+
94+
let mut server = TestServer::connect_h2(
95+
// A basic HTTP/2 server configuration with no overrides.
96+
H2Settings::default(),
97+
// An HTTP/2 client with stream windows to force window exhaustion.
98+
hyper::client::conn::Builder::new().http2_initial_stream_window_size(CLIENT_STREAM_WINDOW),
99+
)
100+
.await;
101+
102+
let (mut tx, mut body) = timeout(server.get()).await.expect("timed out");
103+
104+
let chunk = (0..CLIENT_STREAM_WINDOW).map(|_| b'a').collect::<Bytes>();
105+
tracing::info!(sz = chunk.len(), "Sending chunk");
106+
tx.try_send_data(chunk.clone()).expect("send data");
107+
tokio::task::yield_now().await;
108+
109+
tracing::info!(sz = chunk.len(), "Buffering chunk in channel");
110+
tx.try_send_data(chunk.clone()).expect("send data");
111+
tokio::task::yield_now().await;
112+
113+
tracing::info!(sz = chunk.len(), "Confirming stream window exhaustion");
114+
assert!(
115+
timeout(futures::future::poll_fn(|cx| tx.poll_ready(cx)))
116+
.await
117+
.is_err(),
118+
"stream window should be exhausted"
119+
);
120+
121+
tracing::info!("Once the pending data is read, the stream window should be replenished");
122+
let data = body.data().await.expect("data").expect("data");
123+
assert_eq!(data, chunk);
124+
125+
let data = body.data().await.expect("data").expect("data");
126+
assert_eq!(data, chunk);
127+
128+
timeout(body.data()).await.expect_err("no more chunks");
129+
130+
tracing::info!(sz = chunk.len(), "Confirming stream window availability");
131+
timeout(futures::future::poll_fn(|cx| tx.poll_ready(cx)))
132+
.await
133+
.expect("timed out")
134+
.expect("ready");
135+
}
136+
137+
// === Utilities ===
138+
139+
const LOG_LEVEL: &str = "h2::proto=trace,hyper=trace,linkerd=trace,info";
140+
141+
struct TestServer {
142+
client: hyper::client::conn::SendRequest<BoxBody>,
143+
server: Handle,
144+
}
145+
146+
type Mock = mock::Mock<http::Request<BoxBody>, http::Response<BoxBody>>;
147+
type Handle = mock::Handle<http::Request<BoxBody>, http::Response<BoxBody>>;
148+
149+
/// Allows us to configure a server from the Params type.
150+
#[derive(Clone, Debug)]
151+
struct NewMock(mock::Mock<http::Request<BoxBody>, http::Response<BoxBody>>);
152+
153+
impl NewService<()> for NewMock {
154+
type Service = NewMock;
155+
fn new_service(&self, _: ()) -> Self::Service {
156+
self.clone()
157+
}
158+
}
159+
160+
impl NewService<ClientHandle> for NewMock {
161+
type Service = Mock;
162+
fn new_service(&self, _: ClientHandle) -> Self::Service {
163+
self.0.clone()
164+
}
165+
}
166+
167+
fn drain() -> drain::Watch {
168+
let (mut sig, drain) = drain::channel();
169+
tokio::spawn(async move {
170+
sig.closed().await;
171+
});
172+
drain
173+
}
174+
175+
async fn timeout<F: Future>(inner: F) -> Result<F::Output, time::error::Elapsed> {
176+
time::timeout(time::Duration::from_secs(2), inner).await
177+
}
178+
179+
impl TestServer {
180+
#[tracing::instrument(skip_all)]
181+
async fn connect(params: Params, client: &mut hyper::client::conn::Builder) -> Self {
182+
// Build the HTTP server with a mocked inner service so that we can handle
183+
// requests.
184+
let (mock, server) = mock::pair();
185+
let svc = NewServeHttp::new(CloneParam::from(params), NewMock(mock)).new_service(());
186+
187+
let (sio, cio) = io::duplex(20 * 1024 * 1024); // 20 MB
188+
tokio::spawn(svc.oneshot(sio).instrument(info_span!("server")));
189+
190+
// Build a real HTTP/2 client using the mocked socket.
191+
let (client, task) = client
192+
.executor(crate::executor::TracingExecutor)
193+
.handshake::<_, BoxBody>(cio)
194+
.await
195+
.expect("client connect");
196+
tokio::spawn(task.instrument(info_span!("client")));
197+
198+
Self { client, server }
199+
}
200+
201+
async fn connect_h2(h2: H2Settings, client: &mut hyper::client::conn::Builder) -> Self {
202+
Self::connect(
203+
// A basic HTTP/2 server configuration with no overrides.
204+
Params {
205+
drain: drain(),
206+
version: Version::H2,
207+
h2,
208+
},
209+
// An HTTP/2 client with constrained connection and stream windows to accomodate
210+
client.http2_only(true),
211+
)
212+
.await
213+
}
214+
215+
/// Issues a request through the client to the mocked server and processes the
216+
/// response. The mocked response body sender and the readable response body are
217+
/// returned.
218+
#[tracing::instrument(skip(self))]
219+
async fn get(&mut self) -> (hyper::body::Sender, hyper::Body) {
220+
self.server.allow(1);
221+
let mut call0 = self
222+
.client
223+
.send_request(http::Request::new(BoxBody::default()));
224+
let (_req, next) = tokio::select! {
225+
_ = (&mut call0) => unreachable!("client cannot receive a response"),
226+
next = self.server.next_request() => next.expect("server not dropped"),
227+
};
228+
let (tx, rx) = hyper::Body::channel();
229+
next.send_response(http::Response::new(BoxBody::new(rx)));
230+
let rsp = call0.await.expect("response");
231+
(tx, rsp.into_body())
232+
}
233+
234+
#[tracing::instrument(skip(self))]
235+
async fn respond(&mut self, body: Bytes) -> hyper::Body {
236+
let (mut tx, rx) = self.get().await;
237+
tx.send_data(body.clone()).await.expect("send data");
238+
rx
239+
}
240+
}

0 commit comments

Comments
 (0)