diff --git a/livekit-rtc/livekit/rtc/_utils.py b/livekit-rtc/livekit/rtc/_utils.py index c7fbc092..fd7e452f 100644 --- a/livekit-rtc/livekit/rtc/_utils.py +++ b/livekit-rtc/livekit/rtc/_utils.py @@ -17,7 +17,7 @@ from collections import deque import ctypes import random -from typing import Callable, Generic, List, TypeVar +from typing import Callable, Generator, Generic, List, TypeVar logger = logging.getLogger("livekit") @@ -133,12 +133,13 @@ def generate_random_base62(length=12): # adapted from https://stackoverflow.com/a/6043797 -def split_utf8(s: str, n: int): +def split_utf8(s: str, n: int) -> Generator[bytes, None, None]: """Split UTF-8 s into chunks of maximum length n.""" - while len(s) > n: + encoded = s.encode() + while len(encoded) > n: k = n - while (ord(s[k]) & 0xC0) == 0x80: + while (encoded[k] & 0xC0) == 0x80: k -= 1 - yield s[:k] - s = s[k:] - yield s + yield encoded[:k] + encoded = encoded[k:] + yield encoded diff --git a/livekit-rtc/livekit/rtc/data_stream.py b/livekit-rtc/livekit/rtc/data_stream.py index 770a19e1..c7a22bf5 100644 --- a/livekit-rtc/livekit/rtc/data_stream.py +++ b/livekit-rtc/livekit/rtc/data_stream.py @@ -27,10 +27,10 @@ from ._utils import split_utf8 from typing import TYPE_CHECKING + if TYPE_CHECKING: from .participant import LocalParticipant - STREAM_CHUNK_SIZE = 15_000 @@ -65,7 +65,6 @@ def __init__( attachments=list(header.text_header.attached_stream_ids), ) self._queue: asyncio.Queue[proto_DataStream.Chunk | None] = asyncio.Queue() - self._chunks: Dict[int, proto_DataStream.Chunk] = {} async def _on_chunk_update(self, chunk: proto_DataStream.Chunk): await self._queue.put(chunk) @@ -146,6 +145,7 @@ def __init__( total_size: int | None = None, mime_type: str = "", destination_identities: Optional[List[str]] = None, + sender_identity: str | None = None, ): self._local_participant = local_participant if stream_id is None: @@ -161,6 +161,7 @@ def __init__( ) self._next_chunk_index: int = 0 self._destination_identities = destination_identities + self._sender_identity = sender_identity or self._local_participant.identity async def _send_header(self): req = proto_ffi.FfiRequest( @@ -168,7 +169,7 @@ async def _send_header(self): header=self._header, local_participant_handle=self._local_participant._ffi_handle.handle, destination_identities=self._destination_identities, - sender_identity=self._local_participant.identity, + sender_identity=self._sender_identity, ) ) @@ -230,10 +231,12 @@ async def _send_trailer(self, trailer: proto_DataStream.Trailer): if cb.send_stream_chunk.error: raise ConnectionError(cb.send_stream_trailer.error) - async def aclose(self): + async def aclose( + self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None + ): await self._send_trailer( trailer=proto_DataStream.Trailer( - stream_id=self._header.stream_id, reason="" + stream_id=self._header.stream_id, reason=reason, attributes=attributes ) ) @@ -249,6 +252,7 @@ def __init__( total_size: int | None = None, reply_to_id: str | None = None, destination_identities: Optional[List[str]] = None, + sender_identity: str | None = None, ) -> None: super().__init__( local_participant, @@ -258,6 +262,7 @@ def __init__( total_size, mime_type="text/plain", destination_identities=destination_identities, + sender_identity=sender_identity, ) self._header.text_header.operation_type = proto_DataStream.OperationType.CREATE if reply_to_id: @@ -276,7 +281,7 @@ def __init__( async def write(self, text: str): async with self._write_lock: for chunk in split_utf8(text, STREAM_CHUNK_SIZE): - content = chunk.encode() + content = chunk chunk_index = self._next_chunk_index self._next_chunk_index += 1 chunk_msg = proto_DataStream.Chunk( diff --git a/livekit-rtc/livekit/rtc/participant.py b/livekit-rtc/livekit/rtc/participant.py index a520dd4d..e22f6648 100644 --- a/livekit-rtc/livekit/rtc/participant.py +++ b/livekit-rtc/livekit/rtc/participant.py @@ -585,8 +585,10 @@ async def stream_text( destination_identities: Optional[List[str]] = None, topic: str = "", attributes: Optional[Dict[str, str]] = None, + stream_id: str | None = None, reply_to_id: str | None = None, total_size: int | None = None, + sender_identity: str | None = None, ) -> TextStreamWriter: """ Returns a TextStreamWriter that allows to write individual chunks of text to a text stream. @@ -599,6 +601,8 @@ async def stream_text( reply_to_id=reply_to_id, destination_identities=destination_identities, total_size=total_size, + stream_id=stream_id, + sender_identity=sender_identity, ) await writer._send_header()