Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 51 additions & 45 deletions web-transport-proto/src/capsule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,72 +15,76 @@ const MAX_MESSAGE_SIZE: usize = 1024;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Capsule {
CloseWebTransportSession { code: u32, reason: String },
Grease,
Unknown { typ: VarInt, payload: Bytes },
}

impl Capsule {
pub fn decode<B: Buf>(buf: &mut B) -> Result<Self, CapsuleError> {
loop {
let typ = VarInt::decode(buf)?;
let length = VarInt::decode(buf)?;
let typ = VarInt::decode(buf)?;
let length = VarInt::decode(buf)?;

let mut payload = buf.take(length.into_inner() as usize);
if payload.remaining() > MAX_MESSAGE_SIZE {
return Err(CapsuleError::MessageTooLong);
}
let mut payload = buf.take(length.into_inner() as usize);
if payload.remaining() > MAX_MESSAGE_SIZE {
return Err(CapsuleError::MessageTooLong);
}

if payload.remaining() < payload.limit() {
return Err(CapsuleError::UnexpectedEnd);
}
if payload.remaining() < payload.limit() {
return Err(CapsuleError::UnexpectedEnd);
}

match typ.into_inner() {
CLOSE_WEBTRANSPORT_SESSION_TYPE => {
if payload.remaining() < 4 {
return Err(CapsuleError::UnexpectedEnd);
}
match typ.into_inner() {
CLOSE_WEBTRANSPORT_SESSION_TYPE => {
if payload.remaining() < 4 {
return Err(CapsuleError::UnexpectedEnd);
}

let error_code = payload.get_u32();
let error_code = payload.get_u32();

let message_len = payload.remaining();
if message_len > MAX_MESSAGE_SIZE {
return Err(CapsuleError::MessageTooLong);
}
let message_len = payload.remaining();
if message_len > MAX_MESSAGE_SIZE {
return Err(CapsuleError::MessageTooLong);
}

let mut message_bytes = vec![0u8; message_len];
payload.copy_to_slice(&mut message_bytes);
let mut message_bytes = vec![0u8; message_len];
payload.copy_to_slice(&mut message_bytes);

let error_message =
String::from_utf8(message_bytes).map_err(|_| CapsuleError::InvalidUtf8)?;
let error_message =
String::from_utf8(message_bytes).map_err(|_| CapsuleError::InvalidUtf8)?;

return Ok(Self::CloseWebTransportSession {
code: error_code,
reason: error_message,
});
}
t if is_grease(t) => continue,
_ => {
// Unknown capsule type - store it
let mut payload_bytes = vec![0u8; payload.remaining()];
payload.copy_to_slice(&mut payload_bytes);
return Ok(Self::Unknown {
typ,
payload: Bytes::from(payload_bytes),
});
}
Ok(Self::CloseWebTransportSession {
code: error_code,
reason: error_message,
})
}
t if is_grease(t) => {
payload.advance(payload.remaining());
Ok(Self::Grease)
}
_ => {
let mut payload_bytes = vec![0u8; payload.remaining()];
payload.copy_to_slice(&mut payload_bytes);
Ok(Self::Unknown {
typ,
payload: Bytes::from(payload_bytes),
})
}
}
}

pub async fn read<S: AsyncRead + Unpin>(stream: &mut S) -> Result<Self, CapsuleError> {
pub async fn read<S: AsyncRead + Unpin>(stream: &mut S) -> Result<Option<Self>, CapsuleError> {
let mut buf = Vec::new();
loop {
if stream.read_buf(&mut buf).await? == 0 {
if buf.is_empty() {
return Ok(None);
}
return Err(CapsuleError::UnexpectedEnd);
}

let mut limit = std::io::Cursor::new(&buf);
match Self::decode(&mut limit) {
Ok(capsule) => return Ok(capsule),
let mut cursor = std::io::Cursor::new(&buf);
match Self::decode(&mut cursor) {
Ok(capsule) => return Ok(Some(capsule)),
Err(CapsuleError::UnexpectedEnd) => continue,
Err(e) => return Err(e),
}
Expand Down Expand Up @@ -108,6 +112,7 @@ impl Capsule {
// Encode the error message
buf.put_slice(error_message.as_bytes());
}
Self::Grease => {}
Self::Unknown { typ, payload } => {
// Encode the capsule type
typ.encode(buf);
Expand All @@ -129,13 +134,14 @@ impl Capsule {
}
}

// RFC 9297 Section 5.4: Capsule types of the form 0x29 * N + 0x17
fn is_grease(val: u64) -> bool {
if val < 0x21 {
if val < 0x17 {
return false;
}
#[allow(unknown_lints, clippy::manual_is_multiple_of)]
{
(val - 0x21) % 0x1f == 0
(val - 0x17) % 0x29 == 0
}
}

Expand Down
77 changes: 65 additions & 12 deletions web-transport-quiche/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{ez, h3, ClientError, RecvStream, SendStream, SessionError};

use bytes::Buf;
use futures::{ready, stream::FuturesUnordered, Stream, StreamExt};
use web_transport_proto::{Frame, StreamUni, VarInt};

Expand Down Expand Up @@ -107,23 +108,75 @@ impl Connection {
}

// Keep reading from the control stream until it's closed.
// The CONNECT stream carries HTTP/3 DATA frames containing capsule protocol data.
async fn run_closed(self, connect: h3::Connect) {
let (_send, mut recv) = connect.into_inner();
let _session_id = connect.session_id();
let (_send, mut recv, mut buf) = connect.into_inner();

loop {
match web_transport_proto::Capsule::read(&mut recv).await {
Ok(web_transport_proto::Capsule::CloseWebTransportSession { code, reason }) => {
// TODO We shouldn't be closing the QUIC connection with the same error.
// Instead, we should return it to the application.
self.close(code, &reason);
return;
// Read HTTP/3 frames from the CONNECT stream.
// After the HEADERS exchange, the stream carries DATA frames with capsule data.
let capsule_data = loop {
// Try to parse an HTTP/3 frame from the buffer using a Cursor (peek, don't consume).
let mut cursor = std::io::Cursor::new(buf.as_slice());
match Frame::read(&mut cursor) {
Ok((typ, mut payload)) => {
let data = payload.chunk().to_vec();
// Advance past the payload so the cursor reflects the full frame.
payload.advance(payload.remaining());
drop(payload);
let consumed = cursor.position() as usize;
buf.drain(..consumed);

if typ == Frame::DATA {
break data;
} else if typ.is_grease() {
continue;
} else {
continue;
}
}
Err(_) => {
// Need more data, read from the stream.
}
}
Ok(web_transport_proto::Capsule::Unknown { typ, payload }) => {
tracing::warn!("unknown capsule: type={typ} size={}", payload.len());

match recv.read_buf(&mut buf).await {
Ok(None) => {
if !self.conn.is_closed() {
self.close(0, "connect stream finished");
}
return;
}
Ok(Some(_n)) => {}
Err(_e) => {
if !self.conn.is_closed() {
self.close(500, "connect stream read error");
}
return;
}
}
Err(_) => {
self.close(500, "capsule error");
return;
};

// Decode capsules from the DATA frame payload.
// A single DATA frame may contain multiple capsules (or only GREASE).
let mut payload = capsule_data.as_slice();
while payload.has_remaining() {
match web_transport_proto::Capsule::decode(&mut payload) {
Ok(web_transport_proto::Capsule::CloseWebTransportSession { code, reason }) => {
self.close(code, &reason);
return;
}
Ok(web_transport_proto::Capsule::Grease) => {}
Ok(web_transport_proto::Capsule::Unknown { typ, payload }) => {
tracing::warn!("unknown capsule: type={typ} size={}", payload.len());
}
Err(_e) => {
if !self.conn.is_closed() {
self.close(500, "capsule decode error");
}
return;
}
}
}
}
Expand Down
59 changes: 47 additions & 12 deletions web-transport-quiche/src/h3/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,44 @@ pub struct Connect {

#[allow(dead_code)]
recv: ez::RecvStream,

// Leftover bytes from reading the CONNECT request/response.
// These may contain capsule data that arrived with the HEADERS frame.
buf: Vec<u8>,
}

impl Connect {
/// Accept an HTTP/3 CONNECT request from the client.
///
/// This is called by the server to receive the CONNECT request.
pub async fn accept(conn: &ez::Connection) -> Result<Self, ConnectError> {
// Accept the stream that will be used to send the HTTP CONNECT request.
// If they try to send any other type of HTTP request, we will error out.
let (send, mut recv) = conn.accept_bi().await?;

let request = web_transport_proto::ConnectRequest::read(&mut recv).await?;
tracing::debug!(?request, "received CONNECT");
let mut buf = Vec::new();
let request = loop {
if recv.read_buf(&mut buf).await?.is_none() {
return Err(ConnectError::UnexpectedEnd);
}

let mut cursor = std::io::Cursor::new(buf.as_slice());
match ConnectRequest::decode(&mut cursor) {
Ok(request) => {
let consumed = cursor.position() as usize;
buf.drain(..consumed);
break request;
}
Err(web_transport_proto::ConnectError::UnexpectedEnd) => continue,
Err(e) => return Err(e.into()),
}
};

tracing::debug!(?request, leftover = buf.len(), "received CONNECT");

// The request was successfully decoded, so we can send a response.
Ok(Self {
request,
send,
recv,
buf,
})
}

Expand All @@ -79,19 +98,33 @@ impl Connect {
) -> Result<Self, ConnectError> {
tracing::debug!("opening bi");

// Create a new stream that will be used to send the CONNECT frame.
let (mut send, mut recv) = conn.open_bi().await?;

// Create a new CONNECT request that we'll send using HTTP/3
let request = request.into();

tracing::debug!(?request, "sending CONNECT");
request.write(&mut send).await?;

let response = web_transport_proto::ConnectResponse::read(&mut recv).await?;
tracing::debug!(?response, "received CONNECT");
let mut buf = Vec::new();
let response = loop {
if recv.read_buf(&mut buf).await?.is_none() {
return Err(ConnectError::UnexpectedEnd);
}

let mut cursor = std::io::Cursor::new(buf.as_slice());
match ConnectResponse::decode(&mut cursor) {
Ok(response) => {
let consumed = cursor.position() as usize;
buf.drain(..consumed);
break response;
}
Err(web_transport_proto::ConnectError::UnexpectedEnd) => continue,
Err(e) => return Err(e.into()),
}
};

tracing::debug!(?response, leftover = buf.len(), "received CONNECT");

// Throw an error if we didn't get a 200 OK.
if response.status != http::StatusCode::OK {
return Err(ConnectError::Status(response.status));
}
Expand All @@ -100,6 +133,7 @@ impl Connect {
request,
send,
recv,
buf,
})
}

Expand All @@ -113,7 +147,8 @@ impl Connect {
&self.request.url
}

pub fn into_inner(self) -> (ez::SendStream, ez::RecvStream) {
(self.send, self.recv)
/// Returns the inner streams and any leftover bytes from reading the CONNECT handshake.
pub fn into_inner(self) -> (ez::SendStream, ez::RecvStream, Vec<u8>) {
(self.send, self.recv, self.buf)
}
}
8 changes: 6 additions & 2 deletions web-transport-quinn/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,16 @@ impl Session {
async fn run_closed(&mut self, mut connect: ConnectComplete) -> (u32, String) {
loop {
match web_transport_proto::Capsule::read(&mut connect.recv).await {
Ok(web_transport_proto::Capsule::CloseWebTransportSession { code, reason }) => {
Ok(Some(web_transport_proto::Capsule::CloseWebTransportSession { code, reason })) => {
return (code, reason);
}
Ok(web_transport_proto::Capsule::Unknown { typ, payload }) => {
Ok(Some(web_transport_proto::Capsule::Grease)) => {}
Ok(Some(web_transport_proto::Capsule::Unknown { typ, payload })) => {
tracing::warn!(%typ, size = payload.len(), "unknown capsule");
}
Ok(None) => {
return (0, "stream closed".to_string());
}
Err(_) => {
return (1, "capsule error".to_string());
}
Expand Down