Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion js/moq/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,12 @@ export class Connection {

// Fetch the fingerprint from the server.
const fingerprint = await fetch(fingerprintUrl);
const fingerprintText = await fingerprint.text();

options.serverCertificateHashes = [
{
algorithm: "sha-256",
value: hexToBytes(await fingerprint.text()),
value: hexToBytes(fingerprintText),
},
];

Expand Down
48 changes: 38 additions & 10 deletions js/moq/src/wire/announce.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Valid } from "../path";
import * as Message from "./message";
import type { Reader, Writer } from "./stream";

export class Announce {
Expand All @@ -10,20 +11,29 @@ export class Announce {
this.active = active;
}

async encode(w: Writer) {
async encodeBody(w: Writer) {
await w.u8(this.active ? 1 : 0);
await w.path(this.suffix);
}

static async decode(r: Reader): Promise<Announce> {
static async decodeBody(r: Reader): Promise<Announce> {
const active = (await r.u8()) === 1;
const suffix = await r.path();
return new Announce(suffix, active);
}

// Wrapper methods that automatically handle size prefixing
async encode(w: Writer): Promise<void> {
return Message.encode(this, w);
}

static async decode(r: Reader): Promise<Announce> {
return Message.decode(Announce, r);
}

static async decode_maybe(r: Reader): Promise<Announce | undefined> {
if (await r.done()) return;
return await Announce.decode(r);
return Announce.decode(r);
}
}

Expand All @@ -35,14 +45,23 @@ export class AnnounceInterest {
this.prefix = prefix;
}

async encode(w: Writer) {
async encodeBody(w: Writer) {
await w.path(this.prefix);
}

static async decode(r: Reader): Promise<AnnounceInterest> {
static async decodeBody(r: Reader): Promise<AnnounceInterest> {
const prefix = await r.path();
return new AnnounceInterest(prefix);
}

// Wrapper methods that automatically handle size prefixing
async encode(w: Writer): Promise<void> {
return Message.encode(this, w);
}

static async decode(r: Reader): Promise<AnnounceInterest> {
return Message.decode(AnnounceInterest, r);
}
}

export class AnnounceInit {
Expand All @@ -52,19 +71,28 @@ export class AnnounceInit {
this.suffixes = paths;
}

async encode(w: Writer) {
async encodeBody(w: Writer) {
await w.u53(this.suffixes.length);
for (const path of this.suffixes) {
await w.path(path);
}
}

static async decode(r: Reader): Promise<AnnounceInit> {
static async decodeBody(r: Reader): Promise<AnnounceInit> {
const count = await r.u53();
const paths: Valid[] = [];
const suffixes: Valid[] = [];
for (let i = 0; i < count; i++) {
paths.push(await r.path());
suffixes.push(await r.path());
}
return new AnnounceInit(paths);
return new AnnounceInit(suffixes);
}

// Wrapper methods that automatically handle size prefixing
async encode(w: Writer): Promise<void> {
return Message.encode(this, w);
}

static async decode(r: Reader): Promise<AnnounceInit> {
return Message.decode(AnnounceInit, r);
}
}
40 changes: 34 additions & 6 deletions js/moq/src/wire/group.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as Message from "./message";
import type { Reader, Writer } from "./stream";

export class Group {
Expand All @@ -11,14 +12,23 @@ export class Group {
this.sequence = sequence;
}

async encode(w: Writer) {
async encodeBody(w: Writer) {
await w.u62(this.subscribe);
await w.u53(this.sequence);
}

static async decode(r: Reader): Promise<Group> {
static async decodeBody(r: Reader): Promise<Group> {
return new Group(await r.u62(), await r.u53());
}

// Wrapper methods that automatically handle size prefixing
async encode(w: Writer): Promise<void> {
return Message.encode(this, w);
}

static async decode(r: Reader): Promise<Group> {
return Message.decode(Group, r);
}
}

export class GroupDrop {
Expand All @@ -32,15 +42,24 @@ export class GroupDrop {
this.error = error;
}

async encode(w: Writer) {
async encodeBody(w: Writer) {
await w.u53(this.sequence);
await w.u53(this.count);
await w.u53(this.error);
}

static async decode(r: Reader): Promise<GroupDrop> {
static async decodeBody(r: Reader): Promise<GroupDrop> {
return new GroupDrop(await r.u53(), await r.u53(), await r.u53());
}

// Wrapper methods that automatically handle size prefixing
async encode(w: Writer): Promise<void> {
return Message.encode(this, w);
}

static async decode(r: Reader): Promise<GroupDrop> {
return Message.decode(GroupDrop, r);
}
}

export class Frame {
Expand All @@ -50,14 +69,23 @@ export class Frame {
this.payload = payload;
}

async encode(w: Writer) {
async encodeBody(w: Writer) {
await w.u53(this.payload.byteLength);
await w.write(this.payload);
}

static async decode(r: Reader): Promise<Frame> {
static async decodeBody(r: Reader): Promise<Frame> {
const size = await r.u53();
const payload = await r.read(size);
return new Frame(payload);
}

// Wrapper methods that automatically handle size prefixing
async encode(w: Writer): Promise<void> {
return Message.encode(this, w);
}

static async decode(r: Reader): Promise<Frame> {
return Message.decode(Frame, r);
}
}
1 change: 1 addition & 0 deletions js/moq/src/wire/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from "./announce";
export * from "./group";
export * from "./message";
export * from "./session";
export * from "./stream";
export * from "./subscribe";
89 changes: 89 additions & 0 deletions js/moq/src/wire/message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { Reader, Writer } from "./stream";

/**
* Interface for messages that are automatically size-prefixed during encoding/decoding.
*/
export interface Encode {
encodeBody(w: Writer): Promise<void>;
}

export interface Decode<T extends Encode> {
decodeBody(r: Reader): Promise<T>;
}

/**
* Encode a message with a size prefix.
*/
export async function encode(message: Encode, writer: Writer): Promise<void> {
// Use a growing buffer to collect all the data
// Most messages are small, so start with a small buffer.
// We use data.byteLength as the length and data.buffer.byteLength as the capacity.
let data = new Uint8Array(new ArrayBuffer(64), 0, 0);

const temp = new Writer(
new WritableStream({
write(chunk: Uint8Array) {
const needed = data.byteLength + chunk.byteLength;
if (needed > data.buffer.byteLength) {
// Resize the buffer to the needed size.
const capacity = Math.max(needed, data.buffer.byteLength * 2);
const newBuffer = new ArrayBuffer(capacity);
const newData = new Uint8Array(newBuffer, 0, needed);

// Copy the old data into the new buffer.
newData.set(data);

// Copy the new chunk into the new buffer.
newData.set(chunk, data.byteLength);

data = newData;
} else {
// Copy chunk data into buffer
data = new Uint8Array(data.buffer, 0, needed);
data.set(chunk, needed - chunk.byteLength);
}
},
}),
);

await message.encodeBody(temp);
temp.close();
await temp.closed();

// Write size prefix
await writer.u53(data.byteLength);

// Write the contiguous buffer
await writer.write(data);
}

/**
* Decode a size-prefixed message, ensuring exact size consumption.
*/
export async function decode<T extends Encode>(MessageClass: Decode<T>, reader: Reader): Promise<T> {
const size = await reader.u53();
const messageData = await reader.read(size);

// Create a limited reader that contains exactly `size` bytes
const limitedStream = new ReadableStream({
start(controller) {
controller.enqueue(messageData);
controller.close();
},
});

const limitedReader = new Reader(limitedStream);
const result = await MessageClass.decodeBody(limitedReader);

// Check that we consumed exactly the right number of bytes
if (!(await limitedReader.done())) {
throw new Error("Message decoding consumed too few bytes");
}

return result;
}

export async function decodeMaybe<T extends Encode>(MessageClass: Decode<T>, reader: Reader): Promise<T | undefined> {
if (await reader.done()) return;
return await decode(MessageClass, reader);
}
40 changes: 34 additions & 6 deletions js/moq/src/wire/session.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as Message from "./message";
import type { Reader, Writer } from "./stream";

export const Version = {
Expand Down Expand Up @@ -77,7 +78,7 @@ export class SessionClient {
this.extensions = extensions;
}

async encode(w: Writer) {
async encodeBody(w: Writer) {
await w.u53(this.versions.length);
for (const v of this.versions) {
await w.u53(v);
Expand All @@ -86,7 +87,7 @@ export class SessionClient {
await this.extensions.encode(w);
}

static async decode(r: Reader): Promise<SessionClient> {
static async decodeBody(r: Reader): Promise<SessionClient> {
const versions: number[] = [];
const count = await r.u53();
for (let i = 0; i < count; i++) {
Expand All @@ -96,6 +97,15 @@ export class SessionClient {
const extensions = await Extensions.decode(r);
return new SessionClient(versions, extensions);
}

// Wrapper methods that automatically handle size prefixing
async encode(w: Writer): Promise<void> {
return Message.encode(this, w);
}

static async decode(r: Reader): Promise<SessionClient> {
return Message.decode(SessionClient, r);
}
}

export class SessionServer {
Expand All @@ -107,16 +117,25 @@ export class SessionServer {
this.extensions = extensions;
}

async encode(w: Writer) {
async encodeBody(w: Writer) {
await w.u53(this.version);
await this.extensions.encode(w);
}

static async decode(r: Reader): Promise<SessionServer> {
static async decodeBody(r: Reader): Promise<SessionServer> {
const version = await r.u53();
const extensions = await Extensions.decode(r);
return new SessionServer(version, extensions);
}

// Wrapper methods that automatically handle size prefixing
async encode(w: Writer): Promise<void> {
return Message.encode(this, w);
}

static async decode(r: Reader): Promise<SessionServer> {
return Message.decode(SessionServer, r);
}
}

export class SessionInfo {
Expand All @@ -126,15 +145,24 @@ export class SessionInfo {
this.bitrate = bitrate;
}

async encode(w: Writer) {
async encodeBody(w: Writer) {
await w.u53(this.bitrate);
}

static async decode(r: Reader): Promise<SessionInfo> {
static async decodeBody(r: Reader): Promise<SessionInfo> {
const bitrate = await r.u53();
return new SessionInfo(bitrate);
}

// Wrapper methods that automatically handle size prefixing
async encode(w: Writer): Promise<void> {
return Message.encode(this, w);
}

static async decode(r: Reader): Promise<SessionInfo> {
return Message.decode(SessionInfo, r);
}

static async decode_maybe(r: Reader): Promise<SessionInfo | undefined> {
if (await r.done()) return;
return await SessionInfo.decode(r);
Expand Down
Loading