-
Notifications
You must be signed in to change notification settings - Fork 143
Properly implement the draft-16 SUBSCRIBE_NAMESPACE stream. #940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughConstructor signatures across many IETF message classes were changed from positional parameters to single options objects in JS (js/lite) and corresponding Rust areas. Control/Stream and Connection constructors were updated to object-form parameters. Draft-16 protocol support was added: bidirectional SUBSCRIBE_NAMESPACE streams, new Namespace/NamespaceDone messages, and version-aware encode/decode for publish-namespace messages (including request_id handling). Publisher/Subscriber logic and tests were updated to match these API and protocol changes. 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
No actionable comments were generated in the recent review. 🎉 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
rs/moq-lite/src/ietf/subscriber.rs (1)
68-83:⚠️ Potential issue | 🟡 Minor
publish_namespace_idsentry not cleaned up whenrecv_publish_namespacesends an error response.On line 79, if
start_announcefails,send_erroris called but the mapping inserted at lines 72–77 remains inpublish_namespace_ids. If the remote peer never sends aPublishNamespaceDonefor thisrequest_id, the entry leaks. Consider removing the entry on the error path, or deferring the insert until afterstart_announcesucceeds.🛡️ Proposed fix: insert only on success
pub fn recv_publish_namespace(&mut self, msg: ietf::PublishNamespace) -> Result<(), Error> { let request_id = msg.request_id; - // Track the request_id → namespace mapping for v16 PublishNamespaceDone - { - let mut state = self.state.lock(); - state - .publish_namespace_ids - .insert(request_id, msg.track_namespace.to_owned()); - } - match self.start_announce(msg.track_namespace.to_owned()) { - Ok(_) => self.send_ok(request_id), + Ok(_) => { + // Track the request_id → namespace mapping for v16 PublishNamespaceDone + let mut state = self.state.lock(); + state + .publish_namespace_ids + .insert(request_id, msg.track_namespace.to_owned()); + drop(state); + self.send_ok(request_id) + } Err(err) => self.send_error(request_id, 400, &err.to_string()), } }js/lite/src/ietf/publisher.ts (1)
61-81:⚠️ Potential issue | 🔴 Critical
PublishNamespaceDonemissingrequestId— will encode0non v16.Line 72 constructs
PublishNamespaceDone({ trackNamespace: path })without passingrequestId. PerPublishNamespaceDone's v16 encode path (inpublish_namespace.ts), it writesthis.requestIdinstead ofthis.trackNamespace. SincerequestIddefaults to0n, this will send an incorrect identifier to the peer.Proposed fix
- const unannounce = new PublishNamespaceDone({ trackNamespace: path }); + const unannounce = new PublishNamespaceDone({ trackNamespace: path, requestId });
🤖 Fix all issues with AI agents
In `@js/lite/src/ietf/publisher.ts`:
- Around line 300-348: The polling loop in `#watchBroadcasts` can keep running
after Promise.race resolves and stream.close() is called (race between
stream.reader.closed and `#watchBroadcasts`), causing writes to a closed stream
and unhandled rejections; fix it by adding cancellation: create an
AbortController before calling Promise.race, pass its signal into
`#watchBroadcasts` (update the signature of `#watchBroadcasts` to accept a signal),
and when Promise.race returns (or right before calling stream.close()) call
controller.abort() so the loop sees signal.aborted and exits; inside
`#watchBroadcasts` check signal.aborted each iteration (and subscribe to
signal.onabort) to break/return cleanly instead of attempting writes to the
closed stream.
In `@js/lite/src/ietf/subscriber.ts`:
- Around line 138-167: The read loop inside readLoop that reads messages from
stream.reader currently warns on unknown msgType but does not consume the
message body, which corrupts subsequent reads; update the else branch (where
msgType !== SubscribeNamespaceEntry.id && !== SubscribeNamespaceEntryDone.id) to
read the u16 length prefix from stream.reader (e.g., call the reader method used
for u16 lengths) and then read and discard that many bytes from stream.reader
before continuing so the stream stays framed correctly.
In `@rs/moq-lite/src/ietf/publisher.rs`:
- Around line 504-517: The code calls path.strip_prefix(&prefix).expect(...)
inside the loop using origin.try_announced(), which can panic if the prefix is
absent; change this to handle the Option gracefully by matching on
path.strip_prefix(&prefix) and either (a) log an error with context (use
tracing::error! with origin.absolute(&path) and skip this entry) or (b) return a
descriptive Err from the surrounding async function instead of panicking; apply
the same non-panicking check at the other occurrence that uses strip_prefix, and
ensure any early-return uses the same error type as the surrounding function
while keeping successful cases to call stream.writer.encode(...) with the valid
suffix and Namespace values.
- Around line 59-66: The unannounce path currently uses
namespace_requests.remove(&suffix).unwrap_or(RequestId(0)) which can send a
spurious RequestId(0) to the peer; instead, check the removal result and if it
is None, log a warning (include suffix and self.origin.absolute(&path)) and do
not call self.control.send(PublishNamespaceDone { .. }); if Some(request_id) was
returned, proceed to send the PublishNamespaceDone as before. Update the code
around the unannounce branch (reference: namespace_requests, suffix, RequestId,
PublishNamespaceDone, self.control.send, and tracing::debug!("unannounce")) to
implement this conditional behavior.
In `@rs/moq-lite/src/ietf/session.rs`:
- Around line 284-304: The match arm for unknown stream IDs currently returns
Err(Error::UnexpectedStream) which aborts run_bidi_streams and tears down the
whole session; change this to log the warning and continue the loop instead so a
single bad bidi stream doesn't kill the session: in the function where you
accept streams (the loop using Stream::accept, stream.reader.decode, matching on
ietf::SubscribeNamespace::ID), replace the Err(Error::UnexpectedStream) return
with logic that logs the unexpected id (tracing::warn!(id, "unexpected bidi
stream type")) and simply continues the loop (optionally ensure the stream is
dropped/closed) so only that stream is ignored while the session remains active.
🧹 Nitpick comments (6)
rs/moq-lite/src/ietf/publish_namespace.rs (1)
230-254: v14 and v16 round-trip tests look good, but the v16 test doesn't assert the defaulted field.
test_unannounce_v16assertsdecoded.request_idbut not thatdecoded.track_namespaceis the default empty path. Similarly,test_announce_cancel_v16doesn't checkdecoded.track_namespace. Adding these assertions would strengthen the tests by verifying the defaulting behavior.Optional: add assertions on defaulted fields
assert_eq!(decoded.request_id, RequestId(42)); + assert!(decoded.track_namespace.as_str().is_empty(), "v16 should default track_namespace"); }assert_eq!(decoded.request_id, RequestId(7)); assert_eq!(decoded.error_code, 1); assert_eq!(decoded.reason_phrase, "Shutdown"); + assert!(decoded.track_namespace.as_str().is_empty(), "v16 should default track_namespace"); }rs/moq-lite/src/ietf/subscriber.rs (1)
172-190: Double lock acquisition inrecv_publish_namespace_done— consolidate into a single lock scope.Lines 177–183 acquire the lock twice: once to look up the path, drop it, then again to remove. Since
&mut selfprevents concurrent calls to this method, there's no deadlock risk, but it's needlessly complex. A single lock scope is cleaner and avoids any future footguns if the locking model changes.♻️ Consolidate into a single lock
Version::Draft16 => { // In v16, PublishNamespaceDone uses request_id instead of track_namespace - let state = self.state.lock(); - let path = state.publish_namespace_ids.get(&msg.request_id).cloned(); - drop(state); - - if let Some(path) = path { - self.state.lock().publish_namespace_ids.remove(&msg.request_id); - self.stop_announce(path) + let path = { + let mut state = self.state.lock(); + state.publish_namespace_ids.remove(&msg.request_id) + }; + if let Some(path) = path { + self.stop_announce(path) } else { tracing::warn!(request_id = %msg.request_id, "unknown publish_namespace request_id in done"); Ok(()) } }rs/moq-lite/src/ietf/session.rs (1)
139-147: Message ID 0x08 collision:PublishNamespaceError::IDvsNamespace::ID.
PublishNamespaceError(line 140) uses ID0x08, and the newNamespacemessage insubscribe_namespace.rs(line 130) also uses ID0x08. For v16 on the control stream, receiving0x08returnsErr(Error::UnexpectedMessage)(line 146), so there's no runtime ambiguity on the control stream. However,Namespaceis only used on bidi streams and isn't dispatched through this match. This is fine as long as message IDs are scoped per-stream-type, but worth a brief doc comment to avoid confusion.rs/moq-lite/src/ietf/subscribe_namespace.rs (1)
1-160: Missing tests for new v16 message types andsubscribe_options.This file introduces
subscribe_optionsonSubscribeNamespace, plus two new message types (Namespace,NamespaceDone), but has no#[cfg(test)]module. Round-trip encode/decode tests (similar to those inpublish_namespace.rs) would catch framing regressions. As per coding guidelines, "Rust tests should be integrated within source files".Would you like me to generate the test module for this file?
js/lite/src/ietf/publish_namespace.ts (1)
116-131:"" as Path.Validbypasses branded type safety.The default values use
"" as Path.Validto cast an empty string into the branded type. This works because the field is unused whenrequestIdis active (v16), but it circumvents the type system's protection. Consider makingtrackNamespacetruly optional (i.e.,Path.Valid | undefinedwith defaultundefined) so the brand isn't weakened when the field is semantically absent.Also applies to: 173-182
js/lite/src/ietf/ietf.test.ts (1)
1-464: Missing test coverage for new v16 message types.The new
SubscribeNamespaceEntry(0x08),SubscribeNamespaceEntryDone(0x0e), andSubscribeNamespacev16 encoding (withsubscribeOptions) are not tested. Since these are new wire-format additions, round-trip encode/decode tests would help catch serialization issues. As per coding guidelines, "Write unit tests for critical functionality".Would you like me to generate the missing test cases for these new v16 message types?
| // Loop reading NAMESPACE / NAMESPACE_DONE messages | ||
| const readLoop = (async () => { | ||
| for (;;) { | ||
| const done = await stream.reader.done(); | ||
| if (done) break; | ||
|
|
||
| const msgType = await stream.reader.u53(); | ||
| if (msgType === SubscribeNamespaceEntry.id) { | ||
| const entry = await SubscribeNamespaceEntry.decode(stream.reader, version); | ||
| const path = Path.join(prefix, entry.suffix); | ||
| console.debug(`announced: broadcast=${path} active=true`); | ||
|
|
||
| this.#announced.add(path); | ||
| for (const consumer of this.#announcedConsumers) { | ||
| consumer.append({ path, active: true }); | ||
| } | ||
| } else if (msgType === SubscribeNamespaceEntryDone.id) { | ||
| const entry = await SubscribeNamespaceEntryDone.decode(stream.reader, version); | ||
| const path = Path.join(prefix, entry.suffix); | ||
| console.debug(`announced: broadcast=${path} active=false`); | ||
|
|
||
| this.#announced.delete(path); | ||
| for (const consumer of this.#announcedConsumers) { | ||
| consumer.append({ path, active: false }); | ||
| } | ||
| } else { | ||
| console.warn(`unexpected message type on subscribe_namespace stream: ${msgType}`); | ||
| } | ||
| } | ||
| })(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Search for Message class definition to understand encoding format
fd 'message.ts' -t f | head -20Repository: moq-dev/moq
Length of output: 180
🏁 Script executed:
# Check the Message.encode and Message.decode implementations
rg -n 'class Message|function Message|export.*Message' --type=ts | head -20Repository: moq-dev/moq
Length of output: 540
🏁 Script executed:
# Look at SubscribeNamespaceEntry and related message definitions
rg -n 'SubscribeNamespaceEntry|SubscribeNamespaceEntryDone' --type=ts -B2 -A5 | head -50Repository: moq-dev/moq
Length of output: 3414
🏁 Script executed:
# Verify the actual code in subscriber.ts around the read loop
sed -n '138,175p' js/lite/src/ietf/subscriber.tsRepository: moq-dev/moq
Length of output: 1473
🏁 Script executed:
cat -n js/lite/src/ietf/message.tsRepository: moq-dev/moq
Length of output: 2339
🏁 Script executed:
# Check if there are other places that handle unknown message types similarly
rg -n "unexpected message type|unknown.*message" --type=ts -B2 -A2Repository: moq-dev/moq
Length of output: 1064
🏁 Script executed:
# Verify the stream.reader.u16 method exists
rg -n "\.u16\(\)" js/lite/src/ietf/subscriber.tsRepository: moq-dev/moq
Length of output: 37
🏁 Script executed:
# Check what Reader methods are available
rg -n "reader\.(u16|u53|read|done)" js/lite/src/ietf/subscriber.ts | head -20Repository: moq-dev/moq
Length of output: 211
🏁 Script executed:
# Examine the Reader interface/class definition
rg -n "class Reader|interface Reader|\.u16\(|\.u53\(" js/lite/src/stream.ts -A2Repository: moq-dev/moq
Length of output: 515
🏁 Script executed:
# Get full Reader class definition to see u16 and u53 methods
sed -n '47,200p' js/lite/src/stream.tsRepository: moq-dev/moq
Length of output: 4210
Unknown message types on the bidi stream will corrupt the read loop.
When msgType doesn't match SubscribeNamespaceEntry.id or SubscribeNamespaceEntryDone.id (line 164), the code logs a warning and continues. However, since messages are framed with a u16 length prefix, the unknown message's body is never consumed — the next iteration's stream.reader.u53() will read body bytes as a message type, corrupting all subsequent reads.
You should skip the unknown message body by reading and discarding its u16-prefixed payload:
Proposed fix
} else if (msgType === SubscribeNamespaceEntryDone.id) {
const entry = await SubscribeNamespaceEntryDone.decode(stream.reader, version);
const path = Path.join(prefix, entry.suffix);
console.debug(`announced: broadcast=${path} active=false`);
this.#announced.delete(path);
for (const consumer of this.#announcedConsumers) {
consumer.append({ path, active: false });
}
} else {
console.warn(`unexpected message type on subscribe_namespace stream: ${msgType}`);
+ // Skip unknown message body (u16-prefixed length)
+ const len = await stream.reader.u16();
+ if (len > 0) await stream.reader.read(len);
}🤖 Prompt for AI Agents
In `@js/lite/src/ietf/subscriber.ts` around lines 138 - 167, The read loop inside
readLoop that reads messages from stream.reader currently warns on unknown
msgType but does not consume the message body, which corrupts subsequent reads;
update the else branch (where msgType !== SubscribeNamespaceEntry.id && !==
SubscribeNamespaceEntryDone.id) to read the u16 length prefix from stream.reader
(e.g., call the reader method used for u16 lengths) and then read and discard
that many bytes from stream.reader before continuing so the stream stays framed
correctly.
| // Send initial NAMESPACE messages for currently active namespaces | ||
| while let Some((path, active)) = origin.try_announced() { | ||
| let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path"); | ||
| if active.is_some() { | ||
| tracing::debug!(broadcast = %origin.absolute(&path), "namespace"); | ||
| stream.writer.encode(&ietf::Namespace::ID).await?; | ||
| stream | ||
| .writer | ||
| .encode(&ietf::Namespace { | ||
| suffix: suffix.to_owned(), | ||
| }) | ||
| .await?; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expect() on strip_prefix could panic in production.
Lines 506 and 527 call .strip_prefix(&prefix).expect(...). While consume_only should guarantee the prefix, a panic here would crash the task. Prefer returning an error or logging and skipping.
🛡️ Proposed fix: handle gracefully instead of panicking
- let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path");
+ let Some(suffix) = path.strip_prefix(&prefix) else {
+ tracing::warn!(path = %path.as_str(), prefix = %prefix.as_str(), "origin returned path outside prefix, skipping");
+ continue;
+ };Apply similarly at line 527.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Send initial NAMESPACE messages for currently active namespaces | |
| while let Some((path, active)) = origin.try_announced() { | |
| let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path"); | |
| if active.is_some() { | |
| tracing::debug!(broadcast = %origin.absolute(&path), "namespace"); | |
| stream.writer.encode(&ietf::Namespace::ID).await?; | |
| stream | |
| .writer | |
| .encode(&ietf::Namespace { | |
| suffix: suffix.to_owned(), | |
| }) | |
| .await?; | |
| } | |
| } | |
| // Send initial NAMESPACE messages for currently active namespaces | |
| while let Some((path, active)) = origin.try_announced() { | |
| let Some(suffix) = path.strip_prefix(&prefix) else { | |
| tracing::warn!(path = %path.as_str(), prefix = %prefix.as_str(), "origin returned path outside prefix, skipping"); | |
| continue; | |
| }; | |
| if active.is_some() { | |
| tracing::debug!(broadcast = %origin.absolute(&path), "namespace"); | |
| stream.writer.encode(&ietf::Namespace::ID).await?; | |
| stream | |
| .writer | |
| .encode(&ietf::Namespace { | |
| suffix: suffix.to_owned(), | |
| }) | |
| .await?; | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@rs/moq-lite/src/ietf/publisher.rs` around lines 504 - 517, The code calls
path.strip_prefix(&prefix).expect(...) inside the loop using
origin.try_announced(), which can panic if the prefix is absent; change this to
handle the Option gracefully by matching on path.strip_prefix(&prefix) and
either (a) log an error with context (use tracing::error! with
origin.absolute(&path) and skip this entry) or (b) return a descriptive Err from
the surrounding async function instead of panicking; apply the same
non-panicking check at the other occurrence that uses strip_prefix, and ensure
any early-return uses the same error type as the surrounding function while
keeping successful cases to call stream.writer.encode(...) with the valid suffix
and Namespace values.
| loop { | ||
| let mut stream = Stream::accept(&session, version).await?; | ||
|
|
||
| // Read the first message type ID to determine the stream type | ||
| let id: u64 = stream.reader.decode().await?; | ||
|
|
||
| match id { | ||
| ietf::SubscribeNamespace::ID => { | ||
| let mut pub_clone = publisher.clone(); | ||
| web_async::spawn(async move { | ||
| if let Err(err) = pub_clone.recv_subscribe_namespace_stream(stream).await { | ||
| tracing::debug!(%err, "subscribe_namespace stream error"); | ||
| } | ||
| }); | ||
| } | ||
| _ => { | ||
| tracing::warn!(id, "unexpected bidi stream type"); | ||
| return Err(Error::UnexpectedStream); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unexpected bidi stream type terminates the entire session.
When an unrecognized bidi stream type arrives (line 299–301), run_bidi_streams returns Err(Error::UnexpectedStream), which propagates through the tokio::select! in run() and tears down the entire session. A single malformed or unknown bidi stream shouldn't be fatal — consider logging and continuing the loop instead.
🛡️ Proposed fix: log and continue on unknown stream types
match id {
ietf::SubscribeNamespace::ID => {
let mut pub_clone = publisher.clone();
web_async::spawn(async move {
if let Err(err) = pub_clone.recv_subscribe_namespace_stream(stream).await {
tracing::debug!(%err, "subscribe_namespace stream error");
}
});
}
_ => {
tracing::warn!(id, "unexpected bidi stream type");
- return Err(Error::UnexpectedStream);
+ // Don't kill the session; just drop the stream.
+ continue;
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| loop { | |
| let mut stream = Stream::accept(&session, version).await?; | |
| // Read the first message type ID to determine the stream type | |
| let id: u64 = stream.reader.decode().await?; | |
| match id { | |
| ietf::SubscribeNamespace::ID => { | |
| let mut pub_clone = publisher.clone(); | |
| web_async::spawn(async move { | |
| if let Err(err) = pub_clone.recv_subscribe_namespace_stream(stream).await { | |
| tracing::debug!(%err, "subscribe_namespace stream error"); | |
| } | |
| }); | |
| } | |
| _ => { | |
| tracing::warn!(id, "unexpected bidi stream type"); | |
| return Err(Error::UnexpectedStream); | |
| } | |
| } | |
| } | |
| loop { | |
| let mut stream = Stream::accept(&session, version).await?; | |
| // Read the first message type ID to determine the stream type | |
| let id: u64 = stream.reader.decode().await?; | |
| match id { | |
| ietf::SubscribeNamespace::ID => { | |
| let mut pub_clone = publisher.clone(); | |
| web_async::spawn(async move { | |
| if let Err(err) = pub_clone.recv_subscribe_namespace_stream(stream).await { | |
| tracing::debug!(%err, "subscribe_namespace stream error"); | |
| } | |
| }); | |
| } | |
| _ => { | |
| tracing::warn!(id, "unexpected bidi stream type"); | |
| // Don't kill the session; just drop the stream. | |
| continue; | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@rs/moq-lite/src/ietf/session.rs` around lines 284 - 304, The match arm for
unknown stream IDs currently returns Err(Error::UnexpectedStream) which aborts
run_bidi_streams and tears down the whole session; change this to log the
warning and continue the loop instead so a single bad bidi stream doesn't kill
the session: in the function where you accept streams (the loop using
Stream::accept, stream.reader.decode, matching on ietf::SubscribeNamespace::ID),
replace the Err(Error::UnexpectedStream) return with logic that logs the
unexpected id (tracing::warn!(id, "unexpected bidi stream type")) and simply
continues the loop (optionally ensure the stream is dropped/closed) so only that
stream is ignored while the session remains active.
No description provided.