Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/eighty-bikes-speak.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-agents": patch
---

Add text stream sink and multi text sink
2 changes: 1 addition & 1 deletion examples/avatar/agent_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
RoomTranscriptEventSink,
)
from livekit.agents.pipeline.transcription import TextSynchronizer
from livekit.plugins import openai
from livekit.plugins import cartesia, deepgram, openai

logger = logging.getLogger("avatar-example")
logger.setLevel(logging.INFO)
Expand Down
89 changes: 89 additions & 0 deletions examples/ds_chat_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import asyncio
import os
import signal

from livekit import api, rtc

tasks = set()

def on_text_received(reader: rtc.TextStreamReader, participant_identity: str):
"""Callback for when text is received on the data stream"""
async def _on_text_received():
text = await reader.read_all()
stream_id = reader.info.stream_id
final = reader.info.attributes.get("lk.transcription_final", "null")
print(f"[{participant_identity}][{stream_id}][final={final}]: '{text.replace('\n', '\\n')}'")

task = asyncio.create_task(_on_text_received())
tasks.add(task)
task.add_done_callback(tasks.discard)


async def main(room: rtc.Room, room_name: str):
# Create access token with the API
token = (
api.AccessToken()
.with_identity("chat-listener")
.with_name("Chat Listener")
.with_grants(
api.VideoGrants(
room_join=True,
room=room_name,
)
)
.to_jwt()
)

url = os.getenv("LIVEKIT_URL")
if not url:
print("Please set LIVEKIT_URL environment variable")
return

# Connect to the room
try:
print("Connecting to LiveKit room...")
await room.connect(url, token)
print(f"Connected to room: {room.name}")

# Register handler for text messages on lk.chat topic
room.register_text_stream_handler("lk.chat", on_text_received)

print("Listening for chat messages. Press Ctrl+C to exit...")
# Instead of running the loop forever here, we'll use an Event to keep the connection alive
stop_event = asyncio.Event()
await stop_event.wait()

except KeyboardInterrupt:
print("\nDisconnecting...")
except Exception as e:
print(f"Error: {e}")
finally:
await room.disconnect()
print("Disconnected from room")


if __name__ == "__main__":
import sys

if len(sys.argv) != 2:
print("Usage: python ds_chat_test.py <room-name>")
sys.exit(1)

room_name = sys.argv[1]

# Use asyncio.new_event_loop() instead of get_event_loop()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
room = rtc.Room(loop=loop)

async def cleanup():
await room.disconnect()
loop.stop()

try:
loop.run_until_complete(main(room, room_name))
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(room.disconnect())
loop.close()
3 changes: 3 additions & 0 deletions examples/minimal_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ async def on_enter(self) -> None:
async def on_exit(self) -> None:
self.agent.say("Goodbye!")

async def on_exit(self) -> None:
self.agent.say("Goodbye!")

@ai_function
async def talk_to_echo(self, ctx: CallContext[Userdata]):
return ctx.userdata.echo_task, "Transfering you to Echo."
Expand Down
8 changes: 4 additions & 4 deletions examples/roomio_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from livekit import rtc
from livekit.agents import JobContext, WorkerOptions, WorkerType, cli
from livekit.agents.llm import ai_function
from livekit.agents.pipeline import AgentContext, AgentTask, PipelineAgent
from livekit.agents.pipeline import AgentTask, CallContext, PipelineAgent
from livekit.agents.pipeline.io import PlaybackFinishedEvent
from livekit.plugins import cartesia, deepgram, openai

Expand All @@ -25,7 +25,7 @@ def __init__(self) -> None:
)

@ai_function
async def talk_to_alloy(self, context: AgentContext):
async def talk_to_alloy(self, context: CallContext):
return AlloyTask(), "Transferring you to Alloy."


Expand All @@ -37,15 +37,15 @@ def __init__(self) -> None:
)

@ai_function
async def talk_to_echo(self, context: AgentContext):
async def talk_to_echo(self, context: CallContext):
return EchoTask(), "Transferring you to Echo."


async def entrypoint(ctx: JobContext):
await ctx.connect()

agent = PipelineAgent(
task=AlloyTask(),
task=EchoTask(),
)

await agent.start(room=ctx.room)
Expand Down
12 changes: 12 additions & 0 deletions livekit-agents/livekit/agents/pipeline/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,18 @@ def flush(self) -> None:
...


class ParallelTextSink(TextSink):
def __init__(self, *sinks: TextSink) -> None:
self._sinks = sinks

async def capture_text(self, text: str) -> None:
await asyncio.gather(*[sink.capture_text(text) for sink in self._sinks])

def flush(self) -> None:
for sink in self._sinks:
sink.flush()


# TODO(theomonnom): Add documentation to VideoSink
class VideoSink(ABC):
@abstractmethod
Expand Down
Loading