From d334100910191f27283cc92e46505c47037881ec Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 14 Feb 2025 13:47:38 +0100 Subject: [PATCH 1/5] Add auto chunking for text stream writer --- livekit-rtc/livekit/rtc/data_stream.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/livekit-rtc/livekit/rtc/data_stream.py b/livekit-rtc/livekit/rtc/data_stream.py index 778c96e7..35c76cab 100644 --- a/livekit-rtc/livekit/rtc/data_stream.py +++ b/livekit-rtc/livekit/rtc/data_stream.py @@ -24,7 +24,7 @@ from ._proto import ffi_pb2 as proto_ffi from ._proto import room_pb2 as proto_room from ._ffi_client import FfiClient - +from ._utils import split_utf8 from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -287,19 +287,17 @@ def __init__( attachments=list(self._header.text_header.attached_stream_ids), ) - async def write(self, text: str, chunk_index: int | None = None): - content = text.encode() - if len(content) > STREAM_CHUNK_SIZE: - raise ValueError("maximum chunk size exceeded") - if chunk_index is None: + async def write(self, text: str): + for chunk in split_utf8(text, STREAM_CHUNK_SIZE): + content = chunk.encode() chunk_index = self._next_chunk_index self._next_chunk_index += 1 - chunk_msg = proto_DataStream.Chunk( - stream_id=self._header.stream_id, - chunk_index=chunk_index, - content=content, - ) - await self._send_chunk(chunk_msg) + chunk_msg = proto_DataStream.Chunk( + stream_id=self._header.stream_id, + chunk_index=chunk_index, + content=content, + ) + await self._send_chunk(chunk_msg) @property def info(self) -> TextStreamInfo: From 18ed02a02c62f7e7b9c30d491270a1aae1d28aad Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 14 Feb 2025 13:50:24 +0100 Subject: [PATCH 2/5] Simplify send_text implementation --- livekit-rtc/livekit/rtc/participant.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/livekit-rtc/livekit/rtc/participant.py b/livekit-rtc/livekit/rtc/participant.py index b984a8be..a520dd4d 100644 --- a/livekit-rtc/livekit/rtc/participant.py +++ b/livekit-rtc/livekit/rtc/participant.py @@ -34,7 +34,7 @@ from ._proto.track_pb2 import ( ParticipantTrackPermission, ) -from ._utils import BroadcastQueue, split_utf8 +from ._utils import BroadcastQueue from .track import LocalTrack from .track_publication import ( LocalTrackPublication, @@ -623,8 +623,7 @@ async def send_text( total_size=total_size, ) - for chunk in split_utf8(text, STREAM_CHUNK_SIZE): - await writer.write(chunk) + await writer.write(text) await writer.aclose() return writer.info From 1962bd505df794dde234c4bce914c6d61ac457a6 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 14 Feb 2025 13:58:25 +0100 Subject: [PATCH 3/5] add write lock --- livekit-rtc/livekit/rtc/data_stream.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/livekit-rtc/livekit/rtc/data_stream.py b/livekit-rtc/livekit/rtc/data_stream.py index 35c76cab..bfd57108 100644 --- a/livekit-rtc/livekit/rtc/data_stream.py +++ b/livekit-rtc/livekit/rtc/data_stream.py @@ -286,18 +286,20 @@ def __init__( attributes=dict(self._header.attributes), attachments=list(self._header.text_header.attached_stream_ids), ) + self._write_lock = asyncio.Lock() async def write(self, text: str): - for chunk in split_utf8(text, STREAM_CHUNK_SIZE): - content = chunk.encode() - chunk_index = self._next_chunk_index - self._next_chunk_index += 1 - chunk_msg = proto_DataStream.Chunk( - stream_id=self._header.stream_id, - chunk_index=chunk_index, - content=content, - ) - await self._send_chunk(chunk_msg) + async with self._write_lock: + for chunk in split_utf8(text, STREAM_CHUNK_SIZE): + content = chunk.encode() + chunk_index = self._next_chunk_index + self._next_chunk_index += 1 + chunk_msg = proto_DataStream.Chunk( + stream_id=self._header.stream_id, + chunk_index=chunk_index, + content=content, + ) + await self._send_chunk(chunk_msg) @property def info(self) -> TextStreamInfo: From 453bd5841637cd4d6f41f01f2f4ef2b1a753e5d5 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 14 Feb 2025 14:06:33 +0100 Subject: [PATCH 4/5] use simple async text iterator for text streams --- livekit-rtc/livekit/rtc/data_stream.py | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/livekit-rtc/livekit/rtc/data_stream.py b/livekit-rtc/livekit/rtc/data_stream.py index bfd57108..770a19e1 100644 --- a/livekit-rtc/livekit/rtc/data_stream.py +++ b/livekit-rtc/livekit/rtc/data_stream.py @@ -47,14 +47,6 @@ class BaseStreamInfo: @dataclass class TextStreamInfo(BaseStreamInfo): attachments: List[str] - pass - - -@dataclass -class TextStreamUpdate: - current: str - index: int - collected: str class TextStreamReader: @@ -81,22 +73,15 @@ async def _on_chunk_update(self, chunk: proto_DataStream.Chunk): async def _on_stream_close(self, trailer: proto_DataStream.Trailer): await self._queue.put(None) - def __aiter__(self) -> AsyncIterator[TextStreamUpdate]: + def __aiter__(self) -> AsyncIterator[str]: return self - async def __anext__(self) -> TextStreamUpdate: + async def __anext__(self) -> str: item = await self._queue.get() if item is None: raise StopAsyncIteration decodedStr = item.content.decode() - - self._chunks[item.chunk_index] = item - chunk_list = list(self._chunks.values()) - chunk_list.sort(key=lambda chunk: chunk.chunk_index) - collected: str = "".join(map(lambda chunk: chunk.content.decode(), chunk_list)) - return TextStreamUpdate( - current=decodedStr, index=item.chunk_index, collected=collected - ) + return decodedStr @property def info(self) -> TextStreamInfo: @@ -104,8 +89,8 @@ def info(self) -> TextStreamInfo: async def read_all(self) -> str: final_string = "" - async for update in self: - final_string = update.collected + async for chunk in self: + final_string += chunk return final_string From 417fb02094704cdeee651ce420c45eacf5dc6b69 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 14 Feb 2025 15:22:09 +0100 Subject: [PATCH 5/5] remove textstreamupdate --- livekit-rtc/livekit/rtc/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/livekit-rtc/livekit/rtc/__init__.py b/livekit-rtc/livekit/rtc/__init__.py index 89954b25..6ee1f6c6 100644 --- a/livekit-rtc/livekit/rtc/__init__.py +++ b/livekit-rtc/livekit/rtc/__init__.py @@ -81,7 +81,6 @@ from .synchronizer import AVSynchronizer from .data_stream import ( TextStreamInfo, - TextStreamUpdate, ByteStreamInfo, TextStreamReader, TextStreamWriter, @@ -155,7 +154,6 @@ "EventEmitter", "combine_audio_frames", "AVSynchronizer", - "TextStreamUpdate", "TextStreamInfo", "ByteStreamInfo", "TextStreamReader",