Skip to content

avatar plugin based on v1.0#1391

Merged
longcw merged 33 commits intodev-1.0from
longc/avatar-plugin
Feb 11, 2025
Merged

avatar plugin based on v1.0#1391
longcw merged 33 commits intodev-1.0from
longc/avatar-plugin

Conversation

@longcw
Copy link
Contributor

@longcw longcw commented Jan 20, 2025

  1. AudioSink based on DataStream (Add data stream support python-sdks#347)
  2. Avatar worker example with video generation and av sync

@changeset-bot
Copy link

changeset-bot bot commented Jan 20, 2025

⚠️ No Changeset found

Latest commit: f714070

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@longcw longcw changed the base branch from main to dev-1.0 January 20, 2025 03:45
Copy link
Member

@davidzhao davidzhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks great! just a few comments.

we'll also need some error handling in various parts.. how do both sides handle cases where the other side is disconnected. if the avatar participant is gone for longer than a reasonable timeout, then the agent would likely need to report that error and shutdown itself.

similarly.. if the controller is gone, the service on the other side might want to avoid consuming resources and also exit


AUDIO_SENDER_ATTR = "__livekit_avatar_audio_sender"
AUDIO_RECEIVER_ATTR = "__livekit_avatar_audio_receiver"
RPC_INTERRUPT_PLAYBACK = "__livekit_avatar_interrupt_playback"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for consistency, using lk. namespace to identify livekit specific actions

Suggested change
RPC_INTERRUPT_PLAYBACK = "__livekit_avatar_interrupt_playback"
RPC_INTERRUPT_PLAYBACK = "lk.interrupt_playback"

async def start(self) -> None:
"""Wait for worker participant to join and start streaming"""
# mark self as sender
await self._room.local_participant.set_attributes({AUDIO_SENDER_ATTR: "true"})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was thinking we can simplify this step. instead the receiver could just wait for an audio stream of a particular name?

"""Wait for worker participant to join and start streaming"""
# mark self as sender
await self._room.local_participant.set_attributes({AUDIO_SENDER_ATTR: "true"})
self._remote_participant = await wait_for_participant(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if.. instead of waiting for an attribute, we could:

  • take avatar_identity as a param in the sink (with a sane default)
  • create a token for that identity and send it to the other side as part of initial handshake
  • here we can just wait for that agreed-upon identity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good! on the avatar side, it wait for the audio stream with a particular name from the participant with kind=='agent'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

# start new stream
# TODO: any better option to send the metadata?
name = f"audio_{frame.sample_rate}_{frame.num_channels}"
self._stream_writer = await self._room.local_participant.stream_file(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good use of stream extensions:

writer = await room.local_participant.stream_file("audio",
    extensions={"sample_rate": "48000", "channels": "1"})

or

writer = await room.local_participant.stream_file("audio",
    extensions={"audio_settings": json.dumps({"sample_rate": 48000, channels: 1})})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. the extensions is some kind of metadata? then what is the reason it is named extensions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah.. I think attributes is probably a better name

# mark self as receiver
await self._room.local_participant.set_attributes({AUDIO_RECEIVER_ATTR: "true"})

self._remote_participant = await wait_for_participant(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems here we can just wait for participant.kind == agent?

if we wanted to handle multiple avatars in the room, then the integration should take in the controller's identity.

reader: rtc.FileStreamReader, remote_participant_id: str
) -> None:
if remote_participant_id != self._remote_participant.identity:
logger.warning(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we really want to warn on any other incoming file stream? that seems like a rather narrow use case for this plugin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see, I'll filter for the audio stream first, so other data streams can still be processed by other handlers.

Btw, what is the use case of the file_name in data stream, can I pass a tag and the metadata like sample_rate and num_channels using file name, or is there any better option for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see @davidzhao's comment above, the best option is the extensions map on the stream.

reader = self._stream_readers.pop(0)
async for data in reader.stream_reader:
yield rtc.AudioFrame(
data=data,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pattern would suggest that we're sure a single audio frame never exceeds STREAM_CHUNK_SIZE (~15kb)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For audio bytes, splitting large chunks into smaller chunks before sending is fine, and should be less than 15kb. but for other use cases, receiving a different number of chunks than it send may not be good behavior. Maybe add a size limit at the send side?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally had a size limit in there, @theomonnom's wish was that we wouldn't enforce such a limit, but I agree, it might make things trickier if we don't have a sender side size limit

@longcw longcw changed the title [draft] avatar plugin based on v1.0 avatar plugin based on v1.0 Feb 4, 2025
) -> None:
"""Wait for worker participant to join and start streaming"""
# create a token for the avatar worker
# TODO(long): do we need to set agent=True here? in playground if not the video track is not automatically displayed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might want a new kind to distinguish but we probably shouldn't join with the standard participant kind

.with_name("Avatar Worker")
.with_grants(api.VideoGrants(room_join=True, room=ctx.room.name, agent=True))
.with_metadata("avatar_worker")
.to_jwt()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should specify the identity of the "original" agent as well so that the integrator can add a guard on their RPC handlers methods (if data.caller_identity !== agent_identity: print("RPC call from unexpected participant"); return)

logger.info(f"Sending connection info to avatar dispatcher {avatar_dispatcher_url}")
connection_info = AvatarConnectionInfo(
room_name=ctx.room.name, url=ctx._info.url, token=token
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should specify the identity of the "original" agent as well so that the integrator can add a guard on their RPC handlers methods (if data.caller_identity !== agent_identity: print("RPC call from unexpected participant"); return)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense! I added the check in the rpc hanlders.

@longcw longcw merged commit 26fa0a9 into dev-1.0 Feb 11, 2025
1 check passed
@longcw longcw deleted the longc/avatar-plugin branch February 11, 2025 08:52
async def _read_stream():
async for event in self._audio_stream:
yield event.frame
await asyncio.sleep(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why this is needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was added for debugging an event loop issue. Will remove it.

theomonnom pushed a commit that referenced this pull request Feb 21, 2025
Co-authored-by: David Zhao <dz@livekit.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants