Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion rs/moq-lite/src/model/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ impl BroadcastConsumer {
let mut state = self.state.lock();

if let Some(producer) = state.producers.get(&track.name) {
return producer.consume();
if !producer.is_closed() {
return producer.consume();
}
// Remove the stale entry
state.producers.remove(&track.name);
}
Comment on lines 195 to 201
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, read the relevant section with context
cat -n rs/moq-lite/src/model/broadcast.rs | sed -n '180,240p'

Repository: moq-dev/moq

Length of output: 2265


🏁 Script executed:

# Check if is_clone() method exists on the producer type
rg -n "is_clone|fn is_clone" rs/moq-lite/src/model/broadcast.rs

Repository: moq-dev/moq

Length of output: 291


🏁 Script executed:

# Check the producer type definition and its methods
rg -n "impl.*Producer|fn unused" rs/moq-lite/src/model/broadcast.rs

Repository: moq-dev/moq

Length of output: 317


🏁 Script executed:

# Look for the full file structure to understand the type
git ls-files rs/moq-lite/src/model/ | head -20

Repository: moq-dev/moq

Length of output: 273


🏁 Script executed:

# Check if there are other places where producers.remove is used that might have same issue
rg -n "producers\.remove" rs/moq-lite/src/model/broadcast.rs

Repository: moq-dev/moq

Length of output: 243


🏁 Script executed:

# Examine the actual unused() implementation to confirm it waits for consumers
sed -n '112,125p' rs/moq-lite/src/model/broadcast.rs

Repository: moq-dev/moq

Length of output: 407


🏁 Script executed:

# Check the test coverage for this scenario
sed -n '280,310p' rs/moq-lite/src/model/broadcast.rs

Repository: moq-dev/moq

Length of output: 831


🏁 Script executed:

# Check if there are existing tests for concurrent subscribe_track scenarios
rg -n "#\[tokio::test\]|#\[test\]" rs/moq-lite/src/model/broadcast.rs | head -20

Repository: moq-dev/moq

Length of output: 194


🏁 Script executed:

# Verify the fix is syntactically correct by checking usage of is_clone elsewhere
rg -B3 -A3 "is_clone" rs/moq-lite/src/model/broadcast.rs | head -40

Repository: moq-dev/moq

Length of output: 1139


Stale cleanup task can remove the new producer from the map due to async race condition.

When a producer is first inserted, a cleanup task is spawned (lines 224–227) that removes the entry by track name once unused() resolves (all consumers dropped). If the same track is subscribed again while the old producer's cleanup task is still pending, the code removes the stale entry (line 200) and inserts a fresh producer at the same key (line 220). The old cleanup task will eventually call state.lock().producers.remove(&producer.info.name), which removes the new producer from the map by name, breaking deduplication for subsequent subscribers.

Guard the removal with an identity check to ensure only the original producer instance is evicted:

Proposed fix
		let state = self.state.clone();
+		let producer_ref = producer.clone();
 		web_async::spawn(async move {
-			producer.unused().await;
-			state.lock().producers.remove(&producer.info.name);
+			producer_ref.unused().await;
+			let mut s = state.lock();
+			if let Some(existing) = s.producers.get(&producer_ref.info.name) {
+				if existing.is_clone(&producer_ref) {
+					s.producers.remove(&producer_ref.info.name);
+				}
+			}
 		});
🤖 Prompt for AI Agents
In `@rs/moq-lite/src/model/broadcast.rs` around lines 195 - 201, The removal must
be guarded by identity so the cleanup task only evicts the exact producer
instance it created: when the cleanup task (spawned after producer insertion and
awaiting producer.unused()) wants to remove, lock state and check
state.producers.get(&producer.info.name) — only call
state.producers.remove(&producer.info.name) if the stored value is the same
instance (e.g. Arc::ptr_eq(stored, &producer) or compare a unique producer id
field); apply the same identity check before removing the "stale" entry in the
early-return path where you see state.producers.remove(&track.name) to avoid
removing a newly inserted producer with the same name.


// Otherwise we have never seen this track before and need to create a new producer.
Expand Down Expand Up @@ -395,6 +399,35 @@ mod test {
track5.assert_error();
}

#[tokio::test]
async fn stale_producer() {
let mut broadcast = Broadcast::produce();
let consumer = broadcast.consume();

// Subscribe to a track, creating a request
let track1 = consumer.subscribe_track(&Track::new("track1"));

// Get the requested producer and close it (simulating publisher disconnect)
let mut producer1 = broadcast.assert_request();
producer1.append_group();
producer1.close();

// The consumer should see the track as closed
track1.assert_closed();

// Subscribe again to the same track - should get a NEW producer, not the stale one
let mut track2 = consumer.subscribe_track(&Track::new("track1"));
track2.assert_not_closed();
track2.assert_not_clone(&track1);

// There should be a new request for the track
let mut producer2 = broadcast.assert_request();
producer2.append_group();

// The new consumer should receive the new group
track2.assert_group();
}

#[tokio::test]
async fn requested_unused() {
let mut broadcast = Broadcast::produce();
Expand Down
5 changes: 5 additions & 0 deletions rs/moq-lite/src/model/track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ impl TrackProducer {
}
}

/// Return true if the track has been closed or aborted.
pub fn is_closed(&self) -> bool {
self.state.borrow().closed.is_some()
}

/// Return true if this is the same track.
pub fn is_clone(&self, other: &Self) -> bool {
self.state.same_channel(&other.state)
Expand Down
Loading