|
14 | 14 |
|
15 | 15 | """Tests for FfiQueue filter_fn functionality. |
16 | 16 |
|
17 | | -These tests verify the filter_fn feature of FfiQueue without |
18 | | -requiring the native FFI library. |
| 17 | +These tests verify the filter_fn feature of FfiQueue. |
| 18 | +FfiQueue can be imported without loading the native FFI library. |
19 | 19 | """ |
20 | 20 |
|
21 | 21 | import asyncio |
22 | | -import threading |
23 | 22 | from dataclasses import dataclass |
24 | | -from typing import Callable, Generic, List, Optional, TypeVar |
25 | | -from unittest.mock import MagicMock |
26 | 23 |
|
27 | 24 | import pytest |
28 | 25 |
|
29 | | -# Re-implement FfiQueue locally for testing (avoids FFI library dependency) |
30 | | -T = TypeVar("T") |
31 | | - |
32 | | - |
33 | | -class Queue(Generic[T]): |
34 | | - """Simple asyncio-compatible queue for testing.""" |
35 | | - |
36 | | - def __init__(self) -> None: |
37 | | - self._items: List[T] = [] |
38 | | - |
39 | | - def put_nowait(self, item: T) -> None: |
40 | | - self._items.append(item) |
41 | | - |
42 | | - def get_nowait(self) -> T: |
43 | | - return self._items.pop(0) |
44 | | - |
45 | | - def empty(self) -> bool: |
46 | | - return len(self._items) == 0 |
47 | | - |
48 | | - |
49 | | -class FfiQueue(Generic[T]): |
50 | | - """Copy of FfiQueue with filter_fn for testing.""" |
51 | | - |
52 | | - def __init__(self) -> None: |
53 | | - self._lock = threading.RLock() |
54 | | - self._subscribers: List[ |
55 | | - tuple[Queue[T], asyncio.AbstractEventLoop, Optional[Callable[[T], bool]]] |
56 | | - ] = [] |
57 | | - |
58 | | - def put(self, item: T) -> None: |
59 | | - with self._lock: |
60 | | - for queue, loop, filter_fn in self._subscribers: |
61 | | - if filter_fn is not None: |
62 | | - try: |
63 | | - if not filter_fn(item): |
64 | | - continue |
65 | | - except Exception: |
66 | | - pass # On filter error, deliver the item |
67 | | - |
68 | | - try: |
69 | | - loop.call_soon_threadsafe(queue.put_nowait, item) |
70 | | - except Exception: |
71 | | - pass |
72 | | - |
73 | | - def subscribe( |
74 | | - self, |
75 | | - loop: Optional[asyncio.AbstractEventLoop] = None, |
76 | | - filter_fn: Optional[Callable[[T], bool]] = None, |
77 | | - ) -> Queue[T]: |
78 | | - with self._lock: |
79 | | - queue = Queue[T]() |
80 | | - loop = loop or asyncio.get_event_loop() |
81 | | - self._subscribers.append((queue, loop, filter_fn)) |
82 | | - return queue |
83 | | - |
84 | | - def unsubscribe(self, queue: Queue[T]) -> None: |
85 | | - with self._lock: |
86 | | - for i, (q, _, _) in enumerate(self._subscribers): |
87 | | - if q == queue: |
88 | | - self._subscribers.pop(i) |
89 | | - break |
| 26 | +from livekit.rtc._ffi_client import FfiQueue |
90 | 27 |
|
91 | 28 |
|
92 | 29 | @dataclass |
|
0 commit comments