Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions js/lite/src/connection/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ export async function connect(url: URL, props?: ConnectProps): Promise<Establish
const stream = await Stream.open(session);

// @ts-expect-error - TODO: add protocol to WebTransport
const protocol = session instanceof WebTransport ? session.protocol : undefined;
const protocol: string | undefined = session instanceof WebTransport ? session.protocol : undefined;

// Choose setup encoding based on negotiated WebTransport protocol (if any).
let setupVersion: Ietf.Version;
if (protocol === Ietf.ALPN.DRAFT_16) {
setupVersion = Ietf.Version.DRAFT_16;
} else if (protocol === Ietf.ALPN.DRAFT_15) {
setupVersion = Ietf.Version.DRAFT_15;
} else if (protocol === undefined) {
} else if (protocol === "" || protocol === undefined) {
setupVersion = Ietf.Version.DRAFT_14;
} else {
throw new Error(`unsupported WebTransport protocol: ${protocol}`);
Expand Down
11 changes: 4 additions & 7 deletions js/lite/src/ietf/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ export class Connection implements Established {
stream.stop(new Error("cancel"));
})
.catch((err: unknown) => {
console.error("error processing object stream", err);
stream.stop(err);
});
}
Expand All @@ -297,13 +298,9 @@ export class Connection implements Established {
* Handles a single object stream.
*/
async #runObjectStream(stream: Reader) {
try {
// we don't support other stream types yet
const header = await Group.decode(stream);
await this.#subscriber.handleGroup(header, stream);
} catch (err) {
console.error("error processing object stream", err);
}
// we don't support other stream types yet
const header = await Group.decode(stream);
await this.#subscriber.handleGroup(header, stream);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion js/lite/src/ietf/object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ export class Frame {
static async decode(r: Reader, flags: GroupFlags): Promise<Frame> {
const delta = await r.u53();
if (delta !== 0) {
console.warn(`object ID delta is not supported, ignoring: ${delta}`);
throw new Error(`object ID delta is not supported: ${delta}`);
}

if (flags.hasExtensions) {
Expand Down
2 changes: 1 addition & 1 deletion js/lite/src/ietf/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ export class Subscriber {
const producer = new Group(group.groupId);

if (group.subGroupId !== 0) {
console.warn("subgroup ID is not supported, ignoring");
throw new Error("subgroups are not supported");
}

try {
Expand Down
6 changes: 4 additions & 2 deletions rs/moq-lite/src/ietf/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
tracing::trace!(?group, "received group header");

if group.sub_group_id != 0 {
tracing::warn!(sub_group_id = %group.sub_group_id, "subgroup ID is not supported, stripping");
tracing::warn!(sub_group_id = %group.sub_group_id, "subgroup ID is not supported, dropping stream");
return Err(Error::Unsupported);
}

let producer = {
Expand Down Expand Up @@ -409,7 +410,8 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
) -> Result<(), Error> {
while let Some(id_delta) = stream.decode_maybe::<u64>().await? {
if id_delta != 0 {
tracing::warn!(id_delta = %id_delta, "object ID gaps not supported, ignoring");
tracing::warn!(id_delta = %id_delta, "object ID delta is not supported, dropping stream");
return Err(Error::Unsupported);
}

if group.flags.has_extensions {
Expand Down
Loading