From 09503f4ae3245814ffcc4dc5eaf424b78f26dd8c Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Tue, 3 Sep 2024 11:34:15 -0700 Subject: [PATCH] Cleanup media streams when finished (#411) * [WIP] Cleanup stream * . * . * revert previous idea * Handle dropped broadcaster * fmt * remove unused * remove useless log * handle based watching --- livekit-ffi/src/server/audio_stream.rs | 20 ++++++++++++-------- livekit-ffi/src/server/mod.rs | 24 ++++++++++++++++++++++-- livekit-ffi/src/server/room.rs | 14 +++++++++++++- livekit-ffi/src/server/video_stream.rs | 18 +++++++++++------- 4 files changed, 58 insertions(+), 18 deletions(-) diff --git a/livekit-ffi/src/server/audio_stream.rs b/livekit-ffi/src/server/audio_stream.rs index 9cb0f6c21..d0fd42972 100644 --- a/livekit-ffi/src/server/audio_stream.rs +++ b/livekit-ffi/src/server/audio_stream.rs @@ -24,7 +24,7 @@ pub struct FfiAudioStream { pub stream_type: proto::AudioStreamType, #[allow(dead_code)] - close_tx: oneshot::Sender<()>, // Close the stream on drop + self_dropped_tx: oneshot::Sender<()>, // Close the stream on drop } impl FfiHandle for FfiAudioStream {} @@ -44,18 +44,18 @@ impl FfiAudioStream { ) -> FfiResult { let ffi_track = server.retrieve_handle::(new_stream.track_handle)?.clone(); let rtc_track = ffi_track.track.rtc_track(); + let (self_dropped_tx, self_dropped_rx) = oneshot::channel(); let MediaStreamTrack::Audio(rtc_track) = rtc_track else { return Err(FfiError::InvalidRequest("not an audio track".into())); }; - let (close_tx, close_rx) = oneshot::channel(); let stream_type = new_stream.r#type(); let handle_id = server.next_id(); let audio_stream = match stream_type { #[cfg(not(target_arch = "wasm32"))] proto::AudioStreamType::AudioStreamNative => { - let audio_stream = Self { handle_id, stream_type, close_tx }; + let audio_stream = Self { handle_id, stream_type, self_dropped_tx }; let sample_rate = if new_stream.sample_rate == 0 { 48000 } else { new_stream.sample_rate as i32 }; @@ -69,7 +69,8 @@ impl FfiAudioStream { server, handle_id, native_stream, - close_rx, + self_dropped_rx, + server.watch_handle_dropped(new_stream.track_handle), )); server.watch_panic(handle); Ok::(audio_stream) @@ -91,11 +92,15 @@ impl FfiAudioStream { server: &'static server::FfiServer, stream_handle_id: FfiHandleId, mut native_stream: NativeAudioStream, - mut close_rx: oneshot::Receiver<()>, + mut self_dropped_rx: oneshot::Receiver<()>, + mut handle_dropped_rx: oneshot::Receiver<()>, ) { loop { tokio::select! { - _ = &mut close_rx => { + _ = &mut self_dropped_rx => { + break; + } + _ = &mut handle_dropped_rx => { break; } frame = native_stream.next() => { @@ -126,14 +131,13 @@ impl FfiAudioStream { } } } - if let Err(err) = server.send_event(proto::ffi_event::Message::AudioStreamEvent( proto::AudioStreamEvent { stream_handle: stream_handle_id, message: Some(proto::audio_stream_event::Message::Eos(proto::AudioStreamEos {})), }, )) { - log::warn!("failed to send audio EOS: {}", err); + log::warn!("failed to send audio eos: {}", err); } } } diff --git a/livekit-ffi/src/server/mod.rs b/livekit-ffi/src/server/mod.rs index 2d3cf6086..1a26fd6ae 100644 --- a/livekit-ffi/src/server/mod.rs +++ b/livekit-ffi/src/server/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::{ + collections::HashMap, error::Error, sync::{ atomic::{AtomicU64, Ordering}, @@ -26,7 +27,10 @@ use dashmap::{mapref::one::MappedRef, DashMap}; use downcast_rs::{impl_downcast, Downcast}; use livekit::webrtc::{native::audio_resampler::AudioResampler, prelude::*}; use parking_lot::{deadlock, Mutex}; -use tokio::task::JoinHandle; +use tokio::{ + sync::{broadcast, oneshot}, + task::JoinHandle, +}; use crate::{proto, proto::FfiEvent, FfiError, FfiHandleId, FfiResult, INVALID_HANDLE}; @@ -74,6 +78,7 @@ pub struct FfiServer { next_id: AtomicU64, config: Mutex>, logger: &'static logger::FfiLogger, + handle_dropped_txs: DashMap>>, } impl Default for FfiServer { @@ -111,6 +116,7 @@ impl Default for FfiServer { async_runtime, config: Default::default(), logger, + handle_dropped_txs: Default::default(), } } } @@ -142,6 +148,7 @@ impl FfiServer { // Drop all handles self.ffi_handles.clear(); + self.handle_dropped_txs.clear(); *self.config.lock() = None; // Invalidate the config } @@ -192,7 +199,20 @@ impl FfiServer { } pub fn drop_handle(&self, id: FfiHandleId) -> bool { - self.ffi_handles.remove(&id).is_some() + let existed = self.ffi_handles.remove(&id).is_some(); + self.handle_dropped_txs.remove(&id); + return existed; + } + + pub fn watch_handle_dropped(&self, handle: FfiHandleId) -> oneshot::Receiver<()> { + // Create vec if not exists + if self.handle_dropped_txs.get(&handle).is_none() { + self.handle_dropped_txs.insert(handle, Vec::new()); + } + let (tx, rx) = oneshot::channel::<()>(); + let mut tx_vec = self.handle_dropped_txs.get_mut(&handle).unwrap(); + tx_vec.push(tx); + return rx; } pub fn send_panic(&self, err: Box) { diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index 967254ecf..20a4c9b0c 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::{collections::HashSet, slice, sync::Arc, time::Duration}; -use livekit::participant; use livekit::prelude::*; +use livekit::{participant, track}; use parking_lot::Mutex; use tokio::sync::{broadcast, mpsc, oneshot, Mutex as AsyncMutex}; use tokio::task::JoinHandle; @@ -70,6 +71,8 @@ pub struct RoomInner { pending_published_tracks: Mutex>, // Used to wait for the LocalTrackUnpublished event pending_unpublished_tracks: Mutex>, + + track_handle_lookup: Arc>>, } struct Handle { @@ -145,6 +148,7 @@ impl FfiRoom { dtmf_tx, pending_published_tracks: Default::default(), pending_unpublished_tracks: Default::default(), + track_handle_lookup: Default::default(), }); let (local_info, remote_infos) = @@ -815,10 +819,12 @@ async fn forward_event( } RoomEvent::TrackSubscribed { track, publication: _, participant } => { let handle_id = server.next_id(); + let track_sid = track.sid(); let ffi_track = FfiTrack { handle: handle_id, track: track.into() }; let track_info = proto::TrackInfo::from(&ffi_track); server.store_handle(ffi_track.handle, ffi_track); + inner.track_handle_lookup.lock().insert(track_sid, handle_id); let _ = send_event(proto::room_event::Message::TrackSubscribed(proto::TrackSubscribed { @@ -830,6 +836,12 @@ async fn forward_event( })); } RoomEvent::TrackUnsubscribed { track, publication: _, participant } => { + let track_sid = track.sid(); + if let Some(handle) = inner.track_handle_lookup.lock().remove(&track_sid) { + server.drop_handle(handle); + } else { + log::warn!("track {} was not found in the lookup table", track_sid); + } let _ = send_event(proto::room_event::Message::TrackUnsubscribed( proto::TrackUnsubscribed { participant_identity: participant.identity().to_string(), diff --git a/livekit-ffi/src/server/video_stream.rs b/livekit-ffi/src/server/video_stream.rs index 83fc79b54..4512e4778 100644 --- a/livekit-ffi/src/server/video_stream.rs +++ b/livekit-ffi/src/server/video_stream.rs @@ -24,7 +24,7 @@ pub struct FfiVideoStream { pub stream_type: proto::VideoStreamType, #[allow(dead_code)] - close_tx: oneshot::Sender<()>, // Close the stream on drop + self_dropped_tx: oneshot::Sender<()>, // Close the stream on drop } impl FfiHandle for FfiVideoStream {} @@ -49,20 +49,21 @@ impl FfiVideoStream { return Err(FfiError::InvalidRequest("not a video track".into())); }; - let (close_tx, close_rx) = oneshot::channel(); + let (self_dropped_tx, self_dropped_rx) = oneshot::channel(); let stream_type = new_stream.r#type(); let handle_id = server.next_id(); let stream = match stream_type { #[cfg(not(target_arch = "wasm32"))] proto::VideoStreamType::VideoStreamNative => { - let video_stream = Self { handle_id, close_tx, stream_type }; + let video_stream = Self { handle_id, self_dropped_tx, stream_type }; let handle = server.async_runtime.spawn(Self::native_video_stream_task( server, handle_id, new_stream.format.and_then(|_| Some(new_stream.format())), new_stream.normalize_stride, NativeVideoStream::new(rtc_track), - close_rx, + self_dropped_rx, + server.watch_handle_dropped(new_stream.track_handle), )); server.watch_panic(handle); Ok::(video_stream) @@ -86,11 +87,15 @@ impl FfiVideoStream { dst_type: Option, normalize_stride: bool, mut native_stream: NativeVideoStream, - mut close_rx: oneshot::Receiver<()>, + mut self_dropped_rx: oneshot::Receiver<()>, + mut handle_dropped_rx: oneshot::Receiver<()>, ) { loop { tokio::select! { - _ = &mut close_rx => { + _ = &mut self_dropped_rx => { + break; + } + _ = &mut handle_dropped_rx => { break; } frame = native_stream.next() => { @@ -98,7 +103,6 @@ impl FfiVideoStream { break; }; - let Ok((buffer, info)) = colorcvt::to_video_buffer_info(frame.buffer, dst_type, normalize_stride) else { log::error!("video stream failed to convert video frame to {:?}", dst_type); continue;