Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/claude-code-review.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
# Direct prompt for automated review (no @claude mention needed)
direct_prompt: |
Please review this pull request and look for bugs and security issues.
Only report on bugs and potential vulnerabilities you find. Be concise.
Only report issues you find, otherwise give a thumbs up. Be concise!

# Optional: Use sticky comments to make Claude reuse the same comment on subsequent pushes to the same PR
# use_sticky_comment: true
Expand Down
8 changes: 8 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,11 @@ Key architectural rule: The CDN/relay must not know about application logic, med

- Run `just check` to execute all tests and linting
- Rust tests are integrated within source files

## Contributing

For first-time contributors looking for tasks to work on:

- Check [TODO.md](./TODO.md) for security and performance issues that need to be addressed
- Many of these issues are well-scoped and great for getting familiar with the codebase
- Security-related tasks help improve the robustness of the MoQ protocol implementation
72 changes: 72 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# TODO - Security & Performance Issues

This file contains security and performance issues that need to be addressed. These are great tasks for first-time contributors to the MoQ project.

## Security Issues

### 🔒 DoS Protection & Rate Limiting

- [ ] **Enforce maximum size for paths** - Add configurable limits for path string lengths to prevent memory exhaustion attacks
- [ ] **Enforce maximum number of active announcements** - Add configurable limit per session/connection to prevent announcement flooding
- [ ] **Enforce maximum number of subscriptions** - Currently implicit via MAX_STREAMS, make it configurable and explicit
- [ ] **Enforce maximum size for each frame** - Add configurable frame size limits to prevent large frame DoS attacks
- [ ] **Enforce maximum count of frames per group** - Limit frames per group to prevent unbounded memory allocation
- [ ] **Enforce cumulative maximums per session/IP/user** - Add aggregate limits across all connections from the same source

### 🛡️ Input Validation & Bounds Checking

- [ ] **Fix AnnounceInit decode DoS vector (Rust)** - Add hard limit check before processing count in `rs/moq/src/message/announce.rs:108-113`
- [ ] **Fix missing DoS protection (TypeScript)** - Add count limits in `js/moq/src/wire/announce.ts:62-67`
- [ ] **Fix prefix suffix handling bug** - Correct logic in `js/moq/src/publisher.ts:92-94` for proper hierarchical path handling
- [ ] **Add timeout protection for session initialization** - Prevent indefinite hangs in `rs/moq/src/session/mod.rs:64-66`

### 🔍 Protocol Security

- [ ] **Validate message sequence numbers** - Ensure monotonic ordering and detect replay attacks
- [ ] **Add authentication to sensitive operations** - Require proper auth for publish/announce operations
- [ ] **Implement proper error boundaries** - Prevent cascading failures from malformed messages
- [ ] **Add message rate limiting per connection** - Prevent control message flooding

## Performance Issues

### ⚡ Memory Management

- [ ] **Implement bounded collections** - Replace unbounded Vec/Array usage with size-limited collections
- [ ] **Add memory pool for frequent allocations** - Reduce GC pressure in TypeScript and allocator pressure in Rust
- [ ] **Optimize string handling** - Use string interning for frequently used path names
- [ ] **Add configurable buffer sizes** - Make frame/group buffers configurable based on use case

### 📊 Metrics & Observability

- [ ] **Add connection health metrics** - Track bandwidth, latency, error rates per connection
- [ ] **Implement graceful degradation** - Reduce quality/features under resource pressure
- [ ] **Add resource usage monitoring** - Track memory, CPU, network usage per session
- [ ] **Log security events** - Audit log for rate limit violations, auth failures, etc.

## Implementation Guidelines

When working on these issues:

1. **Security First**: Always validate inputs and add appropriate bounds checking
2. **Configurable Limits**: Make all limits configurable via environment variables or config files
3. **Backwards Compatibility**: Ensure changes don't break existing protocol compatibility
4. **Test Coverage**: Add tests for both normal operation and edge cases/attack scenarios
5. **Documentation**: Update protocol documentation and API docs for any changes
6. **Performance Testing**: Benchmark changes to ensure they don't introduce performance regressions

## Getting Started

New contributors should:

1. Read the main [CLAUDE.md](./CLAUDE.md) for project setup and development guidelines
2. Run `just setup` to install dependencies
3. Run `just check` to ensure tests pass before making changes
4. Pick a single TODO item to work on
5. Create a PR with tests and documentation for your changes

## Questions?

For questions about these issues or implementation guidance, please:
- Open a GitHub issue with the `question` label
- Reference the specific TODO item you're asking about
- Include your proposed approach for discussion
3 changes: 3 additions & 0 deletions js/moq/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ export class Connection {

const conn = new Connection(adjustedUrl, quic, stream);

// The connection is now ready to use
// Note: ANNOUNCE_INIT will be handled when announce streams are actually requested

const cleanup = () => {
conn.close();
};
Expand Down
14 changes: 14 additions & 0 deletions js/moq/src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@ export class Publisher {
async runAnnounce(msg: Wire.AnnounceInterest, stream: Wire.Stream) {
const consumer = this.#announced.consume(msg.prefix);

// Send ANNOUNCE_INIT as the first message with all currently active paths
const activePaths: string[] = [];
for (const [name] of this.#broadcasts) {
if (name.startsWith(msg.prefix)) {
// Return suffix relative to prefix
const suffix = msg.prefix ? name.slice(msg.prefix.length + 1) : name;
activePaths.push(suffix);
}
}

const init = new Wire.AnnounceInit(activePaths);
await init.encode(stream.writer);

// Then send updates as they occur
for (;;) {
const announcement = await consumer.next();
if (!announcement) break;
Expand Down
12 changes: 12 additions & 0 deletions js/moq/src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ export class Subscriber {
try {
const stream = await Wire.Stream.open(this.#quic, msg);

// First, receive ANNOUNCE_INIT
const init = await Wire.AnnounceInit.decode(stream.reader);

// Process initial announcements
for (const path of init.paths) {
const full = prefix.concat(path);
console.debug(`announced: broadcast=${full} active=true`);
producer.write({ name: full, active: true });
active.add(full);
}

// Then receive updates
for (;;) {
const announce = await Wire.Announce.decode_maybe(stream.reader);
if (!announce) {
Expand Down
24 changes: 24 additions & 0 deletions js/moq/src/wire/announce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,27 @@ export class AnnounceInterest {
return new AnnounceInterest(prefix);
}
}

export class AnnounceInit {
paths: string[];

constructor(paths: string[]) {
this.paths = paths;
}

async encode(w: Writer) {
await w.u53(this.paths.length);
for (const path of this.paths) {
await w.string(path);
}
}

static async decode(r: Reader): Promise<AnnounceInit> {
const count = await r.u53();
const paths: string[] = [];
for (let i = 0; i < count; i++) {
paths.push(await r.string());
}
return new AnnounceInit(paths);
}
}
3 changes: 2 additions & 1 deletion js/moq/src/wire/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ export const Version = {
FORK_03: 0xff0bad03,
FORK_04: 0xff0bad04,
LITE_00: 0xff0dad00,
LITE_01: 0xff0dad01,
} as const;

export const CURRENT_VERSION = Version.LITE_00;
export const CURRENT_VERSION = Version.LITE_01;

export class Extensions {
entries: Map<bigint, Uint8Array>;
Expand Down
8 changes: 4 additions & 4 deletions rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions rs/hang-cli/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@ async fn connect(

// Create an origin producer to publish to the broadcast.
let mut publisher = moq_lite::OriginProducer::default();
publisher.publish(&name, consumer.inner.clone());

// Establish the connection, not providing a subscriber.
let session = moq_lite::Session::connect(session, publisher.consume_all(), None).await?;

// Publish the broadcast using the origin producer directly.
publisher.publish(&name, consumer.inner.clone());

tokio::select! {
// On ctrl-c, close the session and exit.
_ = tokio::signal::ctrl_c() => {
Expand Down
3 changes: 1 addition & 2 deletions rs/hang-cli/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,14 @@ async fn run_session(

// Create an origin producer to publish to the broadcast.
let mut publisher = moq_lite::OriginProducer::default();
publisher.publish(&name, consumer.inner.clone());

let session = moq_lite::Session::accept(session, publisher.consume_all(), None)
.await
.context("failed to accept session")?;

tracing::info!(?id, "accepted session");

publisher.publish(&name, consumer.inner.clone());

Err(session.closed().await.into())
}

Expand Down
7 changes: 4 additions & 3 deletions rs/hang-gst/src/sink/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,15 @@ impl HangSink {
let session = client.connect(url.clone()).await.expect("failed to connect");

let mut publisher = moq_lite::OriginProducer::default();
let _session = moq_lite::Session::connect(session, publisher.consume_all(), None)
.await
.expect("failed to connect");

let broadcast = hang::BroadcastProducer::new();
let name = settings.broadcast.as_ref().expect("broadcast is required");
publisher.publish(name, broadcast.consume().inner);

let _session = moq_lite::Session::connect(session, publisher.consume_all(), None)
.await
.expect("failed to connect");

let media = hang::cmaf::Import::new(broadcast);

let mut state = self.state.lock().unwrap();
Expand Down
17 changes: 9 additions & 8 deletions rs/hang-gst/src/source/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ impl ElementImpl for HangSrc {
gst::error!(CAT, obj = self.obj(), "Failed to setup: {:?}", e);
return Err(gst::StateChangeError);
}
// Chain up first to let the bin handle the state change
let result = self.parent_change_state(transition);
result?;
// This is a live source - no preroll needed
return Ok(gst::StateChangeSuccess::NoPreroll);
}

gst::StateChange::PausedToReady => {
Expand All @@ -146,7 +151,7 @@ impl ElementImpl for HangSrc {
_ => (),
}

// Chain up
// Chain up for other transitions
self.parent_change_state(transition)
}
}
Expand Down Expand Up @@ -175,18 +180,14 @@ impl HangSrc {
let origin = moq_lite::OriginProducer::default();
let _session = moq_lite::Session::connect(session, None, origin.clone()).await?;

// TODO giant hack to avoid a race condition with how announcements are now populated.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// Wait for the broadcast to be announced (race condition workaround)
let broadcast = origin.consume(&name).expect("broadcast not found");
let broadcast = origin
.consume(&name)
.ok_or_else(|| anyhow::anyhow!("Broadcast '{}' not found", name))?;
let mut broadcast = hang::BroadcastConsumer::new(broadcast);

// TODO handle catalog updates
let catalog = broadcast.catalog.next().await?.context("no catalog found")?.clone();

gst::info!(CAT, "catalog: {:?}", catalog);

for video in catalog.video {
let mut track = broadcast.subscribe(&video.track);

Expand Down
2 changes: 2 additions & 0 deletions rs/justfile
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub-gst name url:

# Run gstreamer and pipe the output to our plugin
GST_PLUGIN_PATH="${PWD}/target/debug${GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}" \
GST_DEBUG="hangsink:4" \
gst-launch-1.0 -v -e multifilesrc location="dev/{{name}}.fmp4" loop=true ! qtdemux name=demux \
demux.video_0 ! h264parse ! queue ! identity sync=true ! isofmp4mux name=mux chunk-duration=1 fragment-duration=1 ! \
hangsink url="{{url}}" tls-disable-verify=true broadcast="{{name}}" \
Expand All @@ -124,6 +125,7 @@ sub name url:
# Run gstreamer and pipe the output to our plugin
# This will render the video to the screen
GST_PLUGIN_PATH="${PWD}/target/debug${GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}" \
GST_DEBUG="hangsrc:4" \
gst-launch-1.0 -v -e hangsrc url="{{url}}" broadcast="{{name}}" tls-disable-verify=true ! decodebin ! videoconvert ! autovideosink

# Publish a video using ffmpeg directly from hang to the localhost
Expand Down
5 changes: 2 additions & 3 deletions rs/moq-clock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,10 @@ async fn main() -> anyhow::Result<()> {
let clock = clock::Publisher::new(track);

let mut publisher = moq_lite::OriginProducer::default();
let session = moq_lite::Session::connect(session, publisher.consume_all(), None).await?;

// Publish the broadcast - the broadcast name is empty because the URL contains the name
publisher.publish(&config.broadcast, broadcast.consume());

let session = moq_lite::Session::connect(session, publisher.consume_all(), None).await?;

tokio::select! {
res = session.closed() => Err(res.into()),
_ = clock.run() => Ok(()),
Expand Down
4 changes: 0 additions & 4 deletions rs/moq/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ pub enum Error {

#[error("protocol violation")]
ProtocolViolation,

#[error("unauthorized")]
Unauthorized,
}

impl Error {
Expand All @@ -76,7 +73,6 @@ impl Error {
Self::NotFound => 13,
Self::WrongSize => 14,
Self::ProtocolViolation => 15,
Self::Unauthorized => 16,
Self::App(app) => *app + 64,
}
}
Expand Down
Loading
Loading