Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions livekit-rtc/livekit/rtc/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from collections import deque
import ctypes
import random
from typing import Callable, Generic, List, TypeVar
from typing import Callable, Generator, Generic, List, TypeVar

logger = logging.getLogger("livekit")

Expand Down Expand Up @@ -133,12 +133,13 @@ def generate_random_base62(length=12):


# adapted from https://stackoverflow.com/a/6043797
def split_utf8(s: str, n: int):
def split_utf8(s: str, n: int) -> Generator[bytes, None, None]:
"""Split UTF-8 s into chunks of maximum length n."""
while len(s) > n:
encoded = s.encode()
while len(encoded) > n:
k = n
while (ord(s[k]) & 0xC0) == 0x80:
while (encoded[k] & 0xC0) == 0x80:
k -= 1
yield s[:k]
s = s[k:]
yield s
yield encoded[:k]
encoded = encoded[k:]
yield encoded
17 changes: 11 additions & 6 deletions livekit-rtc/livekit/rtc/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
from ._utils import split_utf8
from typing import TYPE_CHECKING


if TYPE_CHECKING:
from .participant import LocalParticipant


STREAM_CHUNK_SIZE = 15_000


Expand Down Expand Up @@ -65,7 +65,6 @@ def __init__(
attachments=list(header.text_header.attached_stream_ids),
)
self._queue: asyncio.Queue[proto_DataStream.Chunk | None] = asyncio.Queue()
self._chunks: Dict[int, proto_DataStream.Chunk] = {}

async def _on_chunk_update(self, chunk: proto_DataStream.Chunk):
await self._queue.put(chunk)
Expand Down Expand Up @@ -146,6 +145,7 @@ def __init__(
total_size: int | None = None,
mime_type: str = "",
destination_identities: Optional[List[str]] = None,
sender_identity: str | None = None,
):
self._local_participant = local_participant
if stream_id is None:
Expand All @@ -161,14 +161,15 @@ def __init__(
)
self._next_chunk_index: int = 0
self._destination_identities = destination_identities
self._sender_identity = sender_identity or self._local_participant.identity

async def _send_header(self):
req = proto_ffi.FfiRequest(
send_stream_header=proto_room.SendStreamHeaderRequest(
header=self._header,
local_participant_handle=self._local_participant._ffi_handle.handle,
destination_identities=self._destination_identities,
sender_identity=self._local_participant.identity,
sender_identity=self._sender_identity,
)
)

Expand Down Expand Up @@ -230,10 +231,12 @@ async def _send_trailer(self, trailer: proto_DataStream.Trailer):
if cb.send_stream_chunk.error:
raise ConnectionError(cb.send_stream_trailer.error)

async def aclose(self):
async def aclose(
self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None
):
await self._send_trailer(
trailer=proto_DataStream.Trailer(
stream_id=self._header.stream_id, reason=""
stream_id=self._header.stream_id, reason=reason, attributes=attributes
)
)

Expand All @@ -249,6 +252,7 @@ def __init__(
total_size: int | None = None,
reply_to_id: str | None = None,
destination_identities: Optional[List[str]] = None,
sender_identity: str | None = None,
) -> None:
super().__init__(
local_participant,
Expand All @@ -258,6 +262,7 @@ def __init__(
total_size,
mime_type="text/plain",
destination_identities=destination_identities,
sender_identity=sender_identity,
)
self._header.text_header.operation_type = proto_DataStream.OperationType.CREATE
if reply_to_id:
Expand All @@ -276,7 +281,7 @@ def __init__(
async def write(self, text: str):
async with self._write_lock:
for chunk in split_utf8(text, STREAM_CHUNK_SIZE):
content = chunk.encode()
content = chunk
chunk_index = self._next_chunk_index
self._next_chunk_index += 1
chunk_msg = proto_DataStream.Chunk(
Expand Down
4 changes: 4 additions & 0 deletions livekit-rtc/livekit/rtc/participant.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,10 @@ async def stream_text(
destination_identities: Optional[List[str]] = None,
topic: str = "",
attributes: Optional[Dict[str, str]] = None,
stream_id: str | None = None,
reply_to_id: str | None = None,
total_size: int | None = None,
sender_identity: str | None = None,
) -> TextStreamWriter:
"""
Returns a TextStreamWriter that allows to write individual chunks of text to a text stream.
Expand All @@ -599,6 +601,8 @@ async def stream_text(
reply_to_id=reply_to_id,
destination_identities=destination_identities,
total_size=total_size,
stream_id=stream_id,
sender_identity=sender_identity,
)

await writer._send_header()
Expand Down
Loading