From 866eee5a8832016de987b0664c283d41e27975ad Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Mon, 9 Feb 2026 17:36:42 -0800 Subject: [PATCH 1/2] Fix capsule protocol handling Changes: - Remove loop in Capsule::decode() - now decodes single capsule instead of skipping GREASE - Add explicit Grease variant to Capsule enum - Fix GREASE detection per RFC 9297 Section 5.4 (0x29 * N + 0x17) - Change Capsule::read() to return Option (returns None on clean EOF) - Update web-transport-quinn to handle new API This is foundational for properly handling HTTP/3 frames that may contain multiple capsules or GREASE values. Co-Authored-By: Claude Sonnet 4.5 --- web-transport-proto/src/capsule.rs | 96 ++++++++++++++++-------------- web-transport-quinn/src/session.rs | 8 ++- 2 files changed, 57 insertions(+), 47 deletions(-) diff --git a/web-transport-proto/src/capsule.rs b/web-transport-proto/src/capsule.rs index baeb654..c5fd26c 100644 --- a/web-transport-proto/src/capsule.rs +++ b/web-transport-proto/src/capsule.rs @@ -15,72 +15,76 @@ const MAX_MESSAGE_SIZE: usize = 1024; #[derive(Debug, Clone, PartialEq, Eq)] pub enum Capsule { CloseWebTransportSession { code: u32, reason: String }, + Grease, Unknown { typ: VarInt, payload: Bytes }, } impl Capsule { pub fn decode(buf: &mut B) -> Result { - loop { - let typ = VarInt::decode(buf)?; - let length = VarInt::decode(buf)?; + let typ = VarInt::decode(buf)?; + let length = VarInt::decode(buf)?; - let mut payload = buf.take(length.into_inner() as usize); - if payload.remaining() > MAX_MESSAGE_SIZE { - return Err(CapsuleError::MessageTooLong); - } + let mut payload = buf.take(length.into_inner() as usize); + if payload.remaining() > MAX_MESSAGE_SIZE { + return Err(CapsuleError::MessageTooLong); + } - if payload.remaining() < payload.limit() { - return Err(CapsuleError::UnexpectedEnd); - } + if payload.remaining() < payload.limit() { + return Err(CapsuleError::UnexpectedEnd); + } - match typ.into_inner() { - CLOSE_WEBTRANSPORT_SESSION_TYPE => { - if payload.remaining() < 4 { - return Err(CapsuleError::UnexpectedEnd); - } + match typ.into_inner() { + CLOSE_WEBTRANSPORT_SESSION_TYPE => { + if payload.remaining() < 4 { + return Err(CapsuleError::UnexpectedEnd); + } - let error_code = payload.get_u32(); + let error_code = payload.get_u32(); - let message_len = payload.remaining(); - if message_len > MAX_MESSAGE_SIZE { - return Err(CapsuleError::MessageTooLong); - } + let message_len = payload.remaining(); + if message_len > MAX_MESSAGE_SIZE { + return Err(CapsuleError::MessageTooLong); + } - let mut message_bytes = vec![0u8; message_len]; - payload.copy_to_slice(&mut message_bytes); + let mut message_bytes = vec![0u8; message_len]; + payload.copy_to_slice(&mut message_bytes); - let error_message = - String::from_utf8(message_bytes).map_err(|_| CapsuleError::InvalidUtf8)?; + let error_message = + String::from_utf8(message_bytes).map_err(|_| CapsuleError::InvalidUtf8)?; - return Ok(Self::CloseWebTransportSession { - code: error_code, - reason: error_message, - }); - } - t if is_grease(t) => continue, - _ => { - // Unknown capsule type - store it - let mut payload_bytes = vec![0u8; payload.remaining()]; - payload.copy_to_slice(&mut payload_bytes); - return Ok(Self::Unknown { - typ, - payload: Bytes::from(payload_bytes), - }); - } + Ok(Self::CloseWebTransportSession { + code: error_code, + reason: error_message, + }) + } + t if is_grease(t) => { + payload.advance(payload.remaining()); + Ok(Self::Grease) + } + _ => { + let mut payload_bytes = vec![0u8; payload.remaining()]; + payload.copy_to_slice(&mut payload_bytes); + Ok(Self::Unknown { + typ, + payload: Bytes::from(payload_bytes), + }) } } } - pub async fn read(stream: &mut S) -> Result { + pub async fn read(stream: &mut S) -> Result, CapsuleError> { let mut buf = Vec::new(); loop { if stream.read_buf(&mut buf).await? == 0 { + if buf.is_empty() { + return Ok(None); + } return Err(CapsuleError::UnexpectedEnd); } - let mut limit = std::io::Cursor::new(&buf); - match Self::decode(&mut limit) { - Ok(capsule) => return Ok(capsule), + let mut cursor = std::io::Cursor::new(&buf); + match Self::decode(&mut cursor) { + Ok(capsule) => return Ok(Some(capsule)), Err(CapsuleError::UnexpectedEnd) => continue, Err(e) => return Err(e), } @@ -108,6 +112,7 @@ impl Capsule { // Encode the error message buf.put_slice(error_message.as_bytes()); } + Self::Grease => {} Self::Unknown { typ, payload } => { // Encode the capsule type typ.encode(buf); @@ -129,13 +134,14 @@ impl Capsule { } } +// RFC 9297 Section 5.4: Capsule types of the form 0x29 * N + 0x17 fn is_grease(val: u64) -> bool { - if val < 0x21 { + if val < 0x17 { return false; } #[allow(unknown_lints, clippy::manual_is_multiple_of)] { - (val - 0x21) % 0x1f == 0 + (val - 0x17) % 0x29 == 0 } } diff --git a/web-transport-quinn/src/session.rs b/web-transport-quinn/src/session.rs index 880a77d..615085c 100644 --- a/web-transport-quinn/src/session.rs +++ b/web-transport-quinn/src/session.rs @@ -103,12 +103,16 @@ impl Session { async fn run_closed(&mut self, mut connect: ConnectComplete) -> (u32, String) { loop { match web_transport_proto::Capsule::read(&mut connect.recv).await { - Ok(web_transport_proto::Capsule::CloseWebTransportSession { code, reason }) => { + Ok(Some(web_transport_proto::Capsule::CloseWebTransportSession { code, reason })) => { return (code, reason); } - Ok(web_transport_proto::Capsule::Unknown { typ, payload }) => { + Ok(Some(web_transport_proto::Capsule::Grease)) => {} + Ok(Some(web_transport_proto::Capsule::Unknown { typ, payload })) => { tracing::warn!(%typ, size = payload.len(), "unknown capsule"); } + Ok(None) => { + return (0, "stream closed".to_string()); + } Err(_) => { return (1, "capsule error".to_string()); } From 49416e3cd06c9351c6359b31019bf9c6b3e6e20e Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Mon, 9 Feb 2026 17:39:33 -0800 Subject: [PATCH 2/2] Fix HTTP/3 frame and capsule buffering in quiche Changes: - Add buffering for leftover bytes after CONNECT handshake (capsule data can arrive with HEADERS frame) - Refactor run_closed() to properly read HTTP/3 DATA frames and decode multiple capsules per frame - Handle GREASE capsules explicitly - More robust frame parsing with cursor-based approach Previously, the code assumed Capsule::read() would handle the HTTP/3 framing, but it only handles capsule protocol. This fix properly extracts DATA frames first, then decodes capsules from the payload. Co-Authored-By: Claude Sonnet 4.5 --- web-transport-quiche/src/connection.rs | 77 ++++++++++++++++++++++---- web-transport-quiche/src/h3/connect.rs | 59 ++++++++++++++++---- 2 files changed, 112 insertions(+), 24 deletions(-) diff --git a/web-transport-quiche/src/connection.rs b/web-transport-quiche/src/connection.rs index fbc37d6..6799bbb 100644 --- a/web-transport-quiche/src/connection.rs +++ b/web-transport-quiche/src/connection.rs @@ -1,5 +1,6 @@ use crate::{ez, h3, ClientError, RecvStream, SendStream, SessionError}; +use bytes::Buf; use futures::{ready, stream::FuturesUnordered, Stream, StreamExt}; use web_transport_proto::{Frame, StreamUni, VarInt}; @@ -107,23 +108,75 @@ impl Connection { } // Keep reading from the control stream until it's closed. + // The CONNECT stream carries HTTP/3 DATA frames containing capsule protocol data. async fn run_closed(self, connect: h3::Connect) { - let (_send, mut recv) = connect.into_inner(); + let _session_id = connect.session_id(); + let (_send, mut recv, mut buf) = connect.into_inner(); loop { - match web_transport_proto::Capsule::read(&mut recv).await { - Ok(web_transport_proto::Capsule::CloseWebTransportSession { code, reason }) => { - // TODO We shouldn't be closing the QUIC connection with the same error. - // Instead, we should return it to the application. - self.close(code, &reason); - return; + // Read HTTP/3 frames from the CONNECT stream. + // After the HEADERS exchange, the stream carries DATA frames with capsule data. + let capsule_data = loop { + // Try to parse an HTTP/3 frame from the buffer using a Cursor (peek, don't consume). + let mut cursor = std::io::Cursor::new(buf.as_slice()); + match Frame::read(&mut cursor) { + Ok((typ, mut payload)) => { + let data = payload.chunk().to_vec(); + // Advance past the payload so the cursor reflects the full frame. + payload.advance(payload.remaining()); + drop(payload); + let consumed = cursor.position() as usize; + buf.drain(..consumed); + + if typ == Frame::DATA { + break data; + } else if typ.is_grease() { + continue; + } else { + continue; + } + } + Err(_) => { + // Need more data, read from the stream. + } } - Ok(web_transport_proto::Capsule::Unknown { typ, payload }) => { - tracing::warn!("unknown capsule: type={typ} size={}", payload.len()); + + match recv.read_buf(&mut buf).await { + Ok(None) => { + if !self.conn.is_closed() { + self.close(0, "connect stream finished"); + } + return; + } + Ok(Some(_n)) => {} + Err(_e) => { + if !self.conn.is_closed() { + self.close(500, "connect stream read error"); + } + return; + } } - Err(_) => { - self.close(500, "capsule error"); - return; + }; + + // Decode capsules from the DATA frame payload. + // A single DATA frame may contain multiple capsules (or only GREASE). + let mut payload = capsule_data.as_slice(); + while payload.has_remaining() { + match web_transport_proto::Capsule::decode(&mut payload) { + Ok(web_transport_proto::Capsule::CloseWebTransportSession { code, reason }) => { + self.close(code, &reason); + return; + } + Ok(web_transport_proto::Capsule::Grease) => {} + Ok(web_transport_proto::Capsule::Unknown { typ, payload }) => { + tracing::warn!("unknown capsule: type={typ} size={}", payload.len()); + } + Err(_e) => { + if !self.conn.is_closed() { + self.close(500, "capsule decode error"); + } + return; + } } } } diff --git a/web-transport-quiche/src/h3/connect.rs b/web-transport-quiche/src/h3/connect.rs index 4155683..52ea772 100644 --- a/web-transport-quiche/src/h3/connect.rs +++ b/web-transport-quiche/src/h3/connect.rs @@ -34,6 +34,10 @@ pub struct Connect { #[allow(dead_code)] recv: ez::RecvStream, + + // Leftover bytes from reading the CONNECT request/response. + // These may contain capsule data that arrived with the HEADERS frame. + buf: Vec, } impl Connect { @@ -41,18 +45,33 @@ impl Connect { /// /// This is called by the server to receive the CONNECT request. pub async fn accept(conn: &ez::Connection) -> Result { - // Accept the stream that will be used to send the HTTP CONNECT request. - // If they try to send any other type of HTTP request, we will error out. let (send, mut recv) = conn.accept_bi().await?; - let request = web_transport_proto::ConnectRequest::read(&mut recv).await?; - tracing::debug!(?request, "received CONNECT"); + let mut buf = Vec::new(); + let request = loop { + if recv.read_buf(&mut buf).await?.is_none() { + return Err(ConnectError::UnexpectedEnd); + } + + let mut cursor = std::io::Cursor::new(buf.as_slice()); + match ConnectRequest::decode(&mut cursor) { + Ok(request) => { + let consumed = cursor.position() as usize; + buf.drain(..consumed); + break request; + } + Err(web_transport_proto::ConnectError::UnexpectedEnd) => continue, + Err(e) => return Err(e.into()), + } + }; + + tracing::debug!(?request, leftover = buf.len(), "received CONNECT"); - // The request was successfully decoded, so we can send a response. Ok(Self { request, send, recv, + buf, }) } @@ -79,19 +98,33 @@ impl Connect { ) -> Result { tracing::debug!("opening bi"); - // Create a new stream that will be used to send the CONNECT frame. let (mut send, mut recv) = conn.open_bi().await?; - // Create a new CONNECT request that we'll send using HTTP/3 let request = request.into(); tracing::debug!(?request, "sending CONNECT"); request.write(&mut send).await?; - let response = web_transport_proto::ConnectResponse::read(&mut recv).await?; - tracing::debug!(?response, "received CONNECT"); + let mut buf = Vec::new(); + let response = loop { + if recv.read_buf(&mut buf).await?.is_none() { + return Err(ConnectError::UnexpectedEnd); + } + + let mut cursor = std::io::Cursor::new(buf.as_slice()); + match ConnectResponse::decode(&mut cursor) { + Ok(response) => { + let consumed = cursor.position() as usize; + buf.drain(..consumed); + break response; + } + Err(web_transport_proto::ConnectError::UnexpectedEnd) => continue, + Err(e) => return Err(e.into()), + } + }; + + tracing::debug!(?response, leftover = buf.len(), "received CONNECT"); - // Throw an error if we didn't get a 200 OK. if response.status != http::StatusCode::OK { return Err(ConnectError::Status(response.status)); } @@ -100,6 +133,7 @@ impl Connect { request, send, recv, + buf, }) } @@ -113,7 +147,8 @@ impl Connect { &self.request.url } - pub fn into_inner(self) -> (ez::SendStream, ez::RecvStream) { - (self.send, self.recv) + /// Returns the inner streams and any leftover bytes from reading the CONNECT handshake. + pub fn into_inner(self) -> (ez::SendStream, ez::RecvStream, Vec) { + (self.send, self.recv, self.buf) } }