Skip to content

Comments

Add text stream sink and multi text sink#1497

Merged
longcw merged 12 commits intodev-1.0from
lukas/ds-sink
Feb 21, 2025
Merged

Add text stream sink and multi text sink#1497
longcw merged 12 commits intodev-1.0from
lukas/ds-sink

Conversation

@lukasIO
Copy link
Contributor

@lukasIO lukasIO commented Feb 14, 2025

@changeset-bot
Copy link

changeset-bot bot commented Feb 14, 2025

🦋 Changeset detected

Latest commit: 6b9a3ff

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 1 package
Name Type
livekit-agents Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@lukasIO lukasIO changed the title text stream sink Add text stream sink and multi text sink Feb 17, 2025
@lukasIO lukasIO requested review from longcw and theomonnom and removed request for longcw February 17, 2025 12:03
@lukasIO lukasIO marked this pull request as ready for review February 17, 2025 12:05
@lukasIO lukasIO requested a review from davidzhao February 17, 2025 12:05
# TODO: support multiple participants
self._text_sink = RoomTranscriptEventSink(
room=self._room, participant=self._participant, is_stream=False
self._text_sink = MultiTextSink(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make this configurable that enables one or both of the sinks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what other decisions were made for the io API, but in my opinion it would be nice for users to be able to set a TextSink themselves

self._track_id = track.sid


class DataStreamSink(TextSink):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps move this to the datastream_io

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wanted to keep all the text sinks together, but don't have a strong opinion if you think it makes more sense to move it to datastream_io.

maybe a text_sinks.py makes sense where we have all of them?

Copy link
Member

@theomonnom theomonnom Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be merged inside the RoomOutput? I don't think we need to have it in a separate class. Also we have an ongoing discussion about merging RoomInput and RoomOutput together in Slack.

Users that only want text would be an option added to the RoomIO constructor.


async def _capture_text():
await self._base_text_sink.capture_text(segment.text)
await self._base_text_sink.capture_text(segment.text, segment_id=segment.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this won't work for other text sinks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a check for the segment id before capture

        if self._current_segment_id != segment.id:
            self._base_text_sink.flush()
            self._current_segment_id = segment.id

self._participant_identity = identity
self._latest_text = ""

async def capture_text(self, text: str, *, segment_id: str | None = None) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to not add segment_id here to keep the same api for all text sinks, user should use flush() to mark the end of the segment ideally.

Though actually I also want a segment id here in case the text with different ids comes not in order like id1, id2, id1, .... wdyt @theomonnom ?

class TextSink(ABC):
@abstractmethod
async def capture_text(self, text: str) -> None:
async def capture_text(self, text: str, *, segment_id: str | None = None) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, it was added here.

Copy link
Contributor Author

@lukasIO lukasIO Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think having it is better than not having it, guessing you're ok with the change as long as it's part of the base class here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still have a concern that the segment_id is conflicted with flush. If calling flush means the end of the segment, what is the "correct" behavior if the segment_id is the same as before after flush?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's a problem that's caused by the segment_id being part of the params.

Is you concern about replacing current_id with segment_id?
I just wanted to avoid any instances where there are multiple places trying to call capture_text with different segment ids. In my head the place to handle this is the sink itself, but we can also leave it as is if we can be certain that every caller checks for the segment_id and flushes if necessary

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not about the current_id or segment_id, but just not clear what is the expect behavior if the same segment_id captured after flush. Sounds like the flush and segment_id are overlapped in functionality.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see a logic change in the DatastreamSink from this commit 446e3a9.

For agent transcription with delta=True, the change of the segment_id is checked before calling the sink.capture_text() here https://github.com/livekit/agents/blob/lukas/ds-sink/livekit-agents/livekit/agents/pipeline/transcription/synchronizer.py#L356-L363,

        if self._current_segment_id != segment.id:
            self._base_text_sink.flush()
            self._current_segment_id = segment.id

        async def _capture_text():
            await self._base_text_sink.capture_text(segment.text)
            if segment.final:
                self._base_text_sink.flush()

        task = asyncio.create_task(_capture_text())

and for user transcription with delta=False, the capture is called here https://github.com/livekit/agents/blob/lukas/ds-sink/livekit-agents/livekit/agents/pipeline/room_io.py#L420-L426 so it only flush after the final transcript received.

        async def _capture_text():
            if ev.alternatives:
                data = ev.alternatives[0]
                await self._text_sink.capture_text(data.text)

            if ev.type == stt.SpeechEventType.FINAL_TRANSCRIPT:
                self._text_sink.flush()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that means it needs to be guaranteed that captureText() is not called for the same segment_id again after it has been flushed once with that segment_id.
Is that guaranteed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For user transcription, yes, there is no segment id naturally in that case.
For agent transcription, it's not guaranteed but it streams delta so it should be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you point out what is the logic difference introduced by the commit if that's the concern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the difference is that previously if a segment_id was provided to capture_text it would re-use that id for sending the text stream.

if we create a new id after a flush it will result in streams getting a new id, even though it might be originating from the same segment_id.
On the receiving side a new id would show up as a new message, while having the same id for the same segments would prompt the receiving side to replace the previous message instead of adding a new one.
Does that make it clearer?
Happy to jump on a call and discuss

Comment on lines 144 to 149
class MultiTextSink(TextSink):
def __init__(self, sinks: list[TextSink]) -> None:
self._sinks = sinks

async def capture_text(self, text: str, *, segment_id: str | None = None) -> None:
await asyncio.gather(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the public API minimal for V1.0, I wouldn't expose it for now and keep this class private.

Copy link
Contributor Author

@lukasIO lukasIO left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm! thanks for fixing the outstanding issues 🙏

@longcw longcw merged commit f2c469b into dev-1.0 Feb 21, 2025
1 check passed
@longcw longcw deleted the lukas/ds-sink branch February 21, 2025 13:35
jayesh-mivi pushed a commit to mivi-dev-org/custom-livekit-agents that referenced this pull request Jun 4, 2025
Co-authored-by: Long Chen <longch1024@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants