From 28a7384f38fbc446d7c7f30359f87eb84ed4b01a Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Mon, 21 Jul 2025 21:07:15 -0700 Subject: [PATCH 1/6] Use a size prefix for messages. Matches moq-transport and is better at catching encoding bugs. --- js/moq/src/connection.ts | 3 +- js/moq/src/wire/announce.ts | 42 +++++++++++--- js/moq/src/wire/group.ts | 40 ++++++++++++-- js/moq/src/wire/index.ts | 1 + js/moq/src/wire/message.ts | 95 ++++++++++++++++++++++++++++++++ js/moq/src/wire/session.ts | 40 ++++++++++++-- js/moq/src/wire/subscribe.ts | 46 +++++++++++++--- rs/hang/src/model/track.rs | 2 +- rs/justfile | 1 + rs/moq/src/coding/decode.rs | 3 + rs/moq/src/coding/encode.rs | 26 --------- rs/moq/src/coding/message.rs | 40 ++++++++++++++ rs/moq/src/coding/mod.rs | 2 + rs/moq/src/coding/size.rs | 18 +++++- rs/moq/src/coding/varint.rs | 18 ++---- rs/moq/src/message/announce.rs | 12 +--- rs/moq/src/message/frame.rs | 18 ------ rs/moq/src/message/group.rs | 4 +- rs/moq/src/message/mod.rs | 2 - rs/moq/src/message/session.rs | 4 +- rs/moq/src/message/setup.rs | 19 +++---- rs/moq/src/message/subscribe.rs | 10 +--- rs/moq/src/session/publisher.rs | 3 +- rs/moq/src/session/reader.rs | 26 ++++----- rs/moq/src/session/subscriber.rs | 4 +- rs/moq/src/session/writer.rs | 4 +- 26 files changed, 335 insertions(+), 148 deletions(-) create mode 100644 js/moq/src/wire/message.ts create mode 100644 rs/moq/src/coding/message.rs delete mode 100644 rs/moq/src/message/frame.rs diff --git a/js/moq/src/connection.ts b/js/moq/src/connection.ts index c69bb5458..95f64b633 100644 --- a/js/moq/src/connection.ts +++ b/js/moq/src/connection.ts @@ -83,11 +83,12 @@ export class Connection { // Fetch the fingerprint from the server. const fingerprint = await fetch(fingerprintUrl); + const fingerprintText = await fingerprint.text(); options.serverCertificateHashes = [ { algorithm: "sha-256", - value: hexToBytes(await fingerprint.text()), + value: hexToBytes(fingerprintText), }, ]; diff --git a/js/moq/src/wire/announce.ts b/js/moq/src/wire/announce.ts index 01bc7e023..95491863a 100644 --- a/js/moq/src/wire/announce.ts +++ b/js/moq/src/wire/announce.ts @@ -1,3 +1,4 @@ +import { decodeMessage, encodeMessage } from "./message"; import type { Reader, Writer } from "./stream"; export class Announce { @@ -9,20 +10,29 @@ export class Announce { this.active = active; } - async encode(w: Writer) { + async encodeBody(w: Writer) { await w.u53(this.active ? 1 : 0); await w.string(this.suffix); } - static async decode(r: Reader): Promise { + static async decodeBody(r: Reader): Promise { const active = (await r.u53()) === 1; const suffix = await r.string(); return new Announce(suffix, active); } + // Wrapper methods that automatically handle size prefixing + async encode(w: Writer): Promise { + return encodeMessage(this, w); + } + + static async decode(r: Reader): Promise { + return decodeMessage(Announce, r); + } + static async decode_maybe(r: Reader): Promise { if (await r.done()) return; - return await Announce.decode(r); + return Announce.decode(r); } } @@ -34,14 +44,23 @@ export class AnnounceInterest { this.prefix = prefix; } - async encode(w: Writer) { + async encodeBody(w: Writer) { await w.string(this.prefix); } - static async decode(r: Reader): Promise { + static async decodeBody(r: Reader): Promise { const prefix = await r.string(); return new AnnounceInterest(prefix); } + + // Wrapper methods that automatically handle size prefixing + async encode(w: Writer): Promise { + return encodeMessage(this, w); + } + + static async decode(r: Reader): Promise { + return decodeMessage(AnnounceInterest, r); + } } export class AnnounceInit { @@ -51,14 +70,14 @@ export class AnnounceInit { this.paths = paths; } - async encode(w: Writer) { + async encodeBody(w: Writer) { await w.u53(this.paths.length); for (const path of this.paths) { await w.string(path); } } - static async decode(r: Reader): Promise { + static async decodeBody(r: Reader): Promise { const count = await r.u53(); const paths: string[] = []; for (let i = 0; i < count; i++) { @@ -66,4 +85,13 @@ export class AnnounceInit { } return new AnnounceInit(paths); } + + // Wrapper methods that automatically handle size prefixing + async encode(w: Writer): Promise { + return encodeMessage(this, w); + } + + static async decode(r: Reader): Promise { + return decodeMessage(AnnounceInit, r); + } } diff --git a/js/moq/src/wire/group.ts b/js/moq/src/wire/group.ts index 79d98e0ac..6bf357e11 100644 --- a/js/moq/src/wire/group.ts +++ b/js/moq/src/wire/group.ts @@ -1,3 +1,4 @@ +import { decodeMessage, encodeMessage } from "./message"; import type { Reader, Writer } from "./stream"; export class Group { @@ -11,14 +12,23 @@ export class Group { this.sequence = sequence; } - async encode(w: Writer) { + async encodeBody(w: Writer) { await w.u62(this.subscribe); await w.u53(this.sequence); } - static async decode(r: Reader): Promise { + static async decodeBody(r: Reader): Promise { return new Group(await r.u62(), await r.u53()); } + + // Wrapper methods that automatically handle size prefixing + async encode(w: Writer): Promise { + return encodeMessage(this, w); + } + + static async decode(r: Reader): Promise { + return decodeMessage(Group, r); + } } export class GroupDrop { @@ -32,15 +42,24 @@ export class GroupDrop { this.error = error; } - async encode(w: Writer) { + async encodeBody(w: Writer) { await w.u53(this.sequence); await w.u53(this.count); await w.u53(this.error); } - static async decode(r: Reader): Promise { + static async decodeBody(r: Reader): Promise { return new GroupDrop(await r.u53(), await r.u53(), await r.u53()); } + + // Wrapper methods that automatically handle size prefixing + async encode(w: Writer): Promise { + return encodeMessage(this, w); + } + + static async decode(r: Reader): Promise { + return decodeMessage(GroupDrop, r); + } } export class Frame { @@ -50,14 +69,23 @@ export class Frame { this.payload = payload; } - async encode(w: Writer) { + async encodeBody(w: Writer) { await w.u53(this.payload.byteLength); await w.write(this.payload); } - static async decode(r: Reader): Promise { + static async decodeBody(r: Reader): Promise { const size = await r.u53(); const payload = await r.read(size); return new Frame(payload); } + + // Wrapper methods that automatically handle size prefixing + async encode(w: Writer): Promise { + return encodeMessage(this, w); + } + + static async decode(r: Reader): Promise { + return decodeMessage(Frame, r); + } } diff --git a/js/moq/src/wire/index.ts b/js/moq/src/wire/index.ts index b801b898c..b9b778434 100644 --- a/js/moq/src/wire/index.ts +++ b/js/moq/src/wire/index.ts @@ -1,5 +1,6 @@ export * from "./announce"; export * from "./group"; +export * from "./message"; export * from "./session"; export * from "./stream"; export * from "./subscribe"; diff --git a/js/moq/src/wire/message.ts b/js/moq/src/wire/message.ts new file mode 100644 index 000000000..dc1d93979 --- /dev/null +++ b/js/moq/src/wire/message.ts @@ -0,0 +1,95 @@ +import { Reader, Writer } from "./stream"; + +/** + * Interface for messages that are automatically size-prefixed during encoding/decoding. + */ +export interface Message { + encodeBody(w: Writer): Promise; +} + +export interface MessageConstructor { + decodeBody(r: Reader): Promise; +} + +/** + * Encode a message with a size prefix. + */ +export async function encodeMessage(message: Message, writer: Writer): Promise { + // Use a growing buffer to collect all the data + // Most messages are small, so start with a small buffer. + // We use data.byteLength as the length and data.buffer.byteLength as the capacity. + let data = new Uint8Array(new ArrayBuffer(64), 0, 0); + + const temp = new Writer( + new WritableStream({ + write(chunk: Uint8Array) { + const needed = data.byteLength + chunk.byteLength; + if (needed > data.buffer.byteLength) { + // Resize the buffer to the needed size. + const capacity = Math.max(needed, data.buffer.byteLength * 2); + const newBuffer = new ArrayBuffer(capacity); + const newData = new Uint8Array(newBuffer, 0, needed); + + // Copy the old data into the new buffer. + newData.set(data); + + // Copy the new chunk into the new buffer. + newData.set(chunk, data.byteLength); + + data = newData; + } else { + // Copy chunk data into buffer + data = new Uint8Array(data.buffer, 0, needed); + data.set(chunk, needed - chunk.byteLength); + } + }, + }), + ); + + await message.encodeBody(temp); + temp.close(); + await temp.closed(); + + // Write size prefix + await writer.u53(data.byteLength); + + // Write the contiguous buffer + await writer.write(data); +} + +/** + * Decode a size-prefixed message, ensuring exact size consumption. + */ +export async function decodeMessage( + MessageClass: MessageConstructor, + reader: Reader, +): Promise { + const size = await reader.u53(); + const messageData = await reader.read(size); + + // Create a limited reader that contains exactly `size` bytes + const limitedStream = new ReadableStream({ + start(controller) { + controller.enqueue(messageData); + controller.close(); + }, + }); + + const limitedReader = new Reader(limitedStream); + const result = await MessageClass.decodeBody(limitedReader); + + // Check that we consumed exactly the right number of bytes + if (!(await limitedReader.done())) { + throw new Error("Message decoding consumed too few bytes"); + } + + return result; +} + +export async function decodeMaybe( + MessageClass: MessageConstructor, + reader: Reader, +): Promise { + if (await reader.done()) return; + return await decodeMessage(MessageClass, reader); +} diff --git a/js/moq/src/wire/session.ts b/js/moq/src/wire/session.ts index 36687240e..bfe65b4cd 100644 --- a/js/moq/src/wire/session.ts +++ b/js/moq/src/wire/session.ts @@ -1,3 +1,4 @@ +import { decodeMessage, encodeMessage } from "./message"; import type { Reader, Writer } from "./stream"; export const Version = { @@ -77,7 +78,7 @@ export class SessionClient { this.extensions = extensions; } - async encode(w: Writer) { + async encodeBody(w: Writer) { await w.u53(this.versions.length); for (const v of this.versions) { await w.u53(v); @@ -86,7 +87,7 @@ export class SessionClient { await this.extensions.encode(w); } - static async decode(r: Reader): Promise { + static async decodeBody(r: Reader): Promise { const versions: number[] = []; const count = await r.u53(); for (let i = 0; i < count; i++) { @@ -96,6 +97,15 @@ export class SessionClient { const extensions = await Extensions.decode(r); return new SessionClient(versions, extensions); } + + // Wrapper methods that automatically handle size prefixing + async encode(w: Writer): Promise { + return encodeMessage(this, w); + } + + static async decode(r: Reader): Promise { + return decodeMessage(SessionClient, r); + } } export class SessionServer { @@ -107,16 +117,25 @@ export class SessionServer { this.extensions = extensions; } - async encode(w: Writer) { + async encodeBody(w: Writer) { await w.u53(this.version); await this.extensions.encode(w); } - static async decode(r: Reader): Promise { + static async decodeBody(r: Reader): Promise { const version = await r.u53(); const extensions = await Extensions.decode(r); return new SessionServer(version, extensions); } + + // Wrapper methods that automatically handle size prefixing + async encode(w: Writer): Promise { + return encodeMessage(this, w); + } + + static async decode(r: Reader): Promise { + return decodeMessage(SessionServer, r); + } } export class SessionInfo { @@ -126,15 +145,24 @@ export class SessionInfo { this.bitrate = bitrate; } - async encode(w: Writer) { + async encodeBody(w: Writer) { await w.u53(this.bitrate); } - static async decode(r: Reader): Promise { + static async decodeBody(r: Reader): Promise { const bitrate = await r.u53(); return new SessionInfo(bitrate); } + // Wrapper methods that automatically handle size prefixing + async encode(w: Writer): Promise { + return encodeMessage(this, w); + } + + static async decode(r: Reader): Promise { + return decodeMessage(SessionInfo, r); + } + static async decode_maybe(r: Reader): Promise { if (await r.done()) return; return await SessionInfo.decode(r); diff --git a/js/moq/src/wire/subscribe.ts b/js/moq/src/wire/subscribe.ts index c7edc793e..d7f0d2128 100644 --- a/js/moq/src/wire/subscribe.ts +++ b/js/moq/src/wire/subscribe.ts @@ -1,3 +1,4 @@ +import { decodeMessage, encodeMessage } from "./message"; import type { Reader, Writer } from "./stream"; export class SubscribeUpdate { @@ -7,18 +8,27 @@ export class SubscribeUpdate { this.priority = priority; } - async encode(w: Writer) { + async encodeBody(w: Writer) { await w.u8(this.priority); } - static async decode(r: Reader): Promise { + static async decodeBody(r: Reader): Promise { const priority = await r.u8(); return new SubscribeUpdate(priority); } + // Wrapper methods that automatically handle size prefixing + async encode(w: Writer): Promise { + return encodeMessage(this, w); + } + + static async decode(r: Reader): Promise { + return decodeMessage(SubscribeUpdate, r); + } + static async decode_maybe(r: Reader): Promise { if (await r.done()) return; - return await SubscribeUpdate.decode(r); + return SubscribeUpdate.decode(r); } } @@ -36,20 +46,29 @@ export class Subscribe extends SubscribeUpdate { this.track = track; } - override async encode(w: Writer) { + override async encodeBody(w: Writer) { await w.u62(this.id); await w.string(this.broadcast); await w.string(this.track); - await super.encode(w); + await super.encodeBody(w); } - static override async decode(r: Reader): Promise { + static override async decodeBody(r: Reader): Promise { const id = await r.u62(); const broadcast = await r.string(); const track = await r.string(); - const update = await SubscribeUpdate.decode(r); + const update = await SubscribeUpdate.decodeBody(r); return new Subscribe(id, broadcast, track, update.priority); } + + // Wrapper methods that automatically handle size prefixing + override async encode(w: Writer): Promise { + return encodeMessage(this, w); + } + + static override async decode(r: Reader): Promise { + return decodeMessage(Subscribe, r); + } } export class SubscribeOk { @@ -59,12 +78,21 @@ export class SubscribeOk { this.priority = priority; } - async encode(w: Writer) { + async encodeBody(w: Writer) { await w.u53(this.priority); } - static async decode(r: Reader): Promise { + static async decodeBody(r: Reader): Promise { const priority = await r.u53(); return new SubscribeOk(priority); } + + // Wrapper methods that automatically handle size prefixing + async encode(w: Writer): Promise { + return encodeMessage(this, w); + } + + static async decode(r: Reader): Promise { + return decodeMessage(SubscribeOk, r); + } } diff --git a/rs/hang/src/model/track.rs b/rs/hang/src/model/track.rs index 476b814f4..c9f47ca3b 100644 --- a/rs/hang/src/model/track.rs +++ b/rs/hang/src/model/track.rs @@ -40,7 +40,7 @@ impl TrackProducer { /// For example, H.264 B-frames will introduce jitter and reordering. pub fn write(&mut self, frame: Frame) { let timestamp = frame.timestamp.as_micros() as u64; - let mut header = BytesMut::with_capacity(timestamp.encode_size()); + let mut header = BytesMut::new(); timestamp.encode(&mut header); if frame.keyframe { diff --git a/rs/justfile b/rs/justfile index 8427cb05e..f68968218 100644 --- a/rs/justfile +++ b/rs/justfile @@ -24,6 +24,7 @@ leaf: # By default, this uses HMAC-SHA256, so it's symmetric. # If some one wants to contribute, public/private key pairs would be nice. root: + @mkdir -p dev @if [ ! -f "dev/root.jwk" ]; then \ rm -f dev/*.jwt; \ cargo run --bin moq-token -- --key "dev/root.jwk" generate; \ diff --git a/rs/moq/src/coding/decode.rs b/rs/moq/src/coding/decode.rs index 2df89dc8c..adea4396a 100644 --- a/rs/moq/src/coding/decode.rs +++ b/rs/moq/src/coding/decode.rs @@ -35,6 +35,9 @@ pub enum DecodeError { #[error("expected data")] ExpectedData, + #[error("too many bytes")] + TooManyBytes, + // TODO move these to ParamError #[error("duplicate parameter")] DupliateParameter, diff --git a/rs/moq/src/coding/encode.rs b/rs/moq/src/coding/encode.rs index 2f0bc2443..f4bbd6bd7 100644 --- a/rs/moq/src/coding/encode.rs +++ b/rs/moq/src/coding/encode.rs @@ -1,29 +1,15 @@ use std::sync::Arc; -use super::Sizer; - pub trait Encode: Sized { // Encode the value to the given writer. // This will panic if the Buf is not large enough; use a Vec or encode_size() to check. fn encode(&self, w: &mut W); - - // Return the size of the encoded value - // Implementations can override this to provide a more efficient implementation - fn encode_size(&self) -> usize { - let mut sizer = Sizer::default(); - self.encode(&mut sizer); - sizer.size - } } impl Encode for u8 { fn encode(&self, w: &mut W) { w.put_u8(*self); } - - fn encode_size(&self) -> usize { - 1 - } } impl Encode for String { @@ -53,10 +39,6 @@ impl Encode for i8 { // A default of 0 is more ergonomic for the user than a default of 128. w.put_u8(((*self as i16) + 128) as u8); } - - fn encode_size(&self) -> usize { - 1 - } } impl Encode for &[T] { @@ -82,18 +64,10 @@ impl Encode for bytes::Bytes { self.len().encode(w); w.put_slice(self); } - - fn encode_size(&self) -> usize { - self.len().encode_size() + self.len() - } } impl Encode for Arc { fn encode(&self, w: &mut W) { (**self).encode(w); } - - fn encode_size(&self) -> usize { - (**self).encode_size() - } } diff --git a/rs/moq/src/coding/message.rs b/rs/moq/src/coding/message.rs new file mode 100644 index 000000000..b5a3b08cc --- /dev/null +++ b/rs/moq/src/coding/message.rs @@ -0,0 +1,40 @@ +use bytes::{Buf, BufMut}; + +use super::{Decode, DecodeError, Encode, Sizer}; + +/// A trait for messages that are automatically size-prefixed during encoding/decoding. +/// +/// This trait wraps the existing Encode/Decode traits and automatically handles: +/// - Prefixing messages with their encoded size during encoding +/// - Reading the size prefix and validating exact consumption during decoding +/// - Ensuring no bytes are left over or missing after decoding +pub trait Message: Sized { + /// Encode this message with a size prefix. + fn encode(&self, w: &mut W); + + /// Decode a size-prefixed message, ensuring exact size consumption. + fn decode(buf: &mut B) -> Result; +} + +// Blanket implementation for all types that implement Encode + Decode +impl Encode for T { + fn encode(&self, w: &mut W) { + let mut sizer = Sizer::default(); + Message::encode(self, &mut sizer); + sizer.size.encode(w); + Message::encode(self, w); + } +} + +impl Decode for T { + fn decode(buf: &mut B) -> Result { + let size = usize::decode(buf)?; + let mut limited = buf.take(size); + let result = Message::decode(&mut limited)?; + if limited.remaining() > 0 { + return Err(DecodeError::TooManyBytes); + } + + Ok(result) + } +} diff --git a/rs/moq/src/coding/mod.rs b/rs/moq/src/coding/mod.rs index d59f72aac..1da43266e 100644 --- a/rs/moq/src/coding/mod.rs +++ b/rs/moq/src/coding/mod.rs @@ -2,11 +2,13 @@ mod decode; mod encode; +mod message; mod size; mod varint; pub use decode::*; pub use encode::*; +pub use message::*; pub use size::*; pub use varint::*; diff --git a/rs/moq/src/coding/size.rs b/rs/moq/src/coding/size.rs index af22e7eb3..fdf4526c7 100644 --- a/rs/moq/src/coding/size.rs +++ b/rs/moq/src/coding/size.rs @@ -1,10 +1,11 @@ +use std::mem::MaybeUninit; + use bytes::{buf::UninitSlice, Buf, BufMut}; // A BufMut implementation that only counts the size of the buffer #[derive(Default)] pub struct Sizer { pub size: usize, - buf: [u8; 16], } unsafe impl BufMut for Sizer { @@ -13,7 +14,20 @@ unsafe impl BufMut for Sizer { } fn chunk_mut(&mut self) -> &mut UninitSlice { - UninitSlice::new(&mut self.buf) + // We need to return a valid slice, but it won't actually be written to + // Use a thread-local static buffer to avoid safety issues + thread_local! { + static BUFFER: std::cell::UnsafeCell<[MaybeUninit; 8192]> = + const { std::cell::UnsafeCell::new(unsafe { MaybeUninit::uninit().assume_init() }) }; + } + + BUFFER.with(|buf| { + let ptr = buf.get(); + unsafe { + let slice = (*ptr).as_mut_ptr(); + bytes::buf::UninitSlice::from_raw_parts_mut(slice as *mut u8, 8192) + } + }) } fn remaining_mut(&self) -> usize { diff --git a/rs/moq/src/coding/varint.rs b/rs/moq/src/coding/varint.rs index 768bfad1b..1c034a198 100644 --- a/rs/moq/src/coding/varint.rs +++ b/rs/moq/src/coding/varint.rs @@ -203,24 +203,14 @@ impl Decode for VarInt { impl Encode for VarInt { /// Encode a varint to the given writer. fn encode(&self, w: &mut W) { - match self.encode_size() { - 1 => w.put_u8(self.0 as u8), - 2 => w.put_u16((0b01 << 14) | self.0 as u16), - 4 => w.put_u32((0b10 << 30) | self.0 as u32), - 8 => w.put_u64((0b11 << 62) | self.0), - _ => unreachable!(), - } - } - - fn encode_size(&self) -> usize { if self.0 < 2u64.pow(6) { - 1 + w.put_u8(self.0 as u8); } else if self.0 < 2u64.pow(14) { - 2 + w.put_u16((0b01 << 14) | self.0 as u16); } else if self.0 < 2u64.pow(30) { - 4 + w.put_u32((0b10 << 30) | self.0 as u32); } else { - 8 + w.put_u64((0b11 << 62) | self.0); } } } diff --git a/rs/moq/src/message/announce.rs b/rs/moq/src/message/announce.rs index b9096e97f..a7006770f 100644 --- a/rs/moq/src/message/announce.rs +++ b/rs/moq/src/message/announce.rs @@ -20,7 +20,7 @@ impl Announce { } } -impl Decode for Announce { +impl Message for Announce { fn decode(r: &mut R) -> Result { Ok(match AnnounceStatus::decode(r)? { AnnounceStatus::Active => Self::Active { @@ -31,9 +31,7 @@ impl Decode for Announce { }, }) } -} -impl Encode for Announce { fn encode(&self, w: &mut W) { match self { Self::Active { suffix } => { @@ -55,14 +53,12 @@ pub struct AnnouncePlease { pub prefix: Path, } -impl Decode for AnnouncePlease { +impl Message for AnnouncePlease { fn decode(r: &mut R) -> Result { let prefix = Path::decode(r)?; Ok(Self { prefix }) } -} -impl Encode for AnnouncePlease { fn encode(&self, w: &mut W) { self.prefix.encode(w) } @@ -101,7 +97,7 @@ pub struct AnnounceInit { pub suffixes: Vec, } -impl Decode for AnnounceInit { +impl Message for AnnounceInit { fn decode(r: &mut R) -> Result { let count = u64::decode(r)?; @@ -114,9 +110,7 @@ impl Decode for AnnounceInit { Ok(Self { suffixes: paths }) } -} -impl Encode for AnnounceInit { fn encode(&self, w: &mut W) { (self.suffixes.len() as u64).encode(w); for path in &self.suffixes { diff --git a/rs/moq/src/message/frame.rs b/rs/moq/src/message/frame.rs deleted file mode 100644 index 17fcf8e3f..000000000 --- a/rs/moq/src/message/frame.rs +++ /dev/null @@ -1,18 +0,0 @@ -use crate::coding::*; - -#[derive(Clone, Debug)] -pub struct Frame { - pub size: u64, -} - -impl Decode for Frame { - fn decode(r: &mut R) -> Result { - Ok(Self { size: u64::decode(r)? }) - } -} - -impl Encode for Frame { - fn encode(&self, w: &mut W) { - self.size.encode(w); - } -} diff --git a/rs/moq/src/message/group.rs b/rs/moq/src/message/group.rs index efb4930f0..e90e745f1 100644 --- a/rs/moq/src/message/group.rs +++ b/rs/moq/src/message/group.rs @@ -9,16 +9,14 @@ pub struct Group { pub sequence: u64, } -impl Decode for Group { +impl Message for Group { fn decode(r: &mut R) -> Result { Ok(Self { subscribe: u64::decode(r)?, sequence: u64::decode(r)?, }) } -} -impl Encode for Group { fn encode(&self, w: &mut W) { self.subscribe.encode(w); self.sequence.encode(w); diff --git a/rs/moq/src/message/mod.rs b/rs/moq/src/message/mod.rs index e11c0a79d..c5c3b9e1c 100644 --- a/rs/moq/src/message/mod.rs +++ b/rs/moq/src/message/mod.rs @@ -3,7 +3,6 @@ //! This module could be used directly but 99% of the time you should use the higher-level [crate::Session] API. mod announce; mod extensions; -mod frame; mod group; mod session; mod setup; @@ -13,7 +12,6 @@ mod versions; pub use announce::*; pub use extensions::*; -pub use frame::*; pub use group::*; pub use session::*; pub use setup::*; diff --git a/rs/moq/src/message/session.rs b/rs/moq/src/message/session.rs index 3f76688f8..b50b609b9 100644 --- a/rs/moq/src/message/session.rs +++ b/rs/moq/src/message/session.rs @@ -5,7 +5,7 @@ pub struct SessionInfo { pub bitrate: Option, } -impl Decode for SessionInfo { +impl Message for SessionInfo { fn decode(r: &mut R) -> Result { let bitrate = match u64::decode(r)? { 0 => None, @@ -14,9 +14,7 @@ impl Decode for SessionInfo { Ok(Self { bitrate }) } -} -impl Encode for SessionInfo { fn encode(&self, w: &mut W) { self.bitrate.unwrap_or(0).encode(w); } diff --git a/rs/moq/src/message/setup.rs b/rs/moq/src/message/setup.rs index 62f659d79..774131448 100644 --- a/rs/moq/src/message/setup.rs +++ b/rs/moq/src/message/setup.rs @@ -11,7 +11,7 @@ pub struct ClientSetup { pub extensions: Extensions, } -impl Decode for ClientSetup { +impl Message for ClientSetup { /// Decode a client setup message. fn decode(r: &mut R) -> Result { let versions = Versions::decode(r)?; @@ -19,9 +19,7 @@ impl Decode for ClientSetup { Ok(Self { versions, extensions }) } -} -impl Encode for ClientSetup { /// Encode a server setup message. fn encode(&self, w: &mut W) { self.versions.encode(w); @@ -39,8 +37,12 @@ pub struct ServerSetup { pub extensions: Extensions, } -impl Decode for ServerSetup { - /// Decode the server setup. +impl Message for ServerSetup { + fn encode(&self, w: &mut W) { + self.version.encode(w); + self.extensions.encode(w); + } + fn decode(r: &mut R) -> Result { let version = Version::decode(r)?; let extensions = Extensions::decode(r)?; @@ -48,10 +50,3 @@ impl Decode for ServerSetup { Ok(Self { version, extensions }) } } - -impl Encode for ServerSetup { - fn encode(&self, w: &mut W) { - self.version.encode(w); - self.extensions.encode(w); - } -} diff --git a/rs/moq/src/message/subscribe.rs b/rs/moq/src/message/subscribe.rs index 20ac0f5eb..6ef88602e 100644 --- a/rs/moq/src/message/subscribe.rs +++ b/rs/moq/src/message/subscribe.rs @@ -1,5 +1,5 @@ use crate::{ - coding::{Decode, DecodeError, Encode}, + coding::{Decode, DecodeError, Encode, Message}, Path, }; @@ -14,7 +14,7 @@ pub struct Subscribe { pub priority: u8, } -impl Decode for Subscribe { +impl Message for Subscribe { fn decode(r: &mut R) -> Result { let id = u64::decode(r)?; let broadcast = Path::decode(r)?; @@ -28,9 +28,7 @@ impl Decode for Subscribe { priority, }) } -} -impl Encode for Subscribe { fn encode(&self, w: &mut W) { self.id.encode(w); self.broadcast.encode(w); @@ -44,13 +42,11 @@ pub struct SubscribeOk { pub priority: u8, } -impl Encode for SubscribeOk { +impl Message for SubscribeOk { fn encode(&self, w: &mut W) { self.priority.encode(w); } -} -impl Decode for SubscribeOk { fn decode(r: &mut R) -> Result { let priority = u8::decode(r)?; Ok(Self { priority }) diff --git a/rs/moq/src/session/publisher.rs b/rs/moq/src/session/publisher.rs index a316fe352..14cd777c3 100644 --- a/rs/moq/src/session/publisher.rs +++ b/rs/moq/src/session/publisher.rs @@ -256,8 +256,7 @@ impl Publisher { None => break, }; - let header = message::Frame { size: frame.info.size }; - stream.encode(&header).await?; + stream.encode(&frame.info.size).await?; loop { let chunk = tokio::select! { diff --git a/rs/moq/src/session/reader.rs b/rs/moq/src/session/reader.rs index daa5f4787..fdb82150e 100644 --- a/rs/moq/src/session/reader.rs +++ b/rs/moq/src/session/reader.rs @@ -1,4 +1,4 @@ -use std::{cmp, fmt, io}; +use std::{cmp, io}; use bytes::{Buf, Bytes, BytesMut}; @@ -22,32 +22,28 @@ impl Reader { Ok(Self::new(stream)) } - pub async fn decode(&mut self) -> Result { + pub async fn decode(&mut self) -> Result { loop { let mut cursor = io::Cursor::new(&self.buffer); - - // Try to decode with the current buffer. match T::decode(&mut cursor) { Ok(msg) => { self.buffer.advance(cursor.position() as usize); return Ok(msg); } - Err(DecodeError::Short) => (), // Try again with more data - Err(err) => return Err(err.into()), - }; - - if !self.buffer.is_empty() { - tracing::trace!(buffer = ?self.buffer, "more data needed"); - } - - if self.stream.read_buf(&mut self.buffer).await?.is_none() { - return Err(DecodeError::Short.into()); + Err(DecodeError::Short) => { + // Try to read more data + if self.stream.read_buf(&mut self.buffer).await?.is_none() { + // Stream closed while we still need more data + return Err(Error::Decode(DecodeError::Short)); + } + } + Err(e) => return Err(Error::Decode(e)), } } } // Decode optional messages at the end of a stream - pub async fn decode_maybe(&mut self) -> Result, Error> { + pub async fn decode_maybe(&mut self) -> Result, Error> { match self.finished().await { Ok(()) => Ok(None), Err(Error::Decode(DecodeError::ExpectedEnd)) => Ok(Some(self.decode().await?)), diff --git a/rs/moq/src/session/subscriber.rs b/rs/moq/src/session/subscriber.rs index e82719039..19a8e4dc4 100644 --- a/rs/moq/src/session/subscriber.rs +++ b/rs/moq/src/session/subscriber.rs @@ -247,8 +247,8 @@ impl Subscriber { } async fn run_group(&mut self, stream: &mut Reader, mut group: GroupProducer) -> Result<(), Error> { - while let Some(frame) = stream.decode_maybe::().await? { - let frame = group.create_frame(Frame { size: frame.size }); + while let Some(size) = stream.decode_maybe::().await? { + let frame = group.create_frame(Frame { size }); let res = tokio::select! { _ = frame.unused() => Err(Error::Cancel), diff --git a/rs/moq/src/session/writer.rs b/rs/moq/src/session/writer.rs index a3cf015b1..2b8d8cd9f 100644 --- a/rs/moq/src/session/writer.rs +++ b/rs/moq/src/session/writer.rs @@ -1,5 +1,3 @@ -use std::fmt; - use crate::{coding::*, message, Error}; // A wrapper around a web_transport::SendStream that will reset on Drop @@ -25,7 +23,7 @@ impl Writer { Ok(writer) } - pub async fn encode(&mut self, msg: &T) -> Result<(), Error> { + pub async fn encode(&mut self, msg: &T) -> Result<(), Error> { self.buffer.clear(); msg.encode(&mut self.buffer); From 9762b93bc1accb5858513bde1edc265fc0950882 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Mon, 21 Jul 2025 21:34:04 -0700 Subject: [PATCH 2/6] Cooler imports. --- js/moq/src/wire/announce.ts | 14 +++++++------- js/moq/src/wire/group.ts | 14 +++++++------- js/moq/src/wire/message.ts | 18 ++++++------------ js/moq/src/wire/session.ts | 14 +++++++------- js/moq/src/wire/subscribe.ts | 14 +++++++------- 5 files changed, 34 insertions(+), 40 deletions(-) diff --git a/js/moq/src/wire/announce.ts b/js/moq/src/wire/announce.ts index 95491863a..1ecc3ac5d 100644 --- a/js/moq/src/wire/announce.ts +++ b/js/moq/src/wire/announce.ts @@ -1,4 +1,4 @@ -import { decodeMessage, encodeMessage } from "./message"; +import * as Message from "./message"; import type { Reader, Writer } from "./stream"; export class Announce { @@ -23,11 +23,11 @@ export class Announce { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(Announce, r); + return Message.decode(Announce, r); } static async decode_maybe(r: Reader): Promise { @@ -55,11 +55,11 @@ export class AnnounceInterest { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(AnnounceInterest, r); + return Message.decode(AnnounceInterest, r); } } @@ -88,10 +88,10 @@ export class AnnounceInit { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(AnnounceInit, r); + return Message.decode(AnnounceInit, r); } } diff --git a/js/moq/src/wire/group.ts b/js/moq/src/wire/group.ts index 6bf357e11..38d1a8c37 100644 --- a/js/moq/src/wire/group.ts +++ b/js/moq/src/wire/group.ts @@ -1,4 +1,4 @@ -import { decodeMessage, encodeMessage } from "./message"; +import * as Message from "./message"; import type { Reader, Writer } from "./stream"; export class Group { @@ -23,11 +23,11 @@ export class Group { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(Group, r); + return Message.decode(Group, r); } } @@ -54,11 +54,11 @@ export class GroupDrop { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(GroupDrop, r); + return Message.decode(GroupDrop, r); } } @@ -82,10 +82,10 @@ export class Frame { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(Frame, r); + return Message.decode(Frame, r); } } diff --git a/js/moq/src/wire/message.ts b/js/moq/src/wire/message.ts index dc1d93979..d5e0bbb80 100644 --- a/js/moq/src/wire/message.ts +++ b/js/moq/src/wire/message.ts @@ -3,18 +3,18 @@ import { Reader, Writer } from "./stream"; /** * Interface for messages that are automatically size-prefixed during encoding/decoding. */ -export interface Message { +export interface Encode { encodeBody(w: Writer): Promise; } -export interface MessageConstructor { +export interface Decode { decodeBody(r: Reader): Promise; } /** * Encode a message with a size prefix. */ -export async function encodeMessage(message: Message, writer: Writer): Promise { +export async function encode(message: Encode, writer: Writer): Promise { // Use a growing buffer to collect all the data // Most messages are small, so start with a small buffer. // We use data.byteLength as the length and data.buffer.byteLength as the capacity. @@ -60,10 +60,7 @@ export async function encodeMessage(message: Message, writer: Writer): Promise( - MessageClass: MessageConstructor, - reader: Reader, -): Promise { +export async function decode(MessageClass: Decode, reader: Reader): Promise { const size = await reader.u53(); const messageData = await reader.read(size); @@ -86,10 +83,7 @@ export async function decodeMessage( return result; } -export async function decodeMaybe( - MessageClass: MessageConstructor, - reader: Reader, -): Promise { +export async function decodeMaybe(MessageClass: Decode, reader: Reader): Promise { if (await reader.done()) return; - return await decodeMessage(MessageClass, reader); + return await decode(MessageClass, reader); } diff --git a/js/moq/src/wire/session.ts b/js/moq/src/wire/session.ts index bfe65b4cd..3594520ae 100644 --- a/js/moq/src/wire/session.ts +++ b/js/moq/src/wire/session.ts @@ -1,4 +1,4 @@ -import { decodeMessage, encodeMessage } from "./message"; +import * as Message from "./message"; import type { Reader, Writer } from "./stream"; export const Version = { @@ -100,11 +100,11 @@ export class SessionClient { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(SessionClient, r); + return Message.decode(SessionClient, r); } } @@ -130,11 +130,11 @@ export class SessionServer { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(SessionServer, r); + return Message.decode(SessionServer, r); } } @@ -156,11 +156,11 @@ export class SessionInfo { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(SessionInfo, r); + return Message.decode(SessionInfo, r); } static async decode_maybe(r: Reader): Promise { diff --git a/js/moq/src/wire/subscribe.ts b/js/moq/src/wire/subscribe.ts index d7f0d2128..e88732d15 100644 --- a/js/moq/src/wire/subscribe.ts +++ b/js/moq/src/wire/subscribe.ts @@ -1,4 +1,4 @@ -import { decodeMessage, encodeMessage } from "./message"; +import * as Message from "./message"; import type { Reader, Writer } from "./stream"; export class SubscribeUpdate { @@ -19,11 +19,11 @@ export class SubscribeUpdate { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(SubscribeUpdate, r); + return Message.decode(SubscribeUpdate, r); } static async decode_maybe(r: Reader): Promise { @@ -63,11 +63,11 @@ export class Subscribe extends SubscribeUpdate { // Wrapper methods that automatically handle size prefixing override async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static override async decode(r: Reader): Promise { - return decodeMessage(Subscribe, r); + return Message.decode(Subscribe, r); } } @@ -89,10 +89,10 @@ export class SubscribeOk { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(SubscribeOk, r); + return Message.decode(SubscribeOk, r); } } From f345b05ca69301d8afc348f31b4fffdeba379815 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Mon, 21 Jul 2025 21:39:33 -0700 Subject: [PATCH 3/6] better merge --- js/moq/src/wire/announce.ts | 14 +++++++------- js/moq/src/wire/subscribe.ts | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/js/moq/src/wire/announce.ts b/js/moq/src/wire/announce.ts index afdc5cc4c..f3bdc2c45 100644 --- a/js/moq/src/wire/announce.ts +++ b/js/moq/src/wire/announce.ts @@ -1,4 +1,4 @@ -import { decodeMessage, encodeMessage } from "./message"; +import * as Message from "./message"; import type { Valid } from "../path"; import type { Reader, Writer } from "./stream"; @@ -24,11 +24,11 @@ export class Announce { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(Announce, r); + return Message.decode(Announce, r); } static async decode_maybe(r: Reader): Promise { @@ -56,11 +56,11 @@ export class AnnounceInterest { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(AnnounceInterest, r); + return Message.decode(AnnounceInterest, r); } } @@ -89,10 +89,10 @@ export class AnnounceInit { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(AnnounceInit, r); + return Message.decode(AnnounceInit, r); } } diff --git a/js/moq/src/wire/subscribe.ts b/js/moq/src/wire/subscribe.ts index c6644a5e7..57437d5bb 100644 --- a/js/moq/src/wire/subscribe.ts +++ b/js/moq/src/wire/subscribe.ts @@ -1,4 +1,4 @@ -import { decodeMessage, encodeMessage } from "./message"; +import * as Message from "./message"; import type { Valid } from "../path"; import type { Reader, Writer } from "./stream"; @@ -20,11 +20,11 @@ export class SubscribeUpdate { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(SubscribeUpdate, r); + return Message.decode(SubscribeUpdate, r); } static async decode_maybe(r: Reader): Promise { @@ -64,11 +64,11 @@ export class Subscribe extends SubscribeUpdate { // Wrapper methods that automatically handle size prefixing override async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static override async decode(r: Reader): Promise { - return decodeMessage(Subscribe, r); + return Message.decode(Subscribe, r); } } @@ -90,10 +90,10 @@ export class SubscribeOk { // Wrapper methods that automatically handle size prefixing async encode(w: Writer): Promise { - return encodeMessage(this, w); + return Message.encode(this, w); } static async decode(r: Reader): Promise { - return decodeMessage(SubscribeOk, r); + return Message.decode(SubscribeOk, r); } } From b1d867dbb7395900085b54acc33d82dcd41e7893 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <209825114+claude[bot]@users.noreply.github.com> Date: Tue, 22 Jul 2025 05:13:54 +0000 Subject: [PATCH 4/6] Fix MaybeUninit bug and optimize Vec encoding/decoding - Fix undefined behavior in size.rs MaybeUninit initialization - Replace generic Vec with specialized Vec implementations - Use efficient byte operations (copy_to_bytes, put_slice) instead of per-byte loops - Prevents accidental inefficient usage and improves performance Co-authored-by: kixelated --- rs/moq/src/coding/decode.rs | 16 +++++++--------- rs/moq/src/coding/encode.rs | 6 ++---- rs/moq/src/coding/size.rs | 2 +- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/rs/moq/src/coding/decode.rs b/rs/moq/src/coding/decode.rs index adea4396a..683e49cc4 100644 --- a/rs/moq/src/coding/decode.rs +++ b/rs/moq/src/coding/decode.rs @@ -68,18 +68,16 @@ impl Decode for String { } } -impl Decode for Vec { +impl Decode for Vec { fn decode(buf: &mut B) -> Result { let size = usize::decode(buf)?; - - // Don't allocate more than 1024 elements upfront - let mut v = Vec::with_capacity(size.min(1024)); - - for _ in 0..size { - v.push(T::decode(buf)?); + + if buf.remaining() < size { + return Err(DecodeError::Short); } - - Ok(v) + + let bytes = buf.copy_to_bytes(size); + Ok(bytes.to_vec()) } } diff --git a/rs/moq/src/coding/encode.rs b/rs/moq/src/coding/encode.rs index f4bbd6bd7..e4ec6c6b6 100644 --- a/rs/moq/src/coding/encode.rs +++ b/rs/moq/src/coding/encode.rs @@ -50,12 +50,10 @@ impl Encode for &[T] { } } -impl Encode for Vec { +impl Encode for Vec { fn encode(&self, w: &mut W) { self.len().encode(w); - for item in self.iter() { - item.encode(w); - } + w.put_slice(self); } } diff --git a/rs/moq/src/coding/size.rs b/rs/moq/src/coding/size.rs index fdf4526c7..c38bf6be9 100644 --- a/rs/moq/src/coding/size.rs +++ b/rs/moq/src/coding/size.rs @@ -18,7 +18,7 @@ unsafe impl BufMut for Sizer { // Use a thread-local static buffer to avoid safety issues thread_local! { static BUFFER: std::cell::UnsafeCell<[MaybeUninit; 8192]> = - const { std::cell::UnsafeCell::new(unsafe { MaybeUninit::uninit().assume_init() }) }; + std::cell::UnsafeCell::new([MaybeUninit::uninit(); 8192]); } BUFFER.with(|buf| { From 163b3dbfead7e792ca476054f120bc8331bb55c0 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <209825114+claude[bot]@users.noreply.github.com> Date: Tue, 22 Jul 2025 11:29:06 +0000 Subject: [PATCH 5/6] Fix clippy warning by using const in thread_local! Co-authored-by: kixelated --- rs/moq/src/coding/size.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rs/moq/src/coding/size.rs b/rs/moq/src/coding/size.rs index c38bf6be9..198d4bf1c 100644 --- a/rs/moq/src/coding/size.rs +++ b/rs/moq/src/coding/size.rs @@ -18,7 +18,7 @@ unsafe impl BufMut for Sizer { // Use a thread-local static buffer to avoid safety issues thread_local! { static BUFFER: std::cell::UnsafeCell<[MaybeUninit; 8192]> = - std::cell::UnsafeCell::new([MaybeUninit::uninit(); 8192]); + const { std::cell::UnsafeCell::new([MaybeUninit::uninit(); 8192]) }; } BUFFER.with(|buf| { From fec2f2a04b51d9190824d539c5fa08fe649a3a0e Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 22 Jul 2025 06:26:10 -0700 Subject: [PATCH 6/6] just fix --- js/moq/src/wire/announce.ts | 2 +- js/moq/src/wire/subscribe.ts | 2 +- rs/Cargo.lock | 4 ++-- rs/moq/src/coding/decode.rs | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/js/moq/src/wire/announce.ts b/js/moq/src/wire/announce.ts index f3bdc2c45..3dea6ba79 100644 --- a/js/moq/src/wire/announce.ts +++ b/js/moq/src/wire/announce.ts @@ -1,5 +1,5 @@ -import * as Message from "./message"; import type { Valid } from "../path"; +import * as Message from "./message"; import type { Reader, Writer } from "./stream"; export class Announce { diff --git a/js/moq/src/wire/subscribe.ts b/js/moq/src/wire/subscribe.ts index 57437d5bb..fab071882 100644 --- a/js/moq/src/wire/subscribe.ts +++ b/js/moq/src/wire/subscribe.ts @@ -1,5 +1,5 @@ -import * as Message from "./message"; import type { Valid } from "../path"; +import * as Message from "./message"; import type { Reader, Writer } from "./stream"; export class SubscribeUpdate { diff --git a/rs/Cargo.lock b/rs/Cargo.lock index ac40f0ddc..4279d64a3 100644 --- a/rs/Cargo.lock +++ b/rs/Cargo.lock @@ -1284,9 +1284,9 @@ dependencies = [ [[package]] name = "io-uring" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013" +checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4" dependencies = [ "bitflags", "cfg-if", diff --git a/rs/moq/src/coding/decode.rs b/rs/moq/src/coding/decode.rs index 683e49cc4..1185a068e 100644 --- a/rs/moq/src/coding/decode.rs +++ b/rs/moq/src/coding/decode.rs @@ -71,11 +71,11 @@ impl Decode for String { impl Decode for Vec { fn decode(buf: &mut B) -> Result { let size = usize::decode(buf)?; - + if buf.remaining() < size { return Err(DecodeError::Short); } - + let bytes = buf.copy_to_bytes(size); Ok(bytes.to_vec()) }