Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions livekit-rtc/livekit/rtc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
from .synchronizer import AVSynchronizer
from .data_stream import (
TextStreamInfo,
TextStreamUpdate,
ByteStreamInfo,
TextStreamReader,
TextStreamWriter,
Expand Down Expand Up @@ -155,7 +154,6 @@
"EventEmitter",
"combine_audio_frames",
"AVSynchronizer",
"TextStreamUpdate",
"TextStreamInfo",
"ByteStreamInfo",
"TextStreamReader",
Expand Down
53 changes: 19 additions & 34 deletions livekit-rtc/livekit/rtc/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ._proto import ffi_pb2 as proto_ffi
from ._proto import room_pb2 as proto_room
from ._ffi_client import FfiClient

from ._utils import split_utf8
from typing import TYPE_CHECKING

if TYPE_CHECKING:
Expand All @@ -47,14 +47,6 @@ class BaseStreamInfo:
@dataclass
class TextStreamInfo(BaseStreamInfo):
attachments: List[str]
pass


@dataclass
class TextStreamUpdate:
current: str
index: int
collected: str


class TextStreamReader:
Expand All @@ -81,31 +73,24 @@ async def _on_chunk_update(self, chunk: proto_DataStream.Chunk):
async def _on_stream_close(self, trailer: proto_DataStream.Trailer):
await self._queue.put(None)

def __aiter__(self) -> AsyncIterator[TextStreamUpdate]:
def __aiter__(self) -> AsyncIterator[str]:
return self

async def __anext__(self) -> TextStreamUpdate:
async def __anext__(self) -> str:
item = await self._queue.get()
if item is None:
raise StopAsyncIteration
decodedStr = item.content.decode()

self._chunks[item.chunk_index] = item
chunk_list = list(self._chunks.values())
chunk_list.sort(key=lambda chunk: chunk.chunk_index)
collected: str = "".join(map(lambda chunk: chunk.content.decode(), chunk_list))
return TextStreamUpdate(
current=decodedStr, index=item.chunk_index, collected=collected
)
return decodedStr

@property
def info(self) -> TextStreamInfo:
return self._info

async def read_all(self) -> str:
final_string = ""
async for update in self:
final_string = update.collected
async for chunk in self:
final_string += chunk
return final_string


Expand Down Expand Up @@ -286,20 +271,20 @@ def __init__(
attributes=dict(self._header.attributes),
attachments=list(self._header.text_header.attached_stream_ids),
)
self._write_lock = asyncio.Lock()

async def write(self, text: str, chunk_index: int | None = None):
content = text.encode()
if len(content) > STREAM_CHUNK_SIZE:
raise ValueError("maximum chunk size exceeded")
if chunk_index is None:
chunk_index = self._next_chunk_index
self._next_chunk_index += 1
chunk_msg = proto_DataStream.Chunk(
stream_id=self._header.stream_id,
chunk_index=chunk_index,
content=content,
)
await self._send_chunk(chunk_msg)
async def write(self, text: str):
async with self._write_lock:
for chunk in split_utf8(text, STREAM_CHUNK_SIZE):
content = chunk.encode()
chunk_index = self._next_chunk_index
self._next_chunk_index += 1
chunk_msg = proto_DataStream.Chunk(
stream_id=self._header.stream_id,
chunk_index=chunk_index,
content=content,
)
await self._send_chunk(chunk_msg)

@property
def info(self) -> TextStreamInfo:
Expand Down
5 changes: 2 additions & 3 deletions livekit-rtc/livekit/rtc/participant.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from ._proto.track_pb2 import (
ParticipantTrackPermission,
)
from ._utils import BroadcastQueue, split_utf8
from ._utils import BroadcastQueue
from .track import LocalTrack
from .track_publication import (
LocalTrackPublication,
Expand Down Expand Up @@ -623,8 +623,7 @@ async def send_text(
total_size=total_size,
)

for chunk in split_utf8(text, STREAM_CHUNK_SIZE):
await writer.write(chunk)
await writer.write(text)
await writer.aclose()

return writer.info
Expand Down
Loading