From 8d2034f6c882afc95de3b3c6159db9c27f361f96 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Thu, 12 Feb 2026 07:14:45 -0800 Subject: [PATCH] Fix stale TrackProducer returned from cache (#941) When a publisher disconnects, the cached TrackProducer in BroadcastConsumer::subscribe_track() was returned to late-joining subscribers even though it was already closed. Now we check is_closed() before returning a cached producer, and remove stale entries so a fresh request is issued. Co-Authored-By: Claude Opus 4.6 --- rs/moq-lite/src/model/broadcast.rs | 35 +++++++++++++++++++++++++++++- rs/moq-lite/src/model/track.rs | 5 +++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/rs/moq-lite/src/model/broadcast.rs b/rs/moq-lite/src/model/broadcast.rs index d57a19031..f4949395c 100644 --- a/rs/moq-lite/src/model/broadcast.rs +++ b/rs/moq-lite/src/model/broadcast.rs @@ -193,7 +193,11 @@ impl BroadcastConsumer { let mut state = self.state.lock(); if let Some(producer) = state.producers.get(&track.name) { - return producer.consume(); + if !producer.is_closed() { + return producer.consume(); + } + // Remove the stale entry + state.producers.remove(&track.name); } // Otherwise we have never seen this track before and need to create a new producer. @@ -395,6 +399,35 @@ mod test { track5.assert_error(); } + #[tokio::test] + async fn stale_producer() { + let mut broadcast = Broadcast::produce(); + let consumer = broadcast.consume(); + + // Subscribe to a track, creating a request + let track1 = consumer.subscribe_track(&Track::new("track1")); + + // Get the requested producer and close it (simulating publisher disconnect) + let mut producer1 = broadcast.assert_request(); + producer1.append_group(); + producer1.close(); + + // The consumer should see the track as closed + track1.assert_closed(); + + // Subscribe again to the same track - should get a NEW producer, not the stale one + let mut track2 = consumer.subscribe_track(&Track::new("track1")); + track2.assert_not_closed(); + track2.assert_not_clone(&track1); + + // There should be a new request for the track + let mut producer2 = broadcast.assert_request(); + producer2.append_group(); + + // The new consumer should receive the new group + track2.assert_group(); + } + #[tokio::test] async fn requested_unused() { let mut broadcast = Broadcast::produce(); diff --git a/rs/moq-lite/src/model/track.rs b/rs/moq-lite/src/model/track.rs index 14c18505b..9001a7843 100644 --- a/rs/moq-lite/src/model/track.rs +++ b/rs/moq-lite/src/model/track.rs @@ -160,6 +160,11 @@ impl TrackProducer { } } + /// Return true if the track has been closed or aborted. + pub fn is_closed(&self) -> bool { + self.state.borrow().closed.is_some() + } + /// Return true if this is the same track. pub fn is_clone(&self, other: &Self) -> bool { self.state.same_channel(&other.state)