Skip to content

Commit f488b5c

Browse files
authored
Merge pull request #96 from sgodin/fixup-subscribe-alias-handling
2 parents 9a51cac + f5816cc commit f488b5c

File tree

9 files changed

+258
-158
lines changed

9 files changed

+258
-158
lines changed

moq-transport/src/data/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
mod datagram;
22
mod fetch;
33
mod header;
4-
mod object;
4+
mod object_status;
55
mod subgroup;
66

77
pub use datagram::*;
88
pub use fetch::*;
99
pub use header::*;
10-
pub use object::*;
10+
pub use object_status::*;
1111
pub use subgroup::*;

moq-transport/src/data/object.rs

Lines changed: 0 additions & 77 deletions
This file was deleted.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
2+
3+
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
4+
pub enum ObjectStatus {
5+
NormalObject = 0x0,
6+
ObjectDoesNotExist = 0x1,
7+
EndOfGroup = 0x3,
8+
EndOfTrack = 0x4,
9+
}
10+
11+
impl Decode for ObjectStatus {
12+
fn decode<B: bytes::Buf>(r: &mut B) -> Result<Self, DecodeError> {
13+
match u64::decode(r)? {
14+
0x0 => Ok(Self::NormalObject),
15+
0x1 => Ok(Self::ObjectDoesNotExist),
16+
0x3 => Ok(Self::EndOfGroup),
17+
0x4 => Ok(Self::EndOfTrack),
18+
_ => Err(DecodeError::InvalidObjectStatus),
19+
}
20+
}
21+
}
22+
23+
impl Encode for ObjectStatus {
24+
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
25+
let val = *self as u64;
26+
val.encode(w)?;
27+
Ok(())
28+
}
29+
}

moq-transport/src/serve/track.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ impl TrackReader {
173173
}
174174

175175
// Returns the largest group/sequence
176-
pub fn latest(&self) -> Option<Location> {
176+
pub fn largest(&self) -> Option<Location> {
177177
// We don't even know the mode yet.
178178
// TODO populate from SUBSCRIBE_OK
179179
None

moq-transport/src/session/publisher.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ impl Publisher {
180180
.ok_or(ServeError::NotFound)?;
181181
let response;
182182

183-
if let Some(latest) = track.latest() {
183+
if let Some(latest) = track.largest() {
184184
response = message::TrackStatusOk {
185185
id: track_status_request.request_msg.id,
186186
track_alias: 0, // TODO SLG - wire up track alias logic
@@ -363,20 +363,41 @@ impl Publisher {
363363
Ok(())
364364
}
365365

366-
pub(super) fn send_message<T: Into<message::Publisher> + Into<Message>>(&mut self, msg: T) {
366+
/// Process a message before sending it, performing any necessary internal actions.
367+
fn act_on_message_to_send<T: Into<message::Publisher>>(
368+
&mut self,
369+
msg: T,
370+
) -> message::Publisher {
367371
let msg = msg.into();
368372
match &msg {
369-
message::Publisher::PublishDone(msg) => self.drop_subscribe(msg.id),
370-
message::Publisher::SubscribeError(msg) => self.drop_subscribe(msg.id),
371-
message::Publisher::PublishNamespaceDone(msg) => {
372-
self.drop_publish_namespace(&msg.track_namespace)
373+
message::Publisher::PublishDone(m) => self.drop_subscribe(m.id),
374+
message::Publisher::SubscribeError(m) => self.drop_subscribe(m.id),
375+
message::Publisher::PublishNamespaceDone(m) => {
376+
self.drop_publish_namespace(&m.track_namespace);
373377
}
374-
_ => (),
375-
};
378+
_ => {}
379+
}
380+
msg
381+
}
376382

383+
/// Send a message without waiting for it to be sent.
384+
pub(super) fn send_message<T: Into<message::Publisher> + Into<Message>>(&mut self, msg: T) {
385+
let msg = self.act_on_message_to_send(msg);
377386
self.outgoing.push(msg.into()).ok();
378387
}
379388

389+
/// Send a message and wait until it is sent (or at least popped off the outgoing control message queue)
390+
pub(super) async fn send_message_and_wait<T: Into<message::Publisher> + Into<Message>>(
391+
&mut self,
392+
msg: T,
393+
) {
394+
let msg = self.act_on_message_to_send(msg);
395+
self.outgoing
396+
.push_and_wait_until_popped(msg.into())
397+
.await
398+
.ok();
399+
}
400+
380401
fn drop_subscribe(&mut self, id: u64) {
381402
self.subscribeds.lock().unwrap().remove(&id);
382403
}

moq-transport/src/session/subscribe.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ pub struct SubscribeInfo {
1919

2020
struct SubscribeState {
2121
ok: bool,
22+
track_alias: Option<u64>,
2223
closed: Result<(), ServeError>,
2324
}
2425

2526
impl Default for SubscribeState {
2627
fn default() -> Self {
2728
Self {
2829
ok: Default::default(),
30+
track_alias: None,
2931
closed: Ok(()),
3032
}
3133
}
@@ -121,19 +123,25 @@ pub(super) struct SubscribeRecv {
121123
}
122124

123125
impl SubscribeRecv {
124-
pub fn ok(&mut self) -> Result<(), ServeError> {
126+
pub fn ok(&mut self, alias: u64) -> Result<(), ServeError> {
125127
let state = self.state.lock();
126128
if state.ok {
127129
return Err(ServeError::Duplicate);
128130
}
129131

130132
if let Some(mut state) = state.into_mut() {
131133
state.ok = true;
134+
state.track_alias = Some(alias);
132135
}
133136

134137
Ok(())
135138
}
136139

140+
pub fn track_alias(&self) -> Option<u64> {
141+
let state = self.state.lock();
142+
state.track_alias
143+
}
144+
137145
pub fn error(mut self, err: ServeError) -> Result<(), ServeError> {
138146
if let Some(writer) = self.writer.take() {
139147
writer.close(err.clone())?;
@@ -163,7 +171,8 @@ impl SubscribeRecv {
163171

164172
let writer = subgroups.create(serve::Subgroup {
165173
group_id: header.group_id,
166-
subgroup_id: header.subgroup_id.unwrap(), // TODO SLG - subgroup_id may not be present
174+
// When subgroup_id is not present in the header type, it implicitly means subgroup 0
175+
subgroup_id: header.subgroup_id.unwrap_or(0),
167176
priority: header.publisher_priority,
168177
})?;
169178

@@ -176,17 +185,20 @@ impl SubscribeRecv {
176185
let writer = self.writer.take().ok_or(ServeError::Done)?;
177186

178187
let mut datagrams = match writer {
179-
TrackWriterMode::Track(init) => init.datagrams()?, // TODO SLG - is this needed?
188+
TrackWriterMode::Track(track) => track.datagrams()?, // TODO SLG - is this needed?
180189
TrackWriterMode::Datagrams(datagrams) => datagrams,
181190
_ => return Err(ServeError::Mode),
182191
};
183192

184193
// TODO SLG - update with new datagram fields
185194
datagrams.write(serve::Datagram {
186195
group_id: datagram.group_id,
187-
object_id: datagram.object_id.unwrap(), // TODO SLG - make safe
196+
// When object_id is not present in the datagram type, it implicitly means object 0
197+
object_id: datagram.object_id.unwrap_or(0),
188198
priority: datagram.publisher_priority,
189-
payload: datagram.payload.unwrap(), // TODO SLG - datagram.payload is an Option
199+
// TODO: Handle status datagrams separately - they don't have payload
200+
// For now, use empty bytes as fallback (shouldn't happen in practice for payload datagrams)
201+
payload: datagram.payload.unwrap_or_default(),
190202
})?;
191203

192204
Ok(())

moq-transport/src/session/subscribed.rs

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -102,24 +102,31 @@ impl Subscribed {
102102
}
103103

104104
async fn serve_inner(&mut self, track: serve::TrackReader) -> Result<(), SessionError> {
105-
let latest = track.latest();
105+
// Update largest location before sending SubscribeOk
106+
let largest_location = track.largest();
106107
self.state
107108
.lock_mut()
108109
.ok_or(ServeError::Cancel)?
109-
.largest_location = latest;
110-
111-
self.publisher.send_message(message::SubscribeOk {
112-
id: self.msg.id,
113-
track_alias: self.msg.id, // TODO SLG - use subscription id for now, needs fixing
114-
expires: 0, // TODO SLG
115-
group_order: message::GroupOrder::Descending, // TODO: resolve correct value from publisher / subscriber prefs
116-
content_exists: latest.is_some(),
117-
largest_location: latest,
118-
params: Default::default(),
119-
});
110+
.largest_location = largest_location;
111+
112+
// Send SubscribeOk using send_message_and_wait to ensure it is sent at least to the QUIC stack before
113+
// we start serving the track. If a subscriber gets the stream before SubscribeOk
114+
// then they won't recognize the track_alias in the stream header.
115+
self.publisher
116+
.send_message_and_wait(message::SubscribeOk {
117+
id: self.msg.id,
118+
track_alias: self.msg.id, // use subscription id as track alias
119+
expires: 0, // TODO SLG
120+
group_order: message::GroupOrder::Descending, // TODO: resolve correct value from publisher / subscriber prefs
121+
content_exists: largest_location.is_some(),
122+
largest_location,
123+
params: Default::default(),
124+
})
125+
.await;
120126

121127
self.ok = true; // So we send SubscribeDone on drop
122128

129+
// Serve based on track mode
123130
match track.mode().await? {
124131
// TODO cancel track/datagrams on closed
125132
TrackReaderMode::Stream(_stream) => panic!("deprecated"),
@@ -204,7 +211,7 @@ impl Subscribed {
204211
Ok(Some(subgroup)) => {
205212
let header = data::SubgroupHeader {
206213
header_type: data::StreamHeaderType::SubgroupIdExt, // SubGroupId = Yes, Extensions = Yes, ContainsEndOfGroup = No
207-
track_alias: self.msg.id, // TODO SLG - use subscription id for now, needs fixing
214+
track_alias: self.msg.id, // use subscription id as track_alias
208215
group_id: subgroup.group_id,
209216
subgroup_id: Some(subgroup.subgroup_id),
210217
publisher_priority: subgroup.priority,
@@ -367,7 +374,7 @@ impl Subscribed {
367374
while let Some(datagram) = datagrams.read().await? {
368375
let encoded_datagram = data::Datagram {
369376
datagram_type: data::DatagramType::ObjectIdPayload, // TODO SLG
370-
track_alias: self.msg.id, // TODO SLG - use subscription id for now
377+
track_alias: self.msg.id, // use subscription id as track_alias
371378
group_id: datagram.group_id,
372379
object_id: Some(datagram.object_id),
373380
publisher_priority: datagram.priority,
@@ -403,7 +410,6 @@ impl Subscribed {
403410
encoded_datagram.group_id,
404411
encoded_datagram.object_id.unwrap(),
405412
)?;
406-
// TODO SLG - fix up safety of unwrap()
407413

408414
datagram_count += 1;
409415
}

0 commit comments

Comments
 (0)