From dd5211b1b76cbd67af8f613354bfb36efd191cf7 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Wed, 11 Feb 2026 19:00:25 -0800 Subject: [PATCH 1/2] Properly implement the SUBSCRIBE_NAMESPACE stream. --- js/lite/src/connection/connect.ts | 25 ++-- js/lite/src/ietf/connection.ts | 63 ++++++-- js/lite/src/ietf/control.ts | 36 ++++- js/lite/src/ietf/fetch.ts | 42 ++++-- js/lite/src/ietf/goaway.ts | 4 +- js/lite/src/ietf/ietf.test.ts | 106 ++++++++++---- js/lite/src/ietf/object.ts | 22 ++- js/lite/src/ietf/publish.ts | 55 ++++--- js/lite/src/ietf/publish_namespace.ts | 97 +++++++++---- js/lite/src/ietf/publisher.ts | 150 +++++++++++++++++--- js/lite/src/ietf/request.ts | 29 ++-- js/lite/src/ietf/setup.ts | 12 +- js/lite/src/ietf/subscribe.ts | 32 +++-- js/lite/src/ietf/subscribe_namespace.ts | 100 +++++++++++-- js/lite/src/ietf/subscriber.ts | 114 +++++++++++++-- js/lite/src/ietf/track.ts | 26 ++-- rs/moq-lite/src/ietf/publish_namespace.rs | 118 ++++++++++++--- rs/moq-lite/src/ietf/publisher.rs | 82 ++++++++++- rs/moq-lite/src/ietf/session.rs | 68 +++++++-- rs/moq-lite/src/ietf/subscribe_namespace.rs | 65 ++++++++- rs/moq-lite/src/ietf/subscriber.rs | 29 +++- 21 files changed, 1043 insertions(+), 232 deletions(-) diff --git a/js/lite/src/connection/connect.ts b/js/lite/src/connection/connect.ts index daba26430..f03f4f14f 100644 --- a/js/lite/src/connection/connect.ts +++ b/js/lite/src/connection/connect.ts @@ -97,16 +97,17 @@ export async function connect(url: URL, props?: ConnectProps): Promise { this.#control.close(); }); - this.#publisher = new Publisher(this.#quic, this.#control); - this.#subscriber = new Subscriber(this.#control); + this.#publisher = new Publisher({ quic: this.#quic, control: this.#control }); + this.#subscriber = new Subscriber({ control: this.#control, quic: this.#quic }); void this.#run(); } @@ -95,11 +107,15 @@ export class Connection implements Established { } async #run(): Promise { - const controlMessages = this.#runControlStream(); - const objectStreams = this.#runObjectStreams(); + const tasks: Promise[] = [this.#runControlStream(), this.#runObjectStreams()]; + + // v16: accept bidi streams for SUBSCRIBE_NAMESPACE + if (this.#control.version === Version.DRAFT_16) { + tasks.push(this.#runBidiStreams()); + } try { - await Promise.all([controlMessages, objectStreams]); + await Promise.all(tasks); } catch (err) { if (!this.#closed) { console.error("fatal error running connection", err); @@ -290,6 +306,35 @@ export class Connection implements Established { } } + /** + * Accepts bidirectional streams for v16 SUBSCRIBE_NAMESPACE. + */ + async #runBidiStreams() { + for (;;) { + const stream = await Stream.accept(this.#quic); + if (!stream) break; + + void this.#runBidiStream(stream).catch((err: unknown) => { + console.error("error processing bidi stream", err); + stream.abort(new Error("bidi stream error")); + }); + } + } + + /** + * Handles a single incoming bidi stream. + */ + async #runBidiStream(stream: Stream) { + const messageType = await stream.reader.u53(); + + if (messageType === SubscribeNamespace.id) { + await this.#publisher.handleSubscribeNamespaceStream(stream); + } else { + console.warn(`unexpected bidi stream type: ${messageType}`); + stream.abort(new Error("unexpected stream type")); + } + } + /** * Returns a promise that resolves when the connection is closed. * @returns A promise that resolves when closed diff --git a/js/lite/src/ietf/control.ts b/js/lite/src/ietf/control.ts index 619a1fd76..17c9c8444 100644 --- a/js/lite/src/ietf/control.ts +++ b/js/lite/src/ietf/control.ts @@ -80,8 +80,30 @@ const MessagesV15 = { [RequestsBlocked.id]: RequestsBlocked, } as const; -// v16 uses the same message map as v15 -const MessagesV16 = MessagesV15; +// v16 message map — SubscribeNamespace (0x11) and UnsubscribeNamespace (0x14) move to bidi streams +const MessagesV16 = { + [Setup.ClientSetup.id]: Setup.ClientSetup, + [Setup.ServerSetup.id]: Setup.ServerSetup, + [Subscribe.id]: Subscribe, + [SubscribeOk.id]: SubscribeOk, + [RequestError.id]: RequestError, // 0x05 → RequestError + [PublishNamespace.id]: PublishNamespace, + [RequestOk.id]: RequestOk, // 0x07 → RequestOk + [PublishNamespaceDone.id]: PublishNamespaceDone, + [Unsubscribe.id]: Unsubscribe, + [PublishDone.id]: PublishDone, + [PublishNamespaceCancel.id]: PublishNamespaceCancel, + [TrackStatusRequest.id]: TrackStatusRequest, + [GoAway.id]: GoAway, + [Fetch.id]: Fetch, + [FetchCancel.id]: FetchCancel, + [FetchOk.id]: FetchOk, + // SubscribeNamespace (0x11) removed — now on bidi stream + // UnsubscribeNamespace (0x14) removed — now use stream close + [Publish.id]: Publish, + [MaxRequestId.id]: MaxRequestId, + [RequestsBlocked.id]: RequestsBlocked, +} as const; type V14MessageType = (typeof MessagesV14)[keyof typeof MessagesV14]; type V15MessageType = (typeof MessagesV15)[keyof typeof MessagesV15]; @@ -106,7 +128,15 @@ export class Stream { #writeLock = new Mutex(); #readLock = new Mutex(); - constructor(stream: StreamInner, maxRequestId: bigint, version: IetfVersion = Version.DRAFT_14) { + constructor({ + stream, + maxRequestId, + version = Version.DRAFT_14, + }: { + stream: StreamInner; + maxRequestId: bigint; + version?: IetfVersion; + }) { this.stream = stream; this.version = version; this.#maxRequestId = maxRequestId; diff --git a/js/lite/src/ietf/fetch.ts b/js/lite/src/ietf/fetch.ts index 577aab63a..c329b6bb6 100644 --- a/js/lite/src/ietf/fetch.ts +++ b/js/lite/src/ietf/fetch.ts @@ -16,17 +16,27 @@ export class Fetch { endGroup: bigint; endObject: bigint; - constructor( - requestId: bigint, - trackNamespace: Path.Valid, - trackName: string, - subscriberPriority: number, - groupOrder: number, - startGroup: bigint, - startObject: bigint, - endGroup: bigint, - endObject: bigint, - ) { + constructor({ + requestId, + trackNamespace, + trackName, + subscriberPriority, + groupOrder, + startGroup, + startObject, + endGroup, + endObject, + }: { + requestId: bigint; + trackNamespace: Path.Valid; + trackName: string; + subscriberPriority: number; + groupOrder: number; + startGroup: bigint; + startObject: bigint; + endGroup: bigint; + endObject: bigint; + }) { this.requestId = requestId; this.trackNamespace = trackNamespace; this.trackName = trackName; @@ -60,7 +70,7 @@ export class FetchOk { requestId: bigint; - constructor(requestId: bigint) { + constructor({ requestId }: { requestId: bigint }) { this.requestId = requestId; } @@ -88,7 +98,11 @@ export class FetchError { errorCode: number; reasonPhrase: string; - constructor(requestId: bigint, errorCode: number, reasonPhrase: string) { + constructor({ + requestId, + errorCode, + reasonPhrase, + }: { requestId: bigint; errorCode: number; reasonPhrase: string }) { this.requestId = requestId; this.errorCode = errorCode; this.reasonPhrase = reasonPhrase; @@ -116,7 +130,7 @@ export class FetchCancel { requestId: bigint; - constructor(requestId: bigint) { + constructor({ requestId }: { requestId: bigint }) { this.requestId = requestId; } diff --git a/js/lite/src/ietf/goaway.ts b/js/lite/src/ietf/goaway.ts index e7fa3c78a..8f7040360 100644 --- a/js/lite/src/ietf/goaway.ts +++ b/js/lite/src/ietf/goaway.ts @@ -7,7 +7,7 @@ export class GoAway { newSessionUri: string; - constructor(newSessionUri: string) { + constructor({ newSessionUri }: { newSessionUri: string }) { this.newSessionUri = newSessionUri; } @@ -25,6 +25,6 @@ export class GoAway { static async #decode(r: Reader): Promise { const newSessionUri = await r.string(); - return new GoAway(newSessionUri); + return new GoAway({ newSessionUri }); } } diff --git a/js/lite/src/ietf/ietf.test.ts b/js/lite/src/ietf/ietf.test.ts index 063c494c3..74c753151 100644 --- a/js/lite/src/ietf/ietf.test.ts +++ b/js/lite/src/ietf/ietf.test.ts @@ -59,7 +59,12 @@ async function decodeVersioned( // Subscribe tests (v14) test("Subscribe v14: round trip", async () => { - const msg = new Subscribe.Subscribe(1n, Path.from("test"), "video", 128); + const msg = new Subscribe.Subscribe({ + requestId: 1n, + trackNamespace: Path.from("test"), + trackName: "video", + subscriberPriority: 128, + }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Subscribe.Subscribe.decode, Version.DRAFT_14); @@ -71,7 +76,12 @@ test("Subscribe v14: round trip", async () => { }); test("Subscribe v14: nested namespace", async () => { - const msg = new Subscribe.Subscribe(100n, Path.from("conference/room123"), "audio", 255); + const msg = new Subscribe.Subscribe({ + requestId: 100n, + trackNamespace: Path.from("conference/room123"), + trackName: "audio", + subscriberPriority: 255, + }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Subscribe.Subscribe.decode, Version.DRAFT_14); @@ -80,7 +90,7 @@ test("Subscribe v14: nested namespace", async () => { }); test("SubscribeOk v14: without largest", async () => { - const msg = new Subscribe.SubscribeOk(42n, 43n); + const msg = new Subscribe.SubscribeOk({ requestId: 42n, trackAlias: 43n }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Subscribe.SubscribeOk.decode, Version.DRAFT_14); @@ -91,7 +101,12 @@ test("SubscribeOk v14: without largest", async () => { // Subscribe tests (v15) test("Subscribe v15: round trip", async () => { - const msg = new Subscribe.Subscribe(1n, Path.from("test"), "video", 128); + const msg = new Subscribe.Subscribe({ + requestId: 1n, + trackNamespace: Path.from("test"), + trackName: "video", + subscriberPriority: 128, + }); const encoded = await encodeVersioned(msg, Version.DRAFT_15); const decoded = await decodeVersioned(encoded, Subscribe.Subscribe.decode, Version.DRAFT_15); @@ -103,7 +118,7 @@ test("Subscribe v15: round trip", async () => { }); test("SubscribeOk v15: round trip", async () => { - const msg = new Subscribe.SubscribeOk(42n, 43n); + const msg = new Subscribe.SubscribeOk({ requestId: 42n, trackAlias: 43n }); const encoded = await encodeVersioned(msg, Version.DRAFT_15); const decoded = await decodeVersioned(encoded, Subscribe.SubscribeOk.decode, Version.DRAFT_15); @@ -113,7 +128,7 @@ test("SubscribeOk v15: round trip", async () => { }); test("SubscribeError: round trip", async () => { - const msg = new Subscribe.SubscribeError(123n, 500, "Not found"); + const msg = new Subscribe.SubscribeError({ requestId: 123n, errorCode: 500, reasonPhrase: "Not found" }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Subscribe.SubscribeError.decode, Version.DRAFT_14); @@ -124,7 +139,7 @@ test("SubscribeError: round trip", async () => { }); test("Unsubscribe: round trip", async () => { - const msg = new Subscribe.Unsubscribe(999n); + const msg = new Subscribe.Unsubscribe({ requestId: 999n }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Subscribe.Unsubscribe.decode, Version.DRAFT_14); @@ -133,7 +148,7 @@ test("Unsubscribe: round trip", async () => { }); test("PublishDone: basic test", async () => { - const msg = new PublishDone(10n, 0, "complete"); + const msg = new PublishDone({ requestId: 10n, statusCode: 0, reasonPhrase: "complete" }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, PublishDone.decode, Version.DRAFT_14); @@ -144,7 +159,7 @@ test("PublishDone: basic test", async () => { }); test("PublishDone: with error", async () => { - const msg = new PublishDone(10n, 1, "error"); + const msg = new PublishDone({ requestId: 10n, statusCode: 1, reasonPhrase: "error" }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, PublishDone.decode, Version.DRAFT_14); @@ -156,7 +171,7 @@ test("PublishDone: with error", async () => { // Announce/PublishNamespace tests test("PublishNamespace: round trip", async () => { - const msg = new Announce.PublishNamespace(1n, Path.from("test/broadcast")); + const msg = new Announce.PublishNamespace({ requestId: 1n, trackNamespace: Path.from("test/broadcast") }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Announce.PublishNamespace.decode, Version.DRAFT_14); @@ -166,7 +181,7 @@ test("PublishNamespace: round trip", async () => { }); test("PublishNamespaceOk: round trip", async () => { - const msg = new Announce.PublishNamespaceOk(2n); + const msg = new Announce.PublishNamespaceOk({ requestId: 2n }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Announce.PublishNamespaceOk.decode, Version.DRAFT_14); @@ -175,7 +190,7 @@ test("PublishNamespaceOk: round trip", async () => { }); test("PublishNamespaceError: round trip", async () => { - const msg = new Announce.PublishNamespaceError(3n, 404, "Unauthorized"); + const msg = new Announce.PublishNamespaceError({ requestId: 3n, errorCode: 404, reasonPhrase: "Unauthorized" }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Announce.PublishNamespaceError.decode, Version.DRAFT_14); @@ -186,7 +201,7 @@ test("PublishNamespaceError: round trip", async () => { }); test("PublishNamespaceDone: round trip", async () => { - const msg = new Announce.PublishNamespaceDone(Path.from("old/stream")); + const msg = new Announce.PublishNamespaceDone({ trackNamespace: Path.from("old/stream") }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Announce.PublishNamespaceDone.decode, Version.DRAFT_14); @@ -195,7 +210,11 @@ test("PublishNamespaceDone: round trip", async () => { }); test("PublishNamespaceCancel: round trip", async () => { - const msg = new Announce.PublishNamespaceCancel(Path.from("canceled"), 1, "Shutdown"); + const msg = new Announce.PublishNamespaceCancel({ + trackNamespace: Path.from("canceled"), + errorCode: 1, + reasonPhrase: "Shutdown", + }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Announce.PublishNamespaceCancel.decode, Version.DRAFT_14); @@ -207,7 +226,7 @@ test("PublishNamespaceCancel: round trip", async () => { // GoAway tests test("GoAway: with URL", async () => { - const msg = new GoAway.GoAway("https://example.com/new"); + const msg = new GoAway.GoAway({ newSessionUri: "https://example.com/new" }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, GoAway.GoAway.decode, Version.DRAFT_14); @@ -216,7 +235,7 @@ test("GoAway: with URL", async () => { }); test("GoAway: empty", async () => { - const msg = new GoAway.GoAway(""); + const msg = new GoAway.GoAway({ newSessionUri: "" }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, GoAway.GoAway.decode, Version.DRAFT_14); @@ -226,7 +245,7 @@ test("GoAway: empty", async () => { // Track tests test("TrackStatusRequest: round trip", async () => { - const msg = new Track.TrackStatusRequest(Path.from("video/stream"), "main"); + const msg = new Track.TrackStatusRequest({ trackNamespace: Path.from("video/stream"), trackName: "main" }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Track.TrackStatusRequest.decode, Version.DRAFT_14); @@ -236,7 +255,13 @@ test("TrackStatusRequest: round trip", async () => { }); test("TrackStatus: round trip", async () => { - const msg = new Track.TrackStatus(Path.from("test"), "status", 200, 42n, 100n); + const msg = new Track.TrackStatus({ + trackNamespace: Path.from("test"), + trackName: "status", + statusCode: 200, + lastGroupId: 42n, + lastObjectId: 100n, + }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Track.TrackStatus.decode, Version.DRAFT_14); @@ -292,7 +317,7 @@ test("SubscribeOk v14: rejects non-zero expires", async () => { // Unicode tests test("SubscribeError: unicode strings", async () => { - const msg = new Subscribe.SubscribeError(1n, 400, "Error: 错误 🚫"); + const msg = new Subscribe.SubscribeError({ requestId: 1n, errorCode: 400, reasonPhrase: "Error: 错误 🚫" }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Subscribe.SubscribeError.decode, Version.DRAFT_14); @@ -303,7 +328,7 @@ test("SubscribeError: unicode strings", async () => { }); test("PublishNamespace: unicode namespace", async () => { - const msg = new Announce.PublishNamespace(1n, Path.from("会议/房间")); + const msg = new Announce.PublishNamespace({ requestId: 1n, trackNamespace: Path.from("会议/房间") }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Announce.PublishNamespace.decode, Version.DRAFT_14); @@ -314,7 +339,16 @@ test("PublishNamespace: unicode namespace", async () => { // Publish v15 tests test("Publish v15: round trip", async () => { - const msg = new Publish(1n, Path.from("test/ns"), "video", 42n, 0x02, false, undefined, true); + const msg = new Publish({ + requestId: 1n, + trackNamespace: Path.from("test/ns"), + trackName: "video", + trackAlias: 42n, + groupOrder: 0x02, + contentExists: false, + largest: undefined, + forward: true, + }); const encoded = await encodeVersioned(msg, Version.DRAFT_15); const decoded = await decodeVersioned(encoded, Publish.decode, Version.DRAFT_15); @@ -327,7 +361,16 @@ test("Publish v15: round trip", async () => { }); test("Publish v14: round trip", async () => { - const msg = new Publish(1n, Path.from("test/ns"), "video", 42n, 0x02, true, { groupId: 10n, objectId: 5n }, true); + const msg = new Publish({ + requestId: 1n, + trackNamespace: Path.from("test/ns"), + trackName: "video", + trackAlias: 42n, + groupOrder: 0x02, + contentExists: true, + largest: { groupId: 10n, objectId: 5n }, + forward: true, + }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Publish.decode, Version.DRAFT_14); @@ -344,7 +387,7 @@ test("Publish v14: round trip", async () => { // ClientSetup v15 tests test("ClientSetup v15: round trip", async () => { - const msg = new Setup.ClientSetup([Version.DRAFT_15]); + const msg = new Setup.ClientSetup({ versions: [Version.DRAFT_15] }); const encoded = await encodeVersioned(msg, Version.DRAFT_15); const decoded = await decodeVersioned(encoded, Setup.ClientSetup.decode, Version.DRAFT_15); @@ -354,7 +397,7 @@ test("ClientSetup v15: round trip", async () => { }); test("ClientSetup v14: round trip", async () => { - const msg = new Setup.ClientSetup([Version.DRAFT_14]); + const msg = new Setup.ClientSetup({ versions: [Version.DRAFT_14] }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Setup.ClientSetup.decode, Version.DRAFT_14); @@ -365,7 +408,7 @@ test("ClientSetup v14: round trip", async () => { // ServerSetup v15 tests test("ServerSetup v15: round trip", async () => { - const msg = new Setup.ServerSetup(Version.DRAFT_15); + const msg = new Setup.ServerSetup({ version: Version.DRAFT_15 }); const encoded = await encodeVersioned(msg, Version.DRAFT_15); const decoded = await decodeVersioned(encoded, Setup.ServerSetup.decode, Version.DRAFT_15); @@ -374,7 +417,7 @@ test("ServerSetup v15: round trip", async () => { }); test("ServerSetup v14: round trip", async () => { - const msg = new Setup.ServerSetup(Version.DRAFT_14); + const msg = new Setup.ServerSetup({ version: Version.DRAFT_14 }); const encoded = await encodeVersioned(msg, Version.DRAFT_14); const decoded = await decodeVersioned(encoded, Setup.ServerSetup.decode, Version.DRAFT_14); @@ -384,7 +427,7 @@ test("ServerSetup v14: round trip", async () => { // RequestOk / RequestError v15 tests test("RequestOk: round trip", async () => { - const msg = new RequestOk(42n); + const msg = new RequestOk({ requestId: 42n }); const encoded = await encodeVersioned(msg, Version.DRAFT_15); const decoded = await decodeVersioned(encoded, RequestOk.decode, Version.DRAFT_15); @@ -393,7 +436,7 @@ test("RequestOk: round trip", async () => { }); test("RequestError v15: round trip", async () => { - const msg = new RequestError(99n, 500, "Internal error"); + const msg = new RequestError({ requestId: 99n, errorCode: 500, reasonPhrase: "Internal error" }); const encoded = await encodeVersioned(msg, Version.DRAFT_15); const decoded = await decodeVersioned(encoded, RequestError.decode, Version.DRAFT_15); @@ -405,7 +448,12 @@ test("RequestError v15: round trip", async () => { }); test("RequestError v16: round trip with retryInterval", async () => { - const msg = new RequestError(99n, 500, "Internal error", 5000n); + const msg = new RequestError({ + requestId: 99n, + errorCode: 500, + reasonPhrase: "Internal error", + retryInterval: 5000n, + }); const encoded = await encodeVersioned(msg, Version.DRAFT_16); const decoded = await decodeVersioned(encoded, RequestError.decode, Version.DRAFT_16); diff --git a/js/lite/src/ietf/object.ts b/js/lite/src/ietf/object.ts index cb91bf03d..21619b38e 100644 --- a/js/lite/src/ietf/object.ts +++ b/js/lite/src/ietf/object.ts @@ -23,7 +23,19 @@ export class Group { subGroupId: number; publisherPriority: number; - constructor(trackAlias: bigint, groupId: number, subGroupId: number, publisherPriority: number, flags: GroupFlags) { + constructor({ + trackAlias, + groupId, + subGroupId, + publisherPriority, + flags, + }: { + trackAlias: bigint; + groupId: number; + subGroupId: number; + publisherPriority: number; + flags: GroupFlags; + }) { this.flags = flags; this.trackAlias = trackAlias; this.groupId = groupId; @@ -89,7 +101,7 @@ export class Group { const subGroupId = flags.hasSubgroup ? await r.u53() : 0; const publisherPriority = hasPriority ? await r.u8() : 128; // Default priority when absent - return new Group(trackAlias, groupId, subGroupId, publisherPriority, flags); + return new Group({ trackAlias, groupId, subGroupId, publisherPriority, flags }); } } @@ -97,7 +109,7 @@ export class Frame { // undefined means end of group payload?: Uint8Array; - constructor(payload?: Uint8Array) { + constructor({ payload }: { payload?: Uint8Array } = {}) { this.payload = payload; } @@ -138,14 +150,14 @@ export class Frame { if (payloadLength > 0) { const payload = await r.read(payloadLength); - return new Frame(payload); + return new Frame({ payload }); } const status = await r.u53(); if (flags.hasEnd) { // Empty frame - if (status === 0) return new Frame(new Uint8Array(0)); + if (status === 0) return new Frame({ payload: new Uint8Array(0) }); } else if (status === 0 || status === GROUP_END) { // TODO status === 0 should be an empty frame, but moq-rs seems to be sending it incorrectly on group end. return new Frame(); diff --git a/js/lite/src/ietf/publish.ts b/js/lite/src/ietf/publish.ts index 4a019a66f..994792a15 100644 --- a/js/lite/src/ietf/publish.ts +++ b/js/lite/src/ietf/publish.ts @@ -19,16 +19,25 @@ export class Publish { largest: { groupId: bigint; objectId: bigint } | undefined; forward: boolean; - constructor( - requestId: bigint, - trackNamespace: Path.Valid, - trackName: string, - trackAlias: bigint, - groupOrder: number, - contentExists: boolean, - largest: { groupId: bigint; objectId: bigint } | undefined, - forward: boolean, - ) { + constructor({ + requestId, + trackNamespace, + trackName, + trackAlias, + groupOrder, + contentExists, + largest, + forward, + }: { + requestId: bigint; + trackNamespace: Path.Valid; + trackName: string; + trackAlias: bigint; + groupOrder: number; + contentExists: boolean; + largest: { groupId: bigint; objectId: bigint } | undefined; + forward: boolean; + }) { this.requestId = requestId; this.trackNamespace = trackNamespace; this.trackName = trackName; @@ -91,23 +100,23 @@ export class Publish { const groupOrder = params.groupOrder ?? 0x02; const forward = params.forward ?? true; const largest = params.largest; - return new Publish( + return new Publish({ requestId, trackNamespace, trackName, trackAlias, groupOrder, - !!largest, + contentExists: !!largest, largest, forward, - ); + }); } else if (version === Version.DRAFT_14) { const groupOrder = await r.u8(); const contentExists = await r.bool(); const largest = contentExists ? { groupId: await r.u62(), objectId: await r.u62() } : undefined; const forward = await r.bool(); await Parameters.decode(r, version); // ignore parameters - return new Publish( + return new Publish({ requestId, trackNamespace, trackName, @@ -116,7 +125,7 @@ export class Publish { contentExists, largest, forward, - ); + }); } else { const _: never = version; throw new Error(`unsupported version: ${_}`); @@ -151,7 +160,11 @@ export class PublishError { errorCode: number; reasonPhrase: string; - constructor(requestId: bigint, errorCode: number, reasonPhrase: string) { + constructor({ + requestId, + errorCode, + reasonPhrase, + }: { requestId: bigint; errorCode: number; reasonPhrase: string }) { this.requestId = requestId; this.errorCode = errorCode; this.reasonPhrase = reasonPhrase; @@ -175,7 +188,7 @@ export class PublishError { const requestId = await r.u62(); const errorCode = Number(await r.u62()); const reasonPhrase = await r.string(); - return new PublishError(requestId, errorCode, reasonPhrase); + return new PublishError({ requestId, errorCode, reasonPhrase }); } } @@ -187,7 +200,11 @@ export class PublishDone { statusCode: number; reasonPhrase: string; - constructor(requestId: bigint, statusCode: number, reasonPhrase: string) { + constructor({ + requestId, + statusCode, + reasonPhrase, + }: { requestId: bigint; statusCode: number; reasonPhrase: string }) { this.requestId = requestId; this.statusCode = statusCode; this.reasonPhrase = reasonPhrase; @@ -214,6 +231,6 @@ export class PublishDone { await r.u62(); // ignore stream_count const reasonPhrase = await r.string(); - return new PublishDone(requestId, statusCode, reasonPhrase); + return new PublishDone({ requestId, statusCode, reasonPhrase }); } } diff --git a/js/lite/src/ietf/publish_namespace.ts b/js/lite/src/ietf/publish_namespace.ts index 10ee22fe7..868ed8343 100644 --- a/js/lite/src/ietf/publish_namespace.ts +++ b/js/lite/src/ietf/publish_namespace.ts @@ -3,7 +3,7 @@ import type { Reader, Writer } from "../stream.ts"; import * as Message from "./message.ts"; import * as Namespace from "./namespace.ts"; import { Parameters } from "./parameters.ts"; -import type { IetfVersion } from "./version.ts"; +import { type IetfVersion, Version } from "./version.ts"; // In draft-14, ANNOUNCE is renamed to PUBLISH_NAMESPACE export class PublishNamespace { @@ -12,7 +12,7 @@ export class PublishNamespace { requestId: bigint; trackNamespace: Path.Valid; - constructor(requestId: bigint, trackNamespace: Path.Valid) { + constructor({ requestId, trackNamespace }: { requestId: bigint; trackNamespace: Path.Valid }) { this.requestId = requestId; this.trackNamespace = trackNamespace; } @@ -35,7 +35,7 @@ export class PublishNamespace { const requestId = await r.u62(); const trackNamespace = await Namespace.decode(r); await Parameters.decode(r, version); // ignore parameters - return new PublishNamespace(requestId, trackNamespace); + return new PublishNamespace({ requestId, trackNamespace }); } } @@ -44,7 +44,7 @@ export class PublishNamespaceOk { requestId: bigint; - constructor(requestId: bigint) { + constructor({ requestId }: { requestId: bigint }) { this.requestId = requestId; } @@ -62,7 +62,7 @@ export class PublishNamespaceOk { static async #decode(r: Reader): Promise { const requestId = await r.u62(); - return new PublishNamespaceOk(requestId); + return new PublishNamespaceOk({ requestId }); } } @@ -73,7 +73,11 @@ export class PublishNamespaceError { errorCode: number; reasonPhrase: string; - constructor(requestId: bigint, errorCode: number, reasonPhrase: string) { + constructor({ + requestId, + errorCode, + reasonPhrase, + }: { requestId: bigint; errorCode: number; reasonPhrase: string }) { this.requestId = requestId; this.errorCode = errorCode; this.reasonPhrase = reasonPhrase; @@ -97,7 +101,7 @@ export class PublishNamespaceError { const requestId = await r.u62(); const errorCode = Number(await r.u62()); const reasonPhrase = await r.string(); - return new PublishNamespaceError(requestId, errorCode, reasonPhrase); + return new PublishNamespaceError({ requestId, errorCode, reasonPhrase }); } } @@ -105,61 +109,100 @@ export class PublishNamespaceCancel { static id = 0x0c; trackNamespace: Path.Valid; + requestId: bigint; // v16: uses request_id instead of track_namespace errorCode: number; reasonPhrase: string; - constructor(trackNamespace: Path.Valid, errorCode: number = 0, reasonPhrase: string = "") { + constructor({ + trackNamespace = "" as Path.Valid, + errorCode = 0, + reasonPhrase = "", + requestId = 0n, + }: { + trackNamespace?: Path.Valid; + errorCode?: number; + reasonPhrase?: string; + requestId?: bigint; + } = {}) { this.trackNamespace = trackNamespace; + this.requestId = requestId; this.errorCode = errorCode; this.reasonPhrase = reasonPhrase; } - async #encode(w: Writer): Promise { - await Namespace.encode(w, this.trackNamespace); + async #encode(w: Writer, version: IetfVersion): Promise { + if (version === Version.DRAFT_16) { + await w.u62(this.requestId); + } else { + await Namespace.encode(w, this.trackNamespace); + } await w.u62(BigInt(this.errorCode)); await w.string(this.reasonPhrase); } - async encode(w: Writer, _version: IetfVersion): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version: IetfVersion): Promise { + return Message.encode(w, (wr) => this.#encode(wr, version)); } - static async decode(r: Reader, _version: IetfVersion): Promise { - return Message.decode(r, PublishNamespaceCancel.#decode); + static async decode(r: Reader, version: IetfVersion): Promise { + return Message.decode(r, (rd) => PublishNamespaceCancel.#decode(rd, version)); } - static async #decode(r: Reader): Promise { - const trackNamespace = await Namespace.decode(r); + static async #decode(r: Reader, version: IetfVersion): Promise { + let trackNamespace = "" as Path.Valid; + let requestId = 0n; + if (version === Version.DRAFT_16) { + requestId = await r.u62(); + } else { + trackNamespace = await Namespace.decode(r); + } const errorCode = Number(await r.u62()); const reasonPhrase = await r.string(); - return new PublishNamespaceCancel(trackNamespace, errorCode, reasonPhrase); + return new PublishNamespaceCancel({ trackNamespace, errorCode, reasonPhrase, requestId }); } } // In draft-14, UNANNOUNCE is renamed to PUBLISH_NAMESPACE_DONE +// In draft-16, uses request_id instead of track_namespace export class PublishNamespaceDone { static readonly id = 0x09; trackNamespace: Path.Valid; - - constructor(trackNamespace: Path.Valid) { + requestId: bigint; // v16: uses request_id instead of track_namespace + + constructor({ + trackNamespace = "" as Path.Valid, + requestId = 0n, + }: { + trackNamespace?: Path.Valid; + requestId?: bigint; + } = {}) { this.trackNamespace = trackNamespace; + this.requestId = requestId; } - async #encode(w: Writer): Promise { - await Namespace.encode(w, this.trackNamespace); + async #encode(w: Writer, version: IetfVersion): Promise { + if (version === Version.DRAFT_16) { + await w.u62(this.requestId); + } else { + await Namespace.encode(w, this.trackNamespace); + } } - async encode(w: Writer, _version: IetfVersion): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version: IetfVersion): Promise { + return Message.encode(w, (wr) => this.#encode(wr, version)); } - static async decode(r: Reader, _version: IetfVersion): Promise { - return Message.decode(r, PublishNamespaceDone.#decode); + static async decode(r: Reader, version: IetfVersion): Promise { + return Message.decode(r, (rd) => PublishNamespaceDone.#decode(rd, version)); } - static async #decode(r: Reader): Promise { + static async #decode(r: Reader, version: IetfVersion): Promise { + if (version === Version.DRAFT_16) { + const requestId = await r.u62(); + return new PublishNamespaceDone({ requestId }); + } const trackNamespace = await Namespace.decode(r); - return new PublishNamespaceDone(trackNamespace); + return new PublishNamespaceDone({ trackNamespace }); } } diff --git a/js/lite/src/ietf/publisher.ts b/js/lite/src/ietf/publisher.ts index d4d75e832..303956a55 100644 --- a/js/lite/src/ietf/publisher.ts +++ b/js/lite/src/ietf/publisher.ts @@ -1,7 +1,7 @@ import type { Broadcast } from "../broadcast.ts"; import type { Group } from "../group.ts"; -import type * as Path from "../path.ts"; -import { Writer } from "../stream.ts"; +import * as Path from "../path.ts"; +import { type Stream, Writer } from "../stream.ts"; import type { Track } from "../track.ts"; import { error } from "../util/error.ts"; import type * as Control from "./control.ts"; @@ -14,11 +14,16 @@ import { type PublishNamespaceError, type PublishNamespaceOk, } from "./publish_namespace.ts"; -import { RequestError, type RequestOk } from "./request.ts"; +import { RequestError, RequestOk } from "./request.ts"; import { type Subscribe, SubscribeError, SubscribeOk, type Unsubscribe } from "./subscribe.ts"; -import type { SubscribeNamespace, UnsubscribeNamespace } from "./subscribe_namespace.ts"; +import { + SubscribeNamespace, + SubscribeNamespaceEntry, + SubscribeNamespaceEntryDone, + type UnsubscribeNamespace, +} from "./subscribe_namespace.ts"; import { TrackStatus, type TrackStatusRequest } from "./track.ts"; -import { Version } from "./version.ts"; +import { type IetfVersion, Version } from "./version.ts"; /** * Handles publishing broadcasts using moq-transport protocol with lite-compatibility restrictions. @@ -39,7 +44,7 @@ export class Publisher { * * @internal */ - constructor(quic: WebTransport, control: Control.Stream) { + constructor({ quic, control }: { quic: WebTransport; control: Control.Stream }) { this.#quic = quic; this.#control = control; } @@ -58,13 +63,13 @@ export class Publisher { const requestId = await this.#control.nextRequestId(); if (requestId === undefined) return; - const announce = new PublishNamespace(requestId, path); + const announce = new PublishNamespace({ requestId, trackNamespace: path }); await this.#control.write(announce); // Wait until the broadcast is closed, then remove it from the lookup. await broadcast.closed; - const unannounce = new PublishNamespaceDone(path); + const unannounce = new PublishNamespaceDone({ trackNamespace: path }); await this.#control.write(unannounce); } catch (err: unknown) { const e = error(err); @@ -88,10 +93,18 @@ export class Publisher { if (!broadcast) { if (this.#control.version === Version.DRAFT_15 || this.#control.version === Version.DRAFT_16) { - const errorMsg = new RequestError(msg.requestId, 404, "Broadcast not found"); + const errorMsg = new RequestError({ + requestId: msg.requestId, + errorCode: 404, + reasonPhrase: "Broadcast not found", + }); await this.#control.write(errorMsg); } else if (this.#control.version === Version.DRAFT_14) { - const errorMsg = new SubscribeError(msg.requestId, 404, "Broadcast not found"); + const errorMsg = new SubscribeError({ + requestId: msg.requestId, + errorCode: 404, + reasonPhrase: "Broadcast not found", + }); await this.#control.write(errorMsg); } else { const version: never = this.#control.version; @@ -104,7 +117,7 @@ export class Publisher { const track = broadcast.subscribe(msg.trackName, msg.subscriberPriority); // Send SUBSCRIBE_OK response on control stream - const okMsg = new SubscribeOk(msg.requestId, msg.requestId); + const okMsg = new SubscribeOk({ requestId: msg.requestId, trackAlias: msg.requestId }); await this.#control.write(okMsg); console.debug(`publish ok: broadcast=${name} track=${track.name}`); @@ -129,12 +142,12 @@ export class Publisher { } console.debug(`publish done: broadcast=${broadcast} track=${track.name}`); - const msg = new PublishDone(requestId, 200, "OK"); + const msg = new PublishDone({ requestId, statusCode: 200, reasonPhrase: "OK" }); await this.#control.write(msg); } catch (err: unknown) { const e = error(err); console.warn(`publish error: broadcast=${broadcast} track=${track.name} error=${e.message}`); - const msg = new PublishDone(requestId, 500, e.message); + const msg = new PublishDone({ requestId, statusCode: 500, reasonPhrase: e.message }); await this.#control.write(msg); } finally { track.close(); @@ -154,13 +167,19 @@ export class Publisher { const stream = await Writer.open(this.#quic); // Write STREAM_HEADER_SUBGROUP - const header = new GroupMessage(requestId, group.sequence, 0, 0, { - hasExtensions: false, - hasSubgroup: false, - hasSubgroupObject: false, - // Automatically end the group on stream FIN - hasEnd: true, - hasPriority: true, + const header = new GroupMessage({ + trackAlias: requestId, + groupId: group.sequence, + subGroupId: 0, + publisherPriority: 0, + flags: { + hasExtensions: false, + hasSubgroup: false, + hasSubgroupObject: false, + // Automatically end the group on stream FIN + hasEnd: true, + hasPriority: true, + }, }); console.debug("sending group header", header); @@ -172,7 +191,7 @@ export class Publisher { if (!frame) break; // Write each frame as an object - const obj = new Frame(frame); + const obj = new Frame({ payload: frame }); await obj.encode(stream, header.flags); } @@ -191,7 +210,13 @@ export class Publisher { */ async handleTrackStatusRequest(msg: TrackStatusRequest) { // moq-lite doesn't support track status requests - const statusMsg = new TrackStatus(msg.trackNamespace, msg.trackName, TrackStatus.STATUS_NOT_FOUND, 0n, 0n); + const statusMsg = new TrackStatus({ + trackNamespace: msg.trackNamespace, + trackName: msg.trackName, + statusCode: TrackStatus.STATUS_NOT_FOUND, + lastGroupId: 0n, + lastObjectId: 0n, + }); await this.#control.write(statusMsg); } @@ -240,4 +265,85 @@ export class Publisher { async handleRequestError(_msg: RequestError) { // TODO: route by request_id to determine what kind of request it belongs to } + + /** + * Handle a v16 SUBSCRIBE_NAMESPACE on a bidirectional stream. + * Reads the request, sends REQUEST_OK, then streams NAMESPACE/NAMESPACE_DONE. + */ + async handleSubscribeNamespaceStream(stream: Stream) { + const version = this.#control.version; + + try { + // Read the SubscribeNamespace message (type ID already consumed by connection) + const msg = await SubscribeNamespace.decode(stream.reader, version); + const prefix = msg.namespace; + + console.debug(`subscribe_namespace stream: prefix=${prefix}`); + + // Send REQUEST_OK + await stream.writer.u53(RequestOk.id); + const ok = new RequestOk({ requestId: msg.requestId }); + await ok.encode(stream.writer, version); + + // Send NAMESPACE for each currently published broadcast matching prefix + const active = new Set(); + for (const name of this.#broadcasts.keys()) { + const suffix = Path.stripPrefix(prefix, name); + if (suffix === null) continue; + console.debug(`namespace: broadcast=${name} suffix=${suffix}`); + active.add(suffix); + + await stream.writer.u53(SubscribeNamespaceEntry.id); + const entry = new SubscribeNamespaceEntry({ suffix }); + await entry.encode(stream.writer, version); + } + + // Wait for broadcast changes and stream updates + // We poll by watching for stream close or broadcasts changing. + // This is a simplified version — we check periodically rather than using signals. + await Promise.race([stream.reader.closed, this.#watchBroadcasts(stream, prefix, active, version)]); + + stream.close(); + } catch (err: unknown) { + const e = error(err); + console.debug(`subscribe_namespace stream error: ${e.message}`); + stream.abort(e); + } + } + + async #watchBroadcasts(stream: Stream, prefix: Path.Valid, active: Set, version: IetfVersion) { + // Simple polling approach - check for changes every 100ms + // A proper implementation would use signals/events + for (;;) { + await new Promise((resolve) => setTimeout(resolve, 100)); + + const newActive = new Set(); + for (const name of this.#broadcasts.keys()) { + const suffix = Path.stripPrefix(prefix, name); + if (suffix === null) continue; + newActive.add(suffix); + } + + // Send NAMESPACE for new broadcasts + for (const added of newActive) { + if (active.has(added)) continue; + console.debug(`namespace: suffix=${added} active=true`); + await stream.writer.u53(SubscribeNamespaceEntry.id); + const entry = new SubscribeNamespaceEntry({ suffix: added }); + await entry.encode(stream.writer, version); + } + + // Send NAMESPACE_DONE for removed broadcasts + for (const removed of active) { + if (newActive.has(removed)) continue; + console.debug(`namespace: suffix=${removed} active=false`); + await stream.writer.u53(SubscribeNamespaceEntryDone.id); + const entry = new SubscribeNamespaceEntryDone({ suffix: removed }); + await entry.encode(stream.writer, version); + } + + active.clear(); + for (const s of newActive) active.add(s); + } + } } diff --git a/js/lite/src/ietf/request.ts b/js/lite/src/ietf/request.ts index be48c49da..1d4b35a50 100644 --- a/js/lite/src/ietf/request.ts +++ b/js/lite/src/ietf/request.ts @@ -8,7 +8,7 @@ export class MaxRequestId { requestId: bigint; - constructor(requestId: bigint) { + constructor({ requestId }: { requestId: bigint }) { this.requestId = requestId; } @@ -21,7 +21,7 @@ export class MaxRequestId { } static async #decode(r: Reader): Promise { - return new MaxRequestId(await r.u62()); + return new MaxRequestId({ requestId: await r.u62() }); } static async decode(r: Reader, _version: IetfVersion): Promise { @@ -34,7 +34,7 @@ export class RequestsBlocked { requestId: bigint; - constructor(requestId: bigint) { + constructor({ requestId }: { requestId: bigint }) { this.requestId = requestId; } @@ -47,7 +47,7 @@ export class RequestsBlocked { } static async #decode(r: Reader): Promise { - return new RequestsBlocked(await r.u62()); + return new RequestsBlocked({ requestId: await r.u62() }); } static async decode(r: Reader, _version: IetfVersion): Promise { @@ -63,7 +63,10 @@ export class RequestOk { requestId: bigint; parameters: MessageParameters; - constructor(requestId: bigint, parameters = new MessageParameters()) { + constructor({ + requestId, + parameters = new MessageParameters(), + }: { requestId: bigint; parameters?: MessageParameters }) { this.requestId = requestId; this.parameters = parameters; } @@ -80,7 +83,7 @@ export class RequestOk { static async #decode(r: Reader, version: IetfVersion): Promise { const requestId = await r.u62(); const parameters = await MessageParameters.decode(r, version); - return new RequestOk(requestId, parameters); + return new RequestOk({ requestId, parameters }); } static async decode(r: Reader, version: IetfVersion): Promise { @@ -98,7 +101,17 @@ export class RequestError { reasonPhrase: string; retryInterval: bigint; - constructor(requestId: bigint, errorCode: number, reasonPhrase: string, retryInterval = 0n) { + constructor({ + requestId, + errorCode, + reasonPhrase, + retryInterval = 0n, + }: { + requestId: bigint; + errorCode: number; + reasonPhrase: string; + retryInterval?: bigint; + }) { this.requestId = requestId; this.errorCode = errorCode; this.reasonPhrase = reasonPhrase; @@ -123,7 +136,7 @@ export class RequestError { const errorCode = Number(await r.u62()); const retryInterval = version === Version.DRAFT_16 ? await r.u62() : 0n; const reasonPhrase = await r.string(); - return new RequestError(requestId, errorCode, reasonPhrase, retryInterval); + return new RequestError({ requestId, errorCode, reasonPhrase, retryInterval }); } static async decode(r: Reader, version: IetfVersion): Promise { diff --git a/js/lite/src/ietf/setup.ts b/js/lite/src/ietf/setup.ts index 16baa1ded..23e27051d 100644 --- a/js/lite/src/ietf/setup.ts +++ b/js/lite/src/ietf/setup.ts @@ -11,7 +11,7 @@ export class ClientSetup { versions: number[]; parameters: Parameters; - constructor(versions: number[], parameters = new Parameters()) { + constructor({ versions, parameters = new Parameters() }: { versions: number[]; parameters?: Parameters }) { this.versions = versions; this.parameters = parameters; } @@ -40,7 +40,7 @@ export class ClientSetup { if (version === Version.DRAFT_15 || version === Version.DRAFT_16) { // Draft15+: no versions list, just parameters const parameters = await Parameters.decode(r, version); - return new ClientSetup([version], parameters); + return new ClientSetup({ versions: [version], parameters }); } else if (version === Version.DRAFT_14) { // Number of supported versions const numVersions = await r.u53(); @@ -57,7 +57,7 @@ export class ClientSetup { const parameters = await Parameters.decode(r, version); - return new ClientSetup(supportedVersions, parameters); + return new ClientSetup({ versions: supportedVersions, parameters }); } else { const _: never = version; throw new Error(`unsupported version: ${_}`); @@ -75,7 +75,7 @@ export class ServerSetup { version: number; parameters: Parameters; - constructor(version: number, parameters = new Parameters()) { + constructor({ version, parameters = new Parameters() }: { version: number; parameters?: Parameters }) { this.version = version; this.parameters = parameters; } @@ -101,11 +101,11 @@ export class ServerSetup { if (version === Version.DRAFT_15 || version === Version.DRAFT_16) { // Draft15+: no version field, just parameters const parameters = await Parameters.decode(r, version); - return new ServerSetup(version, parameters); + return new ServerSetup({ version, parameters }); } else if (version === Version.DRAFT_14) { const selectedVersion = await r.u53(); const parameters = await Parameters.decode(r, version); - return new ServerSetup(selectedVersion, parameters); + return new ServerSetup({ version: selectedVersion, parameters }); } else { const _: never = version; throw new Error(`unsupported version: ${_}`); diff --git a/js/lite/src/ietf/subscribe.ts b/js/lite/src/ietf/subscribe.ts index c03645b54..537337a70 100644 --- a/js/lite/src/ietf/subscribe.ts +++ b/js/lite/src/ietf/subscribe.ts @@ -16,7 +16,17 @@ export class Subscribe { trackName: string; subscriberPriority: number; - constructor(requestId: bigint, trackNamespace: Path.Valid, trackName: string, subscriberPriority: number) { + constructor({ + requestId, + trackNamespace, + trackName, + subscriberPriority, + }: { + requestId: bigint; + trackNamespace: Path.Valid; + trackName: string; + subscriberPriority: number; + }) { this.requestId = requestId; this.trackNamespace = trackNamespace; this.trackName = trackName; @@ -83,7 +93,7 @@ export class Subscribe { throw new Error(`unsupported filter type: ${filterType}`); } - return new Subscribe(requestId, trackNamespace, trackName, subscriberPriority); + return new Subscribe({ requestId, trackNamespace, trackName, subscriberPriority }); } else if (version === Version.DRAFT_14) { const subscriberPriority = await r.u8(); @@ -107,7 +117,7 @@ export class Subscribe { await Parameters.decode(r, version); // ignore parameters - return new Subscribe(requestId, trackNamespace, trackName, subscriberPriority); + return new Subscribe({ requestId, trackNamespace, trackName, subscriberPriority }); } else { const _: never = version; throw new Error(`unsupported version: ${_}`); @@ -121,7 +131,7 @@ export class SubscribeOk { requestId: bigint; trackAlias: bigint; - constructor(requestId: bigint, trackAlias: bigint) { + constructor({ requestId, trackAlias }: { requestId: bigint; trackAlias: bigint }) { this.requestId = requestId; this.trackAlias = trackAlias; } @@ -182,7 +192,7 @@ export class SubscribeOk { throw new Error(`unsupported version: ${_}`); } - return new SubscribeOk(requestId, trackAlias); + return new SubscribeOk({ requestId, trackAlias }); } } @@ -193,7 +203,11 @@ export class SubscribeError { errorCode: number; reasonPhrase: string; - constructor(requestId: bigint, errorCode: number, reasonPhrase: string) { + constructor({ + requestId, + errorCode, + reasonPhrase, + }: { requestId: bigint; errorCode: number; reasonPhrase: string }) { this.requestId = requestId; this.errorCode = errorCode; this.reasonPhrase = reasonPhrase; @@ -218,7 +232,7 @@ export class SubscribeError { const errorCode = Number(await r.u62()); const reasonPhrase = await r.string(); - return new SubscribeError(requestId, errorCode, reasonPhrase); + return new SubscribeError({ requestId, errorCode, reasonPhrase }); } } @@ -227,7 +241,7 @@ export class Unsubscribe { requestId: bigint; - constructor(requestId: bigint) { + constructor({ requestId }: { requestId: bigint }) { this.requestId = requestId; } @@ -245,6 +259,6 @@ export class Unsubscribe { static async #decode(r: Reader): Promise { const requestId = await r.u62(); - return new Unsubscribe(requestId); + return new Unsubscribe({ requestId }); } } diff --git a/js/lite/src/ietf/subscribe_namespace.ts b/js/lite/src/ietf/subscribe_namespace.ts index 48a2678e7..505e7f7c8 100644 --- a/js/lite/src/ietf/subscribe_namespace.ts +++ b/js/lite/src/ietf/subscribe_namespace.ts @@ -3,23 +3,37 @@ import type { Reader, Writer } from "../stream.ts"; import * as Message from "./message.ts"; import * as Namespace from "./namespace.ts"; import { Parameters } from "./parameters.ts"; -import type { IetfVersion } from "./version.ts"; +import { type IetfVersion, Version } from "./version.ts"; // In draft-14, SUBSCRIBE_ANNOUNCES is renamed to SUBSCRIBE_NAMESPACE +// In draft-16, this moves from the control stream to its own bidi stream export class SubscribeNamespace { static id = 0x11; namespace: Path.Valid; requestId: bigint; - - constructor(namespace: Path.Valid, requestId: bigint) { + subscribeOptions: number; // v16: default 0x01 (NAMESPACE only) + + constructor({ + namespace, + requestId, + subscribeOptions = 1, + }: { + namespace: Path.Valid; + requestId: bigint; + subscribeOptions?: number; + }) { this.namespace = namespace; this.requestId = requestId; + this.subscribeOptions = subscribeOptions; } - async #encode(w: Writer, _version: IetfVersion): Promise { + async #encode(w: Writer, version: IetfVersion): Promise { await w.u62(this.requestId); await Namespace.encode(w, this.namespace); + if (version === Version.DRAFT_16) { + await w.u53(this.subscribeOptions); + } await w.u53(0); // no parameters } @@ -34,9 +48,13 @@ export class SubscribeNamespace { static async #decode(r: Reader, version: IetfVersion): Promise { const requestId = await r.u62(); const namespace = await Namespace.decode(r); + let subscribeOptions = 1; + if (version === Version.DRAFT_16) { + subscribeOptions = await r.u53(); + } await Parameters.decode(r, version); - return new SubscribeNamespace(namespace, requestId); + return new SubscribeNamespace({ namespace, requestId, subscribeOptions }); } } @@ -45,7 +63,7 @@ export class SubscribeNamespaceOk { requestId: bigint; - constructor(requestId: bigint) { + constructor({ requestId }: { requestId: bigint }) { this.requestId = requestId; } @@ -63,7 +81,7 @@ export class SubscribeNamespaceOk { static async #decode(r: Reader): Promise { const requestId = await r.u62(); - return new SubscribeNamespaceOk(requestId); + return new SubscribeNamespaceOk({ requestId }); } } @@ -74,7 +92,11 @@ export class SubscribeNamespaceError { errorCode: number; reasonPhrase: string; - constructor(requestId: bigint, errorCode: number, reasonPhrase: string) { + constructor({ + requestId, + errorCode, + reasonPhrase, + }: { requestId: bigint; errorCode: number; reasonPhrase: string }) { this.requestId = requestId; this.errorCode = errorCode; this.reasonPhrase = reasonPhrase; @@ -99,7 +121,7 @@ export class SubscribeNamespaceError { const errorCode = Number(await r.u62()); const reasonPhrase = await r.string(); - return new SubscribeNamespaceError(requestId, errorCode, reasonPhrase); + return new SubscribeNamespaceError({ requestId, errorCode, reasonPhrase }); } } @@ -108,7 +130,7 @@ export class UnsubscribeNamespace { requestId: bigint; - constructor(requestId: bigint) { + constructor({ requestId }: { requestId: bigint }) { this.requestId = requestId; } @@ -126,7 +148,63 @@ export class UnsubscribeNamespace { static async #decode(r: Reader): Promise { const requestId = await r.u62(); - return new UnsubscribeNamespace(requestId); + return new UnsubscribeNamespace({ requestId }); + } +} + +/// NAMESPACE message (0x08) — v16 only, sent on SUBSCRIBE_NAMESPACE bidi stream +export class SubscribeNamespaceEntry { + static id = 0x08; + + suffix: Path.Valid; + + constructor({ suffix }: { suffix: Path.Valid }) { + this.suffix = suffix; + } + + async #encode(w: Writer): Promise { + await Namespace.encode(w, this.suffix); + } + + async encode(w: Writer, _version: IetfVersion): Promise { + return Message.encode(w, this.#encode.bind(this)); + } + + static async decode(r: Reader, _version: IetfVersion): Promise { + return Message.decode(r, SubscribeNamespaceEntry.#decode); + } + + static async #decode(r: Reader): Promise { + const suffix = await Namespace.decode(r); + return new SubscribeNamespaceEntry({ suffix }); + } +} + +/// NAMESPACE_DONE message (0x0E) — v16 only, sent on SUBSCRIBE_NAMESPACE bidi stream +export class SubscribeNamespaceEntryDone { + static id = 0x0e; + + suffix: Path.Valid; + + constructor({ suffix }: { suffix: Path.Valid }) { + this.suffix = suffix; + } + + async #encode(w: Writer): Promise { + await Namespace.encode(w, this.suffix); + } + + async encode(w: Writer, _version: IetfVersion): Promise { + return Message.encode(w, this.#encode.bind(this)); + } + + static async decode(r: Reader, _version: IetfVersion): Promise { + return Message.decode(r, SubscribeNamespaceEntryDone.#decode); + } + + static async #decode(r: Reader): Promise { + const suffix = await Namespace.decode(r); + return new SubscribeNamespaceEntryDone({ suffix }); } } diff --git a/js/lite/src/ietf/subscriber.ts b/js/lite/src/ietf/subscriber.ts index c9332cd56..fbeacede3 100644 --- a/js/lite/src/ietf/subscriber.ts +++ b/js/lite/src/ietf/subscriber.ts @@ -2,17 +2,19 @@ import { Announced } from "../announced.ts"; import { Broadcast, type TrackRequest } from "../broadcast.ts"; import { Group } from "../group.ts"; import * as Path from "../path.ts"; -import type { Reader } from "../stream.ts"; +import { type Reader, Stream } from "../stream.ts"; import type { Track } from "../track.ts"; import { error } from "../util/error.ts"; import type * as Control from "./control.ts"; import { Frame, type Group as GroupMessage } from "./object.ts"; import { type Publish, type PublishDone, PublishError } from "./publish.ts"; import type { PublishNamespace, PublishNamespaceDone } from "./publish_namespace.ts"; -import { RequestError, type RequestOk } from "./request.ts"; +import { RequestError, RequestOk } from "./request.ts"; import { Subscribe, type SubscribeError, type SubscribeOk, Unsubscribe } from "./subscribe.ts"; import { SubscribeNamespace, + SubscribeNamespaceEntry, + SubscribeNamespaceEntryDone, type SubscribeNamespaceError, type SubscribeNamespaceOk, UnsubscribeNamespace, @@ -49,15 +51,18 @@ export class Subscriber { } >(); + #quic: WebTransport; + /** * Creates a new Subscriber instance. - * @param quic - The WebTransport session to use * @param control - The control stream writer for sending control messages + * @param quic - The WebTransport session (needed for v16 bidi streams) * * @internal */ - constructor(control: Control.Stream) { + constructor({ control, quic }: { control: Control.Stream; quic: WebTransport }) { this.#control = control; + this.#quic = quic; } /** @@ -85,14 +90,90 @@ export class Subscriber { } async #runAnnounced(announced: Announced, prefix: Path.Valid) { + if (this.#control.version === Version.DRAFT_16) { + await this.#runAnnouncedV16(announced, prefix); + } else { + await this.#runAnnouncedLegacy(announced, prefix); + } + } + + async #runAnnouncedLegacy(announced: Announced, prefix: Path.Valid) { const requestId = await this.#control.nextRequestId(); if (requestId === undefined) return; try { - this.#control.write(new SubscribeNamespace(prefix, requestId)); + this.#control.write(new SubscribeNamespace({ namespace: prefix, requestId })); await announced.closed; } finally { - this.#control.write(new UnsubscribeNamespace(requestId)); + this.#control.write(new UnsubscribeNamespace({ requestId })); + } + } + + async #runAnnouncedV16(announced: Announced, prefix: Path.Valid) { + const requestId = await this.#control.nextRequestId(); + if (requestId === undefined) return; + + const version = this.#control.version; + + try { + // Open a bidi stream for SUBSCRIBE_NAMESPACE + const stream = await Stream.open(this.#quic); + + // Write message type + SUBSCRIBE_NAMESPACE + await stream.writer.u53(SubscribeNamespace.id); + const msg = new SubscribeNamespace({ namespace: prefix, requestId }); + await msg.encode(stream.writer, version); + + // Read REQUEST_OK or REQUEST_ERROR + const responseType = await stream.reader.u53(); + if (responseType === RequestOk.id) { + await RequestOk.decode(stream.reader, version); + } else if (responseType === RequestError.id) { + const err = await RequestError.decode(stream.reader, version); + throw new Error(`SUBSCRIBE_NAMESPACE error: code=${err.errorCode} reason=${err.reasonPhrase}`); + } else { + throw new Error(`unexpected response type: ${responseType}`); + } + + // Loop reading NAMESPACE / NAMESPACE_DONE messages + const readLoop = (async () => { + for (;;) { + const done = await stream.reader.done(); + if (done) break; + + const msgType = await stream.reader.u53(); + if (msgType === SubscribeNamespaceEntry.id) { + const entry = await SubscribeNamespaceEntry.decode(stream.reader, version); + const path = Path.join(prefix, entry.suffix); + console.debug(`announced: broadcast=${path} active=true`); + + this.#announced.add(path); + for (const consumer of this.#announcedConsumers) { + consumer.append({ path, active: true }); + } + } else if (msgType === SubscribeNamespaceEntryDone.id) { + const entry = await SubscribeNamespaceEntryDone.decode(stream.reader, version); + const path = Path.join(prefix, entry.suffix); + console.debug(`announced: broadcast=${path} active=false`); + + this.#announced.delete(path); + for (const consumer of this.#announcedConsumers) { + consumer.append({ path, active: false }); + } + } else { + console.warn(`unexpected message type on subscribe_namespace stream: ${msgType}`); + } + } + })(); + + // Wait for either the read loop to finish or the announced to close + await Promise.race([readLoop, announced.closed]); + + // Close the bidi stream (replaces UnsubscribeNamespace) + stream.close(); + } catch (err: unknown) { + const e = error(err); + console.warn(`subscribe_namespace error: ${e.message}`); } } @@ -124,7 +205,12 @@ export class Subscriber { console.debug(`subscribe start: id=${requestId} broadcast=${broadcast} track=${request.track.name}`); - const msg = new Subscribe(requestId, broadcast, request.track.name, request.priority); + const msg = new Subscribe({ + requestId, + trackNamespace: broadcast, + trackName: request.track.name, + subscriberPriority: request.priority, + }); // Send SUBSCRIBE message on control stream and wait for response const responsePromise = new Promise((resolve, reject) => { @@ -141,7 +227,7 @@ export class Subscriber { try { await request.track.closed; - const msg = new Unsubscribe(requestId); + const msg = new Unsubscribe({ requestId }); await this.#control.write(msg); console.debug(`unsubscribe: id=${requestId} broadcast=${broadcast} track=${request.track.name}`); } finally { @@ -247,10 +333,18 @@ export class Subscriber { // TODO technically, we should send PUBLISH_OK if we had a SUBSCRIBE in flight for the same track. // Otherwise, the peer will SUBSCRIBE_ERROR because duplicate subscriptions are not allowed :( if (this.#control.version === Version.DRAFT_15 || this.#control.version === Version.DRAFT_16) { - const err = new RequestError(msg.requestId, 500, "publish not supported"); + const err = new RequestError({ + requestId: msg.requestId, + errorCode: 500, + reasonPhrase: "publish not supported", + }); await this.#control.write(err); } else if (this.#control.version === Version.DRAFT_14) { - const err = new PublishError(msg.requestId, 500, "publish not supported"); + const err = new PublishError({ + requestId: msg.requestId, + errorCode: 500, + reasonPhrase: "publish not supported", + }); await this.#control.write(err); } else { const version: never = this.#control.version; diff --git a/js/lite/src/ietf/track.ts b/js/lite/src/ietf/track.ts index b5b6ab0d9..5b48273de 100644 --- a/js/lite/src/ietf/track.ts +++ b/js/lite/src/ietf/track.ts @@ -10,7 +10,7 @@ export class TrackStatusRequest { trackNamespace: Path.Valid; trackName: string; - constructor(trackNamespace: Path.Valid, trackName: string) { + constructor({ trackNamespace, trackName }: { trackNamespace: Path.Valid; trackName: string }) { this.trackNamespace = trackNamespace; this.trackName = trackName; } @@ -31,7 +31,7 @@ export class TrackStatusRequest { static async #decode(r: Reader): Promise { const trackNamespace = await Namespace.decode(r); const trackName = await r.string(); - return new TrackStatusRequest(trackNamespace, trackName); + return new TrackStatusRequest({ trackNamespace, trackName }); } } @@ -45,13 +45,19 @@ export class TrackStatus { lastGroupId: bigint; lastObjectId: bigint; - constructor( - trackNamespace: Path.Valid, - trackName: string, - statusCode: number, - lastGroupId: bigint, - lastObjectId: bigint, - ) { + constructor({ + trackNamespace, + trackName, + statusCode, + lastGroupId, + lastObjectId, + }: { + trackNamespace: Path.Valid; + trackName: string; + statusCode: number; + lastGroupId: bigint; + lastObjectId: bigint; + }) { this.trackNamespace = trackNamespace; this.trackName = trackName; this.statusCode = statusCode; @@ -82,7 +88,7 @@ export class TrackStatus { const lastGroupId = await r.u62(); const lastObjectId = await r.u62(); - return new TrackStatus(trackNamespace, trackName, statusCode, lastGroupId, lastObjectId); + return new TrackStatus({ trackNamespace, trackName, statusCode, lastGroupId, lastObjectId }); } // Track status codes diff --git a/rs/moq-lite/src/ietf/publish_namespace.rs b/rs/moq-lite/src/ietf/publish_namespace.rs index b002239cf..8c6c91f23 100644 --- a/rs/moq-lite/src/ietf/publish_namespace.rs +++ b/rs/moq-lite/src/ietf/publish_namespace.rs @@ -90,28 +90,57 @@ impl Message for PublishNamespaceError<'_> { } } /// PublishNamespaceDone message (0x09) +/// v14/v15: uses track_namespace. v16: uses request_id. #[derive(Clone, Debug)] pub struct PublishNamespaceDone<'a> { + /// v14/v15: the namespace being unannounced pub track_namespace: Path<'a>, + /// v16: the request ID of the original PublishNamespace + pub request_id: RequestId, } impl Message for PublishNamespaceDone<'_> { const ID: u64 = 0x09; fn encode_msg(&self, w: &mut W, version: Version) { - encode_namespace(w, &self.track_namespace, version); + match version { + Version::Draft14 | Version::Draft15 => { + encode_namespace(w, &self.track_namespace, version); + } + Version::Draft16 => { + self.request_id.encode(w, version); + } + } } fn decode_msg(r: &mut R, version: Version) -> Result { - let track_namespace = decode_namespace(r, version)?; - Ok(Self { track_namespace }) + match version { + Version::Draft14 | Version::Draft15 => { + let track_namespace = decode_namespace(r, version)?; + Ok(Self { + track_namespace, + request_id: RequestId(0), + }) + } + Version::Draft16 => { + let request_id = RequestId::decode(r, version)?; + Ok(Self { + track_namespace: Path::default(), + request_id, + }) + } + } } } /// PublishNamespaceCancel message (0x0c) +/// v14/v15: uses track_namespace. v16: uses request_id. #[derive(Clone, Debug)] pub struct PublishNamespaceCancel<'a> { + /// v14/v15: the namespace being cancelled pub track_namespace: Path<'a>, + /// v16: the request ID of the original PublishNamespace + pub request_id: RequestId, pub error_code: u64, pub reason_phrase: Cow<'a, str>, } @@ -120,17 +149,34 @@ impl Message for PublishNamespaceCancel<'_> { const ID: u64 = 0x0c; fn encode_msg(&self, w: &mut W, version: Version) { - encode_namespace(w, &self.track_namespace, version); + match version { + Version::Draft14 | Version::Draft15 => { + encode_namespace(w, &self.track_namespace, version); + } + Version::Draft16 => { + self.request_id.encode(w, version); + } + } self.error_code.encode(w, version); self.reason_phrase.encode(w, version); } fn decode_msg(r: &mut R, version: Version) -> Result { - let track_namespace = decode_namespace(r, version)?; + let (track_namespace, request_id) = match version { + Version::Draft14 | Version::Draft15 => { + let track_namespace = decode_namespace(r, version)?; + (track_namespace, RequestId(0)) + } + Version::Draft16 => { + let request_id = RequestId::decode(r, version)?; + (Path::default(), request_id) + } + }; let error_code = u64::decode(r, version)?; let reason_phrase = Cow::::decode(r, version)?; Ok(Self { track_namespace, + request_id, error_code, reason_phrase, }) @@ -142,15 +188,15 @@ mod tests { use super::*; use bytes::BytesMut; - fn encode_message(msg: &M) -> Vec { + fn encode_message(msg: &M, version: Version) -> Vec { let mut buf = BytesMut::new(); - msg.encode_msg(&mut buf, Version::Draft14); + msg.encode_msg(&mut buf, version); buf.to_vec() } - fn decode_message(bytes: &[u8]) -> Result { + fn decode_message(bytes: &[u8], version: Version) -> Result { let mut buf = bytes::Bytes::from(bytes.to_vec()); - M::decode_msg(&mut buf, Version::Draft14) + M::decode_msg(&mut buf, version) } #[test] @@ -160,8 +206,8 @@ mod tests { track_namespace: Path::new("test/broadcast"), }; - let encoded = encode_message(&msg); - let decoded: PublishNamespace = decode_message(&encoded).unwrap(); + let encoded = encode_message(&msg, Version::Draft14); + let decoded: PublishNamespace = decode_message(&encoded, Version::Draft14).unwrap(); assert_eq!(decoded.track_namespace.as_str(), "test/broadcast"); } @@ -174,41 +220,73 @@ mod tests { reason_phrase: "Unauthorized".into(), }; - let encoded = encode_message(&msg); - let decoded: PublishNamespaceError = decode_message(&encoded).unwrap(); + let encoded = encode_message(&msg, Version::Draft14); + let decoded: PublishNamespaceError = decode_message(&encoded, Version::Draft14).unwrap(); assert_eq!(decoded.error_code, 404); assert_eq!(decoded.reason_phrase, "Unauthorized"); } #[test] - fn test_unannounce() { + fn test_unannounce_v14() { let msg = PublishNamespaceDone { track_namespace: Path::new("old/stream"), + request_id: RequestId(0), }; - let encoded = encode_message(&msg); - let decoded: PublishNamespaceDone = decode_message(&encoded).unwrap(); + let encoded = encode_message(&msg, Version::Draft14); + let decoded: PublishNamespaceDone = decode_message(&encoded, Version::Draft14).unwrap(); assert_eq!(decoded.track_namespace.as_str(), "old/stream"); } #[test] - fn test_announce_cancel() { + fn test_unannounce_v16() { + let msg = PublishNamespaceDone { + track_namespace: Path::default(), + request_id: RequestId(42), + }; + + let encoded = encode_message(&msg, Version::Draft16); + let decoded: PublishNamespaceDone = decode_message(&encoded, Version::Draft16).unwrap(); + + assert_eq!(decoded.request_id, RequestId(42)); + } + + #[test] + fn test_announce_cancel_v14() { let msg = PublishNamespaceCancel { track_namespace: Path::new("canceled"), + request_id: RequestId(0), error_code: 1, reason_phrase: "Shutdown".into(), }; - let encoded = encode_message(&msg); - let decoded: PublishNamespaceCancel = decode_message(&encoded).unwrap(); + let encoded = encode_message(&msg, Version::Draft14); + let decoded: PublishNamespaceCancel = decode_message(&encoded, Version::Draft14).unwrap(); assert_eq!(decoded.track_namespace.as_str(), "canceled"); assert_eq!(decoded.error_code, 1); assert_eq!(decoded.reason_phrase, "Shutdown"); } + #[test] + fn test_announce_cancel_v16() { + let msg = PublishNamespaceCancel { + track_namespace: Path::default(), + request_id: RequestId(7), + error_code: 1, + reason_phrase: "Shutdown".into(), + }; + + let encoded = encode_message(&msg, Version::Draft16); + let decoded: PublishNamespaceCancel = decode_message(&encoded, Version::Draft16).unwrap(); + + assert_eq!(decoded.request_id, RequestId(7)); + assert_eq!(decoded.error_code, 1); + assert_eq!(decoded.reason_phrase, "Shutdown"); + } + #[test] fn test_announce_rejects_parameters() { #[rustfmt::skip] @@ -218,7 +296,7 @@ mod tests { 0x01, // INVALID: num_params = 1 ]; - let result: Result = decode_message(&invalid_bytes); + let result: Result = decode_message(&invalid_bytes, Version::Draft14); assert!(result.is_err()); } } diff --git a/rs/moq-lite/src/ietf/publisher.rs b/rs/moq-lite/src/ietf/publisher.rs index 551e446a5..239a328ce 100644 --- a/rs/moq-lite/src/ietf/publisher.rs +++ b/rs/moq-lite/src/ietf/publisher.rs @@ -5,10 +5,11 @@ use web_async::{FuturesExt, Lock}; use web_transport_trait::SendStream; use crate::{ - Error, Origin, OriginConsumer, Track, TrackConsumer, + AsPath, Error, Origin, OriginConsumer, Track, TrackConsumer, coding::Writer, ietf::{ - self, Control, FetchHeader, FetchType, FilterType, GroupOrder, Location, MessageParameters, RequestId, Version, + self, Control, FetchHeader, FetchType, FilterType, GroupOrder, Location, Message, MessageParameters, RequestId, + Version, }, model::GroupConsumer, }; @@ -39,6 +40,9 @@ impl Publisher { } pub async fn run(mut self) -> Result<(), Error> { + // Track request_id → namespace mapping for v16 PublishNamespaceDone + let mut namespace_requests: HashMap = HashMap::new(); + while let Some((path, active)) = self.origin.announced().await { let suffix = path.to_owned(); @@ -46,6 +50,7 @@ impl Publisher { tracing::debug!(broadcast = %self.origin.absolute(&path), "announce"); let request_id = self.control.next_request_id().await?; + namespace_requests.insert(suffix.clone(), request_id); self.control.send(ietf::PublishNamespace { request_id, @@ -53,8 +58,10 @@ impl Publisher { })?; } else { tracing::debug!(broadcast = %self.origin.absolute(&path), "unannounce"); + let request_id = namespace_requests.remove(&suffix).unwrap_or(RequestId(0)); self.control.send(ietf::PublishNamespaceDone { track_namespace: suffix, + request_id, })?; } } @@ -466,4 +473,75 @@ impl Publisher { tracing::warn!(?msg, "fetch cancel"); Ok(()) } + + /// Handle a SUBSCRIBE_NAMESPACE message received on a v16 bidirectional stream. + /// Reads the request, sends REQUEST_OK, then streams NAMESPACE/NAMESPACE_DONE messages. + pub async fn recv_subscribe_namespace_stream( + &mut self, + mut stream: crate::coding::Stream, + ) -> Result<(), Error> { + let msg: ietf::SubscribeNamespace = stream.reader.decode().await?; + let prefix = msg.namespace.to_owned(); + + tracing::debug!(prefix = %self.origin.absolute(&prefix), "subscribe_namespace stream"); + + // Create a filtered consumer for this prefix + let mut origin = self + .origin + .consume_only(&[prefix.as_path()]) + .ok_or(Error::Unauthorized)?; + + // Send REQUEST_OK + stream.writer.encode(&ietf::RequestOk::ID).await?; + stream + .writer + .encode(&ietf::RequestOk { + request_id: msg.request_id, + parameters: ietf::MessageParameters::default(), + }) + .await?; + + // Send initial NAMESPACE messages for currently active namespaces + while let Some((path, active)) = origin.try_announced() { + let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path"); + if active.is_some() { + tracing::debug!(broadcast = %origin.absolute(&path), "namespace"); + stream.writer.encode(&ietf::Namespace::ID).await?; + stream + .writer + .encode(&ietf::Namespace { + suffix: suffix.to_owned(), + }) + .await?; + } + } + + // Stream updates + loop { + tokio::select! { + biased; + res = stream.reader.closed() => return res, + announced = origin.announced() => { + match announced { + Some((path, active)) => { + let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path").to_owned(); + if active.is_some() { + tracing::debug!(broadcast = %origin.absolute(&path), "namespace"); + stream.writer.encode(&ietf::Namespace::ID).await?; + stream.writer.encode(&ietf::Namespace { suffix }).await?; + } else { + tracing::debug!(broadcast = %origin.absolute(&path), "namespace_done"); + stream.writer.encode(&ietf::NamespaceDone::ID).await?; + stream.writer.encode(&ietf::NamespaceDone { suffix }).await?; + } + } + None => { + stream.writer.finish()?; + return stream.writer.closed().await; + } + } + } + } + } + } } diff --git a/rs/moq-lite/src/ietf/session.rs b/rs/moq-lite/src/ietf/session.rs index a163aed04..270c660f4 100644 --- a/rs/moq-lite/src/ietf/session.rs +++ b/rs/moq-lite/src/ietf/session.rs @@ -62,8 +62,9 @@ async fn run( tokio::select! { res = subscriber.clone().run() => res, res = publisher.clone().run() => res, - res = run_control_read(setup.reader, control, publisher, subscriber, version) => res, + res = run_control_read(setup.reader, control, publisher.clone(), subscriber, version) => res, res = Control::run::(setup.writer, rx) => res, + res = run_bidi_streams(session, publisher, version) => res, } } @@ -174,11 +175,15 @@ async fn run_control_read( tracing::debug!(message = ?msg, "received control message"); return Err(Error::Unsupported); } - ietf::SubscribeNamespace::ID => { - let msg = ietf::SubscribeNamespace::decode_msg(&mut data, version)?; - tracing::debug!(message = ?msg, "received control message"); - publisher.recv_subscribe_namespace(msg)?; - } + // 0x11: SubscribeNamespace — v14/v15: control stream, v16: bidi stream only + ietf::SubscribeNamespace::ID => match version { + Version::Draft14 | Version::Draft15 => { + let msg = ietf::SubscribeNamespace::decode_msg(&mut data, version)?; + tracing::debug!(message = ?msg, "received control message"); + publisher.recv_subscribe_namespace(msg)?; + } + Version::Draft16 => return Err(Error::UnexpectedMessage), + }, // 0x12: SubscribeNamespaceOk in v14, removed in v15+ ietf::SubscribeNamespaceOk::ID => match version { Version::Draft14 => { @@ -197,11 +202,15 @@ async fn run_control_read( } Version::Draft15 | Version::Draft16 => return Err(Error::UnexpectedMessage), }, - ietf::UnsubscribeNamespace::ID => { - let msg = ietf::UnsubscribeNamespace::decode_msg(&mut data, version)?; - tracing::debug!(message = ?msg, "received control message"); - publisher.recv_unsubscribe_namespace(msg)?; - } + // 0x14: UnsubscribeNamespace — v14/v15: control stream, v16: removed (use stream close) + ietf::UnsubscribeNamespace::ID => match version { + Version::Draft14 | Version::Draft15 => { + let msg = ietf::UnsubscribeNamespace::decode_msg(&mut data, version)?; + tracing::debug!(message = ?msg, "received control message"); + publisher.recv_unsubscribe_namespace(msg)?; + } + Version::Draft16 => return Err(Error::UnexpectedMessage), + }, ietf::MaxRequestId::ID => { let msg = ietf::MaxRequestId::decode_msg(&mut data, version)?; tracing::debug!(message = ?msg, "received control message"); @@ -257,3 +266,40 @@ async fn run_control_read( } } } + +/// Accept bidirectional streams for v16 SUBSCRIBE_NAMESPACE. +/// For v14/v15, no bidi streams are expected (other than the control stream). +async fn run_bidi_streams( + session: S, + publisher: Publisher, + version: Version, +) -> Result<(), Error> { + // Only v16 uses bidi streams for SUBSCRIBE_NAMESPACE + if version != Version::Draft16 { + // Park forever — we don't accept bidi streams for v14/v15. + std::future::pending::<()>().await; + return Ok(()); + } + + loop { + let mut stream = Stream::accept(&session, version).await?; + + // Read the first message type ID to determine the stream type + let id: u64 = stream.reader.decode().await?; + + match id { + ietf::SubscribeNamespace::ID => { + let mut pub_clone = publisher.clone(); + web_async::spawn(async move { + if let Err(err) = pub_clone.recv_subscribe_namespace_stream(stream).await { + tracing::debug!(%err, "subscribe_namespace stream error"); + } + }); + } + _ => { + tracing::warn!(id, "unexpected bidi stream type"); + return Err(Error::UnexpectedStream); + } + } + } +} diff --git a/rs/moq-lite/src/ietf/subscribe_namespace.rs b/rs/moq-lite/src/ietf/subscribe_namespace.rs index 9b955bfa3..53a504b9e 100644 --- a/rs/moq-lite/src/ietf/subscribe_namespace.rs +++ b/rs/moq-lite/src/ietf/subscribe_namespace.rs @@ -1,4 +1,4 @@ -//! IETF moq-transport-14 subscribe namespace messages +//! IETF moq-transport subscribe namespace messages use std::borrow::Cow; @@ -11,10 +11,13 @@ use crate::{ use super::namespace::{decode_namespace, encode_namespace}; /// SubscribeNamespace message (0x11) +/// In v16, this moves from the control stream to its own bidirectional stream. #[derive(Clone, Debug)] pub struct SubscribeNamespace<'a> { pub request_id: RequestId, pub namespace: Path<'a>, + /// v16: Subscribe Options (default 0x01 = NAMESPACE only) + pub subscribe_options: u64, } impl Message for SubscribeNamespace<'_> { @@ -23,21 +26,32 @@ impl Message for SubscribeNamespace<'_> { fn encode_msg(&self, w: &mut W, version: Version) { self.request_id.encode(w, version); encode_namespace(w, &self.namespace, version); + if version == Version::Draft16 { + self.subscribe_options.encode(w, version); + } 0u8.encode(w, version); // no parameters } fn decode_msg(r: &mut R, version: Version) -> Result { let request_id = RequestId::decode(r, version)?; let namespace = decode_namespace(r, version)?; + let subscribe_options = match version { + Version::Draft16 => u64::decode(r, version)?, + _ => 0x01, + }; // Ignore parameters, who cares. let _params = Parameters::decode(r, version)?; - Ok(Self { namespace, request_id }) + Ok(Self { + namespace, + request_id, + subscribe_options, + }) } } -/// SubscribeNamespaceOk message (0x12) +/// SubscribeNamespaceOk message (0x12) — v14 only #[derive(Clone, Debug)] pub struct SubscribeNamespaceOk { pub request_id: RequestId, @@ -55,7 +69,8 @@ impl Message for SubscribeNamespaceOk { Ok(Self { request_id }) } } -/// SubscribeNamespaceError message (0x13) + +/// SubscribeNamespaceError message (0x13) — v14 only #[derive(Clone, Debug)] pub struct SubscribeNamespaceError<'a> { pub request_id: RequestId, @@ -85,7 +100,7 @@ impl Message for SubscribeNamespaceError<'_> { } } -/// UnsubscribeNamespace message (0x14) +/// UnsubscribeNamespace message (0x14) — v14/v15 only (v16 uses stream close) #[derive(Clone, Debug)] pub struct UnsubscribeNamespace { pub request_id: RequestId, @@ -103,3 +118,43 @@ impl Message for UnsubscribeNamespace { Ok(Self { request_id }) } } + +/// NAMESPACE message (0x08) — v16 only, sent on SUBSCRIBE_NAMESPACE bidi stream +/// Indicates a namespace suffix matching the subscribed prefix is active. +#[derive(Clone, Debug)] +pub struct Namespace<'a> { + pub suffix: Path<'a>, +} + +impl Message for Namespace<'_> { + const ID: u64 = 0x08; + + fn encode_msg(&self, w: &mut W, version: Version) { + encode_namespace(w, &self.suffix, version); + } + + fn decode_msg(r: &mut R, version: Version) -> Result { + let suffix = decode_namespace(r, version)?; + Ok(Self { suffix }) + } +} + +/// NAMESPACE_DONE message (0x0E) — v16 only, sent on SUBSCRIBE_NAMESPACE bidi stream +/// Indicates a namespace suffix matching the subscribed prefix is no longer active. +#[derive(Clone, Debug)] +pub struct NamespaceDone<'a> { + pub suffix: Path<'a>, +} + +impl Message for NamespaceDone<'_> { + const ID: u64 = 0x0E; + + fn encode_msg(&self, w: &mut W, version: Version) { + encode_namespace(w, &self.suffix, version); + } + + fn decode_msg(r: &mut R, version: Version) -> Result { + let suffix = decode_namespace(r, version)?; + Ok(Self { suffix }) + } +} diff --git a/rs/moq-lite/src/ietf/subscriber.rs b/rs/moq-lite/src/ietf/subscriber.rs index 4e158bd19..c9ea159cd 100644 --- a/rs/moq-lite/src/ietf/subscriber.rs +++ b/rs/moq-lite/src/ietf/subscriber.rs @@ -26,6 +26,9 @@ struct State { // Each PUBLISH message that is implicitly causing a PUBLISH_NAMESPACE message. publishes: HashMap, + + // Maps PublishNamespace request_id → track_namespace (for v16 PublishNamespaceDone) + publish_namespace_ids: HashMap, } struct TrackState { @@ -65,6 +68,14 @@ impl Subscriber { pub fn recv_publish_namespace(&mut self, msg: ietf::PublishNamespace) -> Result<(), Error> { let request_id = msg.request_id; + // Track the request_id → namespace mapping for v16 PublishNamespaceDone + { + let mut state = self.state.lock(); + state + .publish_namespace_ids + .insert(request_id, msg.track_namespace.to_owned()); + } + match self.start_announce(msg.track_namespace.to_owned()) { Ok(_) => self.send_ok(request_id), Err(err) => self.send_error(request_id, 400, &err.to_string()), @@ -159,7 +170,23 @@ impl Subscriber { } pub fn recv_publish_namespace_done(&mut self, msg: ietf::PublishNamespaceDone) -> Result<(), Error> { - self.stop_announce(msg.track_namespace.to_owned()) + match self.version { + Version::Draft14 | Version::Draft15 => self.stop_announce(msg.track_namespace.to_owned()), + Version::Draft16 => { + // In v16, PublishNamespaceDone uses request_id instead of track_namespace + let state = self.state.lock(); + let path = state.publish_namespace_ids.get(&msg.request_id).cloned(); + drop(state); + + if let Some(path) = path { + self.state.lock().publish_namespace_ids.remove(&msg.request_id); + self.stop_announce(path) + } else { + tracing::warn!(request_id = %msg.request_id, "unknown publish_namespace request_id in done"); + Ok(()) + } + } + } } pub fn recv_subscribe_ok(&mut self, msg: ietf::SubscribeOk) -> Result<(), Error> { From fed2585925dba10836911a2ca015ca59d056e16b Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Wed, 11 Feb 2026 19:48:27 -0800 Subject: [PATCH 2/2] gooder announcements --- js/lite/src/ietf/publisher.ts | 91 ++++++++++++++++--------------- js/lite/src/ietf/subscriber.ts | 2 +- rs/moq-lite/src/ietf/publisher.rs | 13 +++-- 3 files changed, 55 insertions(+), 51 deletions(-) diff --git a/js/lite/src/ietf/publisher.ts b/js/lite/src/ietf/publisher.ts index 303956a55..dd3d24bbf 100644 --- a/js/lite/src/ietf/publisher.ts +++ b/js/lite/src/ietf/publisher.ts @@ -1,3 +1,4 @@ +import { Announced } from "../announced.ts"; import type { Broadcast } from "../broadcast.ts"; import type { Group } from "../group.ts"; import * as Path from "../path.ts"; @@ -23,7 +24,7 @@ import { type UnsubscribeNamespace, } from "./subscribe_namespace.ts"; import { TrackStatus, type TrackStatusRequest } from "./track.ts"; -import { type IetfVersion, Version } from "./version.ts"; +import { Version } from "./version.ts"; /** * Handles publishing broadcasts using moq-transport protocol with lite-compatibility restrictions. @@ -37,6 +38,9 @@ export class Publisher { // Our published broadcasts. #broadcasts: Map = new Map(); + // Any consumers that want each new announcement. + #announcedConsumers = new Set(); + /** * Creates a new Publisher instance. * @param quic - The WebTransport session to use @@ -55,6 +59,7 @@ export class Publisher { */ publish(path: Path.Valid, broadcast: Broadcast) { this.#broadcasts.set(path, broadcast); + this.#notifyConsumers(path, true); void this.#runPublish(path, broadcast); } @@ -77,6 +82,7 @@ export class Publisher { } finally { broadcast.close(); this.#broadcasts.delete(path); + this.#notifyConsumers(path, false); } } @@ -285,23 +291,42 @@ export class Publisher { const ok = new RequestOk({ requestId: msg.requestId }); await ok.encode(stream.writer, version); - // Send NAMESPACE for each currently published broadcast matching prefix - const active = new Set(); + // Create an Announced consumer and seed it with current broadcasts + const announced = new Announced(prefix); for (const name of this.#broadcasts.keys()) { const suffix = Path.stripPrefix(prefix, name); if (suffix === null) continue; - console.debug(`namespace: broadcast=${name} suffix=${suffix}`); - active.add(suffix); - - await stream.writer.u53(SubscribeNamespaceEntry.id); - const entry = new SubscribeNamespaceEntry({ suffix }); - await entry.encode(stream.writer, version); + announced.append({ path: suffix, active: true }); } + this.#announcedConsumers.add(announced); - // Wait for broadcast changes and stream updates - // We poll by watching for stream close or broadcasts changing. - // This is a simplified version — we check periodically rather than using signals. - await Promise.race([stream.reader.closed, this.#watchBroadcasts(stream, prefix, active, version)]); + // Close the consumer when the stream closes + stream.reader.closed.then( + () => announced.close(), + () => announced.close(), + ); + + try { + for (;;) { + const entry = await announced.next(); + if (!entry) break; + + if (entry.active) { + console.debug(`namespace: suffix=${entry.path} active=true`); + await stream.writer.u53(SubscribeNamespaceEntry.id); + const msg = new SubscribeNamespaceEntry({ suffix: entry.path }); + await msg.encode(stream.writer, version); + } else { + console.debug(`namespace: suffix=${entry.path} active=false`); + await stream.writer.u53(SubscribeNamespaceEntryDone.id); + const msg = new SubscribeNamespaceEntryDone({ suffix: entry.path }); + await msg.encode(stream.writer, version); + } + } + } finally { + announced.close(); + this.#announcedConsumers.delete(announced); + } stream.close(); } catch (err: unknown) { @@ -311,39 +336,15 @@ export class Publisher { } } - async #watchBroadcasts(stream: Stream, prefix: Path.Valid, active: Set, version: IetfVersion) { - // Simple polling approach - check for changes every 100ms - // A proper implementation would use signals/events - for (;;) { - await new Promise((resolve) => setTimeout(resolve, 100)); - - const newActive = new Set(); - for (const name of this.#broadcasts.keys()) { - const suffix = Path.stripPrefix(prefix, name); - if (suffix === null) continue; - newActive.add(suffix); - } - - // Send NAMESPACE for new broadcasts - for (const added of newActive) { - if (active.has(added)) continue; - console.debug(`namespace: suffix=${added} active=true`); - await stream.writer.u53(SubscribeNamespaceEntry.id); - const entry = new SubscribeNamespaceEntry({ suffix: added }); - await entry.encode(stream.writer, version); - } - - // Send NAMESPACE_DONE for removed broadcasts - for (const removed of active) { - if (newActive.has(removed)) continue; - console.debug(`namespace: suffix=${removed} active=false`); - await stream.writer.u53(SubscribeNamespaceEntryDone.id); - const entry = new SubscribeNamespaceEntryDone({ suffix: removed }); - await entry.encode(stream.writer, version); + #notifyConsumers(path: Path.Valid, active: boolean) { + for (const consumer of this.#announcedConsumers) { + const suffix = Path.stripPrefix(consumer.prefix, path); + if (suffix === null) continue; + try { + consumer.append({ path: suffix, active }); + } catch { + // Consumer already closed, will be cleaned up } - - active.clear(); - for (const s of newActive) active.add(s); } } } diff --git a/js/lite/src/ietf/subscriber.ts b/js/lite/src/ietf/subscriber.ts index fbeacede3..73577d89e 100644 --- a/js/lite/src/ietf/subscriber.ts +++ b/js/lite/src/ietf/subscriber.ts @@ -161,7 +161,7 @@ export class Subscriber { consumer.append({ path, active: false }); } } else { - console.warn(`unexpected message type on subscribe_namespace stream: ${msgType}`); + throw new Error(`unexpected message type on subscribe_namespace stream: ${msgType}`); } } })(); diff --git a/rs/moq-lite/src/ietf/publisher.rs b/rs/moq-lite/src/ietf/publisher.rs index 239a328ce..212c49dde 100644 --- a/rs/moq-lite/src/ietf/publisher.rs +++ b/rs/moq-lite/src/ietf/publisher.rs @@ -58,11 +58,14 @@ impl Publisher { })?; } else { tracing::debug!(broadcast = %self.origin.absolute(&path), "unannounce"); - let request_id = namespace_requests.remove(&suffix).unwrap_or(RequestId(0)); - self.control.send(ietf::PublishNamespaceDone { - track_namespace: suffix, - request_id, - })?; + if let Some(request_id) = namespace_requests.remove(&suffix) { + self.control.send(ietf::PublishNamespaceDone { + track_namespace: suffix, + request_id, + })?; + } else { + tracing::warn!(broadcast = %self.origin.absolute(&path), "unannounce for unknown namespace"); + } } }