Skip to content

Commit c526384

Browse files
Merge pull request #106 from cloudflare/me/nested-namespace-support
Add support for nested namespaces
2 parents af6372a + ff950f9 commit c526384

File tree

5 files changed

+95
-27
lines changed

5 files changed

+95
-27
lines changed

moq-relay-ietf/src/local.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,32 @@ impl Locals {
4646
Ok(registration)
4747
}
4848

49-
/// Lookup local tracks by namespace.
49+
/// Lookup local tracks by namespace using hierarchical prefix matching.
50+
/// Returns the TracksReader for the longest matching namespace prefix.
5051
pub fn route(&self, namespace: &TrackNamespace) -> Option<TracksReader> {
51-
self.lookup.lock().unwrap().get(namespace).cloned()
52+
let lookup = self.lookup.lock().unwrap();
53+
54+
// Find the longest matching prefix
55+
let mut best_match: Option<TracksReader> = None;
56+
let mut best_len = 0;
57+
58+
for (registered_ns, tracks) in lookup.iter() {
59+
// Check if registered_ns is a prefix of namespace
60+
if namespace.fields.len() >= registered_ns.fields.len() {
61+
let is_prefix = registered_ns
62+
.fields
63+
.iter()
64+
.zip(namespace.fields.iter())
65+
.all(|(a, b)| a == b);
66+
67+
if is_prefix && registered_ns.fields.len() > best_len {
68+
best_match = Some(tracks.clone());
69+
best_len = registered_ns.fields.len();
70+
}
71+
}
72+
}
73+
74+
best_match
5275
}
5376
}
5477

moq-relay-ietf/src/producer.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,10 @@ impl Producer {
7979
async fn serve_subscribe(self, subscribed: Subscribed) -> Result<(), anyhow::Error> {
8080
// Check local tracks first, and serve from local if possible
8181
if let Some(mut local) = self.locals.route(&subscribed.track_namespace) {
82-
if let Some(track) = local.subscribe(&subscribed.track_name) {
82+
// Pass the full requested namespace, not the announced prefix
83+
if let Some(track) =
84+
local.subscribe(subscribed.track_namespace.clone(), &subscribed.track_name)
85+
{
8386
log::info!("serving subscribe from local: {:?}", track.info);
8487
return Ok(subscribed.serve(track).await?);
8588
}
@@ -118,9 +121,10 @@ impl Producer {
118121
.locals
119122
.route(&track_status_requested.request_msg.track_namespace)
120123
{
121-
if let Some(track) =
122-
local_tracks.get_track_reader(&track_status_requested.request_msg.track_name)
123-
{
124+
if let Some(track) = local_tracks.get_track_reader(
125+
&track_status_requested.request_msg.track_namespace,
126+
&track_status_requested.request_msg.track_name,
127+
) {
124128
log::info!("serving track_status from local: {:?}", track.info);
125129
return Ok(track_status_requested.respond_ok(&track)?);
126130
}

moq-sub/src/media.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,11 @@ impl<O: AsyncWrite + Send + Unpin + 'static> Media<O> {
107107
});
108108
});
109109

110-
tracks.push(self.broadcast.subscribe(&name).context("no track")?);
110+
tracks.push(
111+
self.broadcast
112+
.subscribe(self.broadcast.namespace.clone(), &name)
113+
.context("no track")?,
114+
);
111115
}
112116
}
113117

@@ -145,7 +149,7 @@ impl<O: AsyncWrite + Send + Unpin + 'static> Media<O> {
145149

146150
let track = self
147151
.broadcast
148-
.subscribe(track_name)
152+
.subscribe(self.broadcast.namespace.clone(), track_name)
149153
.context(format!("no {alias} track"))?;
150154
let mut group = match track.mode().await? {
151155
TrackReaderMode::Subgroups(mut groups) => {

moq-transport/src/serve/tracks.rs

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ use super::{ServeError, Track, TrackReader, TrackWriter};
1616
use crate::coding::TrackNamespace;
1717
use crate::watch::{Queue, State};
1818

19+
/// Full track identifier: namespace + track name
20+
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
21+
pub struct FullTrackName {
22+
pub namespace: TrackNamespace,
23+
pub name: String,
24+
}
25+
1926
/// Static information about a broadcast.
2027
#[derive(Debug)]
2128
pub struct Tracks {
@@ -42,7 +49,7 @@ impl Tracks {
4249

4350
#[derive(Default)]
4451
pub struct TracksState {
45-
tracks: HashMap<String, TrackReader>,
52+
tracks: HashMap<FullTrackName, TrackReader>,
4653
}
4754

4855
/// Publish new tracks for a broadcast by name.
@@ -57,6 +64,7 @@ impl TracksWriter {
5764
}
5865

5966
/// Create a new track with the given name, inserting it into the broadcast.
67+
/// The track will use this writer's namespace.
6068
/// None is returned if all [TracksReader]s have been dropped.
6169
pub fn create(&mut self, track: &str) -> Option<TrackWriter> {
6270
let (writer, reader) = Track {
@@ -66,17 +74,22 @@ impl TracksWriter {
6674
.produce();
6775

6876
// NOTE: We overwrite the track if it already exists.
69-
self.state
70-
.lock_mut()?
71-
.tracks
72-
.insert(track.to_owned(), reader);
77+
let full_name = FullTrackName {
78+
namespace: self.namespace.clone(),
79+
name: track.to_owned(),
80+
};
81+
self.state.lock_mut()?.tracks.insert(full_name, reader);
7382

7483
Some(writer)
7584
}
7685

77-
/// Remove a track from the broadcast by name.
78-
pub fn remove(&mut self, track_name: &str) -> Option<TrackReader> {
79-
self.state.lock_mut()?.tracks.remove(track_name)
86+
/// Remove a track from the broadcast by full name.
87+
pub fn remove(&mut self, namespace: &TrackNamespace, track_name: &str) -> Option<TrackReader> {
88+
let full_name = FullTrackName {
89+
namespace: namespace.clone(),
90+
name: track_name.to_owned(),
91+
};
92+
self.state.lock_mut()?.tracks.remove(&full_name)
8093
}
8194
}
8295

@@ -143,28 +156,46 @@ impl TracksReader {
143156
Self { state, queue, info }
144157
}
145158

146-
/// Get a track from the broadcast by name, if it exists.
147-
pub fn get_track_reader(&mut self, track_name: &str) -> Option<TrackReader> {
159+
/// Get a track from the broadcast by full name, if it exists.
160+
pub fn get_track_reader(
161+
&mut self,
162+
namespace: &TrackNamespace,
163+
track_name: &str,
164+
) -> Option<TrackReader> {
148165
let state = self.state.lock();
166+
let full_name = FullTrackName {
167+
namespace: namespace.clone(),
168+
name: track_name.to_owned(),
169+
};
149170

150-
if let Some(track_reader) = state.tracks.get(track_name) {
171+
if let Some(track_reader) = state.tracks.get(&full_name) {
151172
return Some(track_reader.clone());
152173
}
153174
None
154175
}
155176

156-
/// Get or request a track from the broadcast by name.
177+
/// Get or request a track from the broadcast by full name.
178+
/// The namespace parameter should be the full requested namespace, not just the announced prefix.
157179
/// None is returned if [TracksWriter] or [TracksRequest] cannot fufill the request.
158-
pub fn subscribe(&mut self, track_name: &str) -> Option<TrackReader> {
180+
pub fn subscribe(
181+
&mut self,
182+
namespace: TrackNamespace,
183+
track_name: &str,
184+
) -> Option<TrackReader> {
159185
let state = self.state.lock();
186+
let full_name = FullTrackName {
187+
namespace: namespace.clone(),
188+
name: track_name.to_owned(),
189+
};
160190

161-
if let Some(track_reader) = state.tracks.get(track_name) {
191+
if let Some(track_reader) = state.tracks.get(&full_name) {
162192
return Some(track_reader.clone());
163193
}
164194

165195
let mut state = state.into_mut()?;
196+
// Use the full requested namespace, not self.namespace
166197
let track_writer_reader = Track {
167-
namespace: self.namespace.clone(),
198+
namespace: namespace.clone(),
168199
name: track_name.to_owned(),
169200
}
170201
.produce();
@@ -173,10 +204,10 @@ impl TracksReader {
173204
return None;
174205
}
175206

176-
// We requested the track sucessfully so we can deduplicate it.
207+
// We requested the track sucessfully so we can deduplicate it by full name.
177208
state
178209
.tracks
179-
.insert(track_name.to_owned(), track_writer_reader.1.clone());
210+
.insert(full_name, track_writer_reader.1.clone());
180211

181212
Some(track_writer_reader.1.clone())
182213
}

moq-transport/src/session/publisher.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,10 @@ impl Publisher {
167167
subscribed: Subscribed,
168168
mut tracks: TracksReader,
169169
) -> Result<(), SessionError> {
170-
if let Some(track) = tracks.subscribe(&subscribed.track_name) {
170+
if let Some(track) = tracks.subscribe(
171+
subscribed.info.track_namespace.clone(),
172+
&subscribed.info.track_name,
173+
) {
171174
subscribed.serve(track).await?;
172175
} else {
173176
subscribed.close(ServeError::NotFound)?;
@@ -181,7 +184,10 @@ impl Publisher {
181184
mut tracks: TracksReader,
182185
) -> Result<(), SessionError> {
183186
let track = tracks
184-
.subscribe(&track_status_request.request_msg.track_name.clone())
187+
.subscribe(
188+
track_status_request.request_msg.track_namespace.clone(),
189+
&track_status_request.request_msg.track_name,
190+
)
185191
.ok_or(ServeError::NotFound)?;
186192

187193
track_status_request.respond_ok(&track)?;

0 commit comments

Comments
 (0)