Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
a3aa0a0
Add data stream support
lukasIO Jan 15, 2025
b32ebc9
generated protobuf
github-actions[bot] Jan 15, 2025
f3dc6b6
return info on file send
lukasIO Jan 15, 2025
e04c6e3
expose stream_file
lukasIO Jan 15, 2025
0fa62f9
split written data into chunks
lukasIO Jan 15, 2025
758060a
intendation
lukasIO Jan 15, 2025
8db05c2
fix chunk size
lukasIO Jan 15, 2025
e311a53
fix imports
lukasIO Jan 15, 2025
61eb7fd
add write lock to file streams
lukasIO Jan 16, 2025
318b8d4
add send_file method
lukasIO Jan 16, 2025
15f100c
add explicit return types
lukasIO Jan 16, 2025
a506295
Add index to textstreamupdate
lukasIO Jan 16, 2025
ed2d981
add aiofiles to setup requirements
lukasIO Jan 16, 2025
585a220
fix protocol trailer implementation
lukasIO Jan 16, 2025
521179d
add read_all method for text streams
lukasIO Jan 16, 2025
df4de63
fix bugs and add text stream example
lukasIO Jan 17, 2025
0eeaa3f
generated protobuf
github-actions[bot] Jan 17, 2025
84f286c
update rust sdk
lukasIO Jan 17, 2025
6da1b83
fix proto
lukasIO Jan 17, 2025
3c2a980
generated protobuf
github-actions[bot] Jan 17, 2025
ff8aa7b
bump rust-ffi and split utf8 string properly
lukasIO Jan 17, 2025
8942fea
Merge branch 'lukas/data-streams' of https://github.com/livekit/pytho…
lukasIO Jan 17, 2025
b68d21a
Update naming
lukasIO Jan 22, 2025
e6c6576
generated protobuf
github-actions[bot] Jan 22, 2025
1b6f3a2
stream handlers and address comments
lukasIO Jan 22, 2025
5d16a05
Merge branch 'lukas/data-streams' of https://github.com/livekit/pytho…
lukasIO Jan 22, 2025
5ef54c8
fix interop
lukasIO Jan 22, 2025
cb8c32e
update lfs settings
lukasIO Jan 22, 2025
a3b34f6
add example asset
lukasIO Jan 22, 2025
bc5e695
point submodule to latest ffi release
lukasIO Jan 23, 2025
0989149
dedicated method to remove handlers
lukasIO Jan 23, 2025
464a6f8
Update livekit-rtc/livekit/rtc/data_stream.py
lukasIO Jan 24, 2025
ad21259
Update livekit-rtc/livekit/rtc/data_stream.py
lukasIO Jan 24, 2025
fde9e30
Update livekit-rtc/livekit/rtc/participant.py
lukasIO Jan 24, 2025
c5e6c3c
replace RingQueue with asyncio.Queue
lukasIO Jan 24, 2025
9e7974a
Merge branch 'lukas/data-streams' of https://github.com/livekit/pytho…
lukasIO Jan 24, 2025
b134299
update async stuff
lukasIO Jan 24, 2025
be8fa59
write to disk in example and fix asyncio bug
lukasIO Jan 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
**/*.dll filter=lfs diff=lfs merge=lfs -text
**/*.so filter=lfs diff=lfs merge=lfs -text
**/*.dylib filter=lfs diff=lfs merge=lfs -text
**/*.jpg filter=lfs diff=lfs merge=lfs -text
livekit-protocol/livekit/protocol/** linguist-generated=true
livekit-rtc/livekit/rtc/_proto/** linguist-generated=true
104 changes: 104 additions & 0 deletions examples/data-streams/data_streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import os
import logging
import asyncio
from signal import SIGINT, SIGTERM
from livekit import rtc

# Set the following environment variables with your own values
TOKEN = os.environ.get("LIVEKIT_TOKEN")
URL = os.environ.get("LIVEKIT_URL")


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be good to include some comments on what this example demonstrates, and what other components the user should prepare for in order to see a demo.

it seems like it's built to work with the JS example?

async def main(room: rtc.Room):
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def greetParticipant(identity: str):
text_writer = await room.local_participant.stream_text(
destination_identities=[identity], topic="chat"
)
for char in "Hi! Just a friendly message":
await text_writer.write(char)
await text_writer.aclose()

await room.local_participant.send_file(
"./green_tree_python.jpg",
destination_identities=[identity],
topic="welcome",
)

async def on_chat_message_received(
reader: rtc.TextStreamReader, participant_identity: str
):
full_text = await reader.read_all()
logger.info(
"Received chat message from %s: '%s'", participant_identity, full_text
)

async def on_welcome_image_received(
reader: rtc.ByteStreamReader, participant_identity: str
):
logger.info(
"Received image from %s: '%s'", participant_identity, reader.info["name"]
)
with open(reader.info["name"], mode="wb") as f:
async for chunk in reader:
f.write(chunk)

f.close()

@room.on("participant_connected")
def on_participant_connected(participant: rtc.RemoteParticipant):
logger.info(
"participant connected: %s %s", participant.sid, participant.identity
)
asyncio.create_task(greetParticipant(participant.identity))

room.set_text_stream_handler(
lambda reader, participant_identity: asyncio.create_task(
on_chat_message_received(reader, participant_identity)
),
"chat",
)

room.set_byte_stream_handler(
lambda reader, participant_identity: asyncio.create_task(
on_welcome_image_received(reader, participant_identity)
),
"welcome",
)

# By default, autosubscribe is enabled. The participant will be subscribed to
# all published tracks in the room
await room.connect(URL, TOKEN)
logger.info("connected to room %s", room.name)

for identity, participant in room.remote_participants.items():
logger.info("Sending a welcome message to %s", identity)
await greetParticipant(participant.identity)


if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
handlers=[
logging.FileHandler("data_stream_example.log"),
logging.StreamHandler(),
],
)

loop = asyncio.get_event_loop()
room = rtc.Room(loop=loop)

async def cleanup():
await room.disconnect()
loop.stop()

asyncio.ensure_future(main(room))
for signal in [SIGINT, SIGTERM]:
loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup()))

try:
loop.run_forever()
finally:
loop.close()
3 changes: 3 additions & 0 deletions examples/data-streams/green_tree_python.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
16 changes: 16 additions & 0 deletions livekit-rtc/livekit/rtc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@
from .utils import combine_audio_frames
from .rpc import RpcError, RpcInvocationData
from .synchronizer import AVSynchronizer
from .data_stream import (
TextStreamInfo,
TextStreamUpdate,
ByteStreamInfo,
TextStreamReader,
TextStreamWriter,
ByteStreamWriter,
ByteStreamReader,
)

__all__ = [
"ConnectionQuality",
Expand Down Expand Up @@ -140,5 +149,12 @@
"EventEmitter",
"combine_audio_frames",
"AVSynchronizer",
"TextStreamUpdate",
"TextStreamInfo",
"ByteStreamInfo",
"TextStreamReader",
"TextStreamWriter",
"ByteStreamReader",
"ByteStreamWriter",
"__version__",
]
40 changes: 20 additions & 20 deletions livekit-rtc/livekit/rtc/_proto/ffi_pb2.py

Large diffs are not rendered by default.

70 changes: 61 additions & 9 deletions livekit-rtc/livekit/rtc/_proto/ffi_pb2.pyi

Large diffs are not rendered by default.

228 changes: 132 additions & 96 deletions livekit-rtc/livekit/rtc/_proto/room_pb2.py

Large diffs are not rendered by default.

430 changes: 387 additions & 43 deletions livekit-rtc/livekit/rtc/_proto/room_pb2.pyi

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion livekit-rtc/livekit/rtc/_proto/video_frame_pb2.pyi

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions livekit-rtc/livekit/rtc/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,15 @@ def generate_random_base62(length=12):
"""
global _base62_characters
return "".join(random.choice(_base62_characters) for _ in range(length))


# adapted from https://stackoverflow.com/a/6043797
def split_utf8(s: str, n: int):
"""Split UTF-8 s into chunks of maximum length n."""
while len(s) > n:
k = n
while (ord(s[k]) & 0xC0) == 0x80:
k -= 1
yield s[:k]
s = s[k:]
yield s
Loading
Loading