Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rs/moq-lite/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Client {
.iter()
.find(|v| coding::Version::from(**v) == server.version)
.copied()
.ok_or_else(|| Error::Version(client.versions.clone(), supported.clone().into()))?;
.ok_or(Error::Version)?;

match version {
Version::Lite(version) => {
Expand Down
83 changes: 40 additions & 43 deletions rs/moq-lite/src/coding/reader.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{cmp, fmt::Debug, io, sync::Arc};
use std::{cmp, fmt::Debug, io};

use bytes::{Buf, BufMut, Bytes, BytesMut};

Expand Down Expand Up @@ -34,18 +34,12 @@ impl<S: web_transport_trait::RecvStream, V> Reader<S, V> {
}
Err(DecodeError::Short) => {
// Try to read more data
if self
.stream
.read_buf(&mut self.buffer)
.await
.map_err(|e| Error::Transport(Arc::new(e)))?
.is_none()
{
if !self.read_more().await? {
// Stream closed while we still need more data
return Err(Error::Decode(DecodeError::Short));
return Err(Error::Decode);
}
}
Err(e) => return Err(Error::Decode(e)),
Err(e) => return Err(e.into()),
}
}
}
Expand All @@ -55,11 +49,11 @@ impl<S: web_transport_trait::RecvStream, V> Reader<S, V> {
where
V: Clone,
{
match self.closed().await {
Ok(()) => Ok(None),
Err(Error::Decode(DecodeError::ExpectedEnd)) => Ok(Some(self.decode().await?)),
Err(e) => Err(e),
if !self.has_more().await? {
return Ok(None);
}

Ok(Some(self.decode().await?))
}

/// Decode the next message from the stream without consuming it.
Expand All @@ -73,18 +67,12 @@ impl<S: web_transport_trait::RecvStream, V> Reader<S, V> {
Ok(msg) => return Ok(msg),
Err(DecodeError::Short) => {
// Try to read more data
if self
.stream
.read_buf(&mut self.buffer)
.await
.map_err(|e| Error::Transport(Arc::new(e)))?
.is_none()
{
if !self.read_more().await? {
// Stream closed while we still need more data
return Err(Error::Decode(DecodeError::Short));
return Err(Error::Decode);
}
}
Err(e) => return Err(Error::Decode(e)),
Err(e) => return Err(e.into()),
}
}
}
Expand All @@ -97,10 +85,7 @@ impl<S: web_transport_trait::RecvStream, V> Reader<S, V> {
return Ok(Some(data));
}

self.stream
.read_chunk(max)
.await
.map_err(|e| Error::Transport(Arc::new(e)))
self.stream.read_chunk(max).await.map_err(Error::from_transport)
}

/// Read exactly the given number of bytes from the stream.
Expand All @@ -118,10 +103,11 @@ impl<S: web_transport_trait::RecvStream, V> Reader<S, V> {
buf.put(data);

while buf.has_remaining_mut() {
self.stream
.read_buf(&mut buf)
.await
.map_err(|e| Error::Transport(Arc::new(e)))?;
match self.stream.read_buf(&mut buf).await {
Ok(Some(_)) => {}
Ok(None) => return Err(Error::Decode),
Err(e) => return Err(Error::from_transport(e)),
}
}

Ok(buf.into_inner().freeze())
Expand All @@ -138,8 +124,8 @@ impl<S: web_transport_trait::RecvStream, V> Reader<S, V> {
.stream
.read_chunk(size)
.await
.map_err(|e| Error::Transport(Arc::new(e)))?
.ok_or(Error::Decode(DecodeError::Short))?;
.map_err(Error::from_transport)?
.ok_or(Error::Decode)?;
size -= chunk.len();
}

Expand All @@ -148,18 +134,29 @@ impl<S: web_transport_trait::RecvStream, V> Reader<S, V> {

/// Wait until the stream is closed, erroring if there are any additional bytes.
pub async fn closed(&mut self) -> Result<(), Error> {
if self.buffer.is_empty()
&& self
.stream
.read_buf(&mut self.buffer)
.await
.map_err(|e| Error::Transport(Arc::new(e)))?
.is_none()
{
return Ok(());
if self.has_more().await? {
return Err(Error::Decode);
}

Ok(())
}

/// Returns true if there is more data available in the buffer or stream.
async fn has_more(&mut self) -> Result<bool, Error> {
if !self.buffer.is_empty() {
return Ok(true);
}

Err(DecodeError::ExpectedEnd.into())
self.read_more().await
}

/// Try to read more data from the stream. Returns true if data was read, false if stream closed.
async fn read_more(&mut self) -> Result<bool, Error> {
match self.stream.read_buf(&mut self.buffer).await {
Ok(Some(_)) => Ok(true),
Ok(None) => Ok(false),
Err(e) => Err(Error::from_transport(e)),
}
}

/// Abort the stream with the given error.
Expand Down
9 changes: 2 additions & 7 deletions rs/moq-lite/src/coding/stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use crate::Error;
use crate::coding::{Reader, Writer};

Expand All @@ -15,7 +13,7 @@ impl<S: web_transport_trait::Session, V> Stream<S, V> {
where
V: Clone,
{
let (send, recv) = session.open_bi().await.map_err(|err| Error::Transport(Arc::new(err)))?;
let (send, recv) = session.open_bi().await.map_err(Error::from_transport)?;

let writer = Writer::new(send, version.clone());
let reader = Reader::new(recv, version);
Expand All @@ -28,10 +26,7 @@ impl<S: web_transport_trait::Session, V> Stream<S, V> {
where
V: Clone,
{
let (send, recv) = session
.accept_bi()
.await
.map_err(|err| Error::Transport(Arc::new(err)))?;
let (send, recv) = session.accept_bi().await.map_err(Error::from_transport)?;

let writer = Writer::new(send, version.clone());
let reader = Reader::new(recv, version);
Expand Down
14 changes: 5 additions & 9 deletions rs/moq-lite/src/coding/writer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::Debug, sync::Arc};
use std::fmt::Debug;

use crate::{Error, coding::*};

Expand Down Expand Up @@ -33,7 +33,7 @@ impl<S: web_transport_trait::SendStream, V> Writer<S, V> {
.unwrap()
.write_buf(&mut self.buffer)
.await
.map_err(|e| Error::Transport(Arc::new(e)))?;
.map_err(Error::from_transport)?;
}

Ok(())
Expand All @@ -46,7 +46,7 @@ impl<S: web_transport_trait::SendStream, V> Writer<S, V> {
.unwrap()
.write_buf(buf)
.await
.map_err(|e| Error::Transport(Arc::new(e)))
.map_err(Error::from_transport)
}

/// Write the entire [Buf] to the stream.
Expand All @@ -61,11 +61,7 @@ impl<S: web_transport_trait::SendStream, V> Writer<S, V> {

/// Mark the stream as finished.
pub fn finish(&mut self) -> Result<(), Error> {
self.stream
.as_mut()
.unwrap()
.finish()
.map_err(|e| Error::Transport(Arc::new(e)))
self.stream.as_mut().unwrap().finish().map_err(Error::from_transport)
}

/// Abort the stream with the given error.
Expand All @@ -80,7 +76,7 @@ impl<S: web_transport_trait::SendStream, V> Writer<S, V> {
.unwrap()
.closed()
.await
.map_err(|e| Error::Transport(Arc::new(e)))?;
.map_err(Error::from_transport)?;
Ok(())
}

Expand Down
94 changes: 70 additions & 24 deletions rs/moq-lite/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,30 @@
use std::sync::Arc;

use crate::coding;
use web_transport_trait::{MaybeSend, MaybeSync};

/// A trait that is Send+Sync except on WASM.
pub trait SendSyncError: std::error::Error + MaybeSend + MaybeSync {}

impl<T> SendSyncError for T where T: std::error::Error + MaybeSend + MaybeSync {}

/// A list of possible errors that can occur during the session.
#[derive(thiserror::Error, Debug, Clone)]
#[non_exhaustive]
pub enum Error {
#[error("transport error: {0}")]
Transport(Arc<dyn SendSyncError>),
#[error("transport error")]
Transport,

#[error("decode error: {0}")]
Decode(#[from] coding::DecodeError),
#[error("decode error")]
Decode,

// TODO move to a ConnectError
#[error("unsupported versions: client={0:?} server={1:?}")]
Version(coding::Versions, coding::Versions),
#[error("unsupported versions")]
Version,

/// A required extension was not present
#[error("extension required: {0}")]
RequiredExtension(u64),
#[error("extension required")]
RequiredExtension,

/// An unexpected stream type was received
#[error("unexpected stream type")]
UnexpectedStream,

/// Some VarInt was too large and we were too lazy to handle it
#[error("varint bounds exceeded")]
BoundsExceeded(#[from] coding::BoundsExceeded),
BoundsExceeded,

/// A duplicate ID was used
// The broadcast/track is a duplicate
Expand All @@ -53,7 +45,7 @@ pub enum Error {

// The application closes the stream with a code.
#[error("app code={0}")]
App(u32),
App(u16),

#[error("not found")]
NotFound,
Expand Down Expand Up @@ -91,15 +83,15 @@ impl Error {
pub fn to_code(&self) -> u32 {
match self {
Self::Cancel => 0,
Self::RequiredExtension(_) => 1,
Self::RequiredExtension => 1,
Self::Old => 2,
Self::Timeout => 3,
Self::Transport(_) => 4,
Self::Decode(_) => 5,
Self::Transport => 4,
Self::Decode => 5,
Self::Unauthorized => 6,
Self::Version(..) => 9,
Self::Version => 9,
Self::UnexpectedStream => 10,
Self::BoundsExceeded(_) => 11,
Self::BoundsExceeded => 11,
Self::Duplicate => 12,
Self::NotFound => 13,
Self::WrongSize => 14,
Expand All @@ -110,8 +102,62 @@ impl Error {
Self::TooManyParameters => 19,
Self::InvalidRole => 20,
Self::UnknownAlpn(_) => 21,
Self::App(app) => *app + 64,
Self::App(app) => *app as u32 + 64,
}
}

/// Decode an error from a wire code.
pub fn from_code(code: u32) -> Self {
match code {
0 => Self::Cancel,
1 => Self::RequiredExtension,
2 => Self::Old,
3 => Self::Timeout,
4 => Self::Transport,
5 => Self::Decode,
6 => Self::Unauthorized,
9 => Self::Version,
10 => Self::UnexpectedStream,
11 => Self::BoundsExceeded,
12 => Self::Duplicate,
13 => Self::NotFound,
14 => Self::WrongSize,
15 => Self::ProtocolViolation,
16 => Self::UnexpectedMessage,
17 => Self::Unsupported,
18 => Self::TooLarge,
19 => Self::TooManyParameters,
20 => Self::InvalidRole,
code if code >= 64 => match u16::try_from(code - 64) {
Ok(app) => Self::App(app),
Err(_) => Self::ProtocolViolation,
},
_ => Self::ProtocolViolation,
}
}

/// Convert a transport error into an [Error], decoding stream reset codes.
pub fn from_transport(err: impl web_transport_trait::Error) -> Self {
if let Some(code) = err.stream_error() {
return Self::from_code(code);
}

tracing::warn!(%err, "transport error");
Self::Transport
}
}

impl From<coding::DecodeError> for Error {
fn from(err: coding::DecodeError) -> Self {
tracing::warn!(%err, "decode error");
Error::Decode
}
}

impl From<coding::BoundsExceeded> for Error {
fn from(err: coding::BoundsExceeded) -> Self {
tracing::warn!(%err, "bounds exceeded");
Error::BoundsExceeded
}
}

Expand Down
2 changes: 1 addition & 1 deletion rs/moq-lite/src/ietf/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Control {

tracing::trace!(id = T::ID, size = buf.len(), hex = %hex::encode(&buf), "encoded control message");

self.tx.send(buf).map_err(|e| Error::Transport(Arc::new(e)))?;
self.tx.send(buf).map_err(|_| Error::Transport)?;
Ok(())
}

Expand Down
Loading
Loading