Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions js/hang/src/watch/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ export interface BroadcastProps {
// The broadcast name.
path?: Moq.Path.Valid | Signal<Moq.Path.Valid | undefined>;

// You can disable reloading if you don't want to wait for an announcement.
// Whether to reload the broadcast when it goes offline.
// Defaults to false; pass true to wait for an announcement before subscribing.
reload?: boolean | Signal<boolean>;
}

Expand Down Expand Up @@ -40,7 +41,7 @@ export class Broadcast {
this.connection = Signal.from(props?.connection);
this.path = Signal.from(props?.path);
this.enabled = Signal.from(props?.enabled ?? false);
this.reload = Signal.from(props?.reload ?? true);
this.reload = Signal.from(props?.reload ?? false);

this.signals.effect(this.#runReload.bind(this));
this.signals.effect(this.#runBroadcast.bind(this));
Expand Down
14 changes: 9 additions & 5 deletions js/lite/src/connection/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ export async function connect(url: URL, props?: ConnectProps): Promise<Establish

// Choose setup encoding based on negotiated WebTransport protocol (if any).
let setupVersion: Ietf.Version;
if (protocol === Ietf.ALPN.DRAFT_15) {
if (protocol === Ietf.ALPN.DRAFT_16) {
setupVersion = Ietf.Version.DRAFT_16;
} else if (protocol === Ietf.ALPN.DRAFT_15) {
setupVersion = Ietf.Version.DRAFT_15;
} else if (protocol === undefined) {
setupVersion = Ietf.Version.DRAFT_14;
Expand All @@ -98,9 +100,11 @@ export async function connect(url: URL, props?: ConnectProps): Promise<Establish
const client = new Ietf.ClientSetup(
// NOTE: draft 15 onwards does not use CLIENT_SETUP to negotiate the version.
// We still echo it just to make sure we're not accidentally trying to negotiate the version.
setupVersion === Ietf.Version.DRAFT_15
? [Ietf.Version.DRAFT_15]
: [Lite.Version.DRAFT_02, Lite.Version.DRAFT_01, Ietf.Version.DRAFT_14],
setupVersion === Ietf.Version.DRAFT_16
? [Ietf.Version.DRAFT_16]
: setupVersion === Ietf.Version.DRAFT_15
? [Ietf.Version.DRAFT_15]
: [Lite.Version.DRAFT_02, Lite.Version.DRAFT_01, Ietf.Version.DRAFT_14],
params,
);
console.debug(url.toString(), "sending client setup", client);
Expand Down Expand Up @@ -139,7 +143,7 @@ async function connectWebTransport(
allowPooling: false,
congestionControl: "low-latency",
// @ts-expect-error - TODO: add protocols to WebTransportOptions
protocols: [Ietf.ALPN.DRAFT_15],
protocols: [Ietf.ALPN.DRAFT_16, Ietf.ALPN.DRAFT_15],
...options,
};

Expand Down
3 changes: 1 addition & 2 deletions js/lite/src/ietf/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class Connection implements Established {
*
* @internal
*/
constructor(url: URL, quic: WebTransport, control: Stream, maxRequestId: bigint, version?: IetfVersion) {
constructor(url: URL, quic: WebTransport, control: Stream, maxRequestId: bigint, version: IetfVersion) {
this.url = url;
this.#quic = quic;
this.#control = new Control.Stream(control, maxRequestId, version);
Expand Down Expand Up @@ -284,7 +284,6 @@ export class Connection implements Established {
try {
// we don't support other stream types yet
const header = await Group.decode(stream);
console.debug("received group header", header);
await this.#subscriber.handleGroup(header, stream);
} catch (err) {
console.error("error processing object stream", err);
Expand Down
27 changes: 14 additions & 13 deletions js/lite/src/ietf/control.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Mutex } from "async-mutex";
import type { Reader, Stream as StreamInner, Writer } from "../stream.ts";
import type { Stream as StreamInner, Writer } from "../stream.ts";
import { Fetch, FetchCancel, FetchError, FetchOk } from "./fetch.ts";
import { GoAway } from "./goaway.ts";
import { Publish, PublishDone, PublishError, PublishOk } from "./publish.ts";
Expand Down Expand Up @@ -80,9 +80,13 @@ const MessagesV15 = {
[RequestsBlocked.id]: RequestsBlocked,
} as const;

// v16 uses the same message map as v15
const MessagesV16 = MessagesV15;

type V14MessageType = (typeof MessagesV14)[keyof typeof MessagesV14];
type V15MessageType = (typeof MessagesV15)[keyof typeof MessagesV15];
type MessageType = V14MessageType | V15MessageType;
type V16MessageType = (typeof MessagesV16)[keyof typeof MessagesV16];
type MessageType = V14MessageType | V15MessageType | V16MessageType;

// Type for control message instances (not constructors)
export type Message = InstanceType<MessageType>;
Expand Down Expand Up @@ -123,8 +127,7 @@ export class Stream {
await this.stream.writer.u53((message.constructor as MessageType).id);

// Write message payload with u16 size prefix
// Extra version arg is silently ignored by messages that don't need it
await (message.encode as (w: Writer, v?: IetfVersion) => Promise<void>)(this.stream.writer, this.version);
await (message.encode as (w: Writer, v: IetfVersion) => Promise<void>)(this.stream.writer, this.version);
});
}

Expand All @@ -136,21 +139,19 @@ export class Stream {
return await this.#readLock.runExclusive(async () => {
const messageType = await this.stream.reader.u53();

const messages = this.version === Version.DRAFT_15 ? MessagesV15 : MessagesV14;
const messages =
this.version === Version.DRAFT_16
? MessagesV16
: this.version === Version.DRAFT_15
? MessagesV15
: MessagesV14;
if (!(messageType in messages)) {
throw new Error(`Unknown control message type: ${messageType}`);
}

try {
const msgClass = messages[messageType as keyof typeof messages];

// Extra version arg is silently ignored by messages that don't need it
const msg = await (msgClass as { decode: (r: Reader, v?: IetfVersion) => Promise<Message> }).decode(
this.stream.reader,
this.version,
);

console.debug("message read", msg);
const msg = await msgClass.decode(this.stream.reader, this.version);
return msg;
} catch (err) {
console.error("failed to decode message", messageType, err);
Expand Down
17 changes: 9 additions & 8 deletions js/lite/src/ietf/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type * as Path from "../path.ts";
import type { Reader, Writer } from "../stream.ts";
import * as Message from "./message.ts";
import type { IetfVersion } from "./version.ts";

export class Fetch {
static id = 0x16;
Expand Down Expand Up @@ -41,11 +42,11 @@ export class Fetch {
throw new Error("FETCH messages are not supported");
}

async encode(w: Writer): Promise<void> {
async encode(w: Writer, _version: IetfVersion): Promise<void> {
return Message.encode(w, this.#encode.bind(this));
}

static async decode(r: Reader): Promise<Fetch> {
static async decode(r: Reader, _version: IetfVersion): Promise<Fetch> {
return Message.decode(r, Fetch.#decode);
}

Expand All @@ -67,11 +68,11 @@ export class FetchOk {
throw new Error("FETCH_OK messages are not supported");
}

async encode(w: Writer): Promise<void> {
async encode(w: Writer, _version: IetfVersion): Promise<void> {
return Message.encode(w, this.#encode.bind(this));
}

static async decode(r: Reader): Promise<FetchOk> {
static async decode(r: Reader, _version: IetfVersion): Promise<FetchOk> {
return Message.decode(r, FetchOk.#decode);
}

Expand All @@ -97,11 +98,11 @@ export class FetchError {
throw new Error("FETCH_ERROR messages are not supported");
}

async encode(w: Writer): Promise<void> {
async encode(w: Writer, _version: IetfVersion): Promise<void> {
return Message.encode(w, this.#encode.bind(this));
}

static async decode(r: Reader): Promise<FetchError> {
static async decode(r: Reader, _version: IetfVersion): Promise<FetchError> {
return Message.decode(r, FetchError.#decode);
}

Expand All @@ -123,11 +124,11 @@ export class FetchCancel {
throw new Error("FETCH_CANCEL messages are not supported");
}

async encode(w: Writer): Promise<void> {
async encode(w: Writer, _version: IetfVersion): Promise<void> {
return Message.encode(w, this.#encode.bind(this));
}

static async decode(r: Reader): Promise<FetchCancel> {
static async decode(r: Reader, _version: IetfVersion): Promise<FetchCancel> {
return Message.decode(r, FetchCancel.#decode);
}

Expand Down
5 changes: 3 additions & 2 deletions js/lite/src/ietf/goaway.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { Reader, Writer } from "../stream.ts";
import * as Message from "./message.ts";
import type { IetfVersion } from "./version.ts";

export class GoAway {
static id = 0x10;
Expand All @@ -14,11 +15,11 @@ export class GoAway {
await w.string(this.newSessionUri);
}

async encode(w: Writer): Promise<void> {
async encode(w: Writer, _version: IetfVersion): Promise<void> {
return Message.encode(w, this.#encode.bind(this));
}

static async decode(r: Reader): Promise<GoAway> {
static async decode(r: Reader, _version: IetfVersion): Promise<GoAway> {
return Message.decode(r, GoAway.#decode);
}

Expand Down
Loading
Loading