Skip to content

Commit 9a51cac

Browse files
authored
Merge pull request #94 from sgodin/renames-and-comments-only
2 parents c79511c + 0d5abb0 commit 9a51cac

File tree

17 files changed

+268
-129
lines changed

17 files changed

+268
-129
lines changed

moq-clock-ietf/src/clock.rs

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,64 +7,72 @@ use moq_transport::serve::{
77
use chrono::prelude::*;
88
use tokio::task;
99

10+
/// Publishes the current time every second in the format "YYYY-MM-DD HH:MM:SS"
1011
pub struct Publisher {
11-
track: SubgroupsWriter,
12+
track_subgroups_writer: SubgroupsWriter,
1213
}
1314

1415
impl Publisher {
15-
pub fn new(track: SubgroupsWriter) -> Self {
16-
Self { track }
16+
pub fn new(track_subgroups_writer: SubgroupsWriter) -> Self {
17+
Self {
18+
track_subgroups_writer,
19+
}
1720
}
1821

22+
/// Runs the publisher, sending the current time every second. Creates a new group for each minute.
1923
pub async fn run(mut self) -> anyhow::Result<()> {
2024
let start = Utc::now();
2125
let mut now = start;
2226

2327
// Just for fun, don't start at zero.
24-
let mut sequence = start.minute();
28+
let mut next_group_id = start.minute();
2529

30+
// Create a new group for each minute.
2631
loop {
27-
let segment = self
28-
.track
32+
let subgroup_writer = self
33+
.track_subgroups_writer
2934
.create(Subgroup {
30-
group_id: sequence as u64,
35+
group_id: next_group_id as u64,
3136
subgroup_id: 0,
3237
priority: 0,
3338
})
3439
.context("failed to create minute segment")?;
3540

36-
sequence += 1;
41+
next_group_id += 1;
3742

43+
// Spawn a new task to handle sending the object every second
3844
tokio::spawn(async move {
39-
if let Err(err) = Self::send_segment(segment, now).await {
45+
if let Err(err) = Self::send_subgroup_objects(subgroup_writer, now).await {
4046
log::warn!("failed to send minute: {:?}", err);
4147
}
4248
});
4349

4450
let next = now + chrono::Duration::try_minutes(1).unwrap();
4551
let next = next.with_second(0).unwrap().with_nanosecond(0).unwrap();
4652

53+
// Sleep until the start of the next minute
4754
let delay = (next - now).to_std().unwrap();
4855
tokio::time::sleep(delay).await;
4956

5057
now = next; // just assume we didn't undersleep
5158
}
5259
}
5360

54-
async fn send_segment(
55-
mut segment: SubgroupWriter,
61+
/// Sends the current time every second within a minute group.
62+
async fn send_subgroup_objects(
63+
mut subgroup_writer: SubgroupWriter,
5664
mut now: DateTime<Utc>,
5765
) -> anyhow::Result<()> {
5866
// Everything but the second.
5967
let base = now.format("%Y-%m-%d %H:%M:").to_string();
6068

61-
segment
69+
subgroup_writer
6270
.write(base.clone().into())
6371
.context("failed to write base")?;
6472

6573
loop {
6674
let delta = now.format("%S").to_string();
67-
segment
75+
subgroup_writer
6876
.write(delta.clone().into())
6977
.context("failed to write delta")?;
7078

@@ -73,6 +81,7 @@ impl Publisher {
7381
let next = now + chrono::Duration::try_seconds(1).unwrap();
7482
let next = next.with_nanosecond(0).unwrap();
7583

84+
// Sleep until the next second
7685
let delay = (next - now).to_std().unwrap();
7786
tokio::time::sleep(delay).await;
7887

@@ -86,26 +95,35 @@ impl Publisher {
8695
}
8796
}
8897
}
98+
99+
/// Subscribes to the clock and prints received time updates to stdout.
89100
pub struct Subscriber {
90-
track: TrackReader,
101+
track_reader: TrackReader,
91102
}
92103

93104
impl Subscriber {
94-
pub fn new(track: TrackReader) -> Self {
95-
Self { track }
105+
pub fn new(track_reader: TrackReader) -> Self {
106+
Self { track_reader }
96107
}
97108

109+
/// Runs the subscriber, receiving time updates and printing them to stdout.
98110
pub async fn run(self) -> anyhow::Result<()> {
99-
match self.track.mode().await.context("failed to get mode")? {
111+
match self
112+
.track_reader
113+
.mode()
114+
.await
115+
.context("failed to get mode")?
116+
{
100117
TrackReaderMode::Stream(stream) => Self::recv_stream(stream).await,
101118
TrackReaderMode::Subgroups(subgroups) => Self::recv_subgroups(subgroups).await,
102119
TrackReaderMode::Datagrams(datagrams) => Self::recv_datagrams(datagrams).await,
103120
}
104121
}
105122

106-
async fn recv_stream(mut track: StreamReader) -> anyhow::Result<()> {
107-
while let Some(mut subgroup) = track.next().await? {
108-
while let Some(object) = subgroup.read_next().await? {
123+
/// Receives time updates from a stream and prints them to stdout.
124+
async fn recv_stream(mut stream_reader: StreamReader) -> anyhow::Result<()> {
125+
while let Some(mut stream_group_reader) = stream_reader.next().await? {
126+
while let Some(object) = stream_group_reader.read_next().await? {
109127
let str = String::from_utf8_lossy(&object);
110128
println!("{str}");
111129
}
@@ -114,20 +132,22 @@ impl Subscriber {
114132
Ok(())
115133
}
116134

117-
async fn recv_subgroups(mut subgroups: SubgroupsReader) -> anyhow::Result<()> {
118-
while let Some(mut subgroup) = subgroups.next().await? {
119-
// Spawn a new task to handle the subgroup concurrently
135+
/// Receives time updates from subgroups and prints them to stdout.
136+
async fn recv_subgroups(mut subgroups_reader: SubgroupsReader) -> anyhow::Result<()> {
137+
while let Some(mut subgroup_reader) = subgroups_reader.next().await? {
138+
// Spawn a new task to handle the subgroup concurrently, so we
139+
// don't rely on the publisher ending the previous stream before starting a new one.
120140
task::spawn(async move {
121141
if let Err(e) = async {
122-
let base = subgroup
142+
let base = subgroup_reader
123143
.read_next()
124144
.await
125145
.context("failed to get first object")?
126146
.context("empty subgroup")?;
127147

128148
let base = String::from_utf8_lossy(&base);
129149

130-
while let Some(object) = subgroup.read_next().await? {
150+
while let Some(object) = subgroup_reader.read_next().await? {
131151
let str = String::from_utf8_lossy(&object);
132152
println!("{base}{str}");
133153
}
@@ -144,8 +164,9 @@ impl Subscriber {
144164
Ok(())
145165
}
146166

147-
async fn recv_datagrams(mut datagrams: DatagramsReader) -> anyhow::Result<()> {
148-
while let Some(datagram) = datagrams.read().await? {
167+
/// Receives time updates from datagrams and prints them to stdout.
168+
async fn recv_datagrams(mut datagrams_reader: DatagramsReader) -> anyhow::Result<()> {
169+
while let Some(datagram) = datagrams_reader.read().await? {
149170
let str = String::from_utf8_lossy(&datagram.payload);
150171
println!("{str}");
151172
}

moq-clock-ietf/src/main.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub struct Cli {
4040
pub track: String,
4141
}
4242

43+
/// The main entry point for the MoQ Clock IETF example.
4344
#[tokio::main]
4445
async fn main() -> anyhow::Result<()> {
4546
env_logger::init();
@@ -53,6 +54,7 @@ async fn main() -> anyhow::Result<()> {
5354
let config = Cli::parse();
5455
let tls = config.tls.load()?;
5556

57+
// Create the QUIC endpoint
5658
let quic = quic::Endpoint::new(quic::Config {
5759
bind: config.bind,
5860
qlog_dir: None,
@@ -61,48 +63,52 @@ async fn main() -> anyhow::Result<()> {
6163

6264
log::info!("connecting to server: url={}", config.url);
6365

66+
// Connect to the server
6467
let (session, connection_id) = quic.client.connect(&config.url).await?;
6568

6669
log::info!(
6770
"connected with CID: {} (use this to look up qlog/mlog on server)",
6871
connection_id
6972
);
7073

74+
// Depending on whether we are publishing or subscribing, create the appropriate session
7175
if config.publish {
76+
// Create the publisher session
7277
let (session, mut publisher) = Publisher::connect(session)
7378
.await
7479
.context("failed to create MoQ Transport session")?;
7580

76-
let (mut writer, _, reader) = serve::Tracks {
81+
let (mut tracks_writer, _, tracks_reader) = serve::Tracks {
7782
namespace: TrackNamespace::from_utf8_path(&config.namespace),
7883
}
7984
.produce();
8085

81-
let track = writer.create(&config.track).unwrap();
82-
let clock = clock::Publisher::new(track.groups()?);
86+
let track_writer = tracks_writer.create(&config.track).unwrap();
87+
let clock_publisher = clock::Publisher::new(track_writer.subgroups()?);
8388

8489
tokio::select! {
8590
res = session.run() => res.context("session error")?,
86-
res = clock.run() => res.context("clock error")?,
87-
res = publisher.announce(reader) => res.context("failed to serve tracks")?,
91+
res = clock_publisher.run() => res.context("clock error")?,
92+
res = publisher.announce(tracks_reader) => res.context("failed to serve tracks")?,
8893
}
8994
} else {
95+
// Create the subscriber session
9096
let (session, mut subscriber) = Subscriber::connect(session)
9197
.await
9298
.context("failed to create MoQ Transport session")?;
9399

94-
let (prod, sub) = serve::Track::new(
100+
let (track_writer, track_reader) = serve::Track::new(
95101
TrackNamespace::from_utf8_path(&config.namespace),
96102
config.track,
97103
)
98104
.produce();
99105

100-
let clock = clock::Subscriber::new(sub);
106+
let clock_subscriber = clock::Subscriber::new(track_reader);
101107

102108
tokio::select! {
103109
res = session.run() => res.context("session error")?,
104-
res = clock.run() => res.context("clock error")?,
105-
res = subscriber.subscribe(prod) => res.context("failed to subscribe to track")?,
110+
res = clock_subscriber.run() => res.context("clock error")?,
111+
res = subscriber.subscribe(track_writer) => res.context("failed to subscribe to track")?,
106112
}
107113
}
108114

moq-pub/src/media.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ impl Media {
3131
let catalog = broadcast
3232
.create(".catalog")
3333
.context("broadcast closed")?
34-
.groups()?;
34+
.subgroups()?;
3535
let init = broadcast
3636
.create("0.mp4")
3737
.context("broadcast closed")?
38-
.groups()?;
38+
.subgroups()?;
3939

4040
Ok(Media {
4141
tracks: Default::default(),
@@ -333,7 +333,7 @@ struct Track {
333333
impl Track {
334334
fn new(track: TrackWriter, handler: TrackType, timescale: u64) -> Self {
335335
Self {
336-
track: track.groups().unwrap(),
336+
track: track.subgroups().unwrap(),
337337
current: None,
338338
timescale,
339339
handler,

moq-relay-ietf/src/api.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use url::Url;
22

3+
/// API client for moq-api.
34
#[derive(Clone)]
45
pub struct Api {
56
client: moq_api::Client,
@@ -14,12 +15,14 @@ impl Api {
1415
Self { client, origin }
1516
}
1617

18+
/// Set the origin for a given namespace, returning a refresher.
1719
pub async fn set_origin(&self, namespace: String) -> Result<Refresh, moq_api::ApiError> {
1820
let refresh = Refresh::new(self.client.clone(), self.origin.clone(), namespace);
1921
refresh.update().await?;
2022
Ok(refresh)
2123
}
2224

25+
/// Get the origin for a given namespace.
2326
pub async fn get_origin(
2427
&self,
2528
namespace: &str,
@@ -28,6 +31,7 @@ impl Api {
2831
}
2932
}
3033

34+
/// Periodically refreshes the origin registration in moq-api.
3135
pub struct Refresh {
3236
client: moq_api::Client,
3337
origin: moq_api::Origin,
@@ -37,6 +41,7 @@ pub struct Refresh {
3741

3842
impl Refresh {
3943
fn new(client: moq_api::Client, origin: moq_api::Origin, namespace: String) -> Self {
44+
// Refresh every 5 minutes
4045
let duration = tokio::time::Duration::from_secs(300);
4146
let mut refresh = tokio::time::interval(tokio::time::Duration::from_secs(300));
4247
refresh.reset_after(duration); // skip the first tick
@@ -49,18 +54,20 @@ impl Refresh {
4954
}
5055
}
5156

57+
/// Update the origin registration in moq-api.
5258
async fn update(&self) -> Result<(), moq_api::ApiError> {
53-
// Register the origin in moq-api.
5459
log::debug!(
5560
"registering origin: namespace={} url={}",
5661
self.namespace,
5762
self.origin.url
5863
);
64+
// Register the origin in moq-api.
5965
self.client
6066
.set_origin(&self.namespace, self.origin.clone())
6167
.await
6268
}
6369

70+
/// Run the refresher loop.
6471
pub async fn run(&mut self) -> anyhow::Result<()> {
6572
loop {
6673
self.refresh.tick().await;
@@ -69,6 +76,7 @@ impl Refresh {
6976
}
7077
}
7178

79+
/// Unregister the origin on drop.
7280
impl Drop for Refresh {
7381
fn drop(&mut self) {
7482
// TODO this is really lazy

0 commit comments

Comments
 (0)