Skip to content

Commit 75ffe1e

Browse files
authored
FCE-2750 Add support for agent image capture (#67)
* Add support for agent image capture * Fix linter * Fix pyright * Fix test build * Adjust to CR
1 parent 3435bab commit 75ffe1e

File tree

19 files changed

+568
-5
lines changed

19 files changed

+568
-5
lines changed

compile_proto.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ printf "DONE\n\n"
1111
FILES=("protos/fishjam/agent_notifications.proto" "protos/fishjam/server_notifications.proto")
1212

1313
printf "Compiling file: %s\n" "${FILES[@]}"
14-
protoc -I protos --python_betterproto_out="./fishjam/events/_protos" "${FILES[@]}"
14+
uv run protoc -I protos --python_betterproto_out="./fishjam/events/_protos" "${FILES[@]}"
1515
printf "\tDONE\n"

examples/multimodal/README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Multimodal Demo
2+
3+
A Fishjam SDK example that uses the Gemini Live API for multimodal real-time interaction. Users can speak to ask questions about captured video frames, and Gemini responds via voice.
4+
5+
## How It Works
6+
7+
1. A peer connects with both audio and video tracks
8+
2. The agent periodically captures images from video tracks
9+
3. When you speak ("What do you see?", "Describe this"), your audio and the captured images are sent to Gemini
10+
4. Gemini analyzes the visual content and responds with voice
11+
12+
## Setup
13+
14+
1. Set environment variables:
15+
16+
```bash
17+
export FISHJAM_ID="your-fishjam-id"
18+
export FISHJAM_MANAGEMENT_TOKEN="your-token"
19+
export GOOGLE_API_KEY="your-gemini-api-key"
20+
```
21+
22+
2. Optionally configure the image capture interval (default: 5 seconds):
23+
24+
```bash
25+
export IMAGE_CAPTURE_INTERVAL="3.0"
26+
```
27+
28+
## Running
29+
30+
```bash
31+
cd examples/multimodal
32+
uv run uvicorn main:app
33+
```
34+
35+
## Usage
36+
37+
1. The server will start on `http://localhost:8000`
38+
2. GET `/` returns a peer token for connecting a browser client
39+
3. Connect a peer with video and audio tracks
40+
4. Speak questions about what your camera sees
41+
5. Gemini will respond with voice analysis of the captured frames

examples/multimodal/main.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from contextlib import asynccontextmanager
2+
from typing import Annotated
3+
4+
from fastapi import Depends, FastAPI
5+
from multimodal.notifier import make_notifier
6+
from multimodal.room import RoomService, fishjam
7+
from multimodal.worker import async_worker
8+
9+
_room_service: RoomService | None = None
10+
11+
12+
def get_room_service():
13+
if not _room_service:
14+
raise RuntimeError("Application skipped lifespan events!")
15+
return _room_service
16+
17+
18+
@asynccontextmanager
19+
async def lifespan(_app: FastAPI):
20+
async with async_worker() as worker:
21+
global _room_service
22+
_room_service = RoomService(worker)
23+
notifier = make_notifier(_room_service)
24+
worker.run_in_background(notifier.connect())
25+
26+
yield
27+
28+
29+
app = FastAPI(lifespan=lifespan)
30+
31+
32+
@app.get("/")
33+
def get_peer(room_service: Annotated[RoomService, Depends(get_room_service)]):
34+
_peer, token = fishjam.create_peer(room_service.get_room().id)
35+
return token

examples/multimodal/multimodal/__init__.py

Whitespace-only changes.
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
import asyncio
2+
3+
from fishjam.agent import Agent, AgentSession, IncomingTrackData, IncomingTrackImage
4+
from fishjam.integrations.gemini import GeminiIntegration
5+
6+
from .config import IMAGE_CAPTURE_INTERVAL
7+
from .session import MultimodalSession
8+
from .worker import BackgroundWorker
9+
10+
11+
class MultimodalAgent:
12+
def __init__(self, room_id: str, agent: Agent, worker: BackgroundWorker):
13+
self._room_id = room_id
14+
self._agent = agent
15+
self._worker = worker
16+
self._task: asyncio.Task[None] | None = None
17+
self._capture_task: asyncio.Task[None] | None = None
18+
19+
# Session management per peer
20+
self._sessions: dict[str, MultimodalSession] = {}
21+
22+
# Track caching: peer_id -> set of video track_ids
23+
self._peer_tracks: dict[str, set[str]] = {}
24+
# Reverse lookup: track_id -> peer_id
25+
self._track_to_peer: dict[str, str] = {}
26+
27+
# Agent session reference for capturing images and sending audio
28+
self._agent_session: AgentSession | None = None
29+
30+
async def _start(self):
31+
async with self._agent.connect() as session:
32+
self._agent_session = session
33+
34+
# Create output track for Gemini audio responses
35+
36+
self._output_track = await session.add_track(
37+
GeminiIntegration.GEMINI_OUTPUT_AUDIO_SETTINGS
38+
)
39+
print(f"Agent connected to room {self._room_id}")
40+
41+
async for message in session.receive():
42+
match message:
43+
case IncomingTrackImage(track_id=track_id, data=data):
44+
peer_id = self._track_to_peer.get(track_id)
45+
if peer_id and peer_id in self._sessions:
46+
self._sessions[peer_id].send_image(data)
47+
print(f"Sent image from {track_id} to {peer_id}")
48+
49+
case IncomingTrackData(peer_id=peer_id, data=data):
50+
if peer_id in self._sessions:
51+
self._sessions[peer_id].send_audio(data)
52+
53+
self._agent_session = None
54+
print(f"Agent disconnected from room {self._room_id}")
55+
56+
async def _periodic_capture(self):
57+
while True:
58+
await asyncio.sleep(IMAGE_CAPTURE_INTERVAL)
59+
60+
if not self._agent_session:
61+
continue
62+
63+
# Capture images from all known video tracks
64+
for track_id in self._track_to_peer.keys():
65+
try:
66+
await self._agent_session.capture_image(track_id)
67+
print(f"Requested image capture for track {track_id}")
68+
except Exception as e:
69+
print(f"Error capturing image from track {track_id}: {e}")
70+
71+
def _handle_audio_response(self, peer_id: str, audio: bytes):
72+
if self._output_track:
73+
self._worker.run_in_background(self._output_track.send_chunk(audio))
74+
75+
def on_peer_enter(self, peer_id: str):
76+
if peer_id in self._sessions:
77+
return
78+
79+
# Initialize track cache for this peer
80+
self._peer_tracks[peer_id] = set()
81+
82+
# Start agent connection if this is the first peer
83+
if len(self._sessions) == 0:
84+
self._task = self._worker.run_in_background(self._start())
85+
capture_coro = self._periodic_capture()
86+
self._capture_task = self._worker.run_in_background(capture_coro)
87+
88+
# Create multimodal session for this peer
89+
def on_audio(audio: bytes, pid: str = peer_id):
90+
self._handle_audio_response(pid, audio)
91+
92+
session = MultimodalSession(on_audio)
93+
self._sessions[peer_id] = session
94+
self._worker.run_in_background(session.start(peer_id))
95+
96+
def on_peer_leave(self, peer_id: str):
97+
if peer_id not in self._sessions:
98+
return
99+
100+
# Clean up track cache
101+
if peer_id in self._peer_tracks:
102+
for track_id in self._peer_tracks[peer_id]:
103+
self._track_to_peer.pop(track_id, None)
104+
del self._peer_tracks[peer_id]
105+
106+
# End the session
107+
self._sessions.pop(peer_id).end()
108+
109+
# Stop agent if no more sessions
110+
if len(self._sessions) == 0:
111+
if self._task is not None:
112+
self._task.cancel()
113+
self._task = None
114+
if self._capture_task is not None:
115+
self._capture_task.cancel()
116+
self._capture_task = None
117+
118+
def on_track_added(self, peer_id: str, track_id: str, is_video: bool):
119+
if not is_video:
120+
return
121+
122+
if peer_id not in self._peer_tracks:
123+
self._peer_tracks[peer_id] = set()
124+
125+
self._peer_tracks[peer_id].add(track_id)
126+
self._track_to_peer[track_id] = peer_id
127+
print(f"Added video track {track_id} for peer {peer_id}")
128+
129+
def on_track_removed(self, peer_id: str, track_id: str, is_video: bool):
130+
if not is_video:
131+
return
132+
133+
if peer_id in self._peer_tracks:
134+
self._peer_tracks[peer_id].discard(track_id)
135+
self._track_to_peer.pop(track_id, None)
136+
print(f"Removed video track {track_id} for peer {peer_id}")
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import os
2+
3+
from google.genai.types import LiveConnectConfigDict, Modality
4+
5+
FISHJAM_ID = os.getenv("FISHJAM_ID", "")
6+
FISHJAM_TOKEN = os.environ["FISHJAM_MANAGEMENT_TOKEN"]
7+
8+
MULTIMODAL_MODEL = "gemini-2.5-flash-native-audio-preview-12-2025"
9+
10+
MULTIMODAL_CONFIG: LiveConnectConfigDict = {
11+
"response_modalities": [Modality.AUDIO],
12+
"thinking_config": {
13+
"include_thoughts": False,
14+
},
15+
}
16+
17+
# Interval in seconds between image captures
18+
IMAGE_CAPTURE_INTERVAL = float(os.getenv("IMAGE_CAPTURE_INTERVAL", "5.0"))
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
from fishjam import FishjamNotifier
2+
from fishjam.events import (
3+
ServerMessagePeerConnected,
4+
ServerMessagePeerDisconnected,
5+
ServerMessagePeerType,
6+
ServerMessageTrackAdded,
7+
ServerMessageTrackRemoved,
8+
TrackType,
9+
)
10+
from fishjam.events.allowed_notifications import AllowedNotification
11+
12+
from .config import FISHJAM_ID, FISHJAM_TOKEN
13+
from .room import RoomService
14+
15+
16+
def make_notifier(room_service: RoomService):
17+
notifier = FishjamNotifier(
18+
FISHJAM_ID,
19+
FISHJAM_TOKEN,
20+
)
21+
22+
@notifier.on_server_notification
23+
def _(notification: AllowedNotification):
24+
match notification:
25+
case ServerMessagePeerConnected(
26+
peer_id=peer_id,
27+
room_id=room_id,
28+
peer_type=ServerMessagePeerType.PEER_TYPE_WEBRTC,
29+
):
30+
handle_peer_connected(peer_id, room_id)
31+
32+
case ServerMessagePeerDisconnected(
33+
peer_id=peer_id,
34+
room_id=room_id,
35+
peer_type=ServerMessagePeerType.PEER_TYPE_WEBRTC,
36+
):
37+
handle_peer_disconnected(peer_id, room_id)
38+
39+
case ServerMessageTrackAdded(
40+
peer_id=peer_id,
41+
room_id=room_id,
42+
track=track,
43+
):
44+
handle_track_added(peer_id, room_id, track)
45+
46+
case ServerMessageTrackRemoved(
47+
peer_id=peer_id,
48+
room_id=room_id,
49+
track=track,
50+
):
51+
handle_track_removed(peer_id, room_id, track)
52+
53+
def handle_peer_connected(peer_id: str, room_id: str):
54+
if room_id == room_service.room.id:
55+
room_service.agent.on_peer_enter(peer_id)
56+
57+
def handle_peer_disconnected(peer_id: str, room_id: str):
58+
if room_id == room_service.room.id:
59+
room_service.agent.on_peer_leave(peer_id)
60+
61+
def handle_track_added(peer_id: str, room_id: str, track):
62+
if room_id == room_service.room.id:
63+
is_video = track.type == TrackType.TRACK_TYPE_VIDEO
64+
room_service.agent.on_track_added(peer_id, track.id, is_video)
65+
66+
def handle_track_removed(peer_id: str, room_id: str, track):
67+
if room_id == room_service.room.id:
68+
is_video = track.type == TrackType.TRACK_TYPE_VIDEO
69+
room_service.agent.on_track_removed(peer_id, track.id, is_video)
70+
71+
return notifier
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from fishjam import AgentOptions, FishjamClient, Room
2+
from fishjam.errors import NotFoundError
3+
from fishjam.integrations.gemini import GeminiIntegration
4+
5+
from .agent import MultimodalAgent
6+
from .config import FISHJAM_ID, FISHJAM_TOKEN
7+
from .worker import BackgroundWorker
8+
9+
fishjam = FishjamClient(FISHJAM_ID, FISHJAM_TOKEN)
10+
11+
12+
class RoomService:
13+
def __init__(self, worker: BackgroundWorker):
14+
self._worker = worker
15+
self._create_room()
16+
17+
def get_room(self) -> Room:
18+
try:
19+
self.room = fishjam.get_room(self.room.id)
20+
except NotFoundError:
21+
self._create_room()
22+
return self.room
23+
24+
def _create_room(self):
25+
self.room = fishjam.create_room()
26+
self._create_agent()
27+
28+
def _create_agent(self):
29+
self.agent = MultimodalAgent(
30+
self.room.id,
31+
fishjam.create_agent(
32+
self.room.id,
33+
AgentOptions(output=GeminiIntegration.GEMINI_INPUT_AUDIO_SETTINGS),
34+
),
35+
self._worker,
36+
)

0 commit comments

Comments
 (0)