diff --git a/js/lite/src/connection/connect.ts b/js/lite/src/connection/connect.ts index f03f4f14f..15e6ea41e 100644 --- a/js/lite/src/connection/connect.ts +++ b/js/lite/src/connection/connect.ts @@ -74,7 +74,7 @@ export async function connect(url: URL, props?: ConnectProps): Promise { + console.error("error processing object stream", err); stream.stop(err); }); } @@ -297,13 +298,9 @@ export class Connection implements Established { * Handles a single object stream. */ async #runObjectStream(stream: Reader) { - try { - // we don't support other stream types yet - const header = await Group.decode(stream); - await this.#subscriber.handleGroup(header, stream); - } catch (err) { - console.error("error processing object stream", err); - } + // we don't support other stream types yet + const header = await Group.decode(stream); + await this.#subscriber.handleGroup(header, stream); } /** diff --git a/js/lite/src/ietf/object.ts b/js/lite/src/ietf/object.ts index 21619b38e..abe7284db 100644 --- a/js/lite/src/ietf/object.ts +++ b/js/lite/src/ietf/object.ts @@ -137,7 +137,7 @@ export class Frame { static async decode(r: Reader, flags: GroupFlags): Promise { const delta = await r.u53(); if (delta !== 0) { - console.warn(`object ID delta is not supported, ignoring: ${delta}`); + throw new Error(`object ID delta is not supported: ${delta}`); } if (flags.hasExtensions) { diff --git a/js/lite/src/ietf/subscriber.ts b/js/lite/src/ietf/subscriber.ts index 73577d89e..db64345db 100644 --- a/js/lite/src/ietf/subscriber.ts +++ b/js/lite/src/ietf/subscriber.ts @@ -287,7 +287,7 @@ export class Subscriber { const producer = new Group(group.groupId); if (group.subGroupId !== 0) { - console.warn("subgroup ID is not supported, ignoring"); + throw new Error("subgroups are not supported"); } try { diff --git a/rs/moq-lite/src/ietf/subscriber.rs b/rs/moq-lite/src/ietf/subscriber.rs index c9ea159cd..f6ab97484 100644 --- a/rs/moq-lite/src/ietf/subscriber.rs +++ b/rs/moq-lite/src/ietf/subscriber.rs @@ -358,7 +358,8 @@ impl Subscriber { tracing::trace!(?group, "received group header"); if group.sub_group_id != 0 { - tracing::warn!(sub_group_id = %group.sub_group_id, "subgroup ID is not supported, stripping"); + tracing::warn!(sub_group_id = %group.sub_group_id, "subgroup ID is not supported, dropping stream"); + return Err(Error::Unsupported); } let producer = { @@ -409,7 +410,8 @@ impl Subscriber { ) -> Result<(), Error> { while let Some(id_delta) = stream.decode_maybe::().await? { if id_delta != 0 { - tracing::warn!(id_delta = %id_delta, "object ID gaps not supported, ignoring"); + tracing::warn!(id_delta = %id_delta, "object ID delta is not supported, dropping stream"); + return Err(Error::Unsupported); } if group.flags.has_extensions {