diff --git a/Cargo.lock b/Cargo.lock index ef4b4eb33..9c3657073 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2906,6 +2906,7 @@ name = "libmoq" version = "0.2.7" dependencies = [ "anyhow", + "bytes", "cbindgen", "hang", "moq-lite", diff --git a/rs/hang/examples/video.rs b/rs/hang/examples/video.rs index 220b161aa..020957727 100644 --- a/rs/hang/examples/video.rs +++ b/rs/hang/examples/video.rs @@ -1,5 +1,5 @@ // cargo run --example video -use moq_lite::coding::Bytes; +use bytes::Bytes; #[tokio::main] async fn main() -> anyhow::Result<()> { diff --git a/rs/hang/src/container/consumer.rs b/rs/hang/src/container/consumer.rs index 42c7ad14e..f35fe586e 100644 --- a/rs/hang/src/container/consumer.rs +++ b/rs/hang/src/container/consumer.rs @@ -2,7 +2,6 @@ use std::collections::VecDeque; use buf_list::BufList; use futures::{StreamExt, stream::FuturesUnordered}; -use moq_lite::coding::Decode; use super::{Frame, Timestamp}; use crate::Error; @@ -188,7 +187,7 @@ impl GroupReader { let mut payload = BufList::from_iter(payload); - let timestamp = Timestamp::decode(&mut payload, ())?; + let timestamp = Timestamp::decode(&mut payload)?; let frame = Frame { keyframe: (self.index == 0), diff --git a/rs/hang/src/container/frame.rs b/rs/hang/src/container/frame.rs index 26a31b148..83a7d114d 100644 --- a/rs/hang/src/container/frame.rs +++ b/rs/hang/src/container/frame.rs @@ -2,7 +2,6 @@ use bytes::{Buf, BytesMut}; use derive_more::Debug; pub use buf_list::BufList; -use moq_lite::{coding::Encode, lite}; use crate::Error; @@ -42,7 +41,7 @@ impl Frame { /// NOTE: The [Self::keyframe] flag is ignored for this method; you need to create a new group manually. pub fn encode(&self, group: &mut moq_lite::GroupProducer) -> Result<(), Error> { let mut header = BytesMut::new(); - self.timestamp.encode(&mut header, lite::Version::Draft02); + self.timestamp.encode(&mut header); let size = header.len() + self.payload.remaining(); diff --git a/rs/hang/src/error.rs b/rs/hang/src/error.rs index f5a228fd8..c6aecc4e0 100644 --- a/rs/hang/src/error.rs +++ b/rs/hang/src/error.rs @@ -10,10 +10,6 @@ pub enum Error { #[error("moq lite error: {0}")] Moq(#[from] moq_lite::Error), - /// Failed to decode a message at the MoQ transport layer. - #[error("decode error: {0}")] - Decode(#[from] moq_lite::coding::DecodeError), - /// JSON serialization/deserialization error. #[error("json error: {0}")] Json(Arc), diff --git a/rs/libmoq/Cargo.toml b/rs/libmoq/Cargo.toml index 4f7602434..60f6b9e73 100644 --- a/rs/libmoq/Cargo.toml +++ b/rs/libmoq/Cargo.toml @@ -18,6 +18,7 @@ crate-type = ["staticlib"] [dependencies] anyhow = { version = "1", features = ["backtrace"] } +bytes = "1" hang = { workspace = true } moq-lite = { workspace = true, features = ["serde"] } moq-mux = { workspace = true } diff --git a/rs/libmoq/src/consume.rs b/rs/libmoq/src/consume.rs index 37e34dfcd..676e96cd8 100644 --- a/rs/libmoq/src/consume.rs +++ b/rs/libmoq/src/consume.rs @@ -1,6 +1,6 @@ use std::ffi::c_char; -use moq_lite::coding::Buf; +use bytes::Buf; use tokio::sync::oneshot; use crate::ffi::OnStatus; diff --git a/rs/libmoq/src/publish.rs b/rs/libmoq/src/publish.rs index ca0b18761..e752daf4a 100644 --- a/rs/libmoq/src/publish.rs +++ b/rs/libmoq/src/publish.rs @@ -1,6 +1,6 @@ use std::{str::FromStr, sync::Arc}; -use moq_lite::coding::Buf; +use bytes::Buf; use moq_mux::import; use crate::{Error, Id, NonZeroSlab}; diff --git a/rs/moq-lite/src/coding/mod.rs b/rs/moq-lite/src/coding/mod.rs index c37a77a92..e04b66be3 100644 --- a/rs/moq-lite/src/coding/mod.rs +++ b/rs/moq-lite/src/coding/mod.rs @@ -17,6 +17,3 @@ pub use stream::*; pub use varint::*; pub use version::*; pub use writer::*; - -// Re-export the bytes crate -pub use bytes::*; diff --git a/rs/moq-lite/src/coding/reader.rs b/rs/moq-lite/src/coding/reader.rs index 1bacf6911..1531c49fc 100644 --- a/rs/moq-lite/src/coding/reader.rs +++ b/rs/moq-lite/src/coding/reader.rs @@ -1,6 +1,6 @@ use std::{cmp, fmt::Debug, io, sync::Arc}; -use bytes::{Buf, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use crate::{Error, coding::*}; diff --git a/rs/moq-lite/src/coding/version.rs b/rs/moq-lite/src/coding/version.rs index f6583aa83..ca8103438 100644 --- a/rs/moq-lite/src/coding/version.rs +++ b/rs/moq-lite/src/coding/version.rs @@ -6,10 +6,6 @@ use std::{fmt, ops::Deref}; #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Version(pub u64); -/// A version number negotiated during the setup. -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct Alpn(pub &'static str); - impl From for Version { fn from(v: u64) -> Self { Self(v) diff --git a/rs/moq-lite/src/error.rs b/rs/moq-lite/src/error.rs index a914f4eea..2ce9f990e 100644 --- a/rs/moq-lite/src/error.rs +++ b/rs/moq-lite/src/error.rs @@ -10,6 +10,7 @@ impl SendSyncError for T where T: std::error::Error + MaybeSend + MaybeSync { /// A list of possible errors that can occur during the session. #[derive(thiserror::Error, Debug, Clone)] +#[non_exhaustive] pub enum Error { #[error("transport error: {0}")] Transport(Arc), diff --git a/rs/moq-lite/src/ietf/fetch.rs b/rs/moq-lite/src/ietf/fetch.rs index ee6c73eed..bb71eef1a 100644 --- a/rs/moq-lite/src/ietf/fetch.rs +++ b/rs/moq-lite/src/ietf/fetch.rs @@ -312,23 +312,6 @@ impl Decode for FetchHeader { } } -/// Fetch object serialization (v14 format). -/// v15 adds SerializationFlags for delta encoding but we skip that for now. -pub struct FetchObject { - /* - v14: - Group ID (i), - Subgroup ID (i), - Object ID (i), - Publisher Priority (8), - Extension Headers Length (i), - [Extension headers (...)], - Object Payload Length (i), - [Object Status (i)], - Object Payload (..), - */ -} - #[cfg(test)] mod tests { use super::*; diff --git a/rs/moq-lite/src/ietf/mod.rs b/rs/moq-lite/src/ietf/mod.rs index 1d3a35d3a..8de6f9aed 100644 --- a/rs/moq-lite/src/ietf/mod.rs +++ b/rs/moq-lite/src/ietf/mod.rs @@ -17,7 +17,6 @@ mod publish_namespace; mod publisher; mod request; mod session; -mod setup; mod subscribe; mod subscribe_namespace; mod subscriber; @@ -36,7 +35,6 @@ pub use publish_namespace::*; use publisher::*; pub use request::*; pub(crate) use session::*; -pub use setup::*; pub use subscribe::*; pub use subscribe_namespace::*; use subscriber::*; diff --git a/rs/moq-lite/src/ietf/parameters.rs b/rs/moq-lite/src/ietf/parameters.rs index a2ef9b373..7b966eb4b 100644 --- a/rs/moq-lite/src/ietf/parameters.rs +++ b/rs/moq-lite/src/ietf/parameters.rs @@ -137,6 +137,7 @@ impl Parameters { self.vars.insert(kind, value); } + #[cfg(test)] pub fn get_bytes(&self, kind: ParameterBytes) -> Option<&[u8]> { self.bytes.get(&kind).map(|v| v.as_slice()) } @@ -246,10 +247,10 @@ impl Encode for MessageParameters { impl MessageParameters { // Varint parameter IDs (even) - const DELIVERY_TIMEOUT: u64 = 0x02; - const MAX_CACHE_DURATION: u64 = 0x04; - const EXPIRES: u64 = 0x08; - const PUBLISHER_PRIORITY: u64 = 0x0E; + //const DELIVERY_TIMEOUT: u64 = 0x02; + //const MAX_CACHE_DURATION: u64 = 0x04; + //const EXPIRES: u64 = 0x08; + //const PUBLISHER_PRIORITY: u64 = 0x0E; const FORWARD: u64 = 0x10; const SUBSCRIBER_PRIORITY: u64 = 0x20; const GROUP_ORDER: u64 = 0x22; @@ -262,6 +263,7 @@ impl MessageParameters { // --- Varint accessors --- + /* pub fn delivery_timeout(&self) -> Option { self.vars.get(&Self::DELIVERY_TIMEOUT).copied() } @@ -293,6 +295,7 @@ impl MessageParameters { pub fn set_publisher_priority(&mut self, v: u8) { self.vars.insert(Self::PUBLISHER_PRIORITY, v as u64); } + */ pub fn forward(&self) -> Option { self.vars.get(&Self::FORWARD).map(|v| *v != 0) diff --git a/rs/moq-lite/src/ietf/setup.rs b/rs/moq-lite/src/ietf/setup.rs deleted file mode 100644 index 24ef5505a..000000000 --- a/rs/moq-lite/src/ietf/setup.rs +++ /dev/null @@ -1,172 +0,0 @@ -use crate::{ - coding::*, - ietf::{Message, Parameters, Version as IetfVersion}, -}; - -/// Sent by the client to setup the session. -#[derive(Debug, Clone)] -pub struct ClientSetup { - /// The list of supported versions in preferred order. - pub versions: Versions, - - /// Extensions. - pub parameters: Parameters, -} - -impl Message for ClientSetup { - const ID: u64 = 0x20; - - /// Decode a client setup message. - fn decode_msg(r: &mut R, version: IetfVersion) -> Result { - match version { - IetfVersion::Draft14 => { - let versions = Versions::decode(r, version)?; - let parameters = Parameters::decode(r, version)?; - Ok(Self { versions, parameters }) - } - IetfVersion::Draft15 | IetfVersion::Draft16 => { - // Draft15+: no versions list, just parameters - let parameters = Parameters::decode(r, version)?; - Ok(Self { - versions: vec![Version(version as u64)].into(), - parameters, - }) - } - } - } - - /// Encode a client setup message. - fn encode_msg(&self, w: &mut W, version: IetfVersion) { - match version { - IetfVersion::Draft14 => { - self.versions.encode(w, version); - self.parameters.encode(w, version); - } - IetfVersion::Draft15 | IetfVersion::Draft16 => { - // Draft15+: no versions list, just parameters - self.parameters.encode(w, version); - } - } - } -} - -/// Sent by the server in response to a client setup. -#[derive(Debug, Clone)] -pub struct ServerSetup { - /// The selected version. - pub version: Version, - - /// Supported extensions. - pub parameters: Parameters, -} - -impl Message for ServerSetup { - const ID: u64 = 0x21; - - fn encode_msg(&self, w: &mut W, version: IetfVersion) { - match version { - IetfVersion::Draft14 => { - self.version.encode(w, version); - self.parameters.encode(w, version); - } - IetfVersion::Draft15 | IetfVersion::Draft16 => { - // Draft15+: no version field, just parameters - self.parameters.encode(w, version); - } - } - } - - fn decode_msg(r: &mut R, version: IetfVersion) -> Result { - match version { - IetfVersion::Draft14 => { - let selected = Version::decode(r, version)?; - let parameters = Parameters::decode(r, version)?; - Ok(Self { - version: selected, - parameters, - }) - } - IetfVersion::Draft15 | IetfVersion::Draft16 => { - // Draft15+: no version field, just parameters - let parameters = Parameters::decode(r, version)?; - Ok(Self { - version: Version(version as u64), - parameters, - }) - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use bytes::BytesMut; - - fn encode_message(msg: &M, version: IetfVersion) -> Vec { - let mut buf = BytesMut::new(); - msg.encode_msg(&mut buf, version); - buf.to_vec() - } - - fn decode_message(bytes: &[u8], version: IetfVersion) -> Result { - let mut buf = bytes::Bytes::from(bytes.to_vec()); - M::decode_msg(&mut buf, version) - } - - #[test] - fn test_client_setup_v14_round_trip() { - let msg = ClientSetup { - versions: vec![Version(IetfVersion::Draft14 as u64)].into(), - parameters: Parameters::default(), - }; - - let encoded = encode_message(&msg, IetfVersion::Draft14); - let decoded: ClientSetup = decode_message(&encoded, IetfVersion::Draft14).unwrap(); - - assert_eq!(decoded.versions.len(), 1); - assert_eq!(decoded.versions[0], Version(IetfVersion::Draft14 as u64)); - } - - #[test] - fn test_client_setup_v15_round_trip() { - let msg = ClientSetup { - versions: vec![Version(IetfVersion::Draft15 as u64)].into(), - parameters: Parameters::default(), - }; - - let encoded = encode_message(&msg, IetfVersion::Draft15); - let decoded: ClientSetup = decode_message(&encoded, IetfVersion::Draft15).unwrap(); - - // v15 doesn't encode versions, so decoded should have [Draft15] - assert_eq!(decoded.versions.len(), 1); - assert_eq!(decoded.versions[0], Version(IetfVersion::Draft15 as u64)); - } - - #[test] - fn test_server_setup_v14_round_trip() { - let msg = ServerSetup { - version: Version(IetfVersion::Draft14 as u64), - parameters: Parameters::default(), - }; - - let encoded = encode_message(&msg, IetfVersion::Draft14); - let decoded: ServerSetup = decode_message(&encoded, IetfVersion::Draft14).unwrap(); - - assert_eq!(decoded.version, Version(IetfVersion::Draft14 as u64)); - } - - #[test] - fn test_server_setup_v15_round_trip() { - let msg = ServerSetup { - version: Version(IetfVersion::Draft15 as u64), - parameters: Parameters::default(), - }; - - let encoded = encode_message(&msg, IetfVersion::Draft15); - let decoded: ServerSetup = decode_message(&encoded, IetfVersion::Draft15).unwrap(); - - // v15 doesn't encode version, so decoded should be Draft15 - assert_eq!(decoded.version, Version(IetfVersion::Draft15 as u64)); - } -} diff --git a/rs/moq-lite/src/lib.rs b/rs/moq-lite/src/lib.rs index e4ee5d383..179576808 100644 --- a/rs/moq-lite/src/lib.rs +++ b/rs/moq-lite/src/lib.rs @@ -3,8 +3,7 @@ //! `moq-lite` is designed for real-time live media delivery with sub-second latency at massive scale. //! This is a simplified subset of the *official* Media over QUIC (MoQ) transport, focusing on the practical features. //! -//! **NOTE**: While compatible with a subset of the IETF MoQ specification (see [ietf::Version]), many features are not supported on purpose. -//! Additionally, the IETF standard is immature and up to interpretation, so many implementations are not compatible anyway. +//! **NOTE**: While compatible with a subset of the IETF MoQ specification, many features are not supported on purpose. //! I highly highly highly recommend using `moq-lite` instead of the IETF standard until at least draft-30. //! //! ## API @@ -14,28 +13,13 @@ //! - [Broadcast]: A collection of [Track]s, produced by a single publisher. //! - [Track]: A collection of [Group]s, delivered out-of-order until expired. //! - [Group]: A collection of [Frame]s, delivered in order until cancelled. -//! -//! To publish media, create: -//! - [Origin::produce] to get an [OriginProducer] and [OriginConsumer] pair. -//! - [OriginProducer::create_broadcast] to create a [BroadcastProducer]. -//! - [BroadcastProducer::create_track] to create a [TrackProducer] for each track. -//! - [TrackProducer::append_group] for each Group of Pictures (GOP) or audio frames. -//! - [GroupProducer::write_frame] to write each encoded frame in the group. -//! -//! To consume media, create: -//! - [Origin::produce] to get an [OriginProducer] and [OriginConsumer] pair. -//! - [OriginConsumer::announced] to discover new [BroadcastConsumer]s as they're announced. -//! - [BroadcastConsumer::subscribe_track] to get a [TrackConsumer] for a specific track. -//! - [TrackConsumer::next_group] to receive the next available group. -//! - [GroupConsumer::read_frame] to read each frame in the group. -//! -//! ## Advanced Usage -//! -//! - Use [FrameProducer] and [FrameConsumer] for chunked frame writes/reads without allocating entire frames (useful for relaying). -//! - Use [TrackProducer::create_group] instead of [TrackProducer::append_group] to produce groups out-of-order. +//! - [Frame]: Chunks of data with an upfront size. mod client; +mod coding; mod error; +mod ietf; +mod lite; mod model; mod path; mod server; @@ -43,10 +27,6 @@ mod session; mod setup; mod version; -pub mod coding; -pub mod ietf; -pub mod lite; - pub use client::*; pub use error::*; pub use model::*; @@ -54,3 +34,6 @@ pub use path::*; pub use server::*; pub use session::*; pub use version::*; + +// Re-export the bytes crate +pub use bytes; diff --git a/rs/moq-lite/src/lite/mod.rs b/rs/moq-lite/src/lite/mod.rs index 1390ac283..37db57de1 100644 --- a/rs/moq-lite/src/lite/mod.rs +++ b/rs/moq-lite/src/lite/mod.rs @@ -12,7 +12,6 @@ mod parameters; mod priority; mod publisher; mod session; -mod setup; mod stream; mod subscribe; mod subscriber; @@ -25,7 +24,6 @@ pub use message::*; pub use parameters::*; use publisher::*; pub(super) use session::*; -pub use setup::*; pub use stream::*; pub use subscribe::*; use subscriber::*; diff --git a/rs/moq-lite/src/lite/setup.rs b/rs/moq-lite/src/lite/setup.rs deleted file mode 100644 index 2a649e63e..000000000 --- a/rs/moq-lite/src/lite/setup.rs +++ /dev/null @@ -1,54 +0,0 @@ -use crate::{ - coding::*, - lite::{self, Message, Parameters}, -}; - -/// Sent by the client to setup the session. -#[derive(Debug, Clone)] -pub struct ClientSetup { - /// The list of supported versions in preferred order. - pub versions: Versions, - - /// Extensions. - pub parameters: Parameters, -} - -impl Message for ClientSetup { - /// Decode a client setup message. - fn decode_msg(r: &mut R, version: lite::Version) -> Result { - let versions = Versions::decode(r, version)?; - let parameters = Parameters::decode(r, version)?; - - Ok(Self { versions, parameters }) - } - - /// Encode a client setup message. - fn encode_msg(&self, w: &mut W, version: lite::Version) { - self.versions.encode(w, version); - self.parameters.encode(w, version); - } -} - -/// Sent by the server in response to a client setup. -#[derive(Debug, Clone)] -pub struct ServerSetup { - /// The list of supported versions in preferred order. - pub version: Version, - - /// Supported extensions. - pub parameters: Parameters, -} - -impl Message for ServerSetup { - fn encode_msg(&self, w: &mut W, version: lite::Version) { - self.version.encode(w, version); - self.parameters.encode(w, version); - } - - fn decode_msg(r: &mut R, version: lite::Version) -> Result { - let version = Version::decode(r, version)?; - let parameters = Parameters::decode(r, version)?; - - Ok(Self { version, parameters }) - } -} diff --git a/rs/moq-lite/src/model/frame.rs b/rs/moq-lite/src/model/frame.rs index 7d0cf8835..a126f54c3 100644 --- a/rs/moq-lite/src/model/frame.rs +++ b/rs/moq-lite/src/model/frame.rs @@ -6,6 +6,9 @@ use tokio::sync::watch; use crate::{Error, Result}; /// A chunk of data with an upfront size. +/// +/// Note that this is just the header. +/// You use [FrameProducer] and [FrameConsumer] to deal with the frame payload, potentially chunked. #[derive(Clone, Debug)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct Frame { diff --git a/rs/moq-lite/src/model/time.rs b/rs/moq-lite/src/model/time.rs index 442cabcf5..e9ca1709a 100644 --- a/rs/moq-lite/src/model/time.rs +++ b/rs/moq-lite/src/model/time.rs @@ -1,5 +1,6 @@ use rand::Rng; +use crate::Error; use crate::coding::{Decode, DecodeError, Encode, VarInt}; use std::sync::LazyLock; @@ -187,6 +188,15 @@ impl Timescale { None => Err(TimeOverflow), } } + + pub fn encode(&self, w: &mut W) { + self.0.encode(w, ()); + } + + pub fn decode(r: &mut R) -> Result { + let v = VarInt::decode(r, ())?; + Ok(Self(v)) + } } impl TryFrom for Timescale { diff --git a/rs/moq-lite/src/model/track.rs b/rs/moq-lite/src/model/track.rs index 7f0d715e4..14c18505b 100644 --- a/rs/moq-lite/src/model/track.rs +++ b/rs/moq-lite/src/model/track.rs @@ -22,6 +22,7 @@ use std::{collections::VecDeque, future::Future}; const MAX_CACHE: std::time::Duration = std::time::Duration::from_secs(30); +/// A track is a collection of groups, delivered out-of-order until expired. #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct Track { diff --git a/rs/moq-lite/src/version.rs b/rs/moq-lite/src/version.rs index 3a9ffc421..aefc36034 100644 --- a/rs/moq-lite/src/version.rs +++ b/rs/moq-lite/src/version.rs @@ -9,32 +9,16 @@ pub(crate) const NEGOTIATED: [Version; 3] = [ Version::Ietf(ietf::Version::Draft14), ]; -/// The ALPN strings for supported versions. -const ALPNS: [&str; 4] = [lite::ALPN, ietf::ALPN_14, ietf::ALPN_15, ietf::ALPN_16]; - -// Return the ALPN strings for supported versions. -// This is a function so we can avoid semver bumps. -pub fn alpns() -> &'static [&'static str] { - &ALPNS -} +/// ALPN strings for supported versions. +pub const ALPNS: &[&str] = &[lite::ALPN, ietf::ALPN_14, ietf::ALPN_15, ietf::ALPN_16]; // A combination of ietf::Version and lite::Version. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum Version { +pub(crate) enum Version { Ietf(ietf::Version), Lite(lite::Version), } -impl Version { - pub fn is_ietf(self) -> bool { - matches!(self, Self::Ietf(_)) - } - - pub fn is_lite(self) -> bool { - matches!(self, Self::Lite(_)) - } -} - impl From for Version { fn from(value: ietf::Version) -> Self { Self::Ietf(value) diff --git a/rs/moq-native/src/client.rs b/rs/moq-native/src/client.rs index a68792fa5..fcbd3591a 100644 --- a/rs/moq-native/src/client.rs +++ b/rs/moq-native/src/client.rs @@ -4,8 +4,6 @@ use anyhow::Context; use std::path::PathBuf; use std::{net, sync::Arc}; use url::Url; -#[cfg(feature = "iroh")] -use web_transport_iroh::iroh; /// TLS configuration for the client. #[derive(Clone, Default, Debug, clap::Args, serde::Serialize, serde::Deserialize)] @@ -90,18 +88,13 @@ pub struct Client { moq: moq_lite::Client, #[cfg(feature = "websocket")] websocket: super::ClientWebSocket, - inner: ClientInner, tls: rustls::ClientConfig, - #[cfg(feature = "iroh")] - iroh: Option, -} - -#[derive(Clone)] -enum ClientInner { #[cfg(feature = "quinn")] - Quinn(crate::quinn::QuinnClient), + quinn: Option, #[cfg(feature = "quiche")] - Quiche(crate::quiche::QuicheClient), + quiche: Option, + #[cfg(feature = "iroh")] + iroh: Option, } impl Client { @@ -114,11 +107,16 @@ impl Client { #[cfg(any(feature = "quinn", feature = "quiche"))] pub fn new(config: ClientConfig) -> anyhow::Result { let backend = config.backend.clone().unwrap_or({ - if cfg!(feature = "quinn") { + #[cfg(feature = "quinn")] + { QuicBackend::Quinn - } else { + } + #[cfg(all(feature = "quiche", not(feature = "quinn")))] + { QuicBackend::Quiche } + #[cfg(all(not(feature = "quiche"), not(feature = "quinn")))] + panic!("no QUIC backend compiled; enable quinn or quiche feature"); }); let provider = crypto::provider(); @@ -167,21 +165,16 @@ impl Client { tls.dangerous().set_certificate_verifier(Arc::new(noop)); } - let inner = match backend { - QuicBackend::Quinn => { - #[cfg(not(feature = "quinn"))] - anyhow::bail!("quinn backend not compiled; rebuild with --features quinn"); - - #[cfg(feature = "quinn")] - ClientInner::Quinn(crate::quinn::QuinnClient::new(&config)?) - } - QuicBackend::Quiche => { - #[cfg(not(feature = "quiche"))] - anyhow::bail!("quiche backend not compiled; rebuild with --features quiche"); + #[cfg(feature = "quinn")] + let quinn = match backend { + QuicBackend::Quinn => Some(crate::quinn::QuinnClient::new(&config)?), + _ => None, + }; - #[cfg(feature = "quiche")] - ClientInner::Quiche(crate::quiche::QuicheClient::new(&config)?) - } + #[cfg(feature = "quiche")] + let quiche = match backend { + QuicBackend::Quiche => Some(crate::quiche::QuicheClient::new(&config)?), + _ => None, }; Ok(Self { @@ -189,14 +182,17 @@ impl Client { #[cfg(feature = "websocket")] websocket: config.websocket, tls, - inner, + #[cfg(feature = "quinn")] + quinn, + #[cfg(feature = "quiche")] + quiche, #[cfg(feature = "iroh")] iroh: None, }) } #[cfg(feature = "iroh")] - pub fn with_iroh(mut self, iroh: Option) -> Self { + pub fn with_iroh(mut self, iroh: Option) -> Self { self.iroh = iroh; self } @@ -220,94 +216,72 @@ impl Client { pub async fn connect(&self, url: Url) -> anyhow::Result { #[cfg(feature = "iroh")] if crate::iroh::is_iroh_url(&url) { - let session = self.connect_iroh(url).await?; + let endpoint = self.iroh.as_ref().context("Iroh support is not enabled")?; + let session = crate::iroh::connect(endpoint, url).await?; let session = self.moq.connect(session).await?; return Ok(session); } - match self.inner { - #[cfg(feature = "quinn")] - ClientInner::Quinn(ref quinn) => { - let tls = self.tls.clone(); - let quic_url = url.clone(); - let quic_handle = async { - let res = quinn.connect(&tls, quic_url).await; - if let Err(err) = &res { - tracing::warn!(%err, "QUIC connection failed"); - } - res - }; - - #[cfg(feature = "websocket")] - { - let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url); - - Ok(tokio::select! { - Ok(quic) = quic_handle => self.moq.connect(quic).await?, - Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?, - else => anyhow::bail!("failed to connect to server"), - }) + #[cfg(feature = "quinn")] + if let Some(quinn) = self.quinn.as_ref() { + let tls = self.tls.clone(); + let quic_url = url.clone(); + let quic_handle = async { + let res = quinn.connect(&tls, quic_url).await; + if let Err(err) = &res { + tracing::warn!(%err, "QUIC connection failed"); } + res + }; - #[cfg(not(feature = "websocket"))] - { - let session = quic_handle.await?; - Ok(self.moq.connect(session).await?) - } + #[cfg(feature = "websocket")] + { + let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url); + + return Ok(tokio::select! { + Ok(quic) = quic_handle => self.moq.connect(quic).await?, + Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?, + else => anyhow::bail!("failed to connect to server"), + }); } - #[cfg(feature = "quiche")] - ClientInner::Quiche(ref quiche) => { - let quic_url = url.clone(); - let quic_handle = async { - let res = quiche.connect(quic_url).await; - if let Err(err) = &res { - tracing::warn!(%err, "QUIC connection failed"); - } - res - }; - - #[cfg(feature = "websocket")] - { - let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url); - - Ok(tokio::select! { - Ok(quic) = quic_handle => self.moq.connect(quic).await?, - Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?, - else => anyhow::bail!("failed to connect to server"), - }) - } - #[cfg(not(feature = "websocket"))] - { - let session = quic_handle.await?; - Ok(self.moq.connect(session).await?) - } + #[cfg(not(feature = "websocket"))] + { + let session = quic_handle.await?; + return Ok(self.moq.connect(session).await?); } } - } - #[cfg(feature = "iroh")] - async fn connect_iroh(&self, url: Url) -> anyhow::Result { - let endpoint = self.iroh.as_ref().context("Iroh support is not enabled")?; - // TODO Support multiple ALPNs - let alpn = match url.scheme() { - "moql+iroh" | "iroh" => moq_lite::lite::ALPN, - "moqt+iroh" => moq_lite::ietf::ALPN_14, - "moqt-15+iroh" => moq_lite::ietf::ALPN_15, - "h3+iroh" => web_transport_iroh::ALPN_H3, - _ => anyhow::bail!("Invalid URL: unknown scheme"), - }; - let host = url.host().context("Invalid URL: missing host")?.to_string(); - let endpoint_id: iroh::EndpointId = host.parse().context("Invalid URL: host is not an iroh endpoint id")?; - let conn = endpoint.connect(endpoint_id, alpn.as_bytes()).await?; - let session = match alpn { - web_transport_iroh::ALPN_H3 => { - let url = url_set_scheme(url, "https")?; - web_transport_iroh::Session::connect_h3(conn, url).await? + #[cfg(feature = "quiche")] + if let Some(quiche) = self.quiche.as_ref() { + let quic_url = url.clone(); + let quic_handle = async { + let res = quiche.connect(quic_url).await; + if let Err(err) = &res { + tracing::warn!(%err, "QUIC connection failed"); + } + res + }; + + #[cfg(feature = "websocket")] + { + let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url); + + return Ok(tokio::select! { + Ok(quic) = quic_handle => self.moq.connect(quic).await?, + Some(Ok(ws)) = ws_handle => self.moq.connect(ws).await?, + else => anyhow::bail!("failed to connect to server"), + }); } - _ => web_transport_iroh::Session::raw(conn), - }; - Ok(session) + + #[cfg(not(feature = "websocket"))] + { + let session = quic_handle.await?; + return Ok(self.moq.connect(session).await?); + } + } + + anyhow::bail!("no QUIC backend compiled; enable quinn or quiche feature"); } } @@ -351,23 +325,6 @@ impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification { } } -/// Returns a new URL with a changed scheme. -/// -/// [`Url::set_scheme`] returns an error if the scheme change is not valid according to -/// [the URL specification's section on legal scheme state overrides](https://url.spec.whatwg.org/#scheme-state). -/// -/// This function allows all scheme changes, as long as the resulting URL is valid. -#[cfg(feature = "iroh")] -fn url_set_scheme(url: Url, scheme: &str) -> anyhow::Result { - let url = format!( - "{}:{}", - scheme, - url.to_string().split_once(":").context("invalid URL")?.1 - ) - .parse()?; - Ok(url) -} - #[cfg(test)] mod tests { use super::*; diff --git a/rs/moq-native/src/iroh.rs b/rs/moq-native/src/iroh.rs index 234770b87..a51c40f44 100644 --- a/rs/moq-native/src/iroh.rs +++ b/rs/moq-native/src/iroh.rs @@ -67,7 +67,7 @@ impl IrohEndpointConfig { }; let mut alpns = vec![web_transport_iroh::ALPN_H3.as_bytes().to_vec()]; - for alpn in moq_lite::alpns() { + for alpn in moq_lite::ALPNS { alpns.push(alpn.as_bytes().to_vec()); } @@ -118,7 +118,7 @@ impl IrohRequest { request: Box::new(request), }) } - alpn if moq_lite::alpns().contains(&alpn) => Ok(Self::Quic { connection: conn }), + alpn if moq_lite::ALPNS.contains(&alpn) => Ok(Self::Quic { connection: conn }), _ => Err(anyhow::anyhow!("unsupported ALPN: {alpn}")), } } @@ -149,3 +149,49 @@ impl IrohRequest { } } } + +pub(crate) async fn connect(endpoint: &IrohEndpoint, url: Url) -> anyhow::Result { + let host = url.host().context("Invalid URL: missing host")?.to_string(); + let endpoint_id: iroh::EndpointId = host.parse().context("Invalid URL: host is not an iroh endpoint id")?; + + // We need to use this API to provide multiple ALPNs + let alpn = b"h3"; + let opts = iroh::endpoint::ConnectOptions::new() + .with_additional_alpns(moq_lite::ALPNS.iter().map(|alpn| alpn.as_bytes().to_vec()).collect()); + + let mut connecting = endpoint.connect_with_opts(endpoint_id, alpn, opts).await?; + let alpn = connecting.alpn().await?; + let alpn = String::from_utf8(alpn).context("failed to decode ALPN")?; + + let session = match alpn.as_str() { + web_transport_iroh::ALPN_H3 => { + let conn = connecting.await?; + let url = url_set_scheme(url, "https")?; + web_transport_iroh::Session::connect_h3(conn, url).await? + } + alpn if moq_lite::ALPNS.contains(&alpn) => { + let conn = connecting.await?; + // TODO: Add support for ALPNs. + web_transport_iroh::Session::raw(conn) + } + _ => anyhow::bail!("unsupported ALPN: {alpn}"), + }; + + Ok(session) +} + +/// Returns a new URL with a changed scheme. +/// +/// [`Url::set_scheme`] returns an error if the scheme change is not valid according to +/// [the URL specification's section on legal scheme state overrides](https://url.spec.whatwg.org/#scheme-state). +/// +/// This function allows all scheme changes, as long as the resulting URL is valid. +fn url_set_scheme(url: Url, scheme: &str) -> anyhow::Result { + let url = format!( + "{}:{}", + scheme, + url.to_string().split_once(":").context("invalid URL")?.1 + ) + .parse()?; + Ok(url) +} diff --git a/rs/moq-native/src/lib.rs b/rs/moq-native/src/lib.rs index d87e566f8..ee6e167b6 100644 --- a/rs/moq-native/src/lib.rs +++ b/rs/moq-native/src/lib.rs @@ -44,6 +44,11 @@ pub use iroh::*; #[derive(Clone, Debug, clap::ValueEnum, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "lowercase")] pub enum QuicBackend { + /// [web-transport-quinn](https://crates.io/crates/web-transport-quinn) + #[cfg(feature = "quinn")] Quinn, + + /// [web-transport-quiche](https://crates.io/crates/web-transport-quiche) + #[cfg(feature = "quiche")] Quiche, } diff --git a/rs/moq-native/src/quiche.rs b/rs/moq-native/src/quiche.rs index 83d561018..2157fff75 100644 --- a/rs/moq-native/src/quiche.rs +++ b/rs/moq-native/src/quiche.rs @@ -41,7 +41,7 @@ impl QuicheClient { let alpns = match url.scheme() { "https" => vec![web_transport_quiche::ALPN.as_bytes().to_vec()], - "moqt" | "moql" => moq_lite::alpns().iter().map(|alpn| alpn.as_bytes().to_vec()).collect(), + "moqt" | "moql" => moq_lite::ALPNS.iter().map(|alpn| alpn.as_bytes().to_vec()).collect(), _ => anyhow::bail!("url scheme must be 'https', 'moqt', or 'moql'"), }; @@ -56,7 +56,7 @@ impl QuicheClient { tracing::debug!(%url, "connecting via quiche"); let mut request = web_transport_quiche::proto::ConnectRequest::new(url.clone()); - for alpn in moq_lite::alpns() { + for alpn in moq_lite::ALPNS { request = request.with_protocol(alpn.to_string()); } @@ -135,7 +135,7 @@ impl QuicheServer { })); let mut alpns = vec![b"h3".to_vec()]; - for alpn in moq_lite::alpns() { + for alpn in moq_lite::ALPNS { alpns.push(alpn.as_bytes().to_vec()); } @@ -259,7 +259,7 @@ impl QuicheRequest { .context("failed to accept WebTransport request")?; Ok(Self::WebTransport { request }) } - alpn if moq_lite::alpns().contains(&alpn) => Ok(Self::Raw { + alpn if moq_lite::ALPNS.contains(&alpn) => Ok(Self::Raw { connection: conn, request: ConnectRequest::new("moqt://".to_string().parse::().unwrap()), response: web_transport_quiche::proto::ConnectResponse::OK.with_protocol(alpn), diff --git a/rs/moq-native/src/quinn.rs b/rs/moq-native/src/quinn.rs index 673ac41ee..e33ad5a61 100644 --- a/rs/moq-native/src/quinn.rs +++ b/rs/moq-native/src/quinn.rs @@ -81,7 +81,7 @@ impl QuinnClient { let alpns = match url.scheme() { "https" => vec![web_transport_quinn::ALPN.as_bytes().to_vec()], - "moqt" | "moql" => moq_lite::alpns().iter().map(|alpn| alpn.as_bytes().to_vec()).collect(), + "moqt" | "moql" => moq_lite::ALPNS.iter().map(|alpn| alpn.as_bytes().to_vec()).collect(), _ => anyhow::bail!("url scheme must be 'https', 'moqt', or 'moql'"), }; @@ -98,7 +98,7 @@ impl QuinnClient { tracing::Span::current().record("id", connection.stable_id()); let mut request = web_transport_quinn::proto::ConnectRequest::new(url.clone()); - for alpn in moq_lite::alpns() { + for alpn in moq_lite::ALPNS { request = request.with_protocol(alpn.to_string()); } @@ -210,7 +210,7 @@ impl QuinnServer { .with_cert_resolver(certs.clone()); let mut alpns = vec![web_transport_quinn::ALPN.as_bytes().to_vec()]; - for alpn in moq_lite::alpns() { + for alpn in moq_lite::ALPNS { alpns.push(alpn.as_bytes().to_vec()); } @@ -313,7 +313,7 @@ impl QuinnRequest { .context("failed to receive WebTransport request")?; Ok(Self::WebTransport { request }) } - alpn if moq_lite::alpns().contains(&alpn) => { + alpn if moq_lite::ALPNS.contains(&alpn) => { let url = format!("moqt://{}", host).parse::().unwrap(); let request = web_transport_quinn::proto::ConnectRequest::new(url); let response = web_transport_quinn::proto::ConnectResponse::OK.with_protocol(alpn); diff --git a/rs/moq-native/src/server.rs b/rs/moq-native/src/server.rs index 8d919a400..1132efac0 100644 --- a/rs/moq-native/src/server.rs +++ b/rs/moq-native/src/server.rs @@ -106,25 +106,28 @@ pub struct Server { impl Server { pub fn new(config: ServerConfig) -> anyhow::Result { let backend = config.backend.clone().unwrap_or({ - if cfg!(feature = "quinn") { + #[cfg(feature = "quinn")] + { QuicBackend::Quinn - } else if cfg!(feature = "quiche") { + } + #[cfg(all(feature = "quiche", not(feature = "quinn")))] + { QuicBackend::Quiche - } else { - anyhow::bail!("no QUIC backend compiled; enable quinn or quiche feature"); } + #[cfg(all(not(feature = "quiche"), not(feature = "quinn")))] + panic!("no QUIC backend compiled; enable quinn or quiche feature"); }); #[cfg(feature = "quinn")] let quinn = match backend { QuicBackend::Quinn => Some(crate::quinn::QuinnServer::new(config.clone())?), - QuicBackend::Quiche => None, + _ => None, }; #[cfg(feature = "quiche")] let quiche = match backend { QuicBackend::Quiche => Some(crate::quiche::QuicheServer::new(config)?), - QuicBackend::Quinn => None, + _ => None, }; Ok(Server { @@ -305,6 +308,10 @@ pub(crate) enum RequestKind { Iroh(crate::iroh::IrohRequest), } +/// An incoming MoQ session that can be accepted or rejected. +/// +/// [Self::with_publish] and [Self::with_consume] will configure what will be published and consumed from the session respectively. +/// Otherwise, the Server's configuration is used by default. pub struct Request { server: moq_lite::Server, kind: RequestKind, @@ -338,11 +345,13 @@ impl Request { } } + /// Publish the given origin to the session. pub fn with_publish(mut self, publish: impl Into>) -> Self { self.server = self.server.with_publish(publish); self } + /// Consume the given origin from the session. pub fn with_consume(mut self, consume: impl Into>) -> Self { self.server = self.server.with_consume(consume); self