diff --git a/Cargo.lock b/Cargo.lock index 872280d..2a36338 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1288,6 +1288,7 @@ name = "moq-relay-ietf" version = "0.7.5" dependencies = [ "anyhow", + "async-trait", "axum", "clap", "env_logger", diff --git a/moq-relay-ietf/Cargo.toml b/moq-relay-ietf/Cargo.toml index cd3e4d8..8eb693a 100644 --- a/moq-relay-ietf/Cargo.toml +++ b/moq-relay-ietf/Cargo.toml @@ -11,6 +11,14 @@ edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] categories = ["multimedia", "network-programming", "web-programming"] +[lib] +name = "moq_relay_ietf" +path = "src/lib.rs" + +[[bin]] +name = "moq-relay-ietf" +path = "src/main.rs" + [dependencies] moq-transport = { path = "../moq-transport", version = "0.11" } moq-native-ietf = { path = "../moq-native-ietf", version = "0.5" } @@ -22,6 +30,7 @@ url = "2" # Async stuff tokio = { version = "1", features = ["full"] } futures = "0.3" +async-trait = "0.1" # Web server to serve the fingerprint axum = { version = "0.7", features = ["tokio"] } diff --git a/moq-relay-ietf/src/api.rs b/moq-relay-ietf/src/api.rs deleted file mode 100644 index ddafa44..0000000 --- a/moq-relay-ietf/src/api.rs +++ /dev/null @@ -1,88 +0,0 @@ -use url::Url; - -/// API client for moq-api. -#[derive(Clone)] -pub struct Api { - client: moq_api::Client, - origin: moq_api::Origin, -} - -impl Api { - pub fn new(url: Url, node: Url) -> Self { - let origin = moq_api::Origin { url: node }; - let client = moq_api::Client::new(url); - - Self { client, origin } - } - - /// Set the origin for a given namespace, returning a refresher. - pub async fn set_origin(&self, namespace: String) -> Result { - let refresh = Refresh::new(self.client.clone(), self.origin.clone(), namespace); - refresh.update().await?; - Ok(refresh) - } - - /// Get the origin for a given namespace. - pub async fn get_origin( - &self, - namespace: &str, - ) -> Result, moq_api::ApiError> { - self.client.get_origin(namespace).await - } -} - -/// Periodically refreshes the origin registration in moq-api. -pub struct Refresh { - client: moq_api::Client, - origin: moq_api::Origin, - namespace: String, - refresh: tokio::time::Interval, -} - -impl Refresh { - fn new(client: moq_api::Client, origin: moq_api::Origin, namespace: String) -> Self { - // Refresh every 5 minutes - let duration = tokio::time::Duration::from_secs(300); - let mut refresh = tokio::time::interval(tokio::time::Duration::from_secs(300)); - refresh.reset_after(duration); // skip the first tick - - Self { - client, - origin, - namespace, - refresh, - } - } - - /// Update the origin registration in moq-api. - async fn update(&self) -> Result<(), moq_api::ApiError> { - log::debug!( - "registering origin: namespace={} url={}", - self.namespace, - self.origin.url - ); - // Register the origin in moq-api. - self.client - .set_origin(&self.namespace, self.origin.clone()) - .await - } - - /// Run the refresher loop. - pub async fn run(&mut self) -> anyhow::Result<()> { - loop { - self.refresh.tick().await; - self.update().await?; - } - } -} - -/// Unregister the origin on drop. -impl Drop for Refresh { - fn drop(&mut self) { - // TODO this is really lazy - let namespace = self.namespace.clone(); - let client = self.client.clone(); - log::debug!("removing origin: namespace={}", namespace,); - tokio::spawn(async move { client.delete_origin(&namespace).await }); - } -} diff --git a/moq-relay-ietf/src/consumer.rs b/moq-relay-ietf/src/consumer.rs index 85242d0..5de0296 100644 --- a/moq-relay-ietf/src/consumer.rs +++ b/moq-relay-ietf/src/consumer.rs @@ -5,28 +5,33 @@ use moq_transport::{ session::{Announced, SessionError, Subscriber}, }; -use crate::{Api, Locals, Producer}; +use crate::control_plane::{ControlPlane, Origin}; +use crate::{Locals, Producer}; +use url::Url; /// Consumer of tracks from a remote Publisher #[derive(Clone)] -pub struct Consumer { +pub struct Consumer { remote: Subscriber, locals: Locals, - api: Option, - forward: Option, // Forward all announcements to this subscriber + control_plane: Option, + node_url: Option, + forward: Option>, // Forward all announcements to this subscriber } -impl Consumer { +impl Consumer { pub fn new( remote: Subscriber, locals: Locals, - api: Option, - forward: Option, + control_plane: Option, + node_url: Option, + forward: Option>, ) -> Self { Self { remote, locals, - api, + control_plane, + node_url, forward, } } @@ -64,12 +69,23 @@ impl Consumer { // Produce the tracks for this announce and return the reader let (_, mut request, reader) = Tracks::new(announce.namespace.clone()).produce(); - // Start refreshing the API origin, if any - if let Some(api) = self.api.as_ref() { - let mut refresh = api.set_origin(reader.namespace.to_utf8_path()).await?; - tasks.push( - async move { refresh.run().await.context("failed refreshing origin") }.boxed(), - ); + // Start refreshing the control plane origin, if any + if let Some(control_plane) = self.control_plane.as_ref() { + if let Some(node_url) = &self.node_url { + let origin = Origin { + url: node_url.clone(), + }; + let namespace = reader.namespace.to_utf8_path(); + + // Set the origin initially + control_plane.set_origin(&namespace, origin.clone()).await?; + + // Create and spawn refresher task + let mut refresh = control_plane.create_refresher(namespace, origin); + tasks.push( + async move { refresh.run().await.context("failed refreshing origin") }.boxed(), + ); + } } // Register the local tracks, unregister on drop diff --git a/moq-relay-ietf/src/control_plane.rs b/moq-relay-ietf/src/control_plane.rs new file mode 100644 index 0000000..9019667 --- /dev/null +++ b/moq-relay-ietf/src/control_plane.rs @@ -0,0 +1,34 @@ +use anyhow::Result; +use async_trait::async_trait; +use url::Url; + +/// Origin information for routing +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Origin { + pub url: Url, +} + +/// Trait for control plane operations that enable cross-relay routing and state sharing +#[async_trait] +pub trait ControlPlane: Send + Sync + Clone + 'static { + /// Get the origin URL for a given namespace + async fn get_origin(&self, namespace: &str) -> Result>; + + /// Set/register the origin for a given namespace + async fn set_origin(&self, namespace: &str, origin: Origin) -> Result<()>; + + /// Delete/unregister the origin for a given namespace + async fn delete_origin(&self, namespace: &str) -> Result<()>; + + /// Create a refresher that periodically updates the origin registration + /// Returns a future that runs the refresh loop + fn create_refresher(&self, namespace: String, origin: Origin) + -> Box; +} + +/// Trait for periodically refreshing origin registrations +#[async_trait] +pub trait ControlPlaneRefresher: Send + 'static { + /// Run the refresh loop (should run indefinitely until dropped) + async fn run(&mut self) -> Result<()>; +} diff --git a/moq-relay-ietf/src/control_plane_http.rs b/moq-relay-ietf/src/control_plane_http.rs new file mode 100644 index 0000000..5be94bb --- /dev/null +++ b/moq-relay-ietf/src/control_plane_http.rs @@ -0,0 +1,113 @@ +use anyhow::Result; +use async_trait::async_trait; +use url::Url; + +use crate::control_plane::{ControlPlane, ControlPlaneRefresher, Origin}; + +/// HTTP-based control plane implementation using moq-api +#[derive(Clone)] +pub struct HttpControlPlane { + client: moq_api::Client, + node: Url, +} + +impl HttpControlPlane { + pub fn new(api_url: Url, node_url: Url) -> Self { + let client = moq_api::Client::new(api_url); + Self { + client, + node: node_url, + } + } + + pub fn node_url(&self) -> &Url { + &self.node + } +} + +#[async_trait] +impl ControlPlane for HttpControlPlane { + async fn get_origin(&self, namespace: &str) -> Result> { + match self.client.get_origin(namespace).await? { + Some(origin) => Ok(Some(Origin { url: origin.url })), + None => Ok(None), + } + } + + async fn set_origin(&self, namespace: &str, origin: Origin) -> Result<()> { + let moq_origin = moq_api::Origin { url: origin.url }; + self.client.set_origin(namespace, moq_origin).await?; + Ok(()) + } + + async fn delete_origin(&self, namespace: &str) -> Result<()> { + self.client.delete_origin(namespace).await?; + Ok(()) + } + + fn create_refresher( + &self, + namespace: String, + origin: Origin, + ) -> Box { + Box::new(HttpRefresher::new(self.client.clone(), namespace, origin)) + } +} + +/// Periodically refreshes the origin registration via HTTP +pub struct HttpRefresher { + client: moq_api::Client, + namespace: String, + origin: Origin, + refresh: tokio::time::Interval, +} + +impl HttpRefresher { + fn new(client: moq_api::Client, namespace: String, origin: Origin) -> Self { + // Refresh every 5 minutes + let duration = tokio::time::Duration::from_secs(300); + let mut refresh = tokio::time::interval(duration); + refresh.reset_after(duration); // skip the first tick + + Self { + client, + namespace, + origin, + refresh, + } + } + + async fn update(&self) -> Result<()> { + log::debug!( + "registering origin: namespace={} url={}", + self.namespace, + self.origin.url + ); + let moq_origin = moq_api::Origin { + url: self.origin.url.clone(), + }; + self.client.set_origin(&self.namespace, moq_origin).await?; + Ok(()) + } +} + +#[async_trait] +impl ControlPlaneRefresher for HttpRefresher { + async fn run(&mut self) -> Result<()> { + loop { + self.refresh.tick().await; + self.update().await?; + } + } +} + +impl Drop for HttpRefresher { + fn drop(&mut self) { + let namespace = self.namespace.clone(); + let client = self.client.clone(); + log::debug!("removing origin: namespace={}", namespace); + tokio::spawn(async move { + let _ = client.delete_origin(&namespace).await; + }); + } +} diff --git a/moq-relay-ietf/src/lib.rs b/moq-relay-ietf/src/lib.rs new file mode 100644 index 0000000..dd18bb0 --- /dev/null +++ b/moq-relay-ietf/src/lib.rs @@ -0,0 +1,118 @@ +// Core modules +mod consumer; +mod local; +mod producer; +mod relay; +mod remote; +mod session; +mod web; + +// Control plane abstraction +pub mod control_plane; +pub mod control_plane_http; + +// Re-export key types for library users +pub use consumer::*; +pub use local::*; +pub use producer::*; +pub use relay::*; +pub use remote::*; +pub use session::*; +pub use web::*; + +pub use control_plane::{ControlPlane, ControlPlaneRefresher, Origin as ControlPlaneOrigin}; +pub use control_plane_http::HttpControlPlane; + +use std::{net, path::PathBuf}; +use url::Url; + +/// Configuration for the relay server +#[derive(Clone)] +pub struct RelayServerConfig { + /// Listen on this address + pub bind: net::SocketAddr, + + /// The TLS configuration. + pub tls: moq_native_ietf::tls::Config, + + /// Directory to write qlog files (one per connection) + pub qlog_dir: Option, + + /// Directory to write mlog files (one per connection) + pub mlog_dir: Option, + + /// Forward all announcements to the (optional) URL. + pub announce: Option, + + /// Enable development mode web server + pub enable_dev_web: bool, + + /// Serve qlog files over HTTPS at /qlog/:cid (requires enable_dev_web) + pub qlog_serve: bool, + + /// Serve mlog files over HTTPS at /mlog/:cid (requires enable_dev_web) + pub mlog_serve: bool, +} + +/// Main relay server that can work with any ControlPlane implementation +pub struct RelayServer { + relay: Relay, + web: Option, +} + +impl RelayServer { + /// Create a new relay server with the given control plane implementation + pub fn new( + config: RelayServerConfig, + control_plane: Option, + node_url: Option, + ) -> anyhow::Result { + let relay = Relay::new(RelayConfig { + tls: config.tls.clone(), + bind: config.bind, + qlog_dir: config.qlog_dir.clone(), + mlog_dir: config.mlog_dir.clone(), + announce: config.announce, + control_plane, + node: node_url, + })?; + + let web = if config.enable_dev_web { + let qlog_dir = if config.qlog_serve { + config.qlog_dir + } else { + None + }; + + let mlog_dir = if config.mlog_serve { + config.mlog_dir + } else { + None + }; + + Some(Web::new(WebConfig { + bind: config.bind, + tls: config.tls, + qlog_dir, + mlog_dir, + })) + } else { + None + }; + + Ok(Self { relay, web }) + } + + /// Run the relay server + pub async fn run(self) -> anyhow::Result<()> { + if let Some(web) = self.web { + tokio::spawn(async move { + if let Err(e) = web.run().await { + log::error!("web server failed: {}", e); + } + }); + } + + self.relay.run().await + } +} diff --git a/moq-relay-ietf/src/main.rs b/moq-relay-ietf/src/main.rs index 995b28c..19d519f 100644 --- a/moq-relay-ietf/src/main.rs +++ b/moq-relay-ietf/src/main.rs @@ -1,23 +1,5 @@ use clap::Parser; - -mod api; -mod consumer; -mod local; -mod producer; -mod relay; -mod remote; -mod session; -mod web; - -pub use api::*; -pub use consumer::*; -pub use local::*; -pub use producer::*; -pub use relay::*; -pub use remote::*; -pub use session::*; -pub use web::*; - +use moq_relay_ietf::{HttpControlPlane, RelayServer, RelayServerConfig}; use std::{net, path::PathBuf}; use url::Url; @@ -87,47 +69,28 @@ async fn main() -> anyhow::Result<()> { anyhow::bail!("missing TLS certificates"); } - // Determine qlog directory for both relay and web server - let qlog_dir_for_relay = cli.qlog_dir.clone(); - let qlog_dir_for_web = if cli.qlog_serve { - cli.qlog_dir.clone() - } else { - None - }; - - // Determine mlog directory for both relay and web server - let mlog_dir_for_relay = cli.mlog_dir.clone(); - let mlog_dir_for_web = if cli.mlog_serve { - cli.mlog_dir.clone() + // Create control plane if both API and node URLs are provided + let control_plane = if let (Some(ref api_url), Some(ref node_url)) = (&cli.api, &cli.node) { + Some(HttpControlPlane::new( + (*api_url).clone(), + (*node_url).clone(), + )) } else { None }; - // Create a QUIC server for media. - let relay = Relay::new(RelayConfig { - tls: tls.clone(), + // Build relay server config + let config = RelayServerConfig { bind: cli.bind, - qlog_dir: qlog_dir_for_relay, - mlog_dir: mlog_dir_for_relay, - node: cli.node, - api: cli.api, + tls, + qlog_dir: cli.qlog_dir, + mlog_dir: cli.mlog_dir, announce: cli.announce, - })?; - - if cli.dev { - // Create a web server too. - // Currently this only contains the certificate fingerprint (for development only). - let web = Web::new(WebConfig { - bind: cli.bind, - tls, - qlog_dir: qlog_dir_for_web, - mlog_dir: mlog_dir_for_web, - }); - - tokio::spawn(async move { - web.run().await.expect("failed to run web server"); - }); - } + enable_dev_web: cli.dev, + qlog_serve: cli.qlog_serve, + mlog_serve: cli.mlog_serve, + }; - relay.run().await + let server = RelayServer::new(config, control_plane, cli.node)?; + server.run().await } diff --git a/moq-relay-ietf/src/producer.rs b/moq-relay-ietf/src/producer.rs index e3ddeb9..c9e38dd 100644 --- a/moq-relay-ietf/src/producer.rs +++ b/moq-relay-ietf/src/producer.rs @@ -4,18 +4,19 @@ use moq_transport::{ session::{Publisher, SessionError, Subscribed, TrackStatusRequested}, }; +use crate::control_plane::ControlPlane; use crate::{Locals, RemotesConsumer}; /// Producer of tracks to a remote Subscriber #[derive(Clone)] -pub struct Producer { +pub struct Producer { remote_publisher: Publisher, locals: Locals, - remotes: Option, + remotes: Option>, } -impl Producer { - pub fn new(remote: Publisher, locals: Locals, remotes: Option) -> Self { +impl Producer { + pub fn new(remote: Publisher, locals: Locals, remotes: Option>) -> Self { Self { remote_publisher: remote, locals, diff --git a/moq-relay-ietf/src/relay.rs b/moq-relay-ietf/src/relay.rs index 41a6823..bf14085 100644 --- a/moq-relay-ietf/src/relay.rs +++ b/moq-relay-ietf/src/relay.rs @@ -6,10 +6,11 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use moq_native_ietf::quic; use url::Url; -use crate::{Api, Consumer, Locals, Producer, Remotes, RemotesConsumer, RemotesProducer, Session}; +use crate::control_plane::ControlPlane; +use crate::{Consumer, Locals, Producer, Remotes, RemotesConsumer, RemotesProducer, Session}; /// Configuration for the relay. -pub struct RelayConfig { +pub struct RelayConfig { /// Listen on this address pub bind: net::SocketAddr, @@ -25,8 +26,8 @@ pub struct RelayConfig { /// Forward all announcements to the (optional) URL. pub announce: Option, - /// Connect to the HTTP moq-api at this URL. - pub api: Option, + /// Control plane implementation for routing and state sharing + pub control_plane: Option, /// Our hostname which we advertise to other origins. /// We use QUIC, so the certificate must be valid for this address. @@ -34,17 +35,18 @@ pub struct RelayConfig { } /// MoQ Relay server. -pub struct Relay { +pub struct Relay { quic: quic::Endpoint, announce_url: Option, mlog_dir: Option, locals: Locals, - api: Option, - remotes: Option<(RemotesProducer, RemotesConsumer)>, + control_plane: Option, + node_url: Option, + remotes: Option<(RemotesProducer, RemotesConsumer)>, } -impl Relay { - pub fn new(config: RelayConfig) -> anyhow::Result { +impl Relay { + pub fn new(config: RelayConfig) -> anyhow::Result { // Create a QUIC endpoint that can be used for both clients and servers. let quic = quic::Endpoint::new(quic::Config { bind: config.bind, @@ -63,20 +65,17 @@ impl Relay { log::info!("mlog output enabled: {}", mlog_dir.display()); } - // Create an API client if we have the necessary configuration - let api = if let (Some(url), Some(node)) = (config.api, config.node) { - log::info!("using moq-api: url={} node={}", url, node); - Some(Api::new(url, node)) - } else { - None - }; + // Log control plane usage + if config.control_plane.is_some() { + log::info!("using control plane for routing, node={:?}", config.node); + } let locals = Locals::new(); - // Create remotes if we have an API client - let remotes = api.clone().map(|api| { + // Create remotes if we have a control plane + let remotes = config.control_plane.clone().map(|control_plane| { Remotes { - api, + control_plane, quic: quic.client.clone(), } .produce() @@ -86,7 +85,8 @@ impl Relay { quic, announce_url: config.announce, mlog_dir: config.mlog_dir, - api, + control_plane: config.control_plane, + node_url: config.node, locals, remotes, }) @@ -128,7 +128,13 @@ impl Relay { self.locals.clone(), remotes.clone(), )), - consumer: Some(Consumer::new(subscriber, self.locals.clone(), None, None)), + consumer: Some(Consumer::new( + subscriber, + self.locals.clone(), + None, + None, + None, + )), }; let forward_producer = session.producer.clone(); @@ -157,7 +163,8 @@ impl Relay { let locals = self.locals.clone(); let remotes = remotes.clone(); let forward = forward_producer.clone(); - let api = self.api.clone(); + let control_plane = self.control_plane.clone(); + let node_url = self.node_url.clone(); // Spawn a new task to handle the connection tasks.push(async move { @@ -175,7 +182,7 @@ impl Relay { let session = Session { session, producer: publisher.map(|publisher| Producer::new(publisher, locals.clone(), remotes)), - consumer: subscriber.map(|subscriber| Consumer::new(subscriber, locals, api, forward)), + consumer: subscriber.map(|subscriber| Consumer::new(subscriber, locals, control_plane, node_url, forward)), }; if let Err(err) = session.run().await { diff --git a/moq-relay-ietf/src/remote.rs b/moq-relay-ietf/src/remote.rs index 1b412d0..0eebfc9 100644 --- a/moq-relay-ietf/src/remote.rs +++ b/moq-relay-ietf/src/remote.rs @@ -15,19 +15,19 @@ use moq_transport::serve::{Track, TrackReader, TrackWriter}; use moq_transport::watch::State; use url::Url; -use crate::Api; +use crate::control_plane::ControlPlane; /// Information about remote origins. -pub struct Remotes { - /// The client we use to fetch/store origin information. - pub api: Api, +pub struct Remotes { + /// The control plane we use to fetch/store origin information. + pub control_plane: CP, // A QUIC endpoint we'll use to fetch from other origins. pub quic: quic::Client, } -impl Remotes { - pub fn produce(self) -> (RemotesProducer, RemotesConsumer) { +impl Remotes { + pub fn produce(self) -> (RemotesProducer, RemotesConsumer) { let (send, recv) = State::default().split(); let info = Arc::new(self); @@ -38,26 +38,34 @@ impl Remotes { } } -#[derive(Default)] -struct RemotesState { - lookup: HashMap, - requested: VecDeque, +struct RemotesState { + lookup: HashMap>, + requested: VecDeque>, +} + +impl Default for RemotesState { + fn default() -> Self { + Self { + lookup: HashMap::default(), + requested: VecDeque::default(), + } + } } // Clone for convenience, but there should only be one instance of this #[derive(Clone)] -pub struct RemotesProducer { - info: Arc, - state: State, +pub struct RemotesProducer { + info: Arc>, + state: State>, } -impl RemotesProducer { - fn new(info: Arc, state: State) -> Self { +impl RemotesProducer { + fn new(info: Arc>, state: State>) -> Self { Self { info, state } } /// Block until the next remote requested by a consumer. - async fn next(&mut self) -> Option { + async fn next(&mut self) -> Option> { loop { { let state = self.state.lock(); @@ -109,8 +117,8 @@ impl RemotesProducer { } } -impl ops::Deref for RemotesProducer { - type Target = Remotes; +impl ops::Deref for RemotesProducer { + type Target = Remotes; fn deref(&self) -> &Self::Target { &self.info @@ -118,13 +126,13 @@ impl ops::Deref for RemotesProducer { } #[derive(Clone)] -pub struct RemotesConsumer { - pub info: Arc, - state: State, +pub struct RemotesConsumer { + pub info: Arc>, + state: State>, } -impl RemotesConsumer { - fn new(info: Arc, state: State) -> Self { +impl RemotesConsumer { + fn new(info: Arc>, state: State>) -> Self { Self { info, state } } @@ -132,9 +140,13 @@ impl RemotesConsumer { pub async fn route( &self, namespace: &TrackNamespace, - ) -> anyhow::Result> { + ) -> anyhow::Result>> { // Always fetch the origin instead of using the (potentially invalid) cache. - let origin = match self.api.get_origin(&namespace.to_utf8_path()).await? { + let origin = match self + .control_plane + .get_origin(&namespace.to_utf8_path()) + .await? + { None => return Ok(None), Some(origin) => origin, }; @@ -167,20 +179,20 @@ impl RemotesConsumer { } } -impl ops::Deref for RemotesConsumer { - type Target = Remotes; +impl ops::Deref for RemotesConsumer { + type Target = Remotes; fn deref(&self) -> &Self::Target { &self.info } } -pub struct Remote { - pub remotes: Arc, +pub struct Remote { + pub remotes: Arc>, pub url: Url, } -impl fmt::Debug for Remote { +impl fmt::Debug for Remote { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Remote") .field("url", &self.url.to_string()) @@ -188,17 +200,17 @@ impl fmt::Debug for Remote { } } -impl ops::Deref for Remote { - type Target = Remotes; +impl ops::Deref for Remote { + type Target = Remotes; fn deref(&self) -> &Self::Target { &self.remotes } } -impl Remote { +impl Remote { /// Create a new broadcast. - pub fn produce(self) -> (RemoteProducer, RemoteConsumer) { + pub fn produce(self) -> (RemoteProducer, RemoteConsumer) { let (send, recv) = State::default().split(); let info = Arc::new(self); @@ -215,13 +227,13 @@ struct RemoteState { requested: VecDeque, } -pub struct RemoteProducer { - pub info: Arc, +pub struct RemoteProducer { + pub info: Arc>, state: State, } -impl RemoteProducer { - fn new(info: Arc, state: State) -> Self { +impl RemoteProducer { + fn new(info: Arc>, state: State) -> Self { Self { info, state } } @@ -289,8 +301,8 @@ impl RemoteProducer { } } -impl ops::Deref for RemoteProducer { - type Target = Remote; +impl ops::Deref for RemoteProducer { + type Target = Remote; fn deref(&self) -> &Self::Target { &self.info @@ -298,13 +310,13 @@ impl ops::Deref for RemoteProducer { } #[derive(Clone)] -pub struct RemoteConsumer { - pub info: Arc, +pub struct RemoteConsumer { + pub info: Arc>, state: State, } -impl RemoteConsumer { - fn new(info: Arc, state: State) -> Self { +impl RemoteConsumer { + fn new(info: Arc>, state: State) -> Self { Self { info, state } } @@ -338,8 +350,8 @@ impl RemoteConsumer { } } -impl ops::Deref for RemoteConsumer { - type Target = Remote; +impl ops::Deref for RemoteConsumer { + type Target = Remote; fn deref(&self) -> &Self::Target { &self.info diff --git a/moq-relay-ietf/src/session.rs b/moq-relay-ietf/src/session.rs index b55748b..e659cbf 100644 --- a/moq-relay-ietf/src/session.rs +++ b/moq-relay-ietf/src/session.rs @@ -1,15 +1,16 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use moq_transport::session::SessionError; +use crate::control_plane::ControlPlane; use crate::{Consumer, Producer}; -pub struct Session { +pub struct Session { pub session: moq_transport::session::Session, - pub producer: Option, - pub consumer: Option, + pub producer: Option>, + pub consumer: Option>, } -impl Session { +impl Session { /// Run the session, producer, and consumer as necessary. pub async fn run(self) -> Result<(), SessionError> { let mut tasks = FuturesUnordered::new();