From b730382e61050b5709bb1e15f06837853bafc0d7 Mon Sep 17 00:00:00 2001 From: "Herman J. Radtke III" Date: Sat, 13 Apr 2024 14:52:46 -0400 Subject: [PATCH 1/8] feat(http1): add support for receiving trailer fields Closes #2703 --- src/body/incoming.rs | 13 ++ src/proto/h1/conn.rs | 51 ++++--- src/proto/h1/decode.rs | 288 ++++++++++++++++++++++++++++++++++----- src/proto/h1/dispatch.rs | 40 ++++-- tests/client.rs | 115 +++++++++++++++- tests/server.rs | 27 +++- 6 files changed, 467 insertions(+), 67 deletions(-) diff --git a/src/body/incoming.rs b/src/body/incoming.rs index e0beba6108..b7d1591cc4 100644 --- a/src/body/incoming.rs +++ b/src/body/incoming.rs @@ -403,6 +403,19 @@ impl Sender { .map_err(|err| err.into_inner().expect("just sent Ok")) } + #[cfg(feature = "http1")] + pub(crate) fn try_send_trailers( + &mut self, + trailers: HeaderMap, + ) -> Result<(), Option> { + let tx = match self.trailers_tx.take() { + Some(tx) => tx, + None => return Err(None), + }; + + tx.send(trailers).map_err(|err| Some(err)) + } + #[cfg(test)] pub(crate) fn abort(mut self) { self.send_error(crate::Error::new_body_write_aborted()); diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index f880f97dcb..7c12e22d2f 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -11,6 +11,7 @@ use bytes::{Buf, Bytes}; use futures_util::ready; use http::header::{HeaderValue, CONNECTION, TE}; use http::{HeaderMap, Method, Version}; +use http_body::Frame; use httparse::ParserConfig; use super::io::Buffered; @@ -311,33 +312,41 @@ where pub(crate) fn poll_read_body( &mut self, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>>> { debug_assert!(self.can_read_body()); let (reading, ret) = match self.state.reading { Reading::Body(ref mut decoder) => { match ready!(decoder.decode(cx, &mut self.io)) { - Ok(slice) => { - let (reading, chunk) = if decoder.is_eof() { - debug!("incoming body completed"); - ( - Reading::KeepAlive, - if !slice.is_empty() { - Some(Ok(slice)) - } else { - None - }, - ) - } else if slice.is_empty() { - error!("incoming body unexpectedly ended"); - // This should be unreachable, since all 3 decoders - // either set eof=true or return an Err when reading - // an empty slice... - (Reading::Closed, None) + Ok(frame) => { + if frame.is_data() { + let slice = frame.data_ref().unwrap_or_else(|| unreachable!()); + let (reading, maybe_frame) = if decoder.is_eof() { + debug!("incoming body completed"); + ( + Reading::KeepAlive, + if !slice.is_empty() { + Some(Ok(frame)) + } else { + None + }, + ) + } else if slice.is_empty() { + error!("incoming body unexpectedly ended"); + // This should be unreachable, since all 3 decoders + // either set eof=true or return an Err when reading + // an empty slice... + (Reading::Closed, None) + } else { + return Poll::Ready(Some(Ok(frame))); + }; + (reading, Poll::Ready(maybe_frame)) + } else if frame.is_trailers() { + (Reading::Closed, Poll::Ready(Some(Ok(frame)))) } else { - return Poll::Ready(Some(Ok(slice))); - }; - (reading, Poll::Ready(chunk)) + trace!("discarding unknown frame"); + (Reading::Closed, Poll::Ready(None)) + } } Err(e) => { debug!("incoming body decode error: {}", e); diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index 05f3cc6e60..226d18d3a7 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -4,8 +4,10 @@ use std::io; use std::task::{Context, Poll}; use std::usize; -use bytes::Bytes; +use bytes::{BufMut, Bytes, BytesMut}; use futures_util::ready; +use http::{HeaderMap, HeaderName, HeaderValue}; +use http_body::Frame; use super::io::MemRead; use super::DecodedLength; @@ -26,7 +28,8 @@ pub(crate) struct Decoder { kind: Kind, } -#[derive(Debug, Clone, Copy, PartialEq)] +// FIXME: can we keep Copy trait? +#[derive(Debug, Clone, PartialEq)] enum Kind { /// A Reader used when a Content-Length header is passed with a positive integer. Length(u64), @@ -35,6 +38,8 @@ enum Kind { state: ChunkedState, chunk_len: u64, extensions_cnt: u64, + trailers_buf: Option, + trailers_cnt: u64, }, /// A Reader used for responses that don't indicate a length or chunked. /// @@ -87,6 +92,8 @@ impl Decoder { state: ChunkedState::new(), chunk_len: 0, extensions_cnt: 0, + trailers_buf: None, + trailers_cnt: 0, }, } } @@ -123,12 +130,12 @@ impl Decoder { &mut self, cx: &mut Context<'_>, body: &mut R, - ) -> Poll> { + ) -> Poll, io::Error>> { trace!("decode; state={:?}", self.kind); match self.kind { Length(ref mut remaining) => { if *remaining == 0 { - Poll::Ready(Ok(Bytes::new())) + Poll::Ready(Ok(Frame::data(Bytes::new()))) } else { let to_read = *remaining as usize; let buf = ready!(body.read_mem(cx, to_read))?; @@ -143,37 +150,64 @@ impl Decoder { } else { *remaining -= num; } - Poll::Ready(Ok(buf)) + Poll::Ready(Ok(Frame::data(buf))) } } Chunked { ref mut state, ref mut chunk_len, ref mut extensions_cnt, + ref mut trailers_buf, + ref mut trailers_cnt, } => { loop { let mut buf = None; // advances the chunked state - *state = ready!(state.step(cx, body, chunk_len, extensions_cnt, &mut buf))?; + *state = ready!(state.step( + cx, + body, + chunk_len, + extensions_cnt, + &mut buf, + trailers_buf, + trailers_cnt, + ))?; if *state == ChunkedState::End { trace!("end of chunked"); - return Poll::Ready(Ok(Bytes::new())); + + if trailers_buf.is_some() { + trace!("found possible trailers"); + match decode_trailers( + &mut trailers_buf.take().expect("Trailer is None"), + // decoder enforces that trailers count will not exceed usize::MAX + *trailers_cnt as usize, + ) { + Ok(headers) => { + return Poll::Ready(Ok(Frame::trailers(headers))); + } + Err(e) => { + return Poll::Ready(Err(e)); + } + } + } + + return Poll::Ready(Ok(Frame::data(Bytes::new()))); } if let Some(buf) = buf { - return Poll::Ready(Ok(buf)); + return Poll::Ready(Ok(Frame::data(buf))); } } } Eof(ref mut is_eof) => { if *is_eof { - Poll::Ready(Ok(Bytes::new())) + Poll::Ready(Ok(Frame::data(Bytes::new()))) } else { // 8192 chosen because its about 2 packets, there probably // won't be that much available, so don't have MemReaders // allocate buffers to big body.read_mem(cx, 8192).map_ok(|slice| { *is_eof = slice.is_empty(); - slice + Frame::data(slice) }) } } @@ -181,7 +215,7 @@ impl Decoder { } #[cfg(test)] - async fn decode_fut(&mut self, body: &mut R) -> Result { + async fn decode_fut(&mut self, body: &mut R) -> Result, io::Error> { futures_util::future::poll_fn(move |cx| self.decode(cx, body)).await } } @@ -227,6 +261,8 @@ impl ChunkedState { size: &mut u64, extensions_cnt: &mut u64, buf: &mut Option, + trailers_buf: &mut Option, + trailers_cnt: &mut u64, ) -> Poll> { use self::ChunkedState::*; match *self { @@ -238,10 +274,10 @@ impl ChunkedState { Body => ChunkedState::read_body(cx, body, size, buf), BodyCr => ChunkedState::read_body_cr(cx, body), BodyLf => ChunkedState::read_body_lf(cx, body), - Trailer => ChunkedState::read_trailer(cx, body), - TrailerLf => ChunkedState::read_trailer_lf(cx, body), - EndCr => ChunkedState::read_end_cr(cx, body), - EndLf => ChunkedState::read_end_lf(cx, body), + Trailer => ChunkedState::read_trailer(cx, body, trailers_buf), + TrailerLf => ChunkedState::read_trailer_lf(cx, body, trailers_buf, trailers_cnt), + EndCr => ChunkedState::read_end_cr(cx, body, trailers_buf), + EndLf => ChunkedState::read_end_lf(cx, body, trailers_buf), End => Poll::Ready(Ok(ChunkedState::End)), } } @@ -442,19 +478,46 @@ impl ChunkedState { fn read_trailer( cx: &mut Context<'_>, rdr: &mut R, + trailers_buf: &mut Option, ) -> Poll> { trace!("read_trailer"); - match byte!(rdr, cx) { + let byte = byte!(rdr, cx); + + trailers_buf + .as_mut() + .expect("trailers_buf is None") + .put_u8(byte); + + match byte { b'\r' => Poll::Ready(Ok(ChunkedState::TrailerLf)), _ => Poll::Ready(Ok(ChunkedState::Trailer)), } } + fn read_trailer_lf( cx: &mut Context<'_>, rdr: &mut R, + trailers_buf: &mut Option, + trailers_cnt: &mut u64, ) -> Poll> { - match byte!(rdr, cx) { - b'\n' => Poll::Ready(Ok(ChunkedState::EndCr)), + let byte = byte!(rdr, cx); + match byte { + b'\n' => { + if *trailers_cnt == usize::MAX as u64 { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidData, + "chunk trailers count overflow", + ))); + } + *trailers_cnt += 1; + + trailers_buf + .as_mut() + .expect("trailers_buf is None") + .put_u8(byte); + + Poll::Ready(Ok(ChunkedState::EndCr)) + } _ => Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidInput, "Invalid trailer end LF", @@ -465,18 +528,46 @@ impl ChunkedState { fn read_end_cr( cx: &mut Context<'_>, rdr: &mut R, + trailers_buf: &mut Option, ) -> Poll> { - match byte!(rdr, cx) { - b'\r' => Poll::Ready(Ok(ChunkedState::EndLf)), - _ => Poll::Ready(Ok(ChunkedState::Trailer)), + let byte = byte!(rdr, cx); + match byte { + b'\r' => { + if let Some(trailers_buf) = trailers_buf { + trailers_buf.put_u8(byte); + } + Poll::Ready(Ok(ChunkedState::EndLf)) + } + byte => { + match trailers_buf { + None => { + // 64 will fit a single Expires header without reallocating + let mut buf = BytesMut::with_capacity(64); + buf.put_u8(byte); + *trailers_buf = Some(buf); + } + Some(ref mut trailers_buf) => { + trailers_buf.put_u8(byte); + } + } + + Poll::Ready(Ok(ChunkedState::Trailer)) + } } } fn read_end_lf( cx: &mut Context<'_>, rdr: &mut R, + trailers_buf: &mut Option, ) -> Poll> { - match byte!(rdr, cx) { - b'\n' => Poll::Ready(Ok(ChunkedState::End)), + let byte = byte!(rdr, cx); + match byte { + b'\n' => { + if let Some(trailers_buf) = trailers_buf { + trailers_buf.put_u8(byte); + } + Poll::Ready(Ok(ChunkedState::End)) + } _ => Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidInput, "Invalid chunk end LF", @@ -485,6 +576,48 @@ impl ChunkedState { } } +// TODO: disallow Transfer-Encoding, Content-Length, Trailer, etc in trailers ?? +fn decode_trailers(buf: &mut BytesMut, count: usize) -> Result { + let mut trailers = HeaderMap::new(); + let mut headers = vec![httparse::EMPTY_HEADER; count]; + let res = httparse::parse_headers(&buf, &mut headers); + match res { + Ok(httparse::Status::Complete((_, headers))) => { + for header in headers.iter() { + use std::convert::TryFrom; + let name = match HeaderName::try_from(header.name) { + Ok(name) => name, + Err(_) => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("Invalid header name: {:?}", &header), + )); + } + }; + + let value = match HeaderValue::from_bytes(header.value) { + Ok(value) => value, + Err(_) => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("Invalid header value: {:?}", &header), + )); + } + }; + + trailers.insert(name, value); + } + + Ok(trailers) + } + Ok(httparse::Status::Partial) => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Partial header", + )), + Err(e) => Err(io::Error::new(io::ErrorKind::InvalidInput, e)), + } +} + #[derive(Debug)] struct IncompleteBody; @@ -554,9 +687,18 @@ mod tests { let rdr = &mut s.as_bytes(); let mut size = 0; let mut ext_cnt = 0; + let mut trailers_cnt = 0; loop { let result = futures_util::future::poll_fn(|cx| { - state.step(cx, rdr, &mut size, &mut ext_cnt, &mut None) + state.step( + cx, + rdr, + &mut size, + &mut ext_cnt, + &mut None, + &mut None, + &mut trailers_cnt, + ) }) .await; let desc = format!("read_size failed for {:?}", s); @@ -573,9 +715,18 @@ mod tests { let rdr = &mut s.as_bytes(); let mut size = 0; let mut ext_cnt = 0; + let mut trailers_cnt = 0; loop { let result = futures_util::future::poll_fn(|cx| { - state.step(cx, rdr, &mut size, &mut ext_cnt, &mut None) + state.step( + cx, + rdr, + &mut size, + &mut ext_cnt, + &mut None, + &mut None, + &mut trailers_cnt, + ) }) .await; state = match result { @@ -639,7 +790,16 @@ mod tests { async fn test_read_sized_early_eof() { let mut bytes = &b"foo bar"[..]; let mut decoder = Decoder::length(10); - assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7); + assert_eq!( + decoder + .decode_fut(&mut bytes) + .await + .unwrap() + .data_ref() + .unwrap() + .len(), + 7 + ); let e = decoder.decode_fut(&mut bytes).await.unwrap_err(); assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); } @@ -652,7 +812,16 @@ mod tests { foo bar\ "[..]; let mut decoder = Decoder::chunked(); - assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7); + assert_eq!( + decoder + .decode_fut(&mut bytes) + .await + .unwrap() + .data_ref() + .unwrap() + .len(), + 7 + ); let e = decoder.decode_fut(&mut bytes).await.unwrap_err(); assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); } @@ -664,7 +833,9 @@ mod tests { let buf = Decoder::chunked() .decode_fut(&mut mock_buf) .await - .expect("decode"); + .expect("decode") + .into_data() + .expect("unknown frame type"); assert_eq!(16, buf.len()); let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String"); assert_eq!("1234567890abcdef", &result); @@ -685,7 +856,12 @@ mod tests { let mut mock_buf = Bytes::from(scratch); let mut decoder = Decoder::chunked(); - let buf1 = decoder.decode_fut(&mut mock_buf).await.expect("decode1"); + let buf1 = decoder + .decode_fut(&mut mock_buf) + .await + .expect("decode1") + .into_data() + .expect("unknown frame type"); assert_eq!(&buf1[..], b"A"); let err = decoder @@ -713,17 +889,32 @@ mod tests { let mut decoder = Decoder::chunked(); // normal read - let buf = decoder.decode_fut(&mut mock_buf).await.unwrap(); + let buf = decoder + .decode_fut(&mut mock_buf) + .await + .unwrap() + .into_data() + .expect("unknown frame type"); assert_eq!(16, buf.len()); let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String"); assert_eq!("1234567890abcdef", &result); // eof read - let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode"); + let buf = decoder + .decode_fut(&mut mock_buf) + .await + .expect("decode") + .into_data() + .expect("unknown frame type"); assert_eq!(0, buf.len()); // ensure read after eof also returns eof - let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode"); + let buf = decoder + .decode_fut(&mut mock_buf) + .await + .expect("decode") + .into_data() + .expect("unknown frame type"); assert_eq!(0, buf.len()); } @@ -751,7 +942,9 @@ mod tests { let buf = decoder .decode_fut(&mut ins) .await - .expect("unexpected decode error"); + .expect("unexpected decode error") + .into_data() + .expect("unexpected frame type"); if buf.is_empty() { break; // eof } @@ -811,7 +1004,12 @@ mod tests { let mut decoder = Decoder::chunked(); rt.block_on(async { let mut raw = content.clone(); - let chunk = decoder.decode_fut(&mut raw).await.unwrap(); + let chunk = decoder + .decode_fut(&mut raw) + .await + .unwrap() + .into_data() + .unwrap(); assert_eq!(chunk.len(), LEN); }); }); @@ -830,7 +1028,12 @@ mod tests { let mut decoder = Decoder::length(LEN as u64); rt.block_on(async { let mut raw = content.clone(); - let chunk = decoder.decode_fut(&mut raw).await.unwrap(); + let chunk = decoder + .decode_fut(&mut raw) + .await + .unwrap() + .into_data() + .unwrap(); assert_eq!(chunk.len(), LEN); }); }); @@ -843,4 +1046,19 @@ mod tests { .build() .expect("rt build") } + + #[test] + fn decode_trailers_test() { + let mut buf = BytesMut::new(); + buf.extend_from_slice( + b"Expires: Wed, 21 Oct 2015 07:28:00 GMT\r\nX-Stream-Error: failed to decode\r\n\r\n", + ); + let headers = decode_trailers(&mut buf, 2).expect("decode_trailers"); + assert_eq!(headers.len(), 2); + assert_eq!( + headers.get("Expires").unwrap(), + "Wed, 21 Oct 2015 07:28:00 GMT" + ); + assert_eq!(headers.get("X-Stream-Error").unwrap(), "failed to decode"); + } } diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 3a4faf0487..be0a88c1f9 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -213,17 +213,39 @@ where } } match self.conn.poll_read_body(cx) { - Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) { - Ok(()) => { - self.body_tx = Some(body); - } - Err(_canceled) => { - if self.conn.can_read_body() { - trace!("body receiver dropped before eof, closing"); - self.conn.close_read(); + Poll::Ready(Some(Ok(frame))) => { + if frame.is_data() { + let chunk = frame.into_data().unwrap_or_else(|_| unreachable!()); + match body.try_send_data(chunk) { + Ok(()) => { + self.body_tx = Some(body); + } + Err(_canceled) => { + if self.conn.can_read_body() { + trace!("body receiver dropped before eof, closing"); + self.conn.close_read(); + } + } + } + } else if frame.is_trailers() { + let trailers = + frame.into_trailers().unwrap_or_else(|_| unreachable!()); + match body.try_send_trailers(trailers) { + Ok(()) => { + self.body_tx = Some(body); + } + Err(_canceled) => { + if self.conn.can_read_body() { + trace!("body receiver dropped before eof, closing"); + self.conn.close_read(); + } + } } + } else { + // we should have dropped all unknown frames in poll_read_body + error!("unexpected frame"); } - }, + } Poll::Ready(None) => { // just drop, the body will close automatically } diff --git a/tests/client.rs b/tests/client.rs index 43e1f08acb..e262677573 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -34,6 +34,17 @@ where b.collect().await.map(|c| c.to_bytes()) } +async fn concat_with_trailers(b: B) -> Result<(Bytes, Option), B::Error> +where + B: hyper::body::Body, +{ + let collect = b.collect().await?; + let trailers = collect.trailers().cloned(); + let bytes = collect.to_bytes(); + + Ok((bytes, trailers)) +} + async fn tcp_connect(addr: &SocketAddr) -> std::io::Result> { TcpStream::connect(*addr).await.map(TokioIo::new) } @@ -122,6 +133,9 @@ macro_rules! test { status: $client_status:ident, headers: { $($response_header_name:expr => $response_header_val:expr,)* }, body: $response_body:expr, + $(trailers: {$( + $response_trailer_name:expr => $response_trailer_val:expr, + )*},)? ) => ( #[test] fn $name() { @@ -158,12 +172,23 @@ macro_rules! test { ); )* - let body = rt.block_on(concat(res)) + let (body, _trailers) = rt.block_on(concat_with_trailers(res)) .expect("body concat wait"); let expected_res_body = Option::<&[u8]>::from($response_body) .unwrap_or_default(); assert_eq!(body.as_ref(), expected_res_body); + + $($( + assert_eq!( + _trailers.as_ref().expect("trailers is None") + .get($response_trailer_name) + .expect(concat!("trailer header '", stringify!($response_trailer_name), "'")), + $response_trailer_val, + "trailer '{}'", + stringify!($response_trailer_name), + ); + )*)? } ); ( @@ -679,6 +704,94 @@ test! { body: None, } +test! { + name: client_res_body_chunked_with_trailer, + + server: + expected: "GET / HTTP/1.1\r\nte: trailers\r\nhost: {addr}\r\n\r\n", + reply: "\ + HTTP/1.1 200 OK\r\n\ + transfer-encoding: chunked\r\n\ + trailer: chunky-trailer\r\n\ + \r\n\ + 5\r\n\ + hello\r\n\ + 0\r\n\ + chunky-trailer: header data\r\n\ + \r\n\ + ", + + client: + request: { + method: GET, + url: "http://{addr}/", + headers: { + "te" => "trailers", + }, + }, + response: + status: OK, + headers: { + "Transfer-Encoding" => "chunked", + }, + body: &b"hello"[..], + trailers: { + "chunky-trailer" => "header data", + }, +} + +test! { + name: client_res_body_chunked_with_pathological_trailers, + + server: + expected: "GET / HTTP/1.1\r\nte: trailers\r\nhost: {addr}\r\n\r\n", + reply: "\ + HTTP/1.1 200 OK\r\n\ + transfer-encoding: chunked\r\n\ + trailer: chunky-trailer1, chunky-trailer2, chunky-trailer3, chunky-trailer4, chunky-trailer5\r\n\ + \r\n\ + 5\r\n\ + hello\r\n\ + 0\r\n\ + chunky-trailer1: header data1\r\n\ + chunky-trailer2: header data2\r\n\ + chunky-trailer3: header data3\r\n\ + chunky-trailer4: header data4\r\n\ + chunky-trailer5: header data5\r\n\ + sneaky-trailer: i should not be sent\r\n\ + transfer-encoding: chunked\r\n\ + content-length: 5\r\n\ + trailer: foo\r\n\ + \r\n\ + ", + + client: + request: { + method: GET, + url: "http://{addr}/", + headers: { + "te" => "trailers", + }, + }, + response: + status: OK, + headers: { + "Transfer-Encoding" => "chunked", + }, + body: &b"hello"[..], + trailers: { + "chunky-trailer1" => "header data1", + "chunky-trailer2" => "header data2", + "chunky-trailer3" => "header data3", + "chunky-trailer4" => "header data4", + "chunky-trailer5" => "header data5", + "sneaky-trailer" => "i should not be sent", + "transfer-encoding" => "chunked", + "content-length" => "5", + "trailer" => "foo", + }, +} + test! { name: client_get_req_body_sized, diff --git a/tests/server.rs b/tests/server.rs index 661b98d180..1f17272765 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -2656,7 +2656,7 @@ async fn http2_keep_alive_count_server_pings() { } #[test] -fn http1_trailer_fields() { +fn http1_trailer_send_fields() { let body = futures_util::stream::once(async move { Ok("hello".into()) }); let mut headers = HeaderMap::new(); headers.insert("chunky-trailer", "header data".parse().unwrap()); @@ -2743,6 +2743,31 @@ fn http1_trailer_fields_not_allowed() { assert_eq!(body, expected_body); } +#[test] +fn http1_trailer_recv_fields() { + let server = serve(); + let mut req = connect(server.addr()); + req.write_all( + b"\ + POST / HTTP/1.1\r\n\ + trailer: chunky-trailer\r\n\ + host: example.domain\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + 5\r\n\ + hello\r\n\ + 0\r\n\ + chunky-trailer: header data\r\n\ + \r\n\ + ", + ) + .expect("writing"); + + assert_eq!(server.body(), b"hello"); + + // FIXME: add support for checking trailers that server received +} + // ------------------------------------------------- // the Server that is used to run all the tests with // ------------------------------------------------- From 1d5eaf4979628d51a274c560bbab88e21864b3ed Mon Sep 17 00:00:00 2001 From: "Herman J. Radtke III" Date: Wed, 17 Apr 2024 06:18:15 -0400 Subject: [PATCH 2/8] fix(http1): add server test for recv trailers --- tests/server.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tests/server.rs b/tests/server.rs index 1f17272765..24df55d378 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -2765,7 +2765,11 @@ fn http1_trailer_recv_fields() { assert_eq!(server.body(), b"hello"); - // FIXME: add support for checking trailers that server received + let trailers = server.trailers(); + assert_eq!( + trailers.get("chunky-trailer"), + Some(&"header data".parse().unwrap()) + ); } // ------------------------------------------------- @@ -2775,6 +2779,7 @@ fn http1_trailer_recv_fields() { struct Serve { addr: SocketAddr, msg_rx: mpsc::Receiver, + trailers_rx: mpsc::Receiver, reply_tx: Mutex>, shutdown_signal: Option>, thread: Option>, @@ -2808,6 +2813,10 @@ impl Serve { Ok(buf) } + fn trailers(&self) -> HeaderMap { + self.trailers_rx.recv().expect("trailers") + } + fn reply(&self) -> ReplyBuilder<'_> { ReplyBuilder { tx: &self.reply_tx } } @@ -2921,6 +2930,7 @@ impl Drop for Serve { #[derive(Clone)] struct TestService { tx: mpsc::Sender, + trailers_tx: mpsc::Sender, reply: spmc::Receiver, } @@ -2951,6 +2961,7 @@ impl Service> for TestService { fn call(&self, mut req: Request) -> Self::Future { let tx = self.tx.clone(); + let trailers_tx = self.trailers_tx.clone(); let replies = self.reply.clone(); Box::pin(async move { @@ -2960,6 +2971,9 @@ impl Service> for TestService { if frame.is_data() { tx.send(Msg::Chunk(frame.into_data().unwrap().to_vec())) .unwrap(); + } else if frame.is_trailers() { + let trailers = frame.into_trailers().unwrap(); + trailers_tx.send(trailers).unwrap(); } } Err(err) => { @@ -3088,6 +3102,7 @@ impl ServeOptions { let (addr_tx, addr_rx) = mpsc::channel(); let (msg_tx, msg_rx) = mpsc::channel(); + let (trailers_tx, trailers_rx) = mpsc::channel(); let (reply_tx, reply_rx) = spmc::channel(); let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); @@ -3111,6 +3126,7 @@ impl ServeOptions { loop { let msg_tx = msg_tx.clone(); + let trailers_tx = trailers_tx.clone(); let reply_rx = reply_rx.clone(); tokio::select! { @@ -3123,6 +3139,7 @@ impl ServeOptions { let reply_rx = reply_rx.clone(); let service = TestService { tx: msg_tx, + trailers_tx, reply: reply_rx, }; @@ -3150,6 +3167,7 @@ impl ServeOptions { Serve { msg_rx, + trailers_rx, reply_tx: Mutex::new(reply_tx), addr, shutdown_signal: Some(shutdown_tx), From cf15b338e5416d0d9437899d62d5e761508c4fe0 Mon Sep 17 00:00:00 2001 From: "Herman J. Radtke III" Date: Wed, 17 Apr 2024 06:19:33 -0400 Subject: [PATCH 3/8] fix(http1): clarify trailers client test --- tests/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/client.rs b/tests/client.rs index e262677573..84c3290caf 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -758,7 +758,7 @@ test! { chunky-trailer3: header data3\r\n\ chunky-trailer4: header data4\r\n\ chunky-trailer5: header data5\r\n\ - sneaky-trailer: i should not be sent\r\n\ + sneaky-trailer: not in trailer header\r\n\ transfer-encoding: chunked\r\n\ content-length: 5\r\n\ trailer: foo\r\n\ From 9785bfa78611e70e111f16940bb171e325799dd5 Mon Sep 17 00:00:00 2001 From: "Herman J. Radtke III" Date: Wed, 17 Apr 2024 08:56:48 -0400 Subject: [PATCH 4/8] fix(http1): fix broken test --- tests/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/client.rs b/tests/client.rs index 84c3290caf..6c9d3d7587 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -785,7 +785,7 @@ test! { "chunky-trailer3" => "header data3", "chunky-trailer4" => "header data4", "chunky-trailer5" => "header data5", - "sneaky-trailer" => "i should not be sent", + "sneaky-trailer" => "not in trailer header", "transfer-encoding" => "chunked", "content-length" => "5", "trailer" => "foo", From 545f8c48d3da589cf7a4154aa6433db1bbe07076 Mon Sep 17 00:00:00 2001 From: "Herman J. Radtke III" Date: Wed, 24 Apr 2024 06:53:03 -0400 Subject: [PATCH 5/8] fix(http1): improve trailer limit The trailer limit is now 1024 instead of usize::MAX. There is also a test proving that the trailer limit is respected. --- src/proto/h1/decode.rs | 48 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index 226d18d3a7..16a45cc797 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -19,6 +19,9 @@ use self::Kind::{Chunked, Eof, Length}; /// This limit is currentlty applied for the entire body, not per chunk. const CHUNKED_EXTENSIONS_LIMIT: u64 = 1024 * 16; +/// Maximum number of trailers allowed in a chunked body. +const TRAILERS_LIMIT: u64 = 1024; + /// Decoders to handle different Transfer-Encodings. /// /// If a message body does not include a Transfer-Encoding, it *should* @@ -28,7 +31,6 @@ pub(crate) struct Decoder { kind: Kind, } -// FIXME: can we keep Copy trait? #[derive(Debug, Clone, PartialEq)] enum Kind { /// A Reader used when a Content-Length header is passed with a positive integer. @@ -177,9 +179,18 @@ impl Decoder { if trailers_buf.is_some() { trace!("found possible trailers"); + + // decoder enforces that trailers count will not exceed TRAILERS_LIMIT + // which is also less than usize::MAX + if *trailers_cnt >= TRAILERS_LIMIT || *trailers_cnt > usize::MAX as u64 + { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidData, + "chunk trailers count overflow", + ))); + } match decode_trailers( &mut trailers_buf.take().expect("Trailer is None"), - // decoder enforces that trailers count will not exceed usize::MAX *trailers_cnt as usize, ) { Ok(headers) => { @@ -503,7 +514,7 @@ impl ChunkedState { let byte = byte!(rdr, cx); match byte { b'\n' => { - if *trailers_cnt == usize::MAX as u64 { + if *trailers_cnt == TRAILERS_LIMIT { return Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidData, "chunk trailers count overflow", @@ -1048,7 +1059,7 @@ mod tests { } #[test] - fn decode_trailers_test() { + fn test_decode_trailers() { let mut buf = BytesMut::new(); buf.extend_from_slice( b"Expires: Wed, 21 Oct 2015 07:28:00 GMT\r\nX-Stream-Error: failed to decode\r\n\r\n", @@ -1061,4 +1072,33 @@ mod tests { ); assert_eq!(headers.get("X-Stream-Error").unwrap(), "failed to decode"); } + + #[tokio::test] + async fn test_trailer_limit_enforced() { + let mut scratch = vec![]; + scratch.extend(b"10\r\n1234567890abcdef\r\n0\r\n"); + for i in 0..=TRAILERS_LIMIT { + scratch.extend(format!("trailer{}: {}\r\n", i, i).as_bytes()); + } + scratch.extend(b"\r\n"); + let mut mock_buf = Bytes::from(scratch); + + let mut decoder = Decoder::chunked(); + + // ready chunked body + let buf = decoder + .decode_fut(&mut mock_buf) + .await + .unwrap() + .into_data() + .expect("unknown frame type"); + assert_eq!(16, buf.len()); + + // eof read + let err = decoder + .decode_fut(&mut mock_buf) + .await + .expect_err("trailers over limit"); + assert_eq!(err.kind(), io::ErrorKind::InvalidData); + } } From f194be13cd6fd15823101a8c775252269136c07c Mon Sep 17 00:00:00 2001 From: "Herman J. Radtke III" Date: Sat, 27 Apr 2024 08:43:00 -0400 Subject: [PATCH 6/8] fix(http1): enforce trailer bytes limit The size of the trailer fields is now limited. The limit accounts for a single, very large trailer field or many trailer fields that exceed the limit in aggregate. --- src/proto/h1/decode.rs | 112 ++++++++++++++++++++++++++++++++++------- 1 file changed, 94 insertions(+), 18 deletions(-) diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index 16a45cc797..3435c3bd1e 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -19,8 +19,11 @@ use self::Kind::{Chunked, Eof, Length}; /// This limit is currentlty applied for the entire body, not per chunk. const CHUNKED_EXTENSIONS_LIMIT: u64 = 1024 * 16; -/// Maximum number of trailers allowed in a chunked body. -const TRAILERS_LIMIT: u64 = 1024; +/// Maximum number of trailer fields allowed in a chunked body. +const TRAILERS_FIELD_LIMIT: u64 = 1024; + +/// Maximum number of bytes allowed for all trailer fields. +const TRAILER_LIMIT: u64 = TRAILERS_FIELD_LIMIT * 64; /// Decoders to handle different Transfer-Encodings. /// @@ -180,9 +183,10 @@ impl Decoder { if trailers_buf.is_some() { trace!("found possible trailers"); - // decoder enforces that trailers count will not exceed TRAILERS_LIMIT + // decoder enforces that trailers count will not exceed TRAILERS_FIELD_LIMIT // which is also less than usize::MAX - if *trailers_cnt >= TRAILERS_LIMIT || *trailers_cnt > usize::MAX as u64 + if *trailers_cnt >= TRAILERS_FIELD_LIMIT + || *trailers_cnt > usize::MAX as u64 { return Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidData, @@ -261,6 +265,20 @@ macro_rules! or_overflow { ) } +macro_rules! put_u8 { + ($trailers_buf:expr, $byte:expr) => { + $trailers_buf.put_u8($byte); + + // check if trailer_buf exceeds TRAILER_LIMIT + if $trailers_buf.len() as u64 >= TRAILER_LIMIT { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidData, + "chunk trailers bytes over limit", + ))); + } + }; +} + impl ChunkedState { fn new() -> ChunkedState { ChunkedState::Start @@ -494,10 +512,7 @@ impl ChunkedState { trace!("read_trailer"); let byte = byte!(rdr, cx); - trailers_buf - .as_mut() - .expect("trailers_buf is None") - .put_u8(byte); + put_u8!(trailers_buf.as_mut().expect("trailers_buf is None"), byte); match byte { b'\r' => Poll::Ready(Ok(ChunkedState::TrailerLf)), @@ -514,7 +529,7 @@ impl ChunkedState { let byte = byte!(rdr, cx); match byte { b'\n' => { - if *trailers_cnt == TRAILERS_LIMIT { + if *trailers_cnt == TRAILERS_FIELD_LIMIT { return Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidData, "chunk trailers count overflow", @@ -522,10 +537,7 @@ impl ChunkedState { } *trailers_cnt += 1; - trailers_buf - .as_mut() - .expect("trailers_buf is None") - .put_u8(byte); + put_u8!(trailers_buf.as_mut().expect("trailers_buf is None"), byte); Poll::Ready(Ok(ChunkedState::EndCr)) } @@ -545,7 +557,7 @@ impl ChunkedState { match byte { b'\r' => { if let Some(trailers_buf) = trailers_buf { - trailers_buf.put_u8(byte); + put_u8!(trailers_buf, byte); } Poll::Ready(Ok(ChunkedState::EndLf)) } @@ -558,7 +570,7 @@ impl ChunkedState { *trailers_buf = Some(buf); } Some(ref mut trailers_buf) => { - trailers_buf.put_u8(byte); + put_u8!(trailers_buf, byte); } } @@ -575,7 +587,7 @@ impl ChunkedState { match byte { b'\n' => { if let Some(trailers_buf) = trailers_buf { - trailers_buf.put_u8(byte); + put_u8!(trailers_buf, byte); } Poll::Ready(Ok(ChunkedState::End)) } @@ -1074,10 +1086,10 @@ mod tests { } #[tokio::test] - async fn test_trailer_limit_enforced() { + async fn test_trailer_field_limit_enforced() { let mut scratch = vec![]; scratch.extend(b"10\r\n1234567890abcdef\r\n0\r\n"); - for i in 0..=TRAILERS_LIMIT { + for i in 0..=TRAILERS_FIELD_LIMIT { scratch.extend(format!("trailer{}: {}\r\n", i, i).as_bytes()); } scratch.extend(b"\r\n"); @@ -1094,6 +1106,70 @@ mod tests { .expect("unknown frame type"); assert_eq!(16, buf.len()); + // eof read + let err = decoder + .decode_fut(&mut mock_buf) + .await + .expect_err("trailer fields over limit"); + assert_eq!(err.kind(), io::ErrorKind::InvalidData); + } + + #[tokio::test] + async fn test_trailer_limit_huge_trailer() { + let mut scratch = vec![]; + scratch.extend(b"10\r\n1234567890abcdef\r\n0\r\n"); + scratch.extend( + format!( + "huge_trailer: {}\r\n", + "x".repeat(TRAILER_LIMIT.try_into().unwrap()) + ) + .as_bytes(), + ); + scratch.extend(b"\r\n"); + let mut mock_buf = Bytes::from(scratch); + + let mut decoder = Decoder::chunked(); + + // ready chunked body + let buf = decoder + .decode_fut(&mut mock_buf) + .await + .unwrap() + .into_data() + .expect("unknown frame type"); + assert_eq!(16, buf.len()); + + // eof read + let err = decoder + .decode_fut(&mut mock_buf) + .await + .expect_err("trailers over limit"); + assert_eq!(err.kind(), io::ErrorKind::InvalidData); + } + + #[tokio::test] + async fn test_trailer_limit_many_small_trailers() { + let mut scratch = vec![]; + scratch.extend(b"10\r\n1234567890abcdef\r\n0\r\n"); + + for i in 0..=TRAILERS_FIELD_LIMIT { + scratch.extend(format!("trailer{}: {}\r\n", i, "x".repeat(64)).as_bytes()); + } + + scratch.extend(b"\r\n"); + let mut mock_buf = Bytes::from(scratch); + + let mut decoder = Decoder::chunked(); + + // ready chunked body + let buf = decoder + .decode_fut(&mut mock_buf) + .await + .unwrap() + .into_data() + .expect("unknown frame type"); + assert_eq!(16, buf.len()); + // eof read let err = decoder .decode_fut(&mut mock_buf) From 6c01c879e9f62a543e8865e536dd7d1561619a1f Mon Sep 17 00:00:00 2001 From: "Herman J. Radtke III" Date: Wed, 1 May 2024 06:47:13 -0400 Subject: [PATCH 7/8] feat(http1): trailer limits are configurable Trailer parsing now honors h1_max_headers option. It also supports a future h1_max_header_size option. --- src/proto/h1/conn.rs | 14 ++++- src/proto/h1/decode.rs | 139 ++++++++++++++++++++++++++--------------- src/proto/h1/role.rs | 2 +- 3 files changed, 101 insertions(+), 54 deletions(-) diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index a9a0c2ec02..4ef39d7332 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -267,10 +267,20 @@ where self.try_keep_alive(cx); } } else if msg.expect_continue && msg.head.version.gt(&Version::HTTP_10) { - self.state.reading = Reading::Continue(Decoder::new(msg.decode)); + let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support + self.state.reading = Reading::Continue(Decoder::new( + msg.decode, + self.state.h1_max_headers, + h1_max_header_size, + )); wants = wants.add(Wants::EXPECT); } else { - self.state.reading = Reading::Body(Decoder::new(msg.decode)); + let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support + self.state.reading = Reading::Body(Decoder::new( + msg.decode, + self.state.h1_max_headers, + h1_max_header_size, + )); } self.state.allow_trailer_fields = msg diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index 3435c3bd1e..d045ad0706 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -10,6 +10,7 @@ use http::{HeaderMap, HeaderName, HeaderValue}; use http_body::Frame; use super::io::MemRead; +use super::role::DEFAULT_MAX_HEADERS; use super::DecodedLength; use self::Kind::{Chunked, Eof, Length}; @@ -19,11 +20,10 @@ use self::Kind::{Chunked, Eof, Length}; /// This limit is currentlty applied for the entire body, not per chunk. const CHUNKED_EXTENSIONS_LIMIT: u64 = 1024 * 16; -/// Maximum number of trailer fields allowed in a chunked body. -const TRAILERS_FIELD_LIMIT: u64 = 1024; - /// Maximum number of bytes allowed for all trailer fields. -const TRAILER_LIMIT: u64 = TRAILERS_FIELD_LIMIT * 64; +/// +/// TODO: remove this when we land h1_max_header_size support +const TRAILER_LIMIT: usize = 1024 * 16; /// Decoders to handle different Transfer-Encodings. /// @@ -44,7 +44,9 @@ enum Kind { chunk_len: u64, extensions_cnt: u64, trailers_buf: Option, - trailers_cnt: u64, + trailers_cnt: usize, + h1_max_headers: Option, + h1_max_header_size: Option, }, /// A Reader used for responses that don't indicate a length or chunked. /// @@ -91,7 +93,10 @@ impl Decoder { } } - pub(crate) fn chunked() -> Decoder { + pub(crate) fn chunked( + h1_max_headers: Option, + h1_max_header_size: Option, + ) -> Decoder { Decoder { kind: Kind::Chunked { state: ChunkedState::new(), @@ -99,6 +104,8 @@ impl Decoder { extensions_cnt: 0, trailers_buf: None, trailers_cnt: 0, + h1_max_headers, + h1_max_header_size, }, } } @@ -109,9 +116,13 @@ impl Decoder { } } - pub(super) fn new(len: DecodedLength) -> Self { + pub(super) fn new( + len: DecodedLength, + h1_max_headers: Option, + h1_max_header_size: Option, + ) -> Self { match len { - DecodedLength::CHUNKED => Decoder::chunked(), + DecodedLength::CHUNKED => Decoder::chunked(h1_max_headers, h1_max_header_size), DecodedLength::CLOSE_DELIMITED => Decoder::eof(), length => Decoder::length(length.danger_len()), } @@ -164,7 +175,11 @@ impl Decoder { ref mut extensions_cnt, ref mut trailers_buf, ref mut trailers_cnt, + ref h1_max_headers, + ref h1_max_header_size, } => { + let h1_max_headers = h1_max_headers.unwrap_or(DEFAULT_MAX_HEADERS); + let h1_max_header_size = h1_max_header_size.unwrap_or(TRAILER_LIMIT); loop { let mut buf = None; // advances the chunked state @@ -176,6 +191,8 @@ impl Decoder { &mut buf, trailers_buf, trailers_cnt, + h1_max_headers, + h1_max_header_size ))?; if *state == ChunkedState::End { trace!("end of chunked"); @@ -183,11 +200,8 @@ impl Decoder { if trailers_buf.is_some() { trace!("found possible trailers"); - // decoder enforces that trailers count will not exceed TRAILERS_FIELD_LIMIT - // which is also less than usize::MAX - if *trailers_cnt >= TRAILERS_FIELD_LIMIT - || *trailers_cnt > usize::MAX as u64 - { + // decoder enforces that trailers count will not exceed h1_max_headers + if *trailers_cnt >= h1_max_headers { return Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidData, "chunk trailers count overflow", @@ -195,7 +209,7 @@ impl Decoder { } match decode_trailers( &mut trailers_buf.take().expect("Trailer is None"), - *trailers_cnt as usize, + *trailers_cnt, ) { Ok(headers) => { return Poll::Ready(Ok(Frame::trailers(headers))); @@ -266,11 +280,10 @@ macro_rules! or_overflow { } macro_rules! put_u8 { - ($trailers_buf:expr, $byte:expr) => { + ($trailers_buf:expr, $byte:expr, $limit:expr) => { $trailers_buf.put_u8($byte); - // check if trailer_buf exceeds TRAILER_LIMIT - if $trailers_buf.len() as u64 >= TRAILER_LIMIT { + if $trailers_buf.len() >= $limit { return Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidData, "chunk trailers bytes over limit", @@ -291,7 +304,9 @@ impl ChunkedState { extensions_cnt: &mut u64, buf: &mut Option, trailers_buf: &mut Option, - trailers_cnt: &mut u64, + trailers_cnt: &mut usize, + h1_max_headers: usize, + h1_max_header_size: usize, ) -> Poll> { use self::ChunkedState::*; match *self { @@ -303,10 +318,17 @@ impl ChunkedState { Body => ChunkedState::read_body(cx, body, size, buf), BodyCr => ChunkedState::read_body_cr(cx, body), BodyLf => ChunkedState::read_body_lf(cx, body), - Trailer => ChunkedState::read_trailer(cx, body, trailers_buf), - TrailerLf => ChunkedState::read_trailer_lf(cx, body, trailers_buf, trailers_cnt), - EndCr => ChunkedState::read_end_cr(cx, body, trailers_buf), - EndLf => ChunkedState::read_end_lf(cx, body, trailers_buf), + Trailer => ChunkedState::read_trailer(cx, body, trailers_buf, h1_max_header_size), + TrailerLf => ChunkedState::read_trailer_lf( + cx, + body, + trailers_buf, + trailers_cnt, + h1_max_headers, + h1_max_header_size, + ), + EndCr => ChunkedState::read_end_cr(cx, body, trailers_buf, h1_max_header_size), + EndLf => ChunkedState::read_end_lf(cx, body, trailers_buf, h1_max_header_size), End => Poll::Ready(Ok(ChunkedState::End)), } } @@ -508,11 +530,16 @@ impl ChunkedState { cx: &mut Context<'_>, rdr: &mut R, trailers_buf: &mut Option, + h1_max_header_size: usize, ) -> Poll> { trace!("read_trailer"); let byte = byte!(rdr, cx); - put_u8!(trailers_buf.as_mut().expect("trailers_buf is None"), byte); + put_u8!( + trailers_buf.as_mut().expect("trailers_buf is None"), + byte, + h1_max_header_size + ); match byte { b'\r' => Poll::Ready(Ok(ChunkedState::TrailerLf)), @@ -524,12 +551,14 @@ impl ChunkedState { cx: &mut Context<'_>, rdr: &mut R, trailers_buf: &mut Option, - trailers_cnt: &mut u64, + trailers_cnt: &mut usize, + h1_max_headers: usize, + h1_max_header_size: usize, ) -> Poll> { let byte = byte!(rdr, cx); match byte { b'\n' => { - if *trailers_cnt == TRAILERS_FIELD_LIMIT { + if *trailers_cnt >= h1_max_headers { return Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidData, "chunk trailers count overflow", @@ -537,7 +566,11 @@ impl ChunkedState { } *trailers_cnt += 1; - put_u8!(trailers_buf.as_mut().expect("trailers_buf is None"), byte); + put_u8!( + trailers_buf.as_mut().expect("trailers_buf is None"), + byte, + h1_max_header_size + ); Poll::Ready(Ok(ChunkedState::EndCr)) } @@ -552,12 +585,13 @@ impl ChunkedState { cx: &mut Context<'_>, rdr: &mut R, trailers_buf: &mut Option, + h1_max_header_size: usize, ) -> Poll> { let byte = byte!(rdr, cx); match byte { b'\r' => { if let Some(trailers_buf) = trailers_buf { - put_u8!(trailers_buf, byte); + put_u8!(trailers_buf, byte, h1_max_header_size); } Poll::Ready(Ok(ChunkedState::EndLf)) } @@ -570,7 +604,7 @@ impl ChunkedState { *trailers_buf = Some(buf); } Some(ref mut trailers_buf) => { - put_u8!(trailers_buf, byte); + put_u8!(trailers_buf, byte, h1_max_header_size); } } @@ -582,12 +616,13 @@ impl ChunkedState { cx: &mut Context<'_>, rdr: &mut R, trailers_buf: &mut Option, + h1_max_header_size: usize, ) -> Poll> { let byte = byte!(rdr, cx); match byte { b'\n' => { if let Some(trailers_buf) = trailers_buf { - put_u8!(trailers_buf, byte); + put_u8!(trailers_buf, byte, h1_max_header_size); } Poll::Ready(Ok(ChunkedState::End)) } @@ -721,6 +756,8 @@ mod tests { &mut None, &mut None, &mut trailers_cnt, + DEFAULT_MAX_HEADERS, + TRAILER_LIMIT, ) }) .await; @@ -749,6 +786,8 @@ mod tests { &mut None, &mut None, &mut trailers_cnt, + DEFAULT_MAX_HEADERS, + TRAILER_LIMIT, ) }) .await; @@ -834,7 +873,7 @@ mod tests { 9\r\n\ foo bar\ "[..]; - let mut decoder = Decoder::chunked(); + let mut decoder = Decoder::chunked(None, None); assert_eq!( decoder .decode_fut(&mut bytes) @@ -853,7 +892,7 @@ mod tests { #[tokio::test] async fn test_read_chunked_single_read() { let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n"[..]; - let buf = Decoder::chunked() + let buf = Decoder::chunked(None, None) .decode_fut(&mut mock_buf) .await .expect("decode") @@ -878,7 +917,7 @@ mod tests { scratch.extend(b"0\r\n\r\n"); let mut mock_buf = Bytes::from(scratch); - let mut decoder = Decoder::chunked(); + let mut decoder = Decoder::chunked(None, None); let buf1 = decoder .decode_fut(&mut mock_buf) .await @@ -899,7 +938,7 @@ mod tests { #[tokio::test] async fn test_read_chunked_trailer_with_missing_lf() { let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\nbad\r\r\n"[..]; - let mut decoder = Decoder::chunked(); + let mut decoder = Decoder::chunked(None, None); decoder.decode_fut(&mut mock_buf).await.expect("decode"); let e = decoder.decode_fut(&mut mock_buf).await.unwrap_err(); assert_eq!(e.kind(), io::ErrorKind::InvalidInput); @@ -909,7 +948,7 @@ mod tests { #[tokio::test] async fn test_read_chunked_after_eof() { let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n\r\n"[..]; - let mut decoder = Decoder::chunked(); + let mut decoder = Decoder::chunked(None, None); // normal read let buf = decoder @@ -999,7 +1038,7 @@ mod tests { async fn test_read_chunked_async() { let content = "3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n"; let expected = "foobar"; - all_async_cases(content, expected, Decoder::chunked()).await; + all_async_cases(content, expected, Decoder::chunked(None, None)).await; } #[cfg(not(miri))] @@ -1086,16 +1125,17 @@ mod tests { } #[tokio::test] - async fn test_trailer_field_limit_enforced() { + async fn test_trailer_max_headers_enforced() { + let h1_max_headers = 10; let mut scratch = vec![]; scratch.extend(b"10\r\n1234567890abcdef\r\n0\r\n"); - for i in 0..=TRAILERS_FIELD_LIMIT { + for i in 0..h1_max_headers { scratch.extend(format!("trailer{}: {}\r\n", i, i).as_bytes()); } scratch.extend(b"\r\n"); let mut mock_buf = Bytes::from(scratch); - let mut decoder = Decoder::chunked(); + let mut decoder = Decoder::chunked(Some(h1_max_headers), None); // ready chunked body let buf = decoder @@ -1115,20 +1155,15 @@ mod tests { } #[tokio::test] - async fn test_trailer_limit_huge_trailer() { + async fn test_trailer_max_header_size_huge_trailer() { + let max_header_size = 1024; let mut scratch = vec![]; scratch.extend(b"10\r\n1234567890abcdef\r\n0\r\n"); - scratch.extend( - format!( - "huge_trailer: {}\r\n", - "x".repeat(TRAILER_LIMIT.try_into().unwrap()) - ) - .as_bytes(), - ); + scratch.extend(format!("huge_trailer: {}\r\n", "x".repeat(max_header_size)).as_bytes()); scratch.extend(b"\r\n"); let mut mock_buf = Bytes::from(scratch); - let mut decoder = Decoder::chunked(); + let mut decoder = Decoder::chunked(None, Some(max_header_size)); // ready chunked body let buf = decoder @@ -1148,18 +1183,20 @@ mod tests { } #[tokio::test] - async fn test_trailer_limit_many_small_trailers() { + async fn test_trailer_max_header_size_many_small_trailers() { + let max_headers = 10; + let header_size = 64; let mut scratch = vec![]; scratch.extend(b"10\r\n1234567890abcdef\r\n0\r\n"); - for i in 0..=TRAILERS_FIELD_LIMIT { - scratch.extend(format!("trailer{}: {}\r\n", i, "x".repeat(64)).as_bytes()); + for i in 0..max_headers { + scratch.extend(format!("trailer{}: {}\r\n", i, "x".repeat(header_size)).as_bytes()); } scratch.extend(b"\r\n"); let mut mock_buf = Bytes::from(scratch); - let mut decoder = Decoder::chunked(); + let mut decoder = Decoder::chunked(None, Some(max_headers * header_size)); // ready chunked body let buf = decoder diff --git a/src/proto/h1/role.rs b/src/proto/h1/role.rs index 958340775a..b8fa7d7063 100644 --- a/src/proto/h1/role.rs +++ b/src/proto/h1/role.rs @@ -30,7 +30,7 @@ use crate::proto::h1::{ use crate::proto::RequestHead; use crate::proto::{BodyLength, MessageHead, RequestLine}; -const DEFAULT_MAX_HEADERS: usize = 100; +pub(crate) const DEFAULT_MAX_HEADERS: usize = 100; const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific #[cfg(feature = "server")] const MAX_URI_LEN: usize = (u16::MAX - 1) as usize; From a65aa2550243f7c74b9d28d9437d471f8dfcfc1a Mon Sep 17 00:00:00 2001 From: "Herman J. Radtke III" Date: Thu, 2 May 2024 07:28:11 -0400 Subject: [PATCH 8/8] fix(http1): failing nightly Decoder test --- src/proto/h1/decode.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index d045ad0706..7e34074d76 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -1063,7 +1063,7 @@ mod tests { b.bytes = LEN as u64; b.iter(|| { - let mut decoder = Decoder::chunked(); + let mut decoder = Decoder::chunked(None, None); rt.block_on(async { let mut raw = content.clone(); let chunk = decoder