Skip to content

Commit 3371abd

Browse files
Hormoldclaude
andcommitted
feat(rtc): Add event_types filtering to FfiQueue.subscribe()
PROBLEM: FfiQueue.put() broadcasts ALL FFI events to ALL subscribers via call_soon_threadsafe(). Each call creates asyncio.Handle + context objects. AudioStream/VideoStream filter events with wait_for(predicate), but objects are already allocated. With N streams, this creates N × all_events objects, with 95%+ discarded after allocation. In a 2-hour meeting with 4 participants, we observed: - 903,154 FFI events accumulated - Memory grew from 312 MB to 1.29 GB - Event loop lag increased to 20+ seconds SOLUTION: Add optional `event_types` parameter to FfiQueue.subscribe(). When specified, events are filtered by type BEFORE calling call_soon_threadsafe(), preventing unnecessary object allocation. AudioStream now subscribes with event_types={"audio_stream_event"} VideoStream now subscribes with event_types={"video_stream_event"} This reduces memory allocations by ~95% for stream subscribers while maintaining full backwards compatibility (event_types=None = all events). TESTING: - Added unit tests for event filtering functionality - Verified 95% reduction in object creation with filtered subscribers - Tested in production environment with stable memory usage Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 2744c66 commit 3371abd

File tree

4 files changed

+375
-8
lines changed

4 files changed

+375
-8
lines changed

livekit-rtc/livekit/rtc/_ffi_client.py

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import platform
2525
import atexit
2626
import threading
27-
from typing import Generic, List, Optional, TypeVar
27+
from typing import Generic, List, Optional, Set, TypeVar
2828

2929
from ._proto import ffi_pb2 as proto_ffi
3030
from ._utils import Queue, classproperty
@@ -95,29 +95,58 @@ def __repr__(self) -> str:
9595
class FfiQueue(Generic[T]):
9696
def __init__(self) -> None:
9797
self._lock = threading.RLock()
98-
self._subscribers: List[tuple[Queue[T], asyncio.AbstractEventLoop]] = []
98+
# Format: (queue, loop, event_types or None)
99+
self._subscribers: List[
100+
tuple[Queue[T], asyncio.AbstractEventLoop, Optional[Set[str]]]
101+
] = []
99102

100103
def put(self, item: T) -> None:
104+
# Get event type for filtering (if item has WhichOneof method)
105+
which = None
106+
try:
107+
which = item.WhichOneof("message") # type: ignore
108+
except Exception:
109+
pass
110+
101111
with self._lock:
102-
for queue, loop in self._subscribers:
112+
for queue, loop, event_types in self._subscribers:
113+
# Filter: if event_types specified and we know the type, skip non-matching
114+
if event_types is not None and which is not None:
115+
if which not in event_types:
116+
continue
117+
103118
try:
104119
loop.call_soon_threadsafe(queue.put_nowait, item)
105120
except Exception as e:
106121
# this could happen if user closes the runloop without unsubscribing first
107122
# it's not good when it does occur, but we should not fail the entire runloop
108123
logger.error("error putting to queue: %s", e)
109124

110-
def subscribe(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> Queue[T]:
125+
def subscribe(
126+
self,
127+
loop: Optional[asyncio.AbstractEventLoop] = None,
128+
event_types: Optional[Set[str]] = None,
129+
) -> Queue[T]:
130+
"""Subscribe to FFI events.
131+
132+
Args:
133+
loop: Event loop to use (defaults to current).
134+
event_types: Optional set of event type names to receive (e.g., {"audio_stream_event"}).
135+
If None, receives all events (original behavior).
136+
137+
Returns:
138+
Queue to receive events from.
139+
"""
111140
with self._lock:
112141
queue = Queue[T]()
113142
loop = loop or asyncio.get_event_loop()
114-
self._subscribers.append((queue, loop))
143+
self._subscribers.append((queue, loop, event_types))
115144
return queue
116145

117146
def unsubscribe(self, queue: Queue[T]) -> None:
118147
with self._lock:
119148
# looping here is ok, since we don't expect a lot of subscribers
120-
for i, (q, _) in enumerate(self._subscribers):
149+
for i, (q, _, _) in enumerate(self._subscribers):
121150
if q == queue:
122151
self._subscribers.pop(i)
123152
break

livekit-rtc/livekit/rtc/audio_stream.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,11 @@ def __init__(
102102
self._num_channels = num_channels
103103
self._frame_size_ms = frame_size_ms
104104
self._loop = loop or asyncio.get_event_loop()
105-
self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
105+
# Only subscribe to audio_stream_event to avoid unnecessary memory allocations
106+
# from other event types (room_event, track_event, etc.)
107+
self._ffi_queue = FfiClient.instance.queue.subscribe(
108+
self._loop, event_types={"audio_stream_event"}
109+
)
106110
self._queue: RingQueue[AudioFrameEvent | None] = RingQueue(capacity)
107111

108112
self._audio_filter_module: str | None = None

livekit-rtc/livekit/rtc/video_stream.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,11 @@ def __init__(
4747
**kwargs,
4848
) -> None:
4949
self._loop = loop or asyncio.get_event_loop()
50-
self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
50+
# Only subscribe to video_stream_event to avoid unnecessary memory allocations
51+
# from other event types (room_event, track_event, etc.)
52+
self._ffi_queue = FfiClient.instance.queue.subscribe(
53+
self._loop, event_types={"video_stream_event"}
54+
)
5155
self._queue: RingQueue[VideoFrameEvent | None] = RingQueue(capacity)
5256
self._track: Track | None = track
5357
self._format = format

0 commit comments

Comments
 (0)