-
Notifications
You must be signed in to change notification settings - Fork 144
Fix stale TrackProducer returned from cache #945
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
When a publisher disconnects, the cached TrackProducer in BroadcastConsumer::subscribe_track() was returned to late-joining subscribers even though it was already closed. Now we check is_closed() before returning a cached producer, and remove stale entries so a fresh request is issued. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
WalkthroughThis change modifies the broadcast consumer's track subscription logic to handle stale closed producers. When a matching producer exists but is closed, the system now removes it and creates a new one instead of returning the closed producer. To support this detection, a new public method 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@rs/moq-lite/src/model/broadcast.rs`:
- Around line 195-201: The removal must be guarded by identity so the cleanup
task only evicts the exact producer instance it created: when the cleanup task
(spawned after producer insertion and awaiting producer.unused()) wants to
remove, lock state and check state.producers.get(&producer.info.name) — only
call state.producers.remove(&producer.info.name) if the stored value is the same
instance (e.g. Arc::ptr_eq(stored, &producer) or compare a unique producer id
field); apply the same identity check before removing the "stale" entry in the
early-return path where you see state.producers.remove(&track.name) to avoid
removing a newly inserted producer with the same name.
🧹 Nitpick comments (1)
rs/moq-lite/src/model/broadcast.rs (1)
402-429: Good test coverage for the core fix.The test correctly verifies that a closed producer is not returned from cache. Consider adding a follow-up assertion that drops
track1(the old consumer), yields to the runtime (tokio::task::yield_now().await), and then verifiestrack2still works and a thirdsubscribe_trackstill deduplicates withtrack2— this would catch the cleanup-task race described above.
| if let Some(producer) = state.producers.get(&track.name) { | ||
| return producer.consume(); | ||
| if !producer.is_closed() { | ||
| return producer.consume(); | ||
| } | ||
| // Remove the stale entry | ||
| state.producers.remove(&track.name); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, read the relevant section with context
cat -n rs/moq-lite/src/model/broadcast.rs | sed -n '180,240p'Repository: moq-dev/moq
Length of output: 2265
🏁 Script executed:
# Check if is_clone() method exists on the producer type
rg -n "is_clone|fn is_clone" rs/moq-lite/src/model/broadcast.rsRepository: moq-dev/moq
Length of output: 291
🏁 Script executed:
# Check the producer type definition and its methods
rg -n "impl.*Producer|fn unused" rs/moq-lite/src/model/broadcast.rsRepository: moq-dev/moq
Length of output: 317
🏁 Script executed:
# Look for the full file structure to understand the type
git ls-files rs/moq-lite/src/model/ | head -20Repository: moq-dev/moq
Length of output: 273
🏁 Script executed:
# Check if there are other places where producers.remove is used that might have same issue
rg -n "producers\.remove" rs/moq-lite/src/model/broadcast.rsRepository: moq-dev/moq
Length of output: 243
🏁 Script executed:
# Examine the actual unused() implementation to confirm it waits for consumers
sed -n '112,125p' rs/moq-lite/src/model/broadcast.rsRepository: moq-dev/moq
Length of output: 407
🏁 Script executed:
# Check the test coverage for this scenario
sed -n '280,310p' rs/moq-lite/src/model/broadcast.rsRepository: moq-dev/moq
Length of output: 831
🏁 Script executed:
# Check if there are existing tests for concurrent subscribe_track scenarios
rg -n "#\[tokio::test\]|#\[test\]" rs/moq-lite/src/model/broadcast.rs | head -20Repository: moq-dev/moq
Length of output: 194
🏁 Script executed:
# Verify the fix is syntactically correct by checking usage of is_clone elsewhere
rg -B3 -A3 "is_clone" rs/moq-lite/src/model/broadcast.rs | head -40Repository: moq-dev/moq
Length of output: 1139
Stale cleanup task can remove the new producer from the map due to async race condition.
When a producer is first inserted, a cleanup task is spawned (lines 224–227) that removes the entry by track name once unused() resolves (all consumers dropped). If the same track is subscribed again while the old producer's cleanup task is still pending, the code removes the stale entry (line 200) and inserts a fresh producer at the same key (line 220). The old cleanup task will eventually call state.lock().producers.remove(&producer.info.name), which removes the new producer from the map by name, breaking deduplication for subsequent subscribers.
Guard the removal with an identity check to ensure only the original producer instance is evicted:
Proposed fix
let state = self.state.clone();
+ let producer_ref = producer.clone();
web_async::spawn(async move {
- producer.unused().await;
- state.lock().producers.remove(&producer.info.name);
+ producer_ref.unused().await;
+ let mut s = state.lock();
+ if let Some(existing) = s.producers.get(&producer_ref.info.name) {
+ if existing.is_clone(&producer_ref) {
+ s.producers.remove(&producer_ref.info.name);
+ }
+ }
});🤖 Prompt for AI Agents
In `@rs/moq-lite/src/model/broadcast.rs` around lines 195 - 201, The removal must
be guarded by identity so the cleanup task only evicts the exact producer
instance it created: when the cleanup task (spawned after producer insertion and
awaiting producer.unused()) wants to remove, lock state and check
state.producers.get(&producer.info.name) — only call
state.producers.remove(&producer.info.name) if the stored value is the same
instance (e.g. Arc::ptr_eq(stored, &producer) or compare a unique producer id
field); apply the same identity check before removing the "stale" entry in the
early-return path where you see state.producers.remove(&track.name) to avoid
removing a newly inserted producer with the same name.
Summary
TrackProducer::is_closed()to check if a track has been closed/abortedBroadcastConsumer::subscribe_track()now checks liveness before returning a cached producer, and removes stale entries so a fresh request is issuedstale_producertest verifying late-joining subscribers get a new producer after publisher disconnectFixes #941
Test plan
cargo test -p moq-lite— all 142 tests pass including newstale_producertestjust check— full CI passes🤖 Generated with Claude Code