Skip to content

Commit 1204339

Browse files
authored
Merge pull request #67 from nemethf/sub-catalog
moq-sub: Add cli argument --catalog
2 parents b0bff42 + 1d9175b commit 1204339

File tree

3 files changed

+77
-29
lines changed

3 files changed

+77
-29
lines changed

moq-sub/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ categories = ["multimedia", "network-programming", "web-programming"]
1616
[dependencies]
1717
moq-transport = { path = "../moq-transport", version = "0.11" }
1818
moq-native-ietf = { path = "../moq-native-ietf", version = "0.5" }
19+
moq-catalog = { path = "../moq-catalog", version = "0.2" }
1920
url = "2"
2021

2122
# Async stuff
@@ -29,3 +30,4 @@ mp4 = "0.14"
2930
anyhow = { version = "1", features = ["backtrace"] }
3031
tracing = "0.1"
3132
tracing-subscriber = "0.3"
33+
serde_json = "1"

moq-sub/src/main.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ async fn main() -> anyhow::Result<()> {
3636
// Associate empty set of Tracks with provided namespace
3737
let tracks = Tracks::new(TrackNamespace::from_utf8_path(&config.name));
3838

39-
let mut media = Media::new(subscriber, tracks, out).await?;
39+
let mut media = Media::new(subscriber, tracks, out, config.catalog).await?;
4040

4141
tokio::select! {
4242
res = session.run() => res.context("session error")?,
@@ -63,6 +63,15 @@ pub struct Config {
6363
/// The TLS configuration.
6464
#[command(flatten)]
6565
pub tls: moq_native_ietf::tls::Args,
66+
67+
/// Request the catalog track (to get other track names)
68+
///
69+
/// First download the track named ".catalog" to find out the
70+
/// track names to subscribe to. Other parameters like video
71+
/// dimension are extracted from the tracks themselves. Default:
72+
/// "0.mp4" for the init track, "{track_id}.m4s" for the rest.
73+
#[arg(long)]
74+
pub catalog: bool,
6675
}
6776

6877
fn moq_url(s: &str) -> Result<Url, String> {

moq-sub/src/media.rs

Lines changed: 65 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,48 +19,46 @@ pub struct Media<O> {
1919
broadcast: TracksReader,
2020
tracks_writer: TracksWriter,
2121
output: Arc<Mutex<O>>,
22+
request_catalog: bool,
2223
}
2324

2425
impl<O: AsyncWrite + Send + Unpin + 'static> Media<O> {
25-
pub async fn new(subscriber: Subscriber, tracks: Tracks, output: O) -> anyhow::Result<Self> {
26+
pub async fn new(
27+
subscriber: Subscriber,
28+
tracks: Tracks,
29+
output: O,
30+
request_catalog: bool,
31+
) -> anyhow::Result<Self> {
2632
let (tracks_writer, _tracks_request, tracks_reader) = tracks.produce();
2733
let broadcast = tracks_reader; // breadcrumb for navigating API name changes
2834
Ok(Self {
2935
subscriber,
3036
broadcast,
3137
tracks_writer,
3238
output: Arc::new(Mutex::new(output)),
39+
request_catalog,
3340
})
3441
}
3542

3643
pub async fn run(&mut self) -> anyhow::Result<()> {
44+
let catalog = if self.request_catalog {
45+
// The catalog track has no standardized name, but
46+
// both moq-pub of moq-rs and gst-moq-pub uses ".catalog".
47+
let buf = self.download_first_object(".catalog", "catalog").await?;
48+
let s = std::str::from_utf8(&buf)?;
49+
let c: moq_catalog::Root = serde_json::from_str(s)?;
50+
info!("catalog: {c:#?}");
51+
anyhow::ensure!(c.version == 1, "Unknown catalog version");
52+
Some(c)
53+
} else {
54+
None
55+
};
3756
let moov = {
38-
let init_track_name = "0.mp4";
39-
let track = self
40-
.tracks_writer
41-
.create(init_track_name)
42-
.context("failed to create init track")?;
43-
44-
let mut subscriber = self.subscriber.clone();
45-
tokio::task::spawn(async move {
46-
subscriber.subscribe(track).await.unwrap_or_else(|err| {
47-
warn!("failed to subscribe to init track: {err:?}");
48-
});
49-
});
50-
51-
let track = self
52-
.broadcast
53-
.subscribe(init_track_name)
54-
.context("no init track")?;
55-
let mut group = match track.mode().await? {
56-
TrackReaderMode::Subgroups(mut groups) => {
57-
groups.next().await?.context("no init group")?
58-
}
59-
_ => anyhow::bail!("expected init segment"),
57+
let init_track_name: &str = match catalog {
58+
Some(ref c) => &c.tracks[0].init_track.clone().unwrap(),
59+
None => "0.mp4",
6060
};
61-
62-
let object = group.next().await?.context("no init fragment")?;
63-
let buf = Self::recv_object(object).await?;
61+
let buf = self.download_first_object(init_track_name, "init").await?;
6462
self.output.lock().await.write_all(&buf).await?;
6563
let mut reader = Cursor::new(&buf);
6664

@@ -78,9 +76,12 @@ impl<O: AsyncWrite + Send + Unpin + 'static> Media<O> {
7876
let mut has_video = false;
7977
let mut has_audio = false;
8078
let mut tracks = vec![];
81-
for trak in &moov.traks {
79+
for (idx, trak) in moov.traks.into_iter().enumerate() {
8280
let id = trak.tkhd.track_id;
83-
let name = format!("{id}.m4s");
81+
let name: String = match catalog {
82+
Some(ref c) => c.tracks[idx].name.clone(),
83+
None => format!("{id}.m4s"),
84+
};
8485
info!("found track {name}");
8586
let mut active = false;
8687
if !has_video && trak.mdia.minf.stbl.stsd.avc1.is_some() {
@@ -125,6 +126,42 @@ impl<O: AsyncWrite + Send + Unpin + 'static> Media<O> {
125126
Ok(())
126127
}
127128

129+
async fn download_first_object(
130+
&mut self,
131+
track_name: &str,
132+
alias: &'static str,
133+
) -> anyhow::Result<Vec<u8>> {
134+
let track = self
135+
.tracks_writer
136+
.create(track_name)
137+
.context(format!("failed to create {alias} track"))?;
138+
139+
let mut subscriber = self.subscriber.clone();
140+
tokio::task::spawn(async move {
141+
subscriber.subscribe(track).await.unwrap_or_else(|err| {
142+
warn!("failed to subscribe to {alias} track: {err:?}");
143+
});
144+
});
145+
146+
let track = self
147+
.broadcast
148+
.subscribe(track_name)
149+
.context(format!("no {alias} track"))?;
150+
let mut group = match track.mode().await? {
151+
TrackReaderMode::Subgroups(mut groups) => {
152+
groups.next().await?.context(format!("no {alias} group"))?
153+
}
154+
_ => anyhow::bail!("expected {alias} segment"),
155+
};
156+
157+
let object = group
158+
.next()
159+
.await?
160+
.context(format!("no {alias} fragment"))?;
161+
let buf = Self::recv_object(object).await?;
162+
Ok(buf)
163+
}
164+
128165
async fn recv_track(track: TrackReader, out: Arc<Mutex<O>>) -> anyhow::Result<()> {
129166
let name = track.name.clone();
130167
debug!("track {name}: start");

0 commit comments

Comments
 (0)