Skip to content

Commit ed33f59

Browse files
Hormoldclaude
andauthored
feat(rtc): Add event_types filtering to FfiQueue.subscribe() to reduce memory allocations (#564)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 2744c66 commit ed33f59

File tree

4 files changed

+302
-8
lines changed

4 files changed

+302
-8
lines changed

livekit-rtc/livekit/rtc/_ffi_client.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import platform
2525
import atexit
2626
import threading
27-
from typing import Generic, List, Optional, TypeVar
27+
from typing import Callable, Generic, List, Optional, TypeVar
2828

2929
from ._proto import ffi_pb2 as proto_ffi
3030
from ._utils import Queue, classproperty
@@ -95,29 +95,55 @@ def __repr__(self) -> str:
9595
class FfiQueue(Generic[T]):
9696
def __init__(self) -> None:
9797
self._lock = threading.RLock()
98-
self._subscribers: List[tuple[Queue[T], asyncio.AbstractEventLoop]] = []
98+
# Format: (queue, loop, filter_fn or None)
99+
self._subscribers: List[
100+
tuple[Queue[T], asyncio.AbstractEventLoop, Optional[Callable[[T], bool]]]
101+
] = []
99102

100103
def put(self, item: T) -> None:
101104
with self._lock:
102-
for queue, loop in self._subscribers:
105+
for queue, loop, filter_fn in self._subscribers:
106+
# If filter provided, skip items that don't match
107+
if filter_fn is not None:
108+
try:
109+
if not filter_fn(item):
110+
continue
111+
except Exception:
112+
pass # On filter error, deliver the item
113+
103114
try:
104115
loop.call_soon_threadsafe(queue.put_nowait, item)
105116
except Exception as e:
106117
# this could happen if user closes the runloop without unsubscribing first
107118
# it's not good when it does occur, but we should not fail the entire runloop
108119
logger.error("error putting to queue: %s", e)
109120

110-
def subscribe(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> Queue[T]:
121+
def subscribe(
122+
self,
123+
loop: Optional[asyncio.AbstractEventLoop] = None,
124+
filter_fn: Optional[Callable[[T], bool]] = None,
125+
) -> Queue[T]:
126+
"""Subscribe to FFI events.
127+
128+
Args:
129+
loop: Event loop to use (defaults to current).
130+
filter_fn: Optional filter function. If provided, only items where
131+
filter_fn(item) returns True will be delivered.
132+
If None, receives all events (original behavior).
133+
134+
Returns:
135+
Queue to receive events from.
136+
"""
111137
with self._lock:
112138
queue = Queue[T]()
113139
loop = loop or asyncio.get_event_loop()
114-
self._subscribers.append((queue, loop))
140+
self._subscribers.append((queue, loop, filter_fn))
115141
return queue
116142

117143
def unsubscribe(self, queue: Queue[T]) -> None:
118144
with self._lock:
119145
# looping here is ok, since we don't expect a lot of subscribers
120-
for i, (q, _) in enumerate(self._subscribers):
146+
for i, (q, _, _) in enumerate(self._subscribers):
121147
if q == queue:
122148
self._subscribers.pop(i)
123149
break

livekit-rtc/livekit/rtc/audio_stream.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,12 @@ def __init__(
102102
self._num_channels = num_channels
103103
self._frame_size_ms = frame_size_ms
104104
self._loop = loop or asyncio.get_event_loop()
105-
self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
105+
# Only subscribe to audio_stream_event to avoid unnecessary memory allocations
106+
# from other event types (room_event, track_event, etc.)
107+
self._ffi_queue = FfiClient.instance.queue.subscribe(
108+
self._loop,
109+
filter_fn=lambda e: e.WhichOneof("message") == "audio_stream_event",
110+
)
106111
self._queue: RingQueue[AudioFrameEvent | None] = RingQueue(capacity)
107112

108113
self._audio_filter_module: str | None = None

livekit-rtc/livekit/rtc/video_stream.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,12 @@ def __init__(
4747
**kwargs,
4848
) -> None:
4949
self._loop = loop or asyncio.get_event_loop()
50-
self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
50+
# Only subscribe to video_stream_event to avoid unnecessary memory allocations
51+
# from other event types (room_event, track_event, etc.)
52+
self._ffi_queue = FfiClient.instance.queue.subscribe(
53+
self._loop,
54+
filter_fn=lambda e: e.WhichOneof("message") == "video_stream_event",
55+
)
5156
self._queue: RingQueue[VideoFrameEvent | None] = RingQueue(capacity)
5257
self._track: Track | None = track
5358
self._format = format

tests/rtc/test_ffi_queue.py

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
# Copyright 2023 LiveKit, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Tests for FfiQueue filter_fn functionality.
16+
17+
These tests verify the filter_fn feature of FfiQueue.
18+
FfiQueue can be imported without loading the native FFI library.
19+
"""
20+
21+
import asyncio
22+
from dataclasses import dataclass
23+
24+
import pytest
25+
26+
from livekit.rtc._ffi_client import FfiQueue
27+
28+
29+
@dataclass
30+
class MockFfiEvent:
31+
"""Mock FFI event with WhichOneof support."""
32+
33+
_message_type: str
34+
35+
def WhichOneof(self, field: str) -> str:
36+
return self._message_type
37+
38+
39+
class TestFfiQueueFilterFn:
40+
"""Test suite for FfiQueue filter_fn functionality."""
41+
42+
@pytest.fixture
43+
def event_loop(self):
44+
"""Create event loop for tests."""
45+
loop = asyncio.new_event_loop()
46+
yield loop
47+
loop.close()
48+
49+
def test_subscribe_without_filter_receives_all_events(self, event_loop):
50+
"""Subscriber without filter_fn receives all events."""
51+
queue = FfiQueue()
52+
sub = queue.subscribe(event_loop, filter_fn=None)
53+
54+
events = [
55+
MockFfiEvent("room_event"),
56+
MockFfiEvent("audio_stream_event"),
57+
MockFfiEvent("video_stream_event"),
58+
MockFfiEvent("track_event"),
59+
]
60+
61+
for event in events:
62+
queue.put(event)
63+
64+
event_loop.run_until_complete(asyncio.sleep(0.01))
65+
66+
received = []
67+
while not sub.empty():
68+
received.append(sub.get_nowait())
69+
70+
assert len(received) == 4
71+
72+
def test_subscribe_with_filter_receives_only_matching_events(self, event_loop):
73+
"""Subscriber with filter_fn only receives matching events."""
74+
queue = FfiQueue()
75+
sub = queue.subscribe(
76+
event_loop,
77+
filter_fn=lambda e: e.WhichOneof("message") == "audio_stream_event",
78+
)
79+
80+
events = [
81+
MockFfiEvent("room_event"),
82+
MockFfiEvent("audio_stream_event"),
83+
MockFfiEvent("video_stream_event"),
84+
MockFfiEvent("audio_stream_event"),
85+
MockFfiEvent("track_event"),
86+
]
87+
88+
for event in events:
89+
queue.put(event)
90+
91+
event_loop.run_until_complete(asyncio.sleep(0.01))
92+
93+
received = []
94+
while not sub.empty():
95+
received.append(sub.get_nowait())
96+
97+
assert len(received) == 2
98+
assert all(e._message_type == "audio_stream_event" for e in received)
99+
100+
def test_multiple_subscribers_different_filters(self, event_loop):
101+
"""Multiple subscribers can have different filters."""
102+
queue = FfiQueue()
103+
104+
sub_audio = queue.subscribe(
105+
event_loop,
106+
filter_fn=lambda e: e.WhichOneof("message") == "audio_stream_event",
107+
)
108+
sub_video = queue.subscribe(
109+
event_loop,
110+
filter_fn=lambda e: e.WhichOneof("message") == "video_stream_event",
111+
)
112+
sub_all = queue.subscribe(event_loop, filter_fn=None)
113+
114+
events = [
115+
MockFfiEvent("room_event"),
116+
MockFfiEvent("audio_stream_event"),
117+
MockFfiEvent("video_stream_event"),
118+
MockFfiEvent("audio_stream_event"),
119+
]
120+
121+
for event in events:
122+
queue.put(event)
123+
124+
event_loop.run_until_complete(asyncio.sleep(0.01))
125+
126+
audio_count = 0
127+
while not sub_audio.empty():
128+
sub_audio.get_nowait()
129+
audio_count += 1
130+
131+
video_count = 0
132+
while not sub_video.empty():
133+
sub_video.get_nowait()
134+
video_count += 1
135+
136+
all_count = 0
137+
while not sub_all.empty():
138+
sub_all.get_nowait()
139+
all_count += 1
140+
141+
assert audio_count == 2
142+
assert video_count == 1
143+
assert all_count == 4
144+
145+
def test_filter_with_multiple_event_types(self, event_loop):
146+
"""Filter can match multiple event types."""
147+
queue = FfiQueue()
148+
sub = queue.subscribe(
149+
event_loop,
150+
filter_fn=lambda e: e.WhichOneof("message")
151+
in {"audio_stream_event", "video_stream_event"},
152+
)
153+
154+
events = [
155+
MockFfiEvent("room_event"),
156+
MockFfiEvent("audio_stream_event"),
157+
MockFfiEvent("video_stream_event"),
158+
MockFfiEvent("track_event"),
159+
]
160+
161+
for event in events:
162+
queue.put(event)
163+
164+
event_loop.run_until_complete(asyncio.sleep(0.01))
165+
166+
received = []
167+
while not sub.empty():
168+
received.append(sub.get_nowait())
169+
170+
assert len(received) == 2
171+
types = {e._message_type for e in received}
172+
assert types == {"audio_stream_event", "video_stream_event"}
173+
174+
def test_unsubscribe_works_with_filtered_subscriber(self, event_loop):
175+
"""Unsubscribe correctly removes filtered subscriber."""
176+
queue = FfiQueue()
177+
sub = queue.subscribe(
178+
event_loop,
179+
filter_fn=lambda e: e.WhichOneof("message") == "audio_stream_event",
180+
)
181+
182+
queue.put(MockFfiEvent("audio_stream_event"))
183+
event_loop.run_until_complete(asyncio.sleep(0.01))
184+
185+
assert not sub.empty()
186+
187+
queue.unsubscribe(sub)
188+
189+
while not sub.empty():
190+
sub.get_nowait()
191+
192+
queue.put(MockFfiEvent("audio_stream_event"))
193+
event_loop.run_until_complete(asyncio.sleep(0.01))
194+
195+
assert sub.empty()
196+
197+
def test_filter_error_delivers_item(self, event_loop):
198+
"""If filter_fn raises, item is still delivered."""
199+
queue = FfiQueue()
200+
201+
def bad_filter(e):
202+
raise ValueError("oops")
203+
204+
sub = queue.subscribe(event_loop, filter_fn=bad_filter)
205+
206+
queue.put(MockFfiEvent("audio_stream_event"))
207+
event_loop.run_until_complete(asyncio.sleep(0.01))
208+
209+
# Item should be delivered despite filter error
210+
received = []
211+
while not sub.empty():
212+
received.append(sub.get_nowait())
213+
214+
assert len(received) == 1
215+
216+
217+
class TestFfiQueueMemoryReduction:
218+
"""Test that filtering actually reduces object creation."""
219+
220+
@pytest.fixture
221+
def event_loop(self):
222+
loop = asyncio.new_event_loop()
223+
yield loop
224+
loop.close()
225+
226+
def test_filtering_reduces_callback_calls(self, event_loop):
227+
"""Verify filtering prevents call_soon_threadsafe for non-matching events."""
228+
queue = FfiQueue()
229+
230+
# Create 10 subscribers, each only wants audio events
231+
subscribers = []
232+
for _ in range(10):
233+
sub = queue.subscribe(
234+
event_loop,
235+
filter_fn=lambda e: e.WhichOneof("message") == "audio_stream_event",
236+
)
237+
subscribers.append(sub)
238+
239+
# Generate 1000 events, only 5% are audio
240+
events = []
241+
for i in range(1000):
242+
if i < 50:
243+
events.append(MockFfiEvent("audio_stream_event"))
244+
else:
245+
events.append(MockFfiEvent("room_event"))
246+
247+
for event in events:
248+
queue.put(event)
249+
250+
event_loop.run_until_complete(asyncio.sleep(0.1))
251+
252+
# Each subscriber should have received only 50 events (not 1000)
253+
for sub in subscribers:
254+
count = 0
255+
while not sub.empty():
256+
sub.get_nowait()
257+
count += 1
258+
assert count == 50

0 commit comments

Comments
 (0)