diff --git a/js/hang/src/watch/broadcast.ts b/js/hang/src/watch/broadcast.ts index 791570716..a6b6252c2 100644 --- a/js/hang/src/watch/broadcast.ts +++ b/js/hang/src/watch/broadcast.ts @@ -12,7 +12,8 @@ export interface BroadcastProps { // The broadcast name. path?: Moq.Path.Valid | Signal; - // You can disable reloading if you don't want to wait for an announcement. + // Whether to reload the broadcast when it goes offline. + // Defaults to false; pass true to wait for an announcement before subscribing. reload?: boolean | Signal; } @@ -40,7 +41,7 @@ export class Broadcast { this.connection = Signal.from(props?.connection); this.path = Signal.from(props?.path); this.enabled = Signal.from(props?.enabled ?? false); - this.reload = Signal.from(props?.reload ?? true); + this.reload = Signal.from(props?.reload ?? false); this.signals.effect(this.#runReload.bind(this)); this.signals.effect(this.#runBroadcast.bind(this)); diff --git a/js/lite/src/connection/connect.ts b/js/lite/src/connection/connect.ts index b320afbfa..daba26430 100644 --- a/js/lite/src/connection/connect.ts +++ b/js/lite/src/connection/connect.ts @@ -78,7 +78,9 @@ export async function connect(url: URL, props?: ConnectProps): Promise; @@ -123,8 +127,7 @@ export class Stream { await this.stream.writer.u53((message.constructor as MessageType).id); // Write message payload with u16 size prefix - // Extra version arg is silently ignored by messages that don't need it - await (message.encode as (w: Writer, v?: IetfVersion) => Promise)(this.stream.writer, this.version); + await (message.encode as (w: Writer, v: IetfVersion) => Promise)(this.stream.writer, this.version); }); } @@ -136,21 +139,19 @@ export class Stream { return await this.#readLock.runExclusive(async () => { const messageType = await this.stream.reader.u53(); - const messages = this.version === Version.DRAFT_15 ? MessagesV15 : MessagesV14; + const messages = + this.version === Version.DRAFT_16 + ? MessagesV16 + : this.version === Version.DRAFT_15 + ? MessagesV15 + : MessagesV14; if (!(messageType in messages)) { throw new Error(`Unknown control message type: ${messageType}`); } try { const msgClass = messages[messageType as keyof typeof messages]; - - // Extra version arg is silently ignored by messages that don't need it - const msg = await (msgClass as { decode: (r: Reader, v?: IetfVersion) => Promise }).decode( - this.stream.reader, - this.version, - ); - - console.debug("message read", msg); + const msg = await msgClass.decode(this.stream.reader, this.version); return msg; } catch (err) { console.error("failed to decode message", messageType, err); diff --git a/js/lite/src/ietf/fetch.ts b/js/lite/src/ietf/fetch.ts index f1c875a90..577aab63a 100644 --- a/js/lite/src/ietf/fetch.ts +++ b/js/lite/src/ietf/fetch.ts @@ -1,6 +1,7 @@ import type * as Path from "../path.ts"; import type { Reader, Writer } from "../stream.ts"; import * as Message from "./message.ts"; +import type { IetfVersion } from "./version.ts"; export class Fetch { static id = 0x16; @@ -41,11 +42,11 @@ export class Fetch { throw new Error("FETCH messages are not supported"); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, Fetch.#decode); } @@ -67,11 +68,11 @@ export class FetchOk { throw new Error("FETCH_OK messages are not supported"); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, FetchOk.#decode); } @@ -97,11 +98,11 @@ export class FetchError { throw new Error("FETCH_ERROR messages are not supported"); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, FetchError.#decode); } @@ -123,11 +124,11 @@ export class FetchCancel { throw new Error("FETCH_CANCEL messages are not supported"); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, FetchCancel.#decode); } diff --git a/js/lite/src/ietf/goaway.ts b/js/lite/src/ietf/goaway.ts index e9deb09c9..e7fa3c78a 100644 --- a/js/lite/src/ietf/goaway.ts +++ b/js/lite/src/ietf/goaway.ts @@ -1,5 +1,6 @@ import type { Reader, Writer } from "../stream.ts"; import * as Message from "./message.ts"; +import type { IetfVersion } from "./version.ts"; export class GoAway { static id = 0x10; @@ -14,11 +15,11 @@ export class GoAway { await w.string(this.newSessionUri); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, GoAway.#decode); } diff --git a/js/lite/src/ietf/ietf.test.ts b/js/lite/src/ietf/ietf.test.ts index f49886394..063c494c3 100644 --- a/js/lite/src/ietf/ietf.test.ts +++ b/js/lite/src/ietf/ietf.test.ts @@ -34,16 +34,6 @@ function concatChunks(chunks: Uint8Array[]): Uint8Array { return result; } -// Helper to encode a message (no version) -async function encodeMessage }>(message: T): Promise { - const { stream, written } = createTestWritableStream(); - const writer = new Writer(stream); - await message.encode(writer); - writer.close(); - await writer.closed; - return concatChunks(written); -} - // Helper to encode a versioned message async function encodeVersioned }>( message: T, @@ -57,12 +47,6 @@ async function encodeVersioned(bytes: Uint8Array, decoder: (r: Reader) => Promise): Promise { - const reader = new Reader(undefined, bytes); - return await decoder(reader); -} - // Helper to decode a versioned message async function decodeVersioned( bytes: Uint8Array, @@ -131,8 +115,8 @@ test("SubscribeOk v15: round trip", async () => { test("SubscribeError: round trip", async () => { const msg = new Subscribe.SubscribeError(123n, 500, "Not found"); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, Subscribe.SubscribeError.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_14); + const decoded = await decodeVersioned(encoded, Subscribe.SubscribeError.decode, Version.DRAFT_14); assert.strictEqual(decoded.requestId, 123n); assert.strictEqual(decoded.errorCode, 500); @@ -142,8 +126,8 @@ test("SubscribeError: round trip", async () => { test("Unsubscribe: round trip", async () => { const msg = new Subscribe.Unsubscribe(999n); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, Subscribe.Unsubscribe.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_14); + const decoded = await decodeVersioned(encoded, Subscribe.Unsubscribe.decode, Version.DRAFT_14); assert.strictEqual(decoded.requestId, 999n); }); @@ -151,8 +135,8 @@ test("Unsubscribe: round trip", async () => { test("PublishDone: basic test", async () => { const msg = new PublishDone(10n, 0, "complete"); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, PublishDone.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_14); + const decoded = await decodeVersioned(encoded, PublishDone.decode, Version.DRAFT_14); assert.strictEqual(decoded.requestId, 10n); assert.strictEqual(decoded.statusCode, 0); @@ -162,8 +146,8 @@ test("PublishDone: basic test", async () => { test("PublishDone: with error", async () => { const msg = new PublishDone(10n, 1, "error"); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, PublishDone.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_14); + const decoded = await decodeVersioned(encoded, PublishDone.decode, Version.DRAFT_14); assert.strictEqual(decoded.requestId, 10n); assert.strictEqual(decoded.statusCode, 1); @@ -174,8 +158,8 @@ test("PublishDone: with error", async () => { test("PublishNamespace: round trip", async () => { const msg = new Announce.PublishNamespace(1n, Path.from("test/broadcast")); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, Announce.PublishNamespace.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_14); + const decoded = await decodeVersioned(encoded, Announce.PublishNamespace.decode, Version.DRAFT_14); assert.strictEqual(decoded.requestId, 1n); assert.strictEqual(decoded.trackNamespace, "test/broadcast"); @@ -184,8 +168,8 @@ test("PublishNamespace: round trip", async () => { test("PublishNamespaceOk: round trip", async () => { const msg = new Announce.PublishNamespaceOk(2n); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, Announce.PublishNamespaceOk.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_14); + const decoded = await decodeVersioned(encoded, Announce.PublishNamespaceOk.decode, Version.DRAFT_14); assert.strictEqual(decoded.requestId, 2n); }); @@ -193,8 +177,8 @@ test("PublishNamespaceOk: round trip", async () => { test("PublishNamespaceError: round trip", async () => { const msg = new Announce.PublishNamespaceError(3n, 404, "Unauthorized"); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, Announce.PublishNamespaceError.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_14); + const decoded = await decodeVersioned(encoded, Announce.PublishNamespaceError.decode, Version.DRAFT_14); assert.strictEqual(decoded.requestId, 3n); assert.strictEqual(decoded.errorCode, 404); @@ -204,8 +188,8 @@ test("PublishNamespaceError: round trip", async () => { test("PublishNamespaceDone: round trip", async () => { const msg = new Announce.PublishNamespaceDone(Path.from("old/stream")); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, Announce.PublishNamespaceDone.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_14); + const decoded = await decodeVersioned(encoded, Announce.PublishNamespaceDone.decode, Version.DRAFT_14); assert.strictEqual(decoded.trackNamespace, "old/stream"); }); @@ -213,8 +197,8 @@ test("PublishNamespaceDone: round trip", async () => { test("PublishNamespaceCancel: round trip", async () => { const msg = new Announce.PublishNamespaceCancel(Path.from("canceled"), 1, "Shutdown"); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, Announce.PublishNamespaceCancel.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_14); + const decoded = await decodeVersioned(encoded, Announce.PublishNamespaceCancel.decode, Version.DRAFT_14); assert.strictEqual(decoded.trackNamespace, "canceled"); assert.strictEqual(decoded.errorCode, 1); @@ -225,8 +209,8 @@ test("PublishNamespaceCancel: round trip", async () => { test("GoAway: with URL", async () => { const msg = new GoAway.GoAway("https://example.com/new"); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, GoAway.GoAway.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_14); + const decoded = await decodeVersioned(encoded, GoAway.GoAway.decode, Version.DRAFT_14); assert.strictEqual(decoded.newSessionUri, "https://example.com/new"); }); @@ -234,8 +218,8 @@ test("GoAway: with URL", async () => { test("GoAway: empty", async () => { const msg = new GoAway.GoAway(""); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, GoAway.GoAway.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_14); + const decoded = await decodeVersioned(encoded, GoAway.GoAway.decode, Version.DRAFT_14); assert.strictEqual(decoded.newSessionUri, ""); }); @@ -244,8 +228,8 @@ test("GoAway: empty", async () => { test("TrackStatusRequest: round trip", async () => { const msg = new Track.TrackStatusRequest(Path.from("video/stream"), "main"); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, Track.TrackStatusRequest.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_14); + const decoded = await decodeVersioned(encoded, Track.TrackStatusRequest.decode, Version.DRAFT_14); assert.strictEqual(decoded.trackNamespace, "video/stream"); assert.strictEqual(decoded.trackName, "main"); @@ -254,8 +238,8 @@ test("TrackStatusRequest: round trip", async () => { test("TrackStatus: round trip", async () => { const msg = new Track.TrackStatus(Path.from("test"), "status", 200, 42n, 100n); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, Track.TrackStatus.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_14); + const decoded = await decodeVersioned(encoded, Track.TrackStatus.decode, Version.DRAFT_14); assert.strictEqual(decoded.trackNamespace, "test"); assert.strictEqual(decoded.trackName, "status"); @@ -310,8 +294,8 @@ test("SubscribeOk v14: rejects non-zero expires", async () => { test("SubscribeError: unicode strings", async () => { const msg = new Subscribe.SubscribeError(1n, 400, "Error: 错误 🚫"); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, Subscribe.SubscribeError.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_14); + const decoded = await decodeVersioned(encoded, Subscribe.SubscribeError.decode, Version.DRAFT_14); assert.strictEqual(decoded.requestId, 1n); assert.strictEqual(decoded.errorCode, 400); @@ -321,8 +305,8 @@ test("SubscribeError: unicode strings", async () => { test("PublishNamespace: unicode namespace", async () => { const msg = new Announce.PublishNamespace(1n, Path.from("会议/房间")); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, Announce.PublishNamespace.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_14); + const decoded = await decodeVersioned(encoded, Announce.PublishNamespace.decode, Version.DRAFT_14); assert.strictEqual(decoded.requestId, 1n); assert.strictEqual(decoded.trackNamespace, "会议/房间"); @@ -402,19 +386,32 @@ test("ServerSetup v14: round trip", async () => { test("RequestOk: round trip", async () => { const msg = new RequestOk(42n); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, RequestOk.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_15); + const decoded = await decodeVersioned(encoded, RequestOk.decode, Version.DRAFT_15); assert.strictEqual(decoded.requestId, 42n); }); -test("RequestError: round trip", async () => { +test("RequestError v15: round trip", async () => { const msg = new RequestError(99n, 500, "Internal error"); - const encoded = await encodeMessage(msg); - const decoded = await decodeMessage(encoded, RequestError.decode); + const encoded = await encodeVersioned(msg, Version.DRAFT_15); + const decoded = await decodeVersioned(encoded, RequestError.decode, Version.DRAFT_15); + + assert.strictEqual(decoded.requestId, 99n); + assert.strictEqual(decoded.errorCode, 500); + assert.strictEqual(decoded.reasonPhrase, "Internal error"); + assert.strictEqual(decoded.retryInterval, 0n); +}); + +test("RequestError v16: round trip with retryInterval", async () => { + const msg = new RequestError(99n, 500, "Internal error", 5000n); + + const encoded = await encodeVersioned(msg, Version.DRAFT_16); + const decoded = await decodeVersioned(encoded, RequestError.decode, Version.DRAFT_16); assert.strictEqual(decoded.requestId, 99n); assert.strictEqual(decoded.errorCode, 500); assert.strictEqual(decoded.reasonPhrase, "Internal error"); + assert.strictEqual(decoded.retryInterval, 5000n); }); diff --git a/js/lite/src/ietf/parameters.ts b/js/lite/src/ietf/parameters.ts index 27fd4a75d..beb0b49ba 100644 --- a/js/lite/src/ietf/parameters.ts +++ b/js/lite/src/ietf/parameters.ts @@ -1,5 +1,6 @@ import type { Reader, Writer } from "../stream.ts"; import * as Varint from "../varint.ts"; +import { type IetfVersion, Version } from "./version.ts"; export const Parameter = { MaxRequestId: 2n, @@ -61,44 +62,73 @@ export class Parameters { return this.vars.delete(id); } - async encode(w: Writer) { + async encode(w: Writer, version: IetfVersion) { await w.u53(this.vars.size + this.bytes.size); - for (const [id, value] of this.vars) { - await w.u62(id); - await w.u62(value); - } - for (const [id, value] of this.bytes) { - await w.u62(id); - await w.u53(value.length); - await w.write(value); + if (version === Version.DRAFT_16) { + // Delta encoding: collect all keys, sort, encode deltas + const all: { key: bigint; isVar: boolean }[] = []; + for (const id of this.vars.keys()) all.push({ key: id, isVar: true }); + for (const id of this.bytes.keys()) all.push({ key: id, isVar: false }); + all.sort((a, b) => (a.key < b.key ? -1 : a.key > b.key ? 1 : 0)); + + let prevId = 0n; + for (let i = 0; i < all.length; i++) { + const { key, isVar } = all[i]; + const delta = i === 0 ? key : key - prevId; + prevId = key; + await w.u62(delta); + + if (isVar) { + // biome-ignore lint/style/noNonNullAssertion: key is guaranteed to exist in vars map + await w.u62(this.vars.get(key)!); + } else { + // biome-ignore lint/style/noNonNullAssertion: key is guaranteed to exist in bytes map + const value = this.bytes.get(key)!; + await w.u53(value.length); + await w.write(value); + } + } + } else { + for (const [id, value] of this.vars) { + await w.u62(id); + await w.u62(value); + } + + for (const [id, value] of this.bytes) { + await w.u62(id); + await w.u53(value.length); + await w.write(value); + } } } - static async decode(r: Reader): Promise { + static async decode(r: Reader, version: IetfVersion): Promise { const count = await r.u53(); const params = new Parameters(); + let prevType = 0n; + for (let i = 0; i < count; i++) { - const id = await r.u62(); + let id: bigint; + if (version === Version.DRAFT_16) { + const delta = await r.u62(); + id = i === 0 ? delta : prevType + delta; + prevType = id; + } else { + id = await r.u62(); + } - // Per draft-ietf-moq-transport-14 Section 1.4.2: - // - If Type is even, Value is a single varint (no length prefix) - // - If Type is odd, Value has a length prefix followed by bytes if (id % 2n === 0n) { if (params.vars.has(id)) { throw new Error(`duplicate parameter id: ${id.toString()}`); } - - // Even: read varint and store as encoded bytes const varint = await r.u62(); params.setVarint(id, varint); } else { if (params.bytes.has(id)) { throw new Error(`duplicate parameter id: ${id.toString()}`); } - - // Odd: read length-prefixed bytes const size = await r.u53(); const bytes = await r.read(size); params.setBytes(id, bytes); @@ -226,27 +256,62 @@ export class MessageParameters { this.bytes.set(MSG_PARAM_SUBSCRIPTION_FILTER, new Uint8Array([v])); } - async encode(w: Writer) { + async encode(w: Writer, version: IetfVersion) { await w.u53(this.vars.size + this.bytes.size); - for (const [id, value] of this.vars) { - await w.u62(id); - await w.u62(value); - } + if (version === Version.DRAFT_16) { + // Delta encoding: merge vars and bytes, sort by key + const all: { key: bigint; isVar: boolean }[] = []; + for (const id of this.vars.keys()) all.push({ key: id, isVar: true }); + for (const id of this.bytes.keys()) all.push({ key: id, isVar: false }); + all.sort((a, b) => (a.key < b.key ? -1 : a.key > b.key ? 1 : 0)); + + let prevId = 0n; + for (let i = 0; i < all.length; i++) { + const { key, isVar } = all[i]; + const delta = i === 0 ? key : key - prevId; + prevId = key; + await w.u62(delta); + + if (isVar) { + // biome-ignore lint/style/noNonNullAssertion: key is guaranteed to exist in vars map + await w.u62(this.vars.get(key)!); + } else { + // biome-ignore lint/style/noNonNullAssertion: key is guaranteed to exist in bytes map + const value = this.bytes.get(key)!; + await w.u53(value.length); + await w.write(value); + } + } + } else { + for (const [id, value] of this.vars) { + await w.u62(id); + await w.u62(value); + } - for (const [id, value] of this.bytes) { - await w.u62(id); - await w.u53(value.length); - await w.write(value); + for (const [id, value] of this.bytes) { + await w.u62(id); + await w.u53(value.length); + await w.write(value); + } } } - static async decode(r: Reader): Promise { + static async decode(r: Reader, version: IetfVersion): Promise { const count = await r.u53(); const params = new MessageParameters(); + let prevType = 0n; + for (let i = 0; i < count; i++) { - const id = await r.u62(); + let id: bigint; + if (version === Version.DRAFT_16) { + const delta = await r.u62(); + id = i === 0 ? delta : prevType + delta; + prevType = id; + } else { + id = await r.u62(); + } if (id % 2n === 0n) { if (params.vars.has(id)) { diff --git a/js/lite/src/ietf/publish.ts b/js/lite/src/ietf/publish.ts index cca41ac33..4a019a66f 100644 --- a/js/lite/src/ietf/publish.ts +++ b/js/lite/src/ietf/publish.ts @@ -45,7 +45,7 @@ export class Publish { await w.string(this.trackName); await w.u62(this.trackAlias); - if (version === Version.DRAFT_15) { + if (version === Version.DRAFT_15 || version === Version.DRAFT_16) { // v15: fields in parameters const params = new MessageParameters(); params.groupOrder = this.groupOrder; @@ -53,7 +53,7 @@ export class Publish { if (this.largest) { params.largest = this.largest; } - await params.encode(w); + await params.encode(w, version); } else if (version === Version.DRAFT_14) { await w.u8(this.groupOrder); await w.bool(this.contentExists); @@ -86,8 +86,8 @@ export class Publish { const trackName = await r.string(); const trackAlias = await r.u62(); - if (version === Version.DRAFT_15) { - const params = await MessageParameters.decode(r); + if (version === Version.DRAFT_15 || version === Version.DRAFT_16) { + const params = await MessageParameters.decode(r, version); const groupOrder = params.groupOrder ?? 0x02; const forward = params.forward ?? true; const largest = params.largest; @@ -106,7 +106,7 @@ export class Publish { const contentExists = await r.bool(); const largest = contentExists ? { groupId: await r.u62(), objectId: await r.u62() } : undefined; const forward = await r.bool(); - await Parameters.decode(r); // ignore parameters + await Parameters.decode(r, version); // ignore parameters return new Publish( requestId, trackNamespace, @@ -131,11 +131,11 @@ export class PublishOk { throw new Error("PUBLISH_OK messages are not supported"); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, PublishOk.#decode); } @@ -163,11 +163,11 @@ export class PublishError { await w.string(this.reasonPhrase); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, PublishError.#decode); } @@ -200,11 +200,11 @@ export class PublishDone { await w.string(this.reasonPhrase); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, PublishDone.#decode); } diff --git a/js/lite/src/ietf/publish_namespace.ts b/js/lite/src/ietf/publish_namespace.ts index 708519e22..10ee22fe7 100644 --- a/js/lite/src/ietf/publish_namespace.ts +++ b/js/lite/src/ietf/publish_namespace.ts @@ -3,6 +3,7 @@ import type { Reader, Writer } from "../stream.ts"; import * as Message from "./message.ts"; import * as Namespace from "./namespace.ts"; import { Parameters } from "./parameters.ts"; +import type { IetfVersion } from "./version.ts"; // In draft-14, ANNOUNCE is renamed to PUBLISH_NAMESPACE export class PublishNamespace { @@ -16,24 +17,24 @@ export class PublishNamespace { this.trackNamespace = trackNamespace; } - async #encode(w: Writer): Promise { + async #encode(w: Writer, _version: IetfVersion): Promise { await w.u62(this.requestId); await Namespace.encode(w, this.trackNamespace); await w.u53(0); // size of parameters } - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version: IetfVersion): Promise { + return Message.encode(w, (wr) => this.#encode(wr, version)); } - static async decode(r: Reader): Promise { - return Message.decode(r, PublishNamespace.#decode); + static async decode(r: Reader, version: IetfVersion): Promise { + return Message.decode(r, (rd) => PublishNamespace.#decode(rd, version)); } - static async #decode(r: Reader): Promise { + static async #decode(r: Reader, version: IetfVersion): Promise { const requestId = await r.u62(); const trackNamespace = await Namespace.decode(r); - await Parameters.decode(r); // ignore parameters + await Parameters.decode(r, version); // ignore parameters return new PublishNamespace(requestId, trackNamespace); } } @@ -51,11 +52,11 @@ export class PublishNamespaceOk { await w.u62(this.requestId); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, PublishNamespaceOk.#decode); } @@ -84,11 +85,11 @@ export class PublishNamespaceError { await w.string(this.reasonPhrase); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, PublishNamespaceError.#decode); } @@ -119,11 +120,11 @@ export class PublishNamespaceCancel { await w.string(this.reasonPhrase); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, PublishNamespaceCancel.#decode); } @@ -149,11 +150,11 @@ export class PublishNamespaceDone { await Namespace.encode(w, this.trackNamespace); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, PublishNamespaceDone.#decode); } diff --git a/js/lite/src/ietf/publisher.ts b/js/lite/src/ietf/publisher.ts index dddcf9072..d4d75e832 100644 --- a/js/lite/src/ietf/publisher.ts +++ b/js/lite/src/ietf/publisher.ts @@ -87,7 +87,7 @@ export class Publisher { const broadcast = this.#broadcasts.get(name); if (!broadcast) { - if (this.#control.version === Version.DRAFT_15) { + if (this.#control.version === Version.DRAFT_15 || this.#control.version === Version.DRAFT_16) { const errorMsg = new RequestError(msg.requestId, 404, "Broadcast not found"); await this.#control.write(errorMsg); } else if (this.#control.version === Version.DRAFT_14) { diff --git a/js/lite/src/ietf/request.ts b/js/lite/src/ietf/request.ts index bdeed7446..be48c49da 100644 --- a/js/lite/src/ietf/request.ts +++ b/js/lite/src/ietf/request.ts @@ -1,6 +1,7 @@ import type { Reader, Writer } from "../stream.ts"; import * as Message from "./message.ts"; import { MessageParameters } from "./parameters.ts"; +import { type IetfVersion, Version } from "./version.ts"; export class MaxRequestId { static id = 0x15; @@ -15,7 +16,7 @@ export class MaxRequestId { await w.u62(this.requestId); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } @@ -23,7 +24,7 @@ export class MaxRequestId { return new MaxRequestId(await r.u62()); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, MaxRequestId.#decode); } } @@ -41,7 +42,7 @@ export class RequestsBlocked { await w.u62(this.requestId); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } @@ -49,7 +50,7 @@ export class RequestsBlocked { return new RequestsBlocked(await r.u62()); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, RequestsBlocked.#decode); } } @@ -67,23 +68,23 @@ export class RequestOk { this.parameters = parameters; } - async #encode(w: Writer): Promise { + async #encode(w: Writer, version: IetfVersion): Promise { await w.u62(this.requestId); - await this.parameters.encode(w); + await this.parameters.encode(w, version); } - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version: IetfVersion): Promise { + return Message.encode(w, (wr) => this.#encode(wr, version)); } - static async #decode(r: Reader): Promise { + static async #decode(r: Reader, version: IetfVersion): Promise { const requestId = await r.u62(); - const parameters = await MessageParameters.decode(r); + const parameters = await MessageParameters.decode(r, version); return new RequestOk(requestId, parameters); } - static async decode(r: Reader): Promise { - return Message.decode(r, RequestOk.#decode); + static async decode(r: Reader, version: IetfVersion): Promise { + return Message.decode(r, (rd) => RequestOk.#decode(rd, version)); } } @@ -95,31 +96,37 @@ export class RequestError { requestId: bigint; errorCode: number; reasonPhrase: string; + retryInterval: bigint; - constructor(requestId: bigint, errorCode: number, reasonPhrase: string) { + constructor(requestId: bigint, errorCode: number, reasonPhrase: string, retryInterval = 0n) { this.requestId = requestId; this.errorCode = errorCode; this.reasonPhrase = reasonPhrase; + this.retryInterval = retryInterval; } - async #encode(w: Writer): Promise { + async #encode(w: Writer, version: IetfVersion): Promise { await w.u62(this.requestId); await w.u62(BigInt(this.errorCode)); + if (version === Version.DRAFT_16) { + await w.u62(this.retryInterval); + } await w.string(this.reasonPhrase); } - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version: IetfVersion): Promise { + return Message.encode(w, (wr) => this.#encode(wr, version)); } - static async #decode(r: Reader): Promise { + static async #decode(r: Reader, version: IetfVersion): Promise { const requestId = await r.u62(); const errorCode = Number(await r.u62()); + const retryInterval = version === Version.DRAFT_16 ? await r.u62() : 0n; const reasonPhrase = await r.string(); - return new RequestError(requestId, errorCode, reasonPhrase); + return new RequestError(requestId, errorCode, reasonPhrase, retryInterval); } - static async decode(r: Reader): Promise { - return Message.decode(r, RequestError.#decode); + static async decode(r: Reader, version: IetfVersion): Promise { + return Message.decode(r, (rd) => RequestError.#decode(rd, version)); } } diff --git a/js/lite/src/ietf/setup.ts b/js/lite/src/ietf/setup.ts index 86a29a7a0..16baa1ded 100644 --- a/js/lite/src/ietf/setup.ts +++ b/js/lite/src/ietf/setup.ts @@ -17,20 +17,15 @@ export class ClientSetup { } async #encode(w: Writer, version: IetfVersion): Promise { - if (version === Version.DRAFT_15) { - // Draft15: no versions list, just parameters - // Make sure versions is draft 15 only. - if (this.versions.length !== 1 || this.versions[0] !== Version.DRAFT_15) { - throw new Error("versions must be draft 15 only"); - } - - await this.parameters.encode(w); + if (version === Version.DRAFT_15 || version === Version.DRAFT_16) { + // Draft15+: no versions list, just parameters + await this.parameters.encode(w, version); } else if (version === Version.DRAFT_14) { await w.u53(this.versions.length); for (const v of this.versions) { await w.u53(v); } - await this.parameters.encode(w); + await this.parameters.encode(w, version); } else { const _: never = version; throw new Error(`unsupported version: ${_}`); @@ -42,10 +37,10 @@ export class ClientSetup { } static async #decode(r: Reader, version: IetfVersion): Promise { - if (version === Version.DRAFT_15) { - // Draft15: no versions list, just parameters - const parameters = await Parameters.decode(r); - return new ClientSetup([Version.DRAFT_15], parameters); + if (version === Version.DRAFT_15 || version === Version.DRAFT_16) { + // Draft15+: no versions list, just parameters + const parameters = await Parameters.decode(r, version); + return new ClientSetup([version], parameters); } else if (version === Version.DRAFT_14) { // Number of supported versions const numVersions = await r.u53(); @@ -60,7 +55,7 @@ export class ClientSetup { supportedVersions.push(v); } - const parameters = await Parameters.decode(r); + const parameters = await Parameters.decode(r, version); return new ClientSetup(supportedVersions, parameters); } else { @@ -86,12 +81,12 @@ export class ServerSetup { } async #encode(w: Writer, version: IetfVersion): Promise { - if (version === Version.DRAFT_15) { - // Draft15: no version field, just parameters - await this.parameters.encode(w); + if (version === Version.DRAFT_15 || version === Version.DRAFT_16) { + // Draft15+: no version field, just parameters + await this.parameters.encode(w, version); } else if (version === Version.DRAFT_14) { await w.u53(this.version); - await this.parameters.encode(w); + await this.parameters.encode(w, version); } else { const _: never = version; throw new Error(`unsupported version: ${_}`); @@ -103,13 +98,13 @@ export class ServerSetup { } static async #decode(r: Reader, version: IetfVersion): Promise { - if (version === Version.DRAFT_15) { - // Draft15: no version field, just parameters - const parameters = await Parameters.decode(r); - return new ServerSetup(Version.DRAFT_15, parameters); + if (version === Version.DRAFT_15 || version === Version.DRAFT_16) { + // Draft15+: no version field, just parameters + const parameters = await Parameters.decode(r, version); + return new ServerSetup(version, parameters); } else if (version === Version.DRAFT_14) { const selectedVersion = await r.u53(); - const parameters = await Parameters.decode(r); + const parameters = await Parameters.decode(r, version); return new ServerSetup(selectedVersion, parameters); } else { const _: never = version; diff --git a/js/lite/src/ietf/subscribe.ts b/js/lite/src/ietf/subscribe.ts index 38f42329e..c03645b54 100644 --- a/js/lite/src/ietf/subscribe.ts +++ b/js/lite/src/ietf/subscribe.ts @@ -28,14 +28,14 @@ export class Subscribe { await Namespace.encode(w, this.trackNamespace); await w.string(this.trackName); - if (version === Version.DRAFT_15) { + if (version === Version.DRAFT_15 || version === Version.DRAFT_16) { // v15: fields moved into parameters const params = new MessageParameters(); params.subscriberPriority = this.subscriberPriority; params.groupOrder = GROUP_ORDER; params.forward = true; params.subscriptionFilter = 0x2; // LargestObject - await params.encode(w); + await params.encode(w, version); } else if (version === Version.DRAFT_14) { await w.u8(this.subscriberPriority); await w.u8(GROUP_ORDER); @@ -61,9 +61,9 @@ export class Subscribe { const trackNamespace = await Namespace.decode(r); const trackName = await r.string(); - if (version === Version.DRAFT_15) { + if (version === Version.DRAFT_15 || version === Version.DRAFT_16) { // v15: fields are in parameters - const params = await MessageParameters.decode(r); + const params = await MessageParameters.decode(r, version); const subscriberPriority = params.subscriberPriority ?? 128; let groupOrder = params.groupOrder ?? GROUP_ORDER; if (groupOrder > 2) { @@ -105,7 +105,7 @@ export class Subscribe { throw new Error(`unsupported filter type: ${filterType}`); } - await Parameters.decode(r); // ignore parameters + await Parameters.decode(r, version); // ignore parameters return new Subscribe(requestId, trackNamespace, trackName, subscriberPriority); } else { @@ -130,11 +130,11 @@ export class SubscribeOk { await w.u62(this.requestId); await w.u62(this.trackAlias); - if (version === Version.DRAFT_15) { + if (version === Version.DRAFT_15 || version === Version.DRAFT_16) { // v15: just parameters after track_alias const params = new MessageParameters(); params.groupOrder = GROUP_ORDER; - await params.encode(w); + await params.encode(w, version); } else if (version === Version.DRAFT_14) { await w.u62(0n); // expires = 0 await w.u8(GROUP_ORDER); @@ -158,9 +158,9 @@ export class SubscribeOk { const requestId = await r.u62(); const trackAlias = await r.u62(); - if (version === Version.DRAFT_15) { + if (version === Version.DRAFT_15 || version === Version.DRAFT_16) { // v15: just parameters - await MessageParameters.decode(r); + await MessageParameters.decode(r, version); } else if (version === Version.DRAFT_14) { const expires = await r.u62(); if (expires !== BigInt(0)) { @@ -176,7 +176,7 @@ export class SubscribeOk { await r.u62(); } - await Parameters.decode(r); // ignore parameters + await Parameters.decode(r, version); // ignore parameters } else { const _: never = version; throw new Error(`unsupported version: ${_}`); @@ -205,11 +205,11 @@ export class SubscribeError { await w.string(this.reasonPhrase); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, SubscribeError.#decode); } @@ -235,11 +235,11 @@ export class Unsubscribe { await w.u62(this.requestId); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, Unsubscribe.#decode); } diff --git a/js/lite/src/ietf/subscribe_namespace.ts b/js/lite/src/ietf/subscribe_namespace.ts index 96ddc7699..48a2678e7 100644 --- a/js/lite/src/ietf/subscribe_namespace.ts +++ b/js/lite/src/ietf/subscribe_namespace.ts @@ -3,6 +3,7 @@ import type { Reader, Writer } from "../stream.ts"; import * as Message from "./message.ts"; import * as Namespace from "./namespace.ts"; import { Parameters } from "./parameters.ts"; +import type { IetfVersion } from "./version.ts"; // In draft-14, SUBSCRIBE_ANNOUNCES is renamed to SUBSCRIBE_NAMESPACE export class SubscribeNamespace { @@ -16,24 +17,24 @@ export class SubscribeNamespace { this.requestId = requestId; } - async #encode(w: Writer): Promise { + async #encode(w: Writer, _version: IetfVersion): Promise { await w.u62(this.requestId); await Namespace.encode(w, this.namespace); await w.u53(0); // no parameters } - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version: IetfVersion): Promise { + return Message.encode(w, (wr) => this.#encode(wr, version)); } - static async decode(r: Reader): Promise { - return Message.decode(r, SubscribeNamespace.#decode); + static async decode(r: Reader, version: IetfVersion): Promise { + return Message.decode(r, (rd) => SubscribeNamespace.#decode(rd, version)); } - static async #decode(r: Reader): Promise { + static async #decode(r: Reader, version: IetfVersion): Promise { const requestId = await r.u62(); const namespace = await Namespace.decode(r); - await Parameters.decode(r); + await Parameters.decode(r, version); return new SubscribeNamespace(namespace, requestId); } @@ -52,11 +53,11 @@ export class SubscribeNamespaceOk { await w.u62(this.requestId); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, SubscribeNamespaceOk.#decode); } @@ -85,11 +86,11 @@ export class SubscribeNamespaceError { await w.string(this.reasonPhrase); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, SubscribeNamespaceError.#decode); } @@ -115,11 +116,11 @@ export class UnsubscribeNamespace { await w.u62(this.requestId); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, UnsubscribeNamespace.#decode); } diff --git a/js/lite/src/ietf/subscriber.ts b/js/lite/src/ietf/subscriber.ts index 0c0a9b589..c9332cd56 100644 --- a/js/lite/src/ietf/subscriber.ts +++ b/js/lite/src/ietf/subscriber.ts @@ -246,7 +246,7 @@ export class Subscriber { async handlePublish(msg: Publish) { // TODO technically, we should send PUBLISH_OK if we had a SUBSCRIBE in flight for the same track. // Otherwise, the peer will SUBSCRIBE_ERROR because duplicate subscriptions are not allowed :( - if (this.#control.version === Version.DRAFT_15) { + if (this.#control.version === Version.DRAFT_15 || this.#control.version === Version.DRAFT_16) { const err = new RequestError(msg.requestId, 500, "publish not supported"); await this.#control.write(err); } else if (this.#control.version === Version.DRAFT_14) { diff --git a/js/lite/src/ietf/track.ts b/js/lite/src/ietf/track.ts index 96eb7a9ca..b5b6ab0d9 100644 --- a/js/lite/src/ietf/track.ts +++ b/js/lite/src/ietf/track.ts @@ -2,6 +2,7 @@ import type * as Path from "../path.ts"; import type { Reader, Writer } from "../stream.ts"; import * as Message from "./message.ts"; import * as Namespace from "./namespace.ts"; +import type { IetfVersion } from "./version.ts"; export class TrackStatusRequest { static id = 0x0d; @@ -19,11 +20,11 @@ export class TrackStatusRequest { await w.string(this.trackName); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, TrackStatusRequest.#decode); } @@ -66,11 +67,11 @@ export class TrackStatus { await w.u62(this.lastObjectId); } - async encode(w: Writer): Promise { + async encode(w: Writer, _version: IetfVersion): Promise { return Message.encode(w, this.#encode.bind(this)); } - static async decode(r: Reader): Promise { + static async decode(r: Reader, _version: IetfVersion): Promise { return Message.decode(r, TrackStatus.#decode); } diff --git a/js/lite/src/ietf/version.ts b/js/lite/src/ietf/version.ts index a3badb842..bb6b99080 100644 --- a/js/lite/src/ietf/version.ts +++ b/js/lite/src/ietf/version.ts @@ -19,6 +19,12 @@ export const Version = { * https://www.ietf.org/archive/id/draft-ietf-moq-transport-15.txt */ DRAFT_15: 0xff00000f, + + /** + * draft-ietf-moq-transport-16 + * https://www.ietf.org/archive/id/draft-ietf-moq-transport-16.txt + */ + DRAFT_16: 0xff000010, } as const; export type Version = (typeof Version)[keyof typeof Version]; @@ -27,10 +33,11 @@ export type Version = (typeof Version)[keyof typeof Version]; export const ALPN = { DRAFT_14: "moq-00", DRAFT_15: "moqt-15", + DRAFT_16: "moqt-16", } as const; /** * IETF protocol versions used by the ietf/ module. * Use this narrower type for version-branched encode/decode to get exhaustive matching. */ -export type IetfVersion = typeof Version.DRAFT_14 | typeof Version.DRAFT_15; +export type IetfVersion = typeof Version.DRAFT_14 | typeof Version.DRAFT_15 | typeof Version.DRAFT_16; diff --git a/rs/moq-lite/src/client.rs b/rs/moq-lite/src/client.rs index 744f44deb..7f5a4d3f1 100644 --- a/rs/moq-lite/src/client.rs +++ b/rs/moq-lite/src/client.rs @@ -46,6 +46,10 @@ impl Client { // If ALPN was used to negotiate the version, use the appropriate encoding. // Default to IETF 14 if no ALPN was used and we'll negotiate the version later. let (encoding, supported) = match session.protocol() { + Some(p) if p == ietf::ALPN_16 => ( + Version::Ietf(ietf::Version::Draft16), + vec![ietf::Version::Draft16.into()], + ), Some(p) if p == ietf::ALPN_15 => ( Version::Ietf(ietf::Version::Draft15), vec![ietf::Version::Draft15.into()], @@ -61,10 +65,14 @@ impl Client { let mut stream = Stream::open(&session, encoding).await?; + let ietf_version = match encoding { + Version::Ietf(v) => v, + _ => ietf::Version::Draft14, + }; let mut parameters = ietf::Parameters::default(); parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64); parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec()); - let parameters = parameters.encode_bytes(()); + let parameters = parameters.encode_bytes(ietf_version); let client = setup::Client { versions: supported.clone().into(), diff --git a/rs/moq-lite/src/ietf/control.rs b/rs/moq-lite/src/ietf/control.rs index dbd6208ed..c231d3f28 100644 --- a/rs/moq-lite/src/ietf/control.rs +++ b/rs/moq-lite/src/ietf/control.rs @@ -71,6 +71,9 @@ impl Control { } pub async fn next_request_id(&self) -> Result { + let timeout = tokio::time::sleep(std::time::Duration::from_secs(10)); + tokio::pin!(timeout); + loop { let notify = { let mut state = self.state.lock().unwrap(); @@ -85,6 +88,10 @@ impl Control { tokio::select! { _ = notify => continue, _ = self.tx.closed() => return Err(Error::Cancel), + _ = &mut timeout => { + tracing::warn!("timed out waiting for MAX_REQUEST_ID"); + return Err(Error::Cancel); + } } } } diff --git a/rs/moq-lite/src/ietf/fetch.rs b/rs/moq-lite/src/ietf/fetch.rs index 4230e2a1d..ee6c73eed 100644 --- a/rs/moq-lite/src/ietf/fetch.rs +++ b/rs/moq-lite/src/ietf/fetch.rs @@ -121,7 +121,7 @@ impl Message for Fetch<'_> { self.fetch_type.encode(w, version); 0u8.encode(w, version); // no parameters } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { // v15: request_id, fetch_type, parameters (with subscriber_priority, group_order) self.fetch_type.encode(w, version); let mut params = MessageParameters::default(); @@ -148,7 +148,7 @@ impl Message for Fetch<'_> { fetch_type, }) } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { let fetch_type = FetchType::decode(buf, version)?; let params = MessageParameters::decode(buf, version)?; @@ -193,7 +193,7 @@ impl Message for FetchOk { self.end_location.encode(w, version); 0u8.encode(w, version); // no parameters } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { // v15: request_id, end_of_track(8), end_location, parameters self.end_of_track.encode(w, version); self.end_location.encode(w, version); @@ -220,7 +220,7 @@ impl Message for FetchOk { end_location, }) } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { let end_of_track = bool::decode(buf, version)?; let end_location = Location::decode(buf, version)?; let params = MessageParameters::decode(buf, version)?; diff --git a/rs/moq-lite/src/ietf/parameters.rs b/rs/moq-lite/src/ietf/parameters.rs index b8efe4b1b..a2ef9b373 100644 --- a/rs/moq-lite/src/ietf/parameters.rs +++ b/rs/moq-lite/src/ietf/parameters.rs @@ -4,6 +4,8 @@ use num_enum::{FromPrimitive, IntoPrimitive}; use crate::coding::*; +use super::Version; + const MAX_PARAMS: u64 = 64; // ---- Setup Parameters (used in CLIENT_SETUP/SERVER_SETUP) ---- @@ -34,32 +36,41 @@ pub struct Parameters { bytes: HashMap>, } -impl Decode for Parameters { - fn decode(mut r: &mut R, version: V) -> Result { +impl Decode for Parameters { + fn decode(mut r: &mut R, version: Version) -> Result { let mut vars = HashMap::new(); let mut bytes = HashMap::new(); - // I hate this encoding so much; let me encode my role and get on with my life. - let count = u64::decode(r, version.clone())?; + let count = u64::decode(r, version)?; if count > MAX_PARAMS { return Err(DecodeError::TooMany); } - for _ in 0..count { - let kind = u64::decode(r, version.clone())?; + let mut prev_type: u64 = 0; + + for i in 0..count { + let kind = match version { + Version::Draft16 => { + let delta = u64::decode(r, version)?; + let abs = if i == 0 { delta } else { prev_type + delta }; + prev_type = abs; + abs + } + _ => u64::decode(r, version)?, + }; if kind % 2 == 0 { let kind = ParameterVarInt::from(kind); match vars.entry(kind) { hash_map::Entry::Occupied(_) => return Err(DecodeError::Duplicate), - hash_map::Entry::Vacant(entry) => entry.insert(u64::decode(&mut r, version.clone())?), + hash_map::Entry::Vacant(entry) => entry.insert(u64::decode(&mut r, version)?), }; } else { let kind = ParameterBytes::from(kind); match bytes.entry(kind) { hash_map::Entry::Occupied(_) => return Err(DecodeError::Duplicate), - hash_map::Entry::Vacant(entry) => entry.insert(Vec::::decode(&mut r, version.clone())?), + hash_map::Entry::Vacant(entry) => entry.insert(Vec::::decode(&mut r, version)?), }; } } @@ -68,18 +79,51 @@ impl Decode for Parameters { } } -impl Encode for Parameters { - fn encode(&self, w: &mut W, version: V) { - (self.vars.len() + self.bytes.len()).encode(w, version.clone()); - - for (kind, value) in self.vars.iter() { - u64::from(*kind).encode(w, version.clone()); - value.encode(w, version.clone()); - } - - for (kind, value) in self.bytes.iter() { - u64::from(*kind).encode(w, version.clone()); - value.encode(w, version.clone()); +impl Encode for Parameters { + fn encode(&self, w: &mut W, version: Version) { + (self.vars.len() + self.bytes.len()).encode(w, version); + + match version { + Version::Draft16 => { + // Delta encoding: collect all keys, sort, encode deltas + let mut all: Vec<(u64, bool, usize)> = Vec::new(); // (key, is_var, index) + let var_keys: Vec<_> = self.vars.keys().collect(); + let byte_keys: Vec<_> = self.bytes.keys().collect(); + for (i, k) in var_keys.iter().enumerate() { + all.push((u64::from(**k), true, i)); + } + for (i, k) in byte_keys.iter().enumerate() { + all.push((u64::from(**k), false, i)); + } + all.sort_by_key(|(k, _, _)| *k); + + let var_vals: Vec<_> = self.vars.values().collect(); + let byte_vals: Vec<_> = self.bytes.values().collect(); + + let mut prev_type: u64 = 0; + for (idx, (kind, is_var, orig_idx)) in all.iter().enumerate() { + let delta = if idx == 0 { *kind } else { kind - prev_type }; + prev_type = *kind; + delta.encode(w, version); + + if *is_var { + var_vals[*orig_idx].encode(w, version); + } else { + byte_vals[*orig_idx].encode(w, version); + } + } + } + _ => { + for (kind, value) in self.vars.iter() { + u64::from(*kind).encode(w, version); + value.encode(w, version); + } + + for (kind, value) in self.bytes.iter() { + u64::from(*kind).encode(w, version); + value.encode(w, version); + } + } } } } @@ -112,29 +156,39 @@ pub struct MessageParameters { bytes: BTreeMap>, } -impl Decode for MessageParameters { - fn decode(mut r: &mut R, version: V) -> Result { +impl Decode for MessageParameters { + fn decode(mut r: &mut R, version: Version) -> Result { let mut vars = BTreeMap::new(); let mut bytes = BTreeMap::new(); - let count = u64::decode(r, version.clone())?; + let count = u64::decode(r, version)?; if count > MAX_PARAMS { return Err(DecodeError::TooMany); } - for _ in 0..count { - let kind = u64::decode(r, version.clone())?; + let mut prev_type: u64 = 0; + + for i in 0..count { + let kind = match version { + Version::Draft16 => { + let delta = u64::decode(r, version)?; + let abs = if i == 0 { delta } else { prev_type + delta }; + prev_type = abs; + abs + } + _ => u64::decode(r, version)?, + }; if kind % 2 == 0 { match vars.entry(kind) { btree_map::Entry::Occupied(_) => return Err(DecodeError::Duplicate), - btree_map::Entry::Vacant(entry) => entry.insert(u64::decode(&mut r, version.clone())?), + btree_map::Entry::Vacant(entry) => entry.insert(u64::decode(&mut r, version)?), }; } else { match bytes.entry(kind) { btree_map::Entry::Occupied(_) => return Err(DecodeError::Duplicate), - btree_map::Entry::Vacant(entry) => entry.insert(Vec::::decode(&mut r, version.clone())?), + btree_map::Entry::Vacant(entry) => entry.insert(Vec::::decode(&mut r, version)?), }; } } @@ -143,18 +197,49 @@ impl Decode for MessageParameters { } } -impl Encode for MessageParameters { - fn encode(&self, w: &mut W, version: V) { - (self.vars.len() + self.bytes.len()).encode(w, version.clone()); - - for (kind, value) in self.vars.iter() { - kind.encode(w, version.clone()); - value.encode(w, version.clone()); - } - - for (kind, value) in self.bytes.iter() { - kind.encode(w, version.clone()); - value.encode(w, version.clone()); +impl Encode for MessageParameters { + fn encode(&self, w: &mut W, version: Version) { + (self.vars.len() + self.bytes.len()).encode(w, version); + + match version { + Version::Draft16 => { + // Delta encoding: BTreeMap is already sorted, merge and sort by key + enum ParamValue<'a> { + Var(&'a u64), + Bytes(&'a Vec), + } + let mut all: Vec<(u64, ParamValue)> = Vec::new(); + for (k, v) in self.vars.iter() { + all.push((*k, ParamValue::Var(v))); + } + for (k, v) in self.bytes.iter() { + all.push((*k, ParamValue::Bytes(v))); + } + all.sort_by_key(|(k, _)| *k); + + let mut prev_type: u64 = 0; + for (idx, (kind, val)) in all.iter().enumerate() { + let delta = if idx == 0 { *kind } else { kind - prev_type }; + prev_type = *kind; + delta.encode(w, version); + + match val { + ParamValue::Var(v) => v.encode(w, version), + ParamValue::Bytes(v) => v.encode(w, version), + } + } + } + _ => { + for (kind, value) in self.vars.iter() { + kind.encode(w, version); + value.encode(w, version); + } + + for (kind, value) in self.bytes.iter() { + kind.encode(w, version); + value.encode(w, version); + } + } } } } @@ -264,3 +349,93 @@ impl MessageParameters { self.bytes.insert(Self::SUBSCRIPTION_FILTER, buf); } } + +#[cfg(test)] +mod tests { + use super::*; + use bytes::BytesMut; + + #[test] + fn test_parameters_v16_delta_round_trip() { + let mut params = Parameters::default(); + params.set_bytes(ParameterBytes::Path, b"/test".to_vec()); + params.set_varint(ParameterVarInt::MaxRequestId, 100); + params.set_bytes(ParameterBytes::Implementation, b"test-impl".to_vec()); + + let mut buf = BytesMut::new(); + params.encode(&mut buf, Version::Draft16); + + let mut bytes = buf.freeze(); + let decoded = Parameters::decode(&mut bytes, Version::Draft16).unwrap(); + + assert_eq!(decoded.get_bytes(ParameterBytes::Path), Some(b"/test".as_ref())); + assert_eq!(decoded.get_varint(ParameterVarInt::MaxRequestId), Some(100)); + assert_eq!( + decoded.get_bytes(ParameterBytes::Implementation), + Some(b"test-impl".as_ref()) + ); + } + + #[test] + fn test_parameters_v15_round_trip() { + let mut params = Parameters::default(); + params.set_bytes(ParameterBytes::Path, b"/test".to_vec()); + params.set_varint(ParameterVarInt::MaxRequestId, 100); + + let mut buf = BytesMut::new(); + params.encode(&mut buf, Version::Draft15); + + let mut bytes = buf.freeze(); + let decoded = Parameters::decode(&mut bytes, Version::Draft15).unwrap(); + + assert_eq!(decoded.get_bytes(ParameterBytes::Path), Some(b"/test".as_ref())); + assert_eq!(decoded.get_varint(ParameterVarInt::MaxRequestId), Some(100)); + } + + #[test] + fn test_message_parameters_v16_delta_round_trip() { + let mut params = MessageParameters::default(); + params.set_subscriber_priority(200); + params.set_group_order(2); + params.set_forward(true); + + let mut buf = BytesMut::new(); + params.encode(&mut buf, Version::Draft16); + + let mut bytes = buf.freeze(); + let decoded = MessageParameters::decode(&mut bytes, Version::Draft16).unwrap(); + + assert_eq!(decoded.subscriber_priority(), Some(200)); + assert_eq!(decoded.group_order(), Some(2)); + assert_eq!(decoded.forward(), Some(true)); + } + + #[test] + fn test_message_parameters_v15_round_trip() { + let mut params = MessageParameters::default(); + params.set_subscriber_priority(128); + params.set_group_order(2); + + let mut buf = BytesMut::new(); + params.encode(&mut buf, Version::Draft15); + + let mut bytes = buf.freeze(); + let decoded = MessageParameters::decode(&mut bytes, Version::Draft15).unwrap(); + + assert_eq!(decoded.subscriber_priority(), Some(128)); + assert_eq!(decoded.group_order(), Some(2)); + } + + #[test] + fn test_message_parameters_empty_v16() { + let params = MessageParameters::default(); + + let mut buf = BytesMut::new(); + params.encode(&mut buf, Version::Draft16); + + let mut bytes = buf.freeze(); + let decoded = MessageParameters::decode(&mut bytes, Version::Draft16).unwrap(); + + assert_eq!(decoded.subscriber_priority(), None); + } +} diff --git a/rs/moq-lite/src/ietf/publish.rs b/rs/moq-lite/src/ietf/publish.rs index 264bf9246..a4a2bf8ab 100644 --- a/rs/moq-lite/src/ietf/publish.rs +++ b/rs/moq-lite/src/ietf/publish.rs @@ -184,7 +184,7 @@ impl Message for Publish<'_> { // parameters 0u8.encode(w, version); } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { let mut params = MessageParameters::default(); params.set_group_order(u8::from(self.group_order) as u64); if let Some(location) = &self.largest_location { @@ -224,7 +224,7 @@ impl Message for Publish<'_> { forward, }) } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { let params = MessageParameters::decode(r, version)?; let group_order = match params.group_order() { @@ -281,7 +281,7 @@ impl Message for PublishOk { // no parameters 0u8.encode(w, version); } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { let mut params = MessageParameters::default(); params.set_forward(self.forward); params.set_subscriber_priority(self.subscriber_priority); @@ -323,7 +323,7 @@ impl Message for PublishOk { filter_type, }) } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { let params = MessageParameters::decode(r, version)?; let forward = params.forward().unwrap_or(true); diff --git a/rs/moq-lite/src/ietf/publisher.rs b/rs/moq-lite/src/ietf/publisher.rs index 985779afc..551e446a5 100644 --- a/rs/moq-lite/src/ietf/publisher.rs +++ b/rs/moq-lite/src/ietf/publisher.rs @@ -134,7 +134,7 @@ impl Publisher { Ok(()) } - /// Send a subscribe error, using RequestError for v15. + /// Send a subscribe error, using RequestError for v15+. fn send_subscribe_error(&self, request_id: RequestId, error_code: u64, reason: &str) -> Result<(), Error> { match self.version { Version::Draft14 => self.control.send(ietf::SubscribeError { @@ -142,10 +142,11 @@ impl Publisher { error_code, reason_phrase: reason.into(), }), - Version::Draft15 => self.control.send(ietf::RequestError { + Version::Draft15 | Version::Draft16 => self.control.send(ietf::RequestError { request_id, error_code, reason_phrase: reason.into(), + retry_interval: 0, }), } } @@ -302,7 +303,9 @@ impl Publisher { }; match chunk? { - Some(mut chunk) => stream.write_all(&mut chunk).await?, + Some(mut chunk) => { + stream.write_all(&mut chunk).await?; + } None => break, } } @@ -407,7 +410,7 @@ impl Publisher { Ok(()) } - /// Send a fetch OK, using RequestOk for v15. + /// Send a fetch OK, using RequestOk for v15+. fn send_fetch_ok(&self, request_id: RequestId) -> Result<(), Error> { match self.version { Version::Draft14 => self.control.send(ietf::FetchOk { @@ -416,14 +419,14 @@ impl Publisher { end_of_track: false, end_location: Location { group: 0, object: 0 }, }), - Version::Draft15 => self.control.send(ietf::RequestOk { + Version::Draft15 | Version::Draft16 => self.control.send(ietf::RequestOk { request_id, parameters: MessageParameters::default(), }), } } - /// Send a fetch error, using RequestError for v15. + /// Send a fetch error, using RequestError for v15+. fn send_fetch_error(&self, request_id: RequestId, error_code: u64, reason: &str) -> Result<(), Error> { match self.version { Version::Draft14 => self.control.send(ietf::FetchError { @@ -431,10 +434,11 @@ impl Publisher { error_code, reason_phrase: reason.into(), }), - Version::Draft15 => self.control.send(ietf::RequestError { + Version::Draft15 | Version::Draft16 => self.control.send(ietf::RequestError { request_id, error_code, reason_phrase: reason.into(), + retry_interval: 0, }), } } diff --git a/rs/moq-lite/src/ietf/request.rs b/rs/moq-lite/src/ietf/request.rs index 1e6951f48..c8fe59fb9 100644 --- a/rs/moq-lite/src/ietf/request.rs +++ b/rs/moq-lite/src/ietf/request.rs @@ -103,6 +103,8 @@ pub struct RequestError<'a> { pub request_id: RequestId, pub error_code: u64, pub reason_phrase: Cow<'a, str>, + /// v16+: retry interval in milliseconds + pub retry_interval: u64, } impl Message for RequestError<'_> { @@ -111,17 +113,25 @@ impl Message for RequestError<'_> { fn encode_msg(&self, w: &mut W, version: Version) { self.request_id.encode(w, version); self.error_code.encode(w, version); + if version == Version::Draft16 { + self.retry_interval.encode(w, version); + } self.reason_phrase.encode(w, version); } fn decode_msg(r: &mut R, version: Version) -> Result { let request_id = RequestId::decode(r, version)?; let error_code = u64::decode(r, version)?; + let retry_interval = match version { + Version::Draft16 => u64::decode(r, version)?, + _ => 0, + }; let reason_phrase = Cow::::decode(r, version)?; Ok(Self { request_id, error_code, reason_phrase, + retry_interval, }) } } @@ -161,6 +171,7 @@ mod tests { request_id: RequestId(99), error_code: 500, reason_phrase: "Internal error".into(), + retry_interval: 0, }; let encoded = encode_message(&msg, Version::Draft15); @@ -169,5 +180,24 @@ mod tests { assert_eq!(decoded.request_id, RequestId(99)); assert_eq!(decoded.error_code, 500); assert_eq!(decoded.reason_phrase, "Internal error"); + assert_eq!(decoded.retry_interval, 0); + } + + #[test] + fn test_request_error_v16_retry_interval() { + let msg = RequestError { + request_id: RequestId(99), + error_code: 500, + reason_phrase: "Internal error".into(), + retry_interval: 5000, + }; + + let encoded = encode_message(&msg, Version::Draft16); + let decoded: RequestError = decode_message(&encoded, Version::Draft16).unwrap(); + + assert_eq!(decoded.request_id, RequestId(99)); + assert_eq!(decoded.error_code, 500); + assert_eq!(decoded.reason_phrase, "Internal error"); + assert_eq!(decoded.retry_interval, 5000); } } diff --git a/rs/moq-lite/src/ietf/session.rs b/rs/moq-lite/src/ietf/session.rs index 42fd94d16..a163aed04 100644 --- a/rs/moq-lite/src/ietf/session.rs +++ b/rs/moq-lite/src/ietf/session.rs @@ -102,14 +102,14 @@ async fn run_control_read( tracing::debug!(message = ?msg, "received control message"); subscriber.recv_subscribe_ok(msg)?; } - // 0x05: SubscribeError in v14, REQUEST_ERROR in v15 + // 0x05: SubscribeError in v14, REQUEST_ERROR in v15+ ietf::SubscribeError::ID => match version { Version::Draft14 => { let msg = ietf::SubscribeError::decode_msg(&mut data, version)?; tracing::debug!(message = ?msg, "received control message"); subscriber.recv_subscribe_error(msg)?; } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { let msg = ietf::RequestError::decode_msg(&mut data, version)?; tracing::debug!(message = ?msg, "received control message"); subscriber.recv_request_error(&msg)?; @@ -121,28 +121,28 @@ async fn run_control_read( tracing::debug!(message = ?msg, "received control message"); subscriber.recv_publish_namespace(msg)?; } - // 0x07: PublishNamespaceOk in v14, REQUEST_OK in v15 + // 0x07: PublishNamespaceOk in v14, REQUEST_OK in v15+ ietf::PublishNamespaceOk::ID => match version { Version::Draft14 => { let msg = ietf::PublishNamespaceOk::decode_msg(&mut data, version)?; tracing::debug!(message = ?msg, "received control message"); publisher.recv_publish_namespace_ok(msg)?; } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { let msg = ietf::RequestOk::decode_msg(&mut data, version)?; tracing::debug!(message = ?msg, "received control message"); subscriber.recv_request_ok(&msg)?; publisher.recv_request_ok(&msg)?; } }, - // 0x08: PublishNamespaceError in v14, removed in v15 (replaced by RequestError 0x05) + // 0x08: PublishNamespaceError in v14, NAMESPACE in v16, removed in v15 ietf::PublishNamespaceError::ID => match version { Version::Draft14 => { let msg = ietf::PublishNamespaceError::decode_msg(&mut data, version)?; tracing::debug!(message = ?msg, "received control message"); publisher.recv_publish_namespace_error(msg)?; } - Version::Draft15 => return Err(Error::UnexpectedMessage), + Version::Draft15 | Version::Draft16 => return Err(Error::UnexpectedMessage), }, ietf::PublishNamespaceDone::ID => { let msg = ietf::PublishNamespaceDone::decode_msg(&mut data, version)?; @@ -179,23 +179,23 @@ async fn run_control_read( tracing::debug!(message = ?msg, "received control message"); publisher.recv_subscribe_namespace(msg)?; } - // 0x12: SubscribeNamespaceOk in v14, removed in v15 (replaced by RequestOk 0x07) + // 0x12: SubscribeNamespaceOk in v14, removed in v15+ ietf::SubscribeNamespaceOk::ID => match version { Version::Draft14 => { let msg = ietf::SubscribeNamespaceOk::decode_msg(&mut data, version)?; tracing::debug!(message = ?msg, "received control message"); subscriber.recv_subscribe_namespace_ok(msg)?; } - Version::Draft15 => return Err(Error::UnexpectedMessage), + Version::Draft15 | Version::Draft16 => return Err(Error::UnexpectedMessage), }, - // 0x13: SubscribeNamespaceError in v14, removed in v15 (replaced by RequestError 0x05) + // 0x13: SubscribeNamespaceError in v14, removed in v15+ ietf::SubscribeNamespaceError::ID => match version { Version::Draft14 => { let msg = ietf::SubscribeNamespaceError::decode_msg(&mut data, version)?; tracing::debug!(message = ?msg, "received control message"); subscriber.recv_subscribe_namespace_error(msg)?; } - Version::Draft15 => return Err(Error::UnexpectedMessage), + Version::Draft15 | Version::Draft16 => return Err(Error::UnexpectedMessage), }, ietf::UnsubscribeNamespace::ID => { let msg = ietf::UnsubscribeNamespace::decode_msg(&mut data, version)?; @@ -227,27 +227,27 @@ async fn run_control_read( tracing::debug!(message = ?msg, "received control message"); subscriber.recv_fetch_ok(msg)?; } - // 0x19: FetchError in v14, removed in v15 (replaced by RequestError 0x05) + // 0x19: FetchError in v14, removed in v15+ ietf::FetchError::ID => match version { Version::Draft14 => { let msg = ietf::FetchError::decode_msg(&mut data, version)?; tracing::debug!(message = ?msg, "received control message"); subscriber.recv_fetch_error(msg)?; } - Version::Draft15 => return Err(Error::UnexpectedMessage), + Version::Draft15 | Version::Draft16 => return Err(Error::UnexpectedMessage), }, ietf::Publish::ID => { let msg = ietf::Publish::decode_msg(&mut data, version)?; tracing::debug!(message = ?msg, "received control message"); subscriber.recv_publish(msg)?; } - // 0x1E: PublishOk — v14: unsupported, v15: removed (replaced by RequestOk 0x07) + // 0x1E: PublishOk — v14: unsupported, v15+: removed (replaced by RequestOk 0x07) ietf::PublishOk::ID => match version { - Version::Draft14 | Version::Draft15 => return Err(Error::UnexpectedMessage), + Version::Draft14 | Version::Draft15 | Version::Draft16 => return Err(Error::UnexpectedMessage), }, - // 0x1F: PublishError — v14: unsupported, v15: removed (replaced by RequestError 0x05) + // 0x1F: PublishError — v14: unsupported, v15+: removed (replaced by RequestError 0x05) ietf::PublishError::ID => match version { - Version::Draft14 | Version::Draft15 => return Err(Error::UnexpectedMessage), + Version::Draft14 | Version::Draft15 | Version::Draft16 => return Err(Error::UnexpectedMessage), }, _ => return Err(Error::UnexpectedMessage), } diff --git a/rs/moq-lite/src/ietf/setup.rs b/rs/moq-lite/src/ietf/setup.rs index 4f0fa69fd..24ef5505a 100644 --- a/rs/moq-lite/src/ietf/setup.rs +++ b/rs/moq-lite/src/ietf/setup.rs @@ -24,11 +24,11 @@ impl Message for ClientSetup { let parameters = Parameters::decode(r, version)?; Ok(Self { versions, parameters }) } - IetfVersion::Draft15 => { - // Draft15: no versions list, just parameters + IetfVersion::Draft15 | IetfVersion::Draft16 => { + // Draft15+: no versions list, just parameters let parameters = Parameters::decode(r, version)?; Ok(Self { - versions: vec![Version(IetfVersion::Draft15 as u64)].into(), + versions: vec![Version(version as u64)].into(), parameters, }) } @@ -42,8 +42,8 @@ impl Message for ClientSetup { self.versions.encode(w, version); self.parameters.encode(w, version); } - IetfVersion::Draft15 => { - // Draft15: no versions list, just parameters + IetfVersion::Draft15 | IetfVersion::Draft16 => { + // Draft15+: no versions list, just parameters self.parameters.encode(w, version); } } @@ -69,8 +69,8 @@ impl Message for ServerSetup { self.version.encode(w, version); self.parameters.encode(w, version); } - IetfVersion::Draft15 => { - // Draft15: no version field, just parameters + IetfVersion::Draft15 | IetfVersion::Draft16 => { + // Draft15+: no version field, just parameters self.parameters.encode(w, version); } } @@ -79,15 +79,18 @@ impl Message for ServerSetup { fn decode_msg(r: &mut R, version: IetfVersion) -> Result { match version { IetfVersion::Draft14 => { - let version = Version::decode(r, version)?; + let selected = Version::decode(r, version)?; let parameters = Parameters::decode(r, version)?; - Ok(Self { version, parameters }) + Ok(Self { + version: selected, + parameters, + }) } - IetfVersion::Draft15 => { - // Draft15: no version field, just parameters + IetfVersion::Draft15 | IetfVersion::Draft16 => { + // Draft15+: no version field, just parameters let parameters = Parameters::decode(r, version)?; Ok(Self { - version: Version(IetfVersion::Draft15 as u64), + version: Version(version as u64), parameters, }) } diff --git a/rs/moq-lite/src/ietf/subscribe.rs b/rs/moq-lite/src/ietf/subscribe.rs index b404fd045..9d93d1297 100644 --- a/rs/moq-lite/src/ietf/subscribe.rs +++ b/rs/moq-lite/src/ietf/subscribe.rs @@ -86,7 +86,7 @@ impl Message for Subscribe<'_> { filter_type, }) } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { // v15: fields moved into parameters let params = MessageParameters::decode(r, version)?; @@ -132,7 +132,7 @@ impl Message for Subscribe<'_> { self.filter_type.encode(w, version); 0u8.encode(w, version); // no parameters } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { let mut params = MessageParameters::default(); params.set_subscriber_priority(self.subscriber_priority); params.set_group_order(u8::from(self.group_order) as u64); @@ -165,7 +165,7 @@ impl Message for SubscribeOk { false.encode(w, version); // no content 0u8.encode(w, version); // no parameters } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { // v15: just parameters after track_alias let mut params = MessageParameters::default(); params.set_group_order(u8::from(GroupOrder::Descending) as u64); @@ -194,7 +194,7 @@ impl Message for SubscribeOk { let _params = Parameters::decode(r, version)?; } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { let _params = MessageParameters::decode(r, version)?; } } @@ -279,7 +279,7 @@ impl Message for SubscribeUpdate { self.forward.encode(w, version); 0u8.encode(w, version); // no parameters } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { self.request_id.encode(w, version); self.subscription_request_id.encode(w, version); let mut params = MessageParameters::default(); @@ -311,7 +311,7 @@ impl Message for SubscribeUpdate { forward, }) } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { let request_id = RequestId::decode(r, version)?; let subscription_request_id = RequestId::decode(r, version)?; let params = MessageParameters::decode(r, version)?; diff --git a/rs/moq-lite/src/ietf/subscriber.rs b/rs/moq-lite/src/ietf/subscriber.rs index 4f25acc21..4e158bd19 100644 --- a/rs/moq-lite/src/ietf/subscriber.rs +++ b/rs/moq-lite/src/ietf/subscriber.rs @@ -75,7 +75,7 @@ impl Subscriber { fn send_ok(&self, request_id: RequestId) -> Result<(), Error> { match self.version { Version::Draft14 => self.control.send(ietf::PublishNamespaceOk { request_id }), - Version::Draft15 => self.control.send(ietf::RequestOk { + Version::Draft15 | Version::Draft16 => self.control.send(ietf::RequestOk { request_id, parameters: MessageParameters::default(), }), @@ -90,10 +90,11 @@ impl Subscriber { error_code, reason_phrase: reason.into(), }), - Version::Draft15 => self.control.send(ietf::RequestError { + Version::Draft15 | Version::Draft16 => self.control.send(ietf::RequestError { request_id, error_code, reason_phrase: reason.into(), + retry_interval: 0, }), } } @@ -471,11 +472,12 @@ impl Subscriber { reason_phrase: err.to_string().into(), })?; } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { self.control.send(ietf::RequestError { request_id: msg.request_id, error_code: 400, reason_phrase: err.to_string().into(), + retry_interval: 0, })?; } } @@ -490,7 +492,7 @@ impl Subscriber { filter_type: FilterType::LargestObject, })?; } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { self.control.send(ietf::RequestOk { request_id: msg.request_id, parameters: MessageParameters::default(), diff --git a/rs/moq-lite/src/ietf/track.rs b/rs/moq-lite/src/ietf/track.rs index f2b749a2d..8626b7f87 100644 --- a/rs/moq-lite/src/ietf/track.rs +++ b/rs/moq-lite/src/ietf/track.rs @@ -38,7 +38,7 @@ impl Message for TrackStatus<'_> { FilterType::LargestObject.encode(w, version); // filter type 0u8.encode(w, version); // no parameters } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { // v15: same format as Subscribe - fields in parameters let params = MessageParameters::default(); params.encode(w, version); @@ -59,7 +59,7 @@ impl Message for TrackStatus<'_> { let _filter_type = u64::decode(r, version)?; let _params = Parameters::decode(r, version)?; } - Version::Draft15 => { + Version::Draft15 | Version::Draft16 => { let _params = MessageParameters::decode(r, version)?; } } diff --git a/rs/moq-lite/src/ietf/version.rs b/rs/moq-lite/src/ietf/version.rs index 315232264..20cec4dd3 100644 --- a/rs/moq-lite/src/ietf/version.rs +++ b/rs/moq-lite/src/ietf/version.rs @@ -2,12 +2,14 @@ use crate::coding; pub const ALPN_14: &str = "moq-00"; pub const ALPN_15: &str = "moqt-15"; +pub const ALPN_16: &str = "moqt-16"; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] #[repr(u64)] pub enum Version { Draft14 = 0xff00000e, Draft15 = 0xff00000f, + Draft16 = 0xff000010, } impl TryFrom for Version { @@ -18,6 +20,8 @@ impl TryFrom for Version { Ok(Self::Draft14) } else if value == Self::Draft15.into() { Ok(Self::Draft15) + } else if value == Self::Draft16.into() { + Ok(Self::Draft16) } else { Err(()) } diff --git a/rs/moq-lite/src/server.rs b/rs/moq-lite/src/server.rs index ae28bb475..fde5ab204 100644 --- a/rs/moq-lite/src/server.rs +++ b/rs/moq-lite/src/server.rs @@ -44,6 +44,10 @@ impl Server { } let (encoding, supported) = match session.protocol() { + Some(p) if p == ietf::ALPN_16 => ( + Version::Ietf(ietf::Version::Draft16), + vec![ietf::Version::Draft16.into()], + ), Some(p) if p == ietf::ALPN_15 => ( Version::Ietf(ietf::Version::Draft15), vec![ietf::Version::Draft15.into()], @@ -71,13 +75,14 @@ impl Server { .ok_or_else(|| Error::Version(client.versions.clone(), supported.into()))?; // Only encode parameters if we're using the IETF draft because it has max_request_id - let parameters = if version.is_ietf() { - let mut parameters = ietf::Parameters::default(); - parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64); - parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec()); - parameters.encode_bytes(()) - } else { - lite::Parameters::default().encode_bytes(()) + let parameters = match version { + Version::Ietf(ietf_version) => { + let mut parameters = ietf::Parameters::default(); + parameters.set_varint(ietf::ParameterVarInt::MaxRequestId, u32::MAX as u64); + parameters.set_bytes(ietf::ParameterBytes::Implementation, b"moq-lite-rs".to_vec()); + parameters.encode_bytes(ietf_version) + } + Version::Lite(_) => lite::Parameters::default().encode_bytes(()), }; let server = setup::Server { diff --git a/rs/moq-lite/src/setup.rs b/rs/moq-lite/src/setup.rs index ff022af68..a0f4109d8 100644 --- a/rs/moq-lite/src/setup.rs +++ b/rs/moq-lite/src/setup.rs @@ -22,9 +22,8 @@ pub struct Client { impl Client { fn encode_inner(&self, w: &mut W, v: Version) { match v { - Version::Ietf(ietf::Version::Draft15) => { - // Draft15: no versions list, parameters only. - assert_eq!(self.versions, coding::Versions::from([ietf::Version::Draft15.into()])); + Version::Ietf(ietf::Version::Draft15 | ietf::Version::Draft16) => { + // Draft15+: no versions list, parameters only. } Version::Ietf(ietf::Version::Draft14) | Version::Lite(lite::Version::Draft02) @@ -43,7 +42,9 @@ impl Decode for Client { } let size = match v { - Version::Ietf(ietf::Version::Draft15 | ietf::Version::Draft14) => u16::decode(r, v)? as usize, + Version::Ietf(ietf::Version::Draft14 | ietf::Version::Draft15 | ietf::Version::Draft16) => { + u16::decode(r, v)? as usize + } Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => u64::decode(r, v)? as usize, }; @@ -54,9 +55,9 @@ impl Decode for Client { let mut msg = r.copy_to_bytes(size); let versions = match v { - Version::Ietf(ietf::Version::Draft15) => { - // Draft15: no versions list, parameters only. - coding::Versions::from([ietf::Version::Draft15.into()]) + Version::Ietf(ietf::Version::Draft15 | ietf::Version::Draft16) => { + // Draft15+: no versions list, parameters only. + coding::Versions::from([v.into()]) } Version::Ietf(ietf::Version::Draft14) | Version::Lite(lite::Version::Draft02) @@ -80,7 +81,7 @@ impl Encode for Client { let size = sizer.size; match v { - Version::Ietf(ietf::Version::Draft15 | ietf::Version::Draft14) => { + Version::Ietf(ietf::Version::Draft14 | ietf::Version::Draft15 | ietf::Version::Draft16) => { u16::try_from(size).expect("message too large for u16").encode(w, v) } Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => (size as u64).encode(w, v), @@ -102,9 +103,8 @@ pub struct Server { impl Server { fn encode_inner(&self, w: &mut W, v: Version) { match v { - Version::Ietf(ietf::Version::Draft15) => { - // Draft15: No version field, parameters only. - assert_eq!(self.version, ietf::Version::Draft15.into()); + Version::Ietf(ietf::Version::Draft15 | ietf::Version::Draft16) => { + // Draft15+: No version field, parameters only. } Version::Ietf(ietf::Version::Draft14) | Version::Lite(lite::Version::Draft02) @@ -123,7 +123,7 @@ impl Encode for Server { let size = sizer.size; match v { - Version::Ietf(ietf::Version::Draft15 | ietf::Version::Draft14) => { + Version::Ietf(ietf::Version::Draft14 | ietf::Version::Draft15 | ietf::Version::Draft16) => { u16::try_from(size).expect("message too large for u16").encode(w, v) } Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => (size as u64).encode(w, v), @@ -141,7 +141,9 @@ impl Decode for Server { } let size = match v { - Version::Ietf(ietf::Version::Draft15 | ietf::Version::Draft14) => u16::decode(r, v)? as usize, + Version::Ietf(ietf::Version::Draft14 | ietf::Version::Draft15 | ietf::Version::Draft16) => { + u16::decode(r, v)? as usize + } Version::Lite(lite::Version::Draft02 | lite::Version::Draft01) => u64::decode(r, v)? as usize, }; @@ -151,7 +153,7 @@ impl Decode for Server { let mut msg = r.copy_to_bytes(size); let version = match v { - Version::Ietf(ietf::Version::Draft15) => v.into(), + Version::Ietf(ietf::Version::Draft15 | ietf::Version::Draft16) => v.into(), Version::Ietf(ietf::Version::Draft14) | Version::Lite(lite::Version::Draft02) | Version::Lite(lite::Version::Draft01) => coding::Version::decode(&mut msg, v)?, diff --git a/rs/moq-lite/src/version.rs b/rs/moq-lite/src/version.rs index 7bf2d56a6..3a9ffc421 100644 --- a/rs/moq-lite/src/version.rs +++ b/rs/moq-lite/src/version.rs @@ -10,7 +10,7 @@ pub(crate) const NEGOTIATED: [Version; 3] = [ ]; /// The ALPN strings for supported versions. -const ALPNS: [&str; 3] = [lite::ALPN, ietf::ALPN_14, ietf::ALPN_15]; +const ALPNS: [&str; 4] = [lite::ALPN, ietf::ALPN_14, ietf::ALPN_15, ietf::ALPN_16]; // Return the ALPN strings for supported versions. // This is a function so we can avoid semver bumps.