Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions js/lite/src/connection/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,17 @@ export async function connect(url: URL, props?: ConnectProps): Promise<Establish
params.setVarint(Ietf.Parameter.MaxRequestId, 42069n); // Allow a ton of request IDs.
params.setBytes(Ietf.Parameter.Implementation, encoder.encode("moq-lite-js")); // Put the implementation name in the parameters.

const client = new Ietf.ClientSetup(
const client = new Ietf.ClientSetup({
// NOTE: draft 15 onwards does not use CLIENT_SETUP to negotiate the version.
// We still echo it just to make sure we're not accidentally trying to negotiate the version.
setupVersion === Ietf.Version.DRAFT_16
? [Ietf.Version.DRAFT_16]
: setupVersion === Ietf.Version.DRAFT_15
? [Ietf.Version.DRAFT_15]
: [Lite.Version.DRAFT_02, Lite.Version.DRAFT_01, Ietf.Version.DRAFT_14],
params,
);
versions:
setupVersion === Ietf.Version.DRAFT_16
? [Ietf.Version.DRAFT_16]
: setupVersion === Ietf.Version.DRAFT_15
? [Ietf.Version.DRAFT_15]
: [Lite.Version.DRAFT_02, Lite.Version.DRAFT_01, Ietf.Version.DRAFT_14],
parameters: params,
});
console.debug(url.toString(), "sending client setup", client);
await client.encode(stream.writer, setupVersion);

Expand All @@ -126,7 +127,13 @@ export async function connect(url: URL, props?: ConnectProps): Promise<Establish
} else if (Object.values(Ietf.Version).includes(server.version as Ietf.Version)) {
const maxRequestId = server.parameters.getVarint(Ietf.Parameter.MaxRequestId) ?? 0n;
console.debug(url.toString(), "moq-ietf session established, version:", server.version.toString(16));
return new Ietf.Connection(url, session, stream, maxRequestId, server.version as Ietf.IetfVersion);
return new Ietf.Connection({
url,
quic: session,
control: stream,
maxRequestId,
version: server.version as Ietf.IetfVersion,
});
} else {
throw new Error(`unsupported server version: ${server.version.toString()}`);
}
Expand Down
63 changes: 54 additions & 9 deletions js/lite/src/ietf/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { Announced } from "../announced.ts";
import type { Broadcast } from "../broadcast.ts";
import type { Established } from "../connection/established.ts";
import * as Path from "../path.ts";
import { type Reader, Readers, type Stream } from "../stream.ts";
import { type Reader, Readers, Stream } from "../stream.ts";
import { unreachable } from "../util/index.ts";
import * as Control from "./control.ts";
import { Fetch, FetchCancel, FetchError, FetchOk } from "./fetch.ts";
Expand All @@ -28,7 +28,7 @@ import {
} from "./subscribe_namespace.ts";
import { Subscriber } from "./subscriber.ts";
import { TrackStatus, TrackStatusRequest } from "./track.ts";
import type { IetfVersion } from "./version.ts";
import { type IetfVersion, Version } from "./version.ts";

/**
* Represents a connection to a MoQ server using moq-transport protocol.
Expand Down Expand Up @@ -64,17 +64,29 @@ export class Connection implements Established {
*
* @internal
*/
constructor(url: URL, quic: WebTransport, control: Stream, maxRequestId: bigint, version: IetfVersion) {
constructor({
url,
quic,
control,
maxRequestId,
version,
}: {
url: URL;
quic: WebTransport;
control: Stream;
maxRequestId: bigint;
version: IetfVersion;
}) {
this.url = url;
this.#quic = quic;
this.#control = new Control.Stream(control, maxRequestId, version);
this.#control = new Control.Stream({ stream: control, maxRequestId, version });

this.#quic.closed.finally(() => {
this.#control.close();
});

this.#publisher = new Publisher(this.#quic, this.#control);
this.#subscriber = new Subscriber(this.#control);
this.#publisher = new Publisher({ quic: this.#quic, control: this.#control });
this.#subscriber = new Subscriber({ control: this.#control, quic: this.#quic });

void this.#run();
}
Expand All @@ -95,11 +107,15 @@ export class Connection implements Established {
}

async #run(): Promise<void> {
const controlMessages = this.#runControlStream();
const objectStreams = this.#runObjectStreams();
const tasks: Promise<void>[] = [this.#runControlStream(), this.#runObjectStreams()];

// v16: accept bidi streams for SUBSCRIBE_NAMESPACE
if (this.#control.version === Version.DRAFT_16) {
tasks.push(this.#runBidiStreams());
}

try {
await Promise.all([controlMessages, objectStreams]);
await Promise.all(tasks);
} catch (err) {
if (!this.#closed) {
console.error("fatal error running connection", err);
Expand Down Expand Up @@ -290,6 +306,35 @@ export class Connection implements Established {
}
}

/**
* Accepts bidirectional streams for v16 SUBSCRIBE_NAMESPACE.
*/
async #runBidiStreams() {
for (;;) {
const stream = await Stream.accept(this.#quic);
if (!stream) break;

void this.#runBidiStream(stream).catch((err: unknown) => {
console.error("error processing bidi stream", err);
stream.abort(new Error("bidi stream error"));
});
}
}

/**
* Handles a single incoming bidi stream.
*/
async #runBidiStream(stream: Stream) {
const messageType = await stream.reader.u53();

if (messageType === SubscribeNamespace.id) {
await this.#publisher.handleSubscribeNamespaceStream(stream);
} else {
console.warn(`unexpected bidi stream type: ${messageType}`);
stream.abort(new Error("unexpected stream type"));
}
}

/**
* Returns a promise that resolves when the connection is closed.
* @returns A promise that resolves when closed
Expand Down
36 changes: 33 additions & 3 deletions js/lite/src/ietf/control.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,30 @@ const MessagesV15 = {
[RequestsBlocked.id]: RequestsBlocked,
} as const;

// v16 uses the same message map as v15
const MessagesV16 = MessagesV15;
// v16 message map — SubscribeNamespace (0x11) and UnsubscribeNamespace (0x14) move to bidi streams
const MessagesV16 = {
[Setup.ClientSetup.id]: Setup.ClientSetup,
[Setup.ServerSetup.id]: Setup.ServerSetup,
[Subscribe.id]: Subscribe,
[SubscribeOk.id]: SubscribeOk,
[RequestError.id]: RequestError, // 0x05 → RequestError
[PublishNamespace.id]: PublishNamespace,
[RequestOk.id]: RequestOk, // 0x07 → RequestOk
[PublishNamespaceDone.id]: PublishNamespaceDone,
[Unsubscribe.id]: Unsubscribe,
[PublishDone.id]: PublishDone,
[PublishNamespaceCancel.id]: PublishNamespaceCancel,
[TrackStatusRequest.id]: TrackStatusRequest,
[GoAway.id]: GoAway,
[Fetch.id]: Fetch,
[FetchCancel.id]: FetchCancel,
[FetchOk.id]: FetchOk,
// SubscribeNamespace (0x11) removed — now on bidi stream
// UnsubscribeNamespace (0x14) removed — now use stream close
[Publish.id]: Publish,
[MaxRequestId.id]: MaxRequestId,
[RequestsBlocked.id]: RequestsBlocked,
} as const;

type V14MessageType = (typeof MessagesV14)[keyof typeof MessagesV14];
type V15MessageType = (typeof MessagesV15)[keyof typeof MessagesV15];
Expand All @@ -106,7 +128,15 @@ export class Stream {
#writeLock = new Mutex();
#readLock = new Mutex();

constructor(stream: StreamInner, maxRequestId: bigint, version: IetfVersion = Version.DRAFT_14) {
constructor({
stream,
maxRequestId,
version = Version.DRAFT_14,
}: {
stream: StreamInner;
maxRequestId: bigint;
version?: IetfVersion;
}) {
this.stream = stream;
this.version = version;
this.#maxRequestId = maxRequestId;
Expand Down
42 changes: 28 additions & 14 deletions js/lite/src/ietf/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,27 @@ export class Fetch {
endGroup: bigint;
endObject: bigint;

constructor(
requestId: bigint,
trackNamespace: Path.Valid,
trackName: string,
subscriberPriority: number,
groupOrder: number,
startGroup: bigint,
startObject: bigint,
endGroup: bigint,
endObject: bigint,
) {
constructor({
requestId,
trackNamespace,
trackName,
subscriberPriority,
groupOrder,
startGroup,
startObject,
endGroup,
endObject,
}: {
requestId: bigint;
trackNamespace: Path.Valid;
trackName: string;
subscriberPriority: number;
groupOrder: number;
startGroup: bigint;
startObject: bigint;
endGroup: bigint;
endObject: bigint;
}) {
this.requestId = requestId;
this.trackNamespace = trackNamespace;
this.trackName = trackName;
Expand Down Expand Up @@ -60,7 +70,7 @@ export class FetchOk {

requestId: bigint;

constructor(requestId: bigint) {
constructor({ requestId }: { requestId: bigint }) {
this.requestId = requestId;
}

Expand Down Expand Up @@ -88,7 +98,11 @@ export class FetchError {
errorCode: number;
reasonPhrase: string;

constructor(requestId: bigint, errorCode: number, reasonPhrase: string) {
constructor({
requestId,
errorCode,
reasonPhrase,
}: { requestId: bigint; errorCode: number; reasonPhrase: string }) {
this.requestId = requestId;
this.errorCode = errorCode;
this.reasonPhrase = reasonPhrase;
Expand Down Expand Up @@ -116,7 +130,7 @@ export class FetchCancel {

requestId: bigint;

constructor(requestId: bigint) {
constructor({ requestId }: { requestId: bigint }) {
this.requestId = requestId;
}

Expand Down
4 changes: 2 additions & 2 deletions js/lite/src/ietf/goaway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export class GoAway {

newSessionUri: string;

constructor(newSessionUri: string) {
constructor({ newSessionUri }: { newSessionUri: string }) {
this.newSessionUri = newSessionUri;
}

Expand All @@ -25,6 +25,6 @@ export class GoAway {

static async #decode(r: Reader): Promise<GoAway> {
const newSessionUri = await r.string();
return new GoAway(newSessionUri);
return new GoAway({ newSessionUri });
}
}
Loading
Loading