Skip to content

Commit 3364608

Browse files
Hormoldclaude
andcommitted
fix: Use filter_fn instead of event_types to preserve generic abstraction
Addresses review feedback: FfiQueue is Generic[T], so we can't assume item has WhichOneof method. Instead, use a filter_fn callback that the caller provides - this keeps FfiQueue generic while allowing filtering. - FfiQueue.subscribe() now takes optional filter_fn: Callable[[T], bool] - AudioStream/VideoStream provide the filter that knows the concrete type - Tests updated to use filter_fn approach Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 3371abd commit 3364608

File tree

4 files changed

+76
-86
lines changed

4 files changed

+76
-86
lines changed

livekit-rtc/livekit/rtc/_ffi_client.py

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import platform
2525
import atexit
2626
import threading
27-
from typing import Generic, List, Optional, Set, TypeVar
27+
from typing import Callable, Generic, List, Optional, TypeVar
2828

2929
from ._proto import ffi_pb2 as proto_ffi
3030
from ._utils import Queue, classproperty
@@ -95,25 +95,21 @@ def __repr__(self) -> str:
9595
class FfiQueue(Generic[T]):
9696
def __init__(self) -> None:
9797
self._lock = threading.RLock()
98-
# Format: (queue, loop, event_types or None)
98+
# Format: (queue, loop, filter_fn or None)
9999
self._subscribers: List[
100-
tuple[Queue[T], asyncio.AbstractEventLoop, Optional[Set[str]]]
100+
tuple[Queue[T], asyncio.AbstractEventLoop, Optional[Callable[[T], bool]]]
101101
] = []
102102

103103
def put(self, item: T) -> None:
104-
# Get event type for filtering (if item has WhichOneof method)
105-
which = None
106-
try:
107-
which = item.WhichOneof("message") # type: ignore
108-
except Exception:
109-
pass
110-
111104
with self._lock:
112-
for queue, loop, event_types in self._subscribers:
113-
# Filter: if event_types specified and we know the type, skip non-matching
114-
if event_types is not None and which is not None:
115-
if which not in event_types:
116-
continue
105+
for queue, loop, filter_fn in self._subscribers:
106+
# If filter provided, skip items that don't match
107+
if filter_fn is not None:
108+
try:
109+
if not filter_fn(item):
110+
continue
111+
except Exception:
112+
pass # On filter error, deliver the item
117113

118114
try:
119115
loop.call_soon_threadsafe(queue.put_nowait, item)
@@ -125,22 +121,23 @@ def put(self, item: T) -> None:
125121
def subscribe(
126122
self,
127123
loop: Optional[asyncio.AbstractEventLoop] = None,
128-
event_types: Optional[Set[str]] = None,
124+
filter_fn: Optional[Callable[[T], bool]] = None,
129125
) -> Queue[T]:
130126
"""Subscribe to FFI events.
131127
132128
Args:
133129
loop: Event loop to use (defaults to current).
134-
event_types: Optional set of event type names to receive (e.g., {"audio_stream_event"}).
135-
If None, receives all events (original behavior).
130+
filter_fn: Optional filter function. If provided, only items where
131+
filter_fn(item) returns True will be delivered.
132+
If None, receives all events (original behavior).
136133
137134
Returns:
138135
Queue to receive events from.
139136
"""
140137
with self._lock:
141138
queue = Queue[T]()
142139
loop = loop or asyncio.get_event_loop()
143-
self._subscribers.append((queue, loop, event_types))
140+
self._subscribers.append((queue, loop, filter_fn))
144141
return queue
145142

146143
def unsubscribe(self, queue: Queue[T]) -> None:

livekit-rtc/livekit/rtc/audio_stream.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ def __init__(
105105
# Only subscribe to audio_stream_event to avoid unnecessary memory allocations
106106
# from other event types (room_event, track_event, etc.)
107107
self._ffi_queue = FfiClient.instance.queue.subscribe(
108-
self._loop, event_types={"audio_stream_event"}
108+
self._loop,
109+
filter_fn=lambda e: e.WhichOneof("message") == "audio_stream_event",
109110
)
110111
self._queue: RingQueue[AudioFrameEvent | None] = RingQueue(capacity)
111112

livekit-rtc/livekit/rtc/video_stream.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ def __init__(
5050
# Only subscribe to video_stream_event to avoid unnecessary memory allocations
5151
# from other event types (room_event, track_event, etc.)
5252
self._ffi_queue = FfiClient.instance.queue.subscribe(
53-
self._loop, event_types={"video_stream_event"}
53+
self._loop,
54+
filter_fn=lambda e: e.WhichOneof("message") == "video_stream_event",
5455
)
5556
self._queue: RingQueue[VideoFrameEvent | None] = RingQueue(capacity)
5657
self._track: Track | None = track

tests/rtc/test_ffi_queue.py

Lines changed: 56 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,16 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
"""Tests for FfiQueue event filtering functionality.
15+
"""Tests for FfiQueue filter_fn functionality.
1616
17-
These tests verify the event_types filtering feature of FfiQueue without
17+
These tests verify the filter_fn feature of FfiQueue without
1818
requiring the native FFI library.
1919
"""
2020

2121
import asyncio
2222
import threading
2323
from dataclasses import dataclass
24-
from typing import Generic, List, Optional, Set, TypeVar
24+
from typing import Callable, Generic, List, Optional, TypeVar
2525
from unittest.mock import MagicMock
2626

2727
import pytest
@@ -47,26 +47,23 @@ def empty(self) -> bool:
4747

4848

4949
class FfiQueue(Generic[T]):
50-
"""Copy of FfiQueue with event_types filtering for testing."""
50+
"""Copy of FfiQueue with filter_fn for testing."""
5151

5252
def __init__(self) -> None:
5353
self._lock = threading.RLock()
5454
self._subscribers: List[
55-
tuple[Queue[T], asyncio.AbstractEventLoop, Optional[Set[str]]]
55+
tuple[Queue[T], asyncio.AbstractEventLoop, Optional[Callable[[T], bool]]]
5656
] = []
5757

5858
def put(self, item: T) -> None:
59-
which = None
60-
try:
61-
which = item.WhichOneof("message") # type: ignore
62-
except Exception:
63-
pass
64-
6559
with self._lock:
66-
for queue, loop, event_types in self._subscribers:
67-
if event_types is not None and which is not None:
68-
if which not in event_types:
69-
continue
60+
for queue, loop, filter_fn in self._subscribers:
61+
if filter_fn is not None:
62+
try:
63+
if not filter_fn(item):
64+
continue
65+
except Exception:
66+
pass # On filter error, deliver the item
7067

7168
try:
7269
loop.call_soon_threadsafe(queue.put_nowait, item)
@@ -76,12 +73,12 @@ def put(self, item: T) -> None:
7673
def subscribe(
7774
self,
7875
loop: Optional[asyncio.AbstractEventLoop] = None,
79-
event_types: Optional[Set[str]] = None,
76+
filter_fn: Optional[Callable[[T], bool]] = None,
8077
) -> Queue[T]:
8178
with self._lock:
8279
queue = Queue[T]()
8380
loop = loop or asyncio.get_event_loop()
84-
self._subscribers.append((queue, loop, event_types))
81+
self._subscribers.append((queue, loop, filter_fn))
8582
return queue
8683

8784
def unsubscribe(self, queue: Queue[T]) -> None:
@@ -102,8 +99,8 @@ def WhichOneof(self, field: str) -> str:
10299
return self._message_type
103100

104101

105-
class TestFfiQueueEventFiltering:
106-
"""Test suite for FfiQueue event_types filtering."""
102+
class TestFfiQueueFilterFn:
103+
"""Test suite for FfiQueue filter_fn functionality."""
107104

108105
@pytest.fixture
109106
def event_loop(self):
@@ -113,11 +110,10 @@ def event_loop(self):
113110
loop.close()
114111

115112
def test_subscribe_without_filter_receives_all_events(self, event_loop):
116-
"""Subscriber without event_types filter receives all events."""
113+
"""Subscriber without filter_fn receives all events."""
117114
queue = FfiQueue()
118-
sub = queue.subscribe(event_loop, event_types=None)
115+
sub = queue.subscribe(event_loop, filter_fn=None)
119116

120-
# Send various event types
121117
events = [
122118
MockFfiEvent("room_event"),
123119
MockFfiEvent("audio_stream_event"),
@@ -128,22 +124,22 @@ def test_subscribe_without_filter_receives_all_events(self, event_loop):
128124
for event in events:
129125
queue.put(event)
130126

131-
# Run event loop to process callbacks
132127
event_loop.run_until_complete(asyncio.sleep(0.01))
133128

134-
# Should receive all 4 events
135129
received = []
136130
while not sub.empty():
137131
received.append(sub.get_nowait())
138132

139133
assert len(received) == 4
140134

141135
def test_subscribe_with_filter_receives_only_matching_events(self, event_loop):
142-
"""Subscriber with event_types filter only receives matching events."""
136+
"""Subscriber with filter_fn only receives matching events."""
143137
queue = FfiQueue()
144-
sub = queue.subscribe(event_loop, event_types={"audio_stream_event"})
138+
sub = queue.subscribe(
139+
event_loop,
140+
filter_fn=lambda e: e.WhichOneof("message") == "audio_stream_event",
141+
)
145142

146-
# Send various event types
147143
events = [
148144
MockFfiEvent("room_event"),
149145
MockFfiEvent("audio_stream_event"),
@@ -155,10 +151,8 @@ def test_subscribe_with_filter_receives_only_matching_events(self, event_loop):
155151
for event in events:
156152
queue.put(event)
157153

158-
# Run event loop to process callbacks
159154
event_loop.run_until_complete(asyncio.sleep(0.01))
160155

161-
# Should receive only 2 audio_stream_events
162156
received = []
163157
while not sub.empty():
164158
received.append(sub.get_nowait())
@@ -170,16 +164,16 @@ def test_multiple_subscribers_different_filters(self, event_loop):
170164
"""Multiple subscribers can have different filters."""
171165
queue = FfiQueue()
172166

173-
# Subscriber 1: only audio events
174-
sub_audio = queue.subscribe(event_loop, event_types={"audio_stream_event"})
175-
176-
# Subscriber 2: only video events
177-
sub_video = queue.subscribe(event_loop, event_types={"video_stream_event"})
178-
179-
# Subscriber 3: all events
180-
sub_all = queue.subscribe(event_loop, event_types=None)
167+
sub_audio = queue.subscribe(
168+
event_loop,
169+
filter_fn=lambda e: e.WhichOneof("message") == "audio_stream_event",
170+
)
171+
sub_video = queue.subscribe(
172+
event_loop,
173+
filter_fn=lambda e: e.WhichOneof("message") == "video_stream_event",
174+
)
175+
sub_all = queue.subscribe(event_loop, filter_fn=None)
181176

182-
# Send mixed events
183177
events = [
184178
MockFfiEvent("room_event"),
185179
MockFfiEvent("audio_stream_event"),
@@ -192,7 +186,6 @@ def test_multiple_subscribers_different_filters(self, event_loop):
192186

193187
event_loop.run_until_complete(asyncio.sleep(0.01))
194188

195-
# Count received events
196189
audio_count = 0
197190
while not sub_audio.empty():
198191
sub_audio.get_nowait()
@@ -208,15 +201,17 @@ def test_multiple_subscribers_different_filters(self, event_loop):
208201
sub_all.get_nowait()
209202
all_count += 1
210203

211-
assert audio_count == 2 # 2 audio events
212-
assert video_count == 1 # 1 video event
213-
assert all_count == 4 # all 4 events
204+
assert audio_count == 2
205+
assert video_count == 1
206+
assert all_count == 4
214207

215208
def test_filter_with_multiple_event_types(self, event_loop):
216-
"""Filter can accept multiple event types."""
209+
"""Filter can match multiple event types."""
217210
queue = FfiQueue()
218211
sub = queue.subscribe(
219-
event_loop, event_types={"audio_stream_event", "video_stream_event"}
212+
event_loop,
213+
filter_fn=lambda e: e.WhichOneof("message")
214+
in {"audio_stream_event", "video_stream_event"},
220215
)
221216

222217
events = [
@@ -235,48 +230,46 @@ def test_filter_with_multiple_event_types(self, event_loop):
235230
while not sub.empty():
236231
received.append(sub.get_nowait())
237232

238-
# Should receive audio and video events only
239233
assert len(received) == 2
240234
types = {e._message_type for e in received}
241235
assert types == {"audio_stream_event", "video_stream_event"}
242236

243237
def test_unsubscribe_works_with_filtered_subscriber(self, event_loop):
244238
"""Unsubscribe correctly removes filtered subscriber."""
245239
queue = FfiQueue()
246-
sub = queue.subscribe(event_loop, event_types={"audio_stream_event"})
240+
sub = queue.subscribe(
241+
event_loop,
242+
filter_fn=lambda e: e.WhichOneof("message") == "audio_stream_event",
243+
)
247244

248245
queue.put(MockFfiEvent("audio_stream_event"))
249246
event_loop.run_until_complete(asyncio.sleep(0.01))
250247

251-
# Should have received 1 event
252248
assert not sub.empty()
253249

254-
# Unsubscribe
255250
queue.unsubscribe(sub)
256251

257-
# Clear the queue
258252
while not sub.empty():
259253
sub.get_nowait()
260254

261-
# Send more events
262255
queue.put(MockFfiEvent("audio_stream_event"))
263256
event_loop.run_until_complete(asyncio.sleep(0.01))
264257

265-
# Should not receive after unsubscribe
266258
assert sub.empty()
267259

268-
def test_event_without_which_oneof_passes_through(self, event_loop):
269-
"""Events without WhichOneof method pass through to all subscribers."""
260+
def test_filter_error_delivers_item(self, event_loop):
261+
"""If filter_fn raises, item is still delivered."""
270262
queue = FfiQueue()
271-
sub = queue.subscribe(event_loop, event_types={"audio_stream_event"})
272263

273-
# Event without WhichOneof
274-
plain_event = MagicMock(spec=[]) # No WhichOneof method
264+
def bad_filter(e):
265+
raise ValueError("oops")
275266

276-
queue.put(plain_event)
267+
sub = queue.subscribe(event_loop, filter_fn=bad_filter)
268+
269+
queue.put(MockFfiEvent("audio_stream_event"))
277270
event_loop.run_until_complete(asyncio.sleep(0.01))
278271

279-
# Should still receive it (can't filter without type info)
272+
# Item should be delivered despite filter error
280273
received = []
281274
while not sub.empty():
282275
received.append(sub.get_nowait())
@@ -300,18 +293,20 @@ def test_filtering_reduces_callback_calls(self, event_loop):
300293
# Create 10 subscribers, each only wants audio events
301294
subscribers = []
302295
for _ in range(10):
303-
sub = queue.subscribe(event_loop, event_types={"audio_stream_event"})
296+
sub = queue.subscribe(
297+
event_loop,
298+
filter_fn=lambda e: e.WhichOneof("message") == "audio_stream_event",
299+
)
304300
subscribers.append(sub)
305301

306302
# Generate 1000 events, only 5% are audio
307303
events = []
308304
for i in range(1000):
309-
if i < 50: # 5% audio events
305+
if i < 50:
310306
events.append(MockFfiEvent("audio_stream_event"))
311307
else:
312308
events.append(MockFfiEvent("room_event"))
313309

314-
# Process all events
315310
for event in events:
316311
queue.put(event)
317312

@@ -324,7 +319,3 @@ def test_filtering_reduces_callback_calls(self, event_loop):
324319
sub.get_nowait()
325320
count += 1
326321
assert count == 50
327-
328-
# Total callbacks made: 10 subscribers × 50 audio events = 500
329-
# Without filtering: 10 subscribers × 1000 events = 10,000
330-
# This is a 95% reduction in callback/object creation

0 commit comments

Comments
 (0)