Skip to content

Commit fbb085d

Browse files
committed
Introduce Realtime LipSync Client
1 parent 6e85756 commit fbb085d

File tree

5 files changed

+728
-0
lines changed

5 files changed

+728
-0
lines changed

decart/lipsync/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .client import RealtimeLipsyncClient
2+
3+
__all__ = ["RealtimeLipsyncClient"]

decart/lipsync/client.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import asyncio
2+
import websockets
3+
from typing import Optional, Tuple
4+
from .messages import (
5+
LipsyncClientMessage,
6+
LipsyncServerMessage,
7+
LipsyncServerMessageAdapter,
8+
LipsyncConfigMessage,
9+
LipsyncConfigAckMessage,
10+
LipsyncAudioInputMessage,
11+
LipsyncVideoInputMessage,
12+
LipsyncInterruptAudioMessage,
13+
LipsyncSyncedOutputMessage,
14+
LipsyncErrorMessage,
15+
)
16+
import fractions
17+
import time
18+
import logging
19+
import cv2
20+
import numpy as np
21+
22+
logger = logging.getLogger(__name__)
23+
24+
25+
class RealtimeLipsyncClient:
26+
27+
DECART_LIPSYNC_ENDPOINT = "/router/lipsync/ws"
28+
VIDEO_FPS = 25
29+
30+
def __init__(
31+
self,
32+
api_key: str,
33+
base_url: str = "https://api.decart.ai",
34+
audio_sample_rate: int = 16000,
35+
video_fps: int = VIDEO_FPS,
36+
sync_latency: float = 0.0,
37+
):
38+
"""
39+
Args:
40+
api_key: The API key for the Decart Lipsync server
41+
url: The URL of the Decart Lipsync server
42+
audio_sample_rate: The sample rate of the audio
43+
video_fps: The FPS of the video
44+
sync_latency: Delay next frame up to this many seconds, to account for variable latency
45+
"""
46+
self._url = f"{base_url}{self.DECART_LIPSYNC_ENDPOINT}".replace(
47+
"https://", "wss://"
48+
).replace("http://", "ws://")
49+
self._api_key = api_key
50+
self._audio_sample_rate = audio_sample_rate
51+
self._video_fps = video_fps
52+
self._sync_latency = sync_latency
53+
54+
self._websocket: Optional[websockets.ClientConnection] = None
55+
self._out_queue = asyncio.Queue()
56+
self._response_handling_task: Optional[asyncio.Task] = None
57+
58+
self._video_frame_interval = fractions.Fraction(1, video_fps)
59+
self._video_out_frame_index = 0
60+
self._video_out_start_time = 0
61+
62+
async def _recv(self) -> LipsyncServerMessage:
63+
response = await self._websocket.recv()
64+
return LipsyncServerMessageAdapter.validate_json(response)
65+
66+
async def _send(self, message: LipsyncClientMessage):
67+
msg = message.model_dump_json()
68+
await self._websocket.send(msg)
69+
70+
async def _handle_server_responses(self):
71+
try:
72+
while self._websocket is not None:
73+
response = await self._recv()
74+
if isinstance(response, LipsyncSyncedOutputMessage):
75+
await self._out_queue.put(response)
76+
elif isinstance(response, LipsyncErrorMessage):
77+
logger.error(f"Lipsync server error: {response.message}")
78+
raise Exception(response.message)
79+
else:
80+
logger.error(f"Unknown response from lipsync server: {response}")
81+
except asyncio.CancelledError:
82+
pass
83+
except websockets.exceptions.ConnectionClosedOK:
84+
logger.debug("Connection closed by server")
85+
86+
async def _decode_video_frame(self, video_frame: bytes) -> bytes:
87+
def _decode_video_frame_sync(video_frame: bytes) -> bytes:
88+
nparr = np.frombuffer(video_frame, np.uint8)
89+
video_frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
90+
return video_frame
91+
92+
return await asyncio.to_thread(_decode_video_frame_sync, video_frame)
93+
94+
async def _encode_video_frame(self, image: np.ndarray) -> bytes:
95+
def _encode_video_frame_sync(image: np.ndarray) -> bytes:
96+
success, encoded_image = cv2.imencode(".jpeg", image)
97+
if not success:
98+
raise Exception("Failed to encode video frame as JPEG")
99+
return encoded_image.tobytes()
100+
101+
return await asyncio.to_thread(_encode_video_frame_sync, image)
102+
103+
async def _decode_audio_frame(self, audio_frame: bytes) -> bytes:
104+
return audio_frame
105+
106+
async def connect(self):
107+
logger.debug(f"Connecting to lipsync server at {self._url}")
108+
self._websocket = await websockets.connect(f"{self._url}?api_key={self._api_key}")
109+
logger.debug("WebSocket connected")
110+
# Initial handshake
111+
await self._send(
112+
LipsyncConfigMessage(
113+
video_fps=self._video_fps,
114+
audio_sample_rate=self._audio_sample_rate,
115+
)
116+
)
117+
logger.debug("Configuration sent")
118+
response = await self._recv()
119+
if not isinstance(response, LipsyncConfigAckMessage):
120+
raise Exception(f"Configuration not acknowledged by server: {response}")
121+
logger.debug("Configuration acknowledged")
122+
123+
self._response_handling_task = asyncio.create_task(self._handle_server_responses())
124+
125+
logger.debug("Connected to lipsync server")
126+
127+
async def disconnect(self):
128+
if self._websocket is not None:
129+
await self._websocket.close()
130+
self._websocket = None
131+
132+
if self._response_handling_task is not None:
133+
self._response_handling_task.cancel()
134+
try:
135+
await self._response_handling_task
136+
except asyncio.CancelledError:
137+
pass
138+
self._response_handling_task = None
139+
140+
async def send_audio(self, audio_data: bytes):
141+
await self._send(LipsyncAudioInputMessage(audio_data=audio_data))
142+
143+
async def send_video_frame_bytes(self, video_frame_bytes: bytes):
144+
await self._send(LipsyncVideoInputMessage(video_frame=video_frame_bytes))
145+
146+
async def send_video_frame(self, image: np.ndarray):
147+
encoded_image = await self._encode_video_frame(image)
148+
await self.send_video_frame_bytes(encoded_image)
149+
150+
async def interrupt_audio(self):
151+
await self._send(LipsyncInterruptAudioMessage())
152+
153+
async def get_synced_output(self, timeout: Optional[float] = None) -> Tuple[bytes, bytes]:
154+
synced_output: LipsyncSyncedOutputMessage = await asyncio.wait_for(
155+
self._out_queue.get(), timeout=timeout
156+
)
157+
158+
video_frame = await self._decode_video_frame(synced_output.video_frame)
159+
audio_frame = await self._decode_audio_frame(synced_output.audio_frame)
160+
161+
if self._video_out_frame_index == 0:
162+
self._video_out_start_time = time.time() + self._sync_latency
163+
164+
time_til_frame = (
165+
self._video_out_start_time
166+
+ (self._video_out_frame_index * self._video_frame_interval)
167+
- time.time()
168+
)
169+
if time_til_frame > 0:
170+
await asyncio.sleep(time_til_frame)
171+
172+
return video_frame, audio_frame

decart/lipsync/messages.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from pydantic import BaseModel, Field, ConfigDict, TypeAdapter
2+
from typing import Literal, Union, Annotated
3+
4+
5+
class LipsyncMessage(BaseModel):
6+
model_config = ConfigDict(ser_json_bytes="base64", val_json_bytes="base64")
7+
8+
9+
class LipsyncConfigMessage(LipsyncMessage):
10+
type: Literal["config"] = "config"
11+
video_fps: int
12+
audio_sample_rate: int
13+
14+
15+
class LipsyncConfigAckMessage(LipsyncMessage):
16+
type: Literal["config_ack"] = "config_ack"
17+
18+
19+
class LipsyncAudioInputMessage(LipsyncMessage):
20+
type: Literal["audio_input"] = "audio_input"
21+
audio_data: bytes
22+
23+
24+
class LipsyncVideoInputMessage(LipsyncMessage):
25+
type: Literal["video_input"] = "video_input"
26+
video_frame: bytes
27+
28+
29+
class LipsyncInterruptAudioMessage(LipsyncMessage):
30+
type: Literal["interrupt_audio"] = "interrupt_audio"
31+
32+
33+
class LipsyncSyncedOutputMessage(LipsyncMessage):
34+
type: Literal["synced_result"] = "synced_result"
35+
video_frame: bytes
36+
audio_frame: bytes
37+
38+
39+
class LipsyncErrorMessage(LipsyncMessage):
40+
type: Literal["error"] = "error"
41+
message: str
42+
43+
44+
LipsyncClientMessage = Annotated[
45+
Union[
46+
LipsyncConfigMessage,
47+
LipsyncAudioInputMessage,
48+
LipsyncVideoInputMessage,
49+
LipsyncInterruptAudioMessage,
50+
],
51+
Field(discriminator="type"),
52+
]
53+
LipsyncServerMessage = Annotated[
54+
Union[LipsyncConfigAckMessage, LipsyncSyncedOutputMessage, LipsyncErrorMessage],
55+
Field(discriminator="type"),
56+
]
57+
58+
LipsyncServerMessageAdapter = TypeAdapter(LipsyncServerMessage)

examples/lipsync_file.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#!/usr/bin/env python
2+
"""
3+
Example of using Decart's Realtime Lipsync API to synchronize audio with video.
4+
5+
This example loads a video file and an audio file, processes them through the
6+
Decart Lipsync API, and saves the lipsynced result to a new video file.
7+
8+
Usage:
9+
python lipsync_file.py <video_file> <audio_file> <output_file>
10+
11+
Example:
12+
python lipsync_file.py input.mp4 speech.wav output_lipsynced.mp4
13+
python lipsync_file.py input.mp4 speech.mp3 output_lipsynced.mp4
14+
"""
15+
16+
import asyncio
17+
import os
18+
import sys
19+
import cv2
20+
import subprocess
21+
import tempfile
22+
from pathlib import Path
23+
24+
# Add parent directory for importing decart (for development)
25+
sys.path.insert(0, str(Path(__file__).parent.parent))
26+
27+
from decart.lipsync import RealtimeLipsyncClient
28+
29+
30+
async def process_lipsync(video_path: str, audio_path: str, output_path: str):
31+
"""Process video and audio through Decart's lipsync API."""
32+
33+
# Get API key
34+
api_key = os.getenv("DECART_API_KEY")
35+
if not api_key:
36+
print("Error: Please set DECART_API_KEY environment variable")
37+
return
38+
39+
# Initialize client
40+
client = RealtimeLipsyncClient(api_key=api_key)
41+
42+
print(f"Processing: {video_path} + {audio_path} -> {output_path}")
43+
44+
# Connect to server
45+
await client.connect()
46+
print("Connected to Decart Lipsync server")
47+
48+
try:
49+
# Load audio data - handle different formats
50+
with open(audio_path, "rb") as f:
51+
audio_data = f.read()
52+
53+
# Send audio to server (server handles chunking)
54+
await client.send_audio(audio_data)
55+
56+
# Load video frames and convert to RGB
57+
frame_count = 0
58+
cap = cv2.VideoCapture(video_path)
59+
while True:
60+
ret, frame = cap.read()
61+
if not ret:
62+
break
63+
frame_count += 1
64+
# Convert from BGR (OpenCV default) to RGB
65+
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
66+
await client.send_video_frame(rgb_frame)
67+
cap.release()
68+
69+
# Receive lipsynced output
70+
out = cv2.VideoWriter(
71+
output_path,
72+
cv2.VideoWriter_fourcc(*"mp4v"),
73+
client._video_fps,
74+
(rgb_frame.shape[1], rgb_frame.shape[0]),
75+
)
76+
for i in range(frame_count):
77+
try:
78+
79+
video_frame, audio_frame = await client.get_synced_output(timeout=1.0)
80+
bgr_frame = cv2.cvtColor(video_frame, cv2.COLOR_RGB2BGR)
81+
out.write(bgr_frame)
82+
except asyncio.TimeoutError:
83+
print(f"Warning: Timeout at frame {i}")
84+
break
85+
out.release()
86+
87+
finally:
88+
await client.disconnect()
89+
print("Disconnected from server")
90+
91+
92+
async def main():
93+
"""Main entry point."""
94+
if len(sys.argv) != 4:
95+
print("Usage: python lipsync_file.py <video_file> <wav_audio_file> <output_file>")
96+
print("Example: python lipsync_file.py input.mp4 speech.wav output_lipsynced.mp4")
97+
sys.exit(1)
98+
99+
video_path = sys.argv[1]
100+
audio_path = sys.argv[2]
101+
output_path = sys.argv[3]
102+
103+
# Check input files exist
104+
if not Path(video_path).exists():
105+
print(f"Error: Video file not found: {video_path}")
106+
sys.exit(1)
107+
108+
if not Path(audio_path).exists():
109+
print(f"Error: Audio file not found: {audio_path}")
110+
sys.exit(1)
111+
112+
# Process the files
113+
await process_lipsync(video_path, audio_path, output_path)
114+
115+
116+
if __name__ == "__main__":
117+
asyncio.run(main())

0 commit comments

Comments
 (0)