Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions backends/advanced/docker-compose-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Isolated test environment for integration tests
# Uses different ports to avoid conflicts with development environment

name: backend-test

services:
chronicle-backend-test:
build:
Expand All @@ -16,7 +18,8 @@ services:
- ./data/test_audio_chunks:/app/audio_chunks
- ./data/test_debug_dir:/app/debug # Fixed: mount to /app/debug for plugin database
- ./data/test_data:/app/data
- ../../config:/app/config # Mount config directory with defaults.yml and config.yml
- ../../config:/app/config # Mount config directory with defaults.yml
- ../../tests/configs:/app/test-configs:ro # Mount test-specific configs
- ${PLUGINS_CONFIG:-../../tests/config/plugins.test.yml}:/app/config/plugins.yml # Mount test plugins config to correct location
environment:
# Override with test-specific settings
Expand All @@ -25,6 +28,8 @@ services:
- QDRANT_PORT=6333
- REDIS_URL=redis://redis-test:6379/0
- DEBUG_DIR=/app/debug # Fixed: match plugin database mount path
# Test configuration file
- CONFIG_FILE=${TEST_CONFIG_FILE:-/app/test-configs/deepgram-openai.yml}
# Import API keys from environment
- DEEPGRAM_API_KEY=${DEEPGRAM_API_KEY}
- OPENAI_API_KEY=${OPENAI_API_KEY}
Expand All @@ -45,13 +50,16 @@ services:
# Speaker recognition controlled by config.yml (disabled in test config for CI performance)
- SPEAKER_SERVICE_URL=http://speaker-service-test:8085
- CORS_ORIGINS=http://localhost:3001,http://localhost:8001,https://localhost:3001,https://localhost:8001
# Set low inactivity timeout for tests (2 seconds instead of 60)
- SPEECH_INACTIVITY_THRESHOLD_SECONDS=2
# Set inactivity timeout for tests (20 seconds of audio time)
# This is audio duration, not wall-clock time
- SPEECH_INACTIVITY_THRESHOLD_SECONDS=20
# Set low speech detection thresholds for tests
- SPEECH_DETECTION_MIN_DURATION=2.0 # 2 seconds instead of 10
- SPEECH_DETECTION_MIN_WORDS=5 # 5 words instead of 10
# Wait for audio queue to drain before timing out (test mode)
- WAIT_FOR_AUDIO_QUEUE_DRAIN=true
# Mock speaker recognition for tests (avoids resource-intensive ML service)
- USE_MOCK_SPEAKER_CLIENT=true
depends_on:
qdrant-test:
condition: service_started
Expand Down Expand Up @@ -168,7 +176,8 @@ services:
- ./data/test_audio_chunks:/app/audio_chunks
- ./data/test_debug_dir:/app/debug # Fixed: mount to /app/debug for plugin database
- ./data/test_data:/app/data
- ../../config:/app/config # Mount config directory with defaults.yml and config.yml
- ../../config:/app/config # Mount config directory with defaults.yml
- ../../tests/configs:/app/test-configs:ro # Mount test-specific configs
- ${PLUGINS_CONFIG:-../../tests/config/plugins.test.yml}:/app/config/plugins.yml # Mount test plugins config to correct location
environment:
# Same environment as backend
Expand All @@ -177,6 +186,8 @@ services:
- QDRANT_PORT=6333
- REDIS_URL=redis://redis-test:6379/0
- DEBUG_DIR=/app/debug # Fixed: match plugin database mount path
# Test configuration file
- CONFIG_FILE=${TEST_CONFIG_FILE:-/app/test-configs/deepgram-openai.yml}
- DEEPGRAM_API_KEY=${DEEPGRAM_API_KEY}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- GROQ_API_KEY=${GROQ_API_KEY}
Expand All @@ -192,13 +203,16 @@ services:
- MYCELIA_DB=mycelia_test
# Speaker recognition controlled by config.yml (disabled in test config for CI performance)
- SPEAKER_SERVICE_URL=http://speaker-service-test:8085
# Set low inactivity timeout for tests (2 seconds instead of 60)
- SPEECH_INACTIVITY_THRESHOLD_SECONDS=2
# Set inactivity timeout for tests (20 seconds of audio time)
# This is audio duration, not wall-clock time
- SPEECH_INACTIVITY_THRESHOLD_SECONDS=20
# Set low speech detection thresholds for tests
- SPEECH_DETECTION_MIN_DURATION=2.0 # 2 seconds instead of 10
- SPEECH_DETECTION_MIN_WORDS=5 # 5 words instead of 10
# Wait for audio queue to drain before timing out (test mode)
- WAIT_FOR_AUDIO_QUEUE_DRAIN=true
# Mock speaker recognition for tests (avoids resource-intensive ML service)
- USE_MOCK_SPEAKER_CLIENT=true
depends_on:
chronicle-backend-test:
condition: service_healthy
Expand Down
12 changes: 2 additions & 10 deletions backends/advanced/run-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,13 @@ if [ -d "./data/test_audio_chunks/" ] || [ -d "./data/test_data/" ] || [ -d "./d
docker run --rm -v "$(pwd)/data:/data" alpine sh -c 'rm -rf /data/test_*' 2>/dev/null || true
fi

# Use unique project name to avoid conflicts with development environment
export COMPOSE_PROJECT_NAME="advanced-backend-test"
# Note: Project name 'backend-test' is set in docker-compose-test.yml
# No need to export COMPOSE_PROJECT_NAME - it's handled by the compose file

# Stop any existing test containers
print_info "Stopping existing test containers..."
# Try cleanup with current project name
docker compose -f docker-compose-test.yml down -v || true

# Also try cleanup with default project name (in case containers were started without COMPOSE_PROJECT_NAME)
COMPOSE_PROJECT_NAME=advanced docker compose -f docker-compose-test.yml down -v 2>/dev/null || true

# Run integration tests
print_info "Running integration tests..."
print_info "Using fresh mode (CACHED_MODE=False) for clean testing"
Expand Down Expand Up @@ -268,8 +264,6 @@ else
if [ "${CLEANUP_CONTAINERS:-true}" != "false" ]; then
print_info "Cleaning up test containers after failure..."
docker compose -f docker-compose-test.yml down -v || true
# Also cleanup with default project name
COMPOSE_PROJECT_NAME=advanced docker compose -f docker-compose-test.yml down -v 2>/dev/null || true
docker system prune -f || true
else
print_warning "Skipping cleanup (CLEANUP_CONTAINERS=false) - containers left running for debugging"
Expand All @@ -282,8 +276,6 @@ fi
if [ "${CLEANUP_CONTAINERS:-true}" != "false" ]; then
print_info "Cleaning up test containers..."
docker compose -f docker-compose-test.yml down -v || true
# Also cleanup with default project name
COMPOSE_PROJECT_NAME=advanced docker compose -f docker-compose-test.yml down -v 2>/dev/null || true
docker system prune -f || true
else
print_warning "Skipping cleanup (CLEANUP_CONTAINERS=false) - containers left running"
Expand Down
3 changes: 1 addition & 2 deletions backends/advanced/src/advanced_omi_backend/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ class AppConfig:
def __init__(self):
# MongoDB Configuration
self.mongodb_uri = os.getenv("MONGODB_URI", "mongodb://mongo:27017")
# default to legacy value to avoid breaking peoples .env
self.mongodb_database = os.getenv("MONGODB_DATABASE", "friend-lite")
self.mongodb_database = os.getenv("MONGODB_DATABASE", "chronicle")
self.mongo_client = AsyncIOMotorClient(self.mongodb_uri)
self.db = self.mongo_client.get_default_database(self.mongodb_database)
self.users_col = self.db["users"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,39 @@ async def _stop():
logger.info(f"Stream {stream_id} stopped, sent {total_chunks} chunks")
return total_chunks

def close_stream_without_stop(self, stream_id: str) -> int:
"""Close WebSocket connection without sending audio-stop event.

This simulates abrupt disconnection (network failure, client crash)
and should trigger websocket_disconnect end_reason.

Args:
stream_id: Stream session ID

Returns:
Total chunks sent during this session
"""
session = self._sessions.get(stream_id)
if not session:
raise ValueError(f"Unknown stream_id: {stream_id}")

async def _close_abruptly():
# Just close the connection without audio-stop
await session.client.close()

future = asyncio.run_coroutine_threadsafe(_close_abruptly(), session.loop)
future.result(timeout=10)

# Stop the event loop
session.loop.call_soon_threadsafe(session.loop.stop)
session.thread.join(timeout=5)

total_chunks = session.chunk_count
del self._sessions[stream_id]

logger.info(f"Stream {stream_id} closed abruptly (no audio-stop), sent {total_chunks} chunks")
return total_chunks

def get_session(self, stream_id: str) -> Optional[StreamSession]:
"""Get session info for a stream."""
return self._sessions.get(stream_id)
Expand Down
9 changes: 8 additions & 1 deletion backends/advanced/src/advanced_omi_backend/config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,14 @@ def load_config(force_reload: bool = False) -> DictConfig:

config_dir = get_config_dir()
defaults_path = config_dir / "defaults.yml"
config_path = config_dir / "config.yml"

# Support CONFIG_FILE env var for test configurations
config_file = os.getenv("CONFIG_FILE", "config.yml")
# Handle both absolute paths and relative filenames
if os.path.isabs(config_file):
config_path = Path(config_file)
else:
config_path = config_dir / config_file

# Load defaults
defaults = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,22 @@
from fastapi import UploadFile
from fastapi.responses import JSONResponse

from advanced_omi_backend.controllers.queue_controller import (
JOB_RESULT_TTL,
start_post_conversation_jobs,
transcription_queue,
)
from advanced_omi_backend.models.conversation import create_conversation
from advanced_omi_backend.models.user import User
from advanced_omi_backend.services.transcription import is_transcription_available
from advanced_omi_backend.utils.audio_chunk_utils import convert_audio_to_chunks
from advanced_omi_backend.utils.audio_utils import (
AudioValidationError,
validate_and_prepare_audio,
)
from advanced_omi_backend.workers.transcription_jobs import (
transcribe_full_audio_job,
)

logger = logging.getLogger(__name__)
audio_logger = logging.getLogger("audio_processing")
Expand Down Expand Up @@ -94,7 +103,8 @@ async def upload_and_process_audio_files(
audio_data, sample_rate, sample_width, channels, duration = await validate_and_prepare_audio(
audio_data=content,
expected_sample_rate=16000, # Expecting 16kHz
convert_to_mono=True # Convert stereo to mono
convert_to_mono=True, # Convert stereo to mono
auto_resample=True # Auto-resample if sample rate doesn't match
)
except AudioValidationError as e:
processed_files.append({
Expand Down Expand Up @@ -166,54 +176,61 @@ async def upload_and_process_audio_files(
continue

# Enqueue batch transcription job first (file uploads need transcription)
from advanced_omi_backend.controllers.queue_controller import (
JOB_RESULT_TTL,
start_post_conversation_jobs,
transcription_queue,
)
from advanced_omi_backend.workers.transcription_jobs import (
transcribe_full_audio_job,
)

version_id = str(uuid.uuid4())
transcribe_job_id = f"transcribe_{conversation_id[:12]}"

transcription_job = transcription_queue.enqueue(
transcribe_full_audio_job,
conversation_id,
version_id,
"batch", # trigger
job_timeout=1800, # 30 minutes
result_ttl=JOB_RESULT_TTL,
job_id=transcribe_job_id,
description=f"Transcribe uploaded file {conversation_id[:8]}",
meta={'conversation_id': conversation_id, 'client_id': client_id}
)

audio_logger.info(f"📥 Enqueued transcription job {transcription_job.id} for uploaded file")
# Check if transcription provider is available before enqueueing
transcription_job = None
if is_transcription_available(mode="batch"):
transcription_job = transcription_queue.enqueue(
transcribe_full_audio_job,
conversation_id,
version_id,
"batch", # trigger
job_timeout=1800, # 30 minutes
result_ttl=JOB_RESULT_TTL,
job_id=transcribe_job_id,
description=f"Transcribe uploaded file {conversation_id[:8]}",
meta={'conversation_id': conversation_id, 'client_id': client_id}
)
audio_logger.info(f"📥 Enqueued transcription job {transcription_job.id} for uploaded file")
else:
audio_logger.warning(
f"⚠️ Skipping transcription for conversation {conversation_id}: "
"No transcription provider configured"
)

# Enqueue post-conversation processing job chain (depends on transcription)
job_ids = start_post_conversation_jobs(
conversation_id=conversation_id,
user_id=user.user_id,
transcript_version_id=version_id, # Pass the version_id from transcription job
depends_on_job=transcription_job, # Wait for transcription to complete
depends_on_job=transcription_job, # Wait for transcription to complete (or None)
client_id=client_id # Pass client_id for UI tracking
)

processed_files.append({
"filename": file.filename,
"status": "processing",
"status": "started", # RQ standard: job has been enqueued
"conversation_id": conversation_id,
"transcript_job_id": transcription_job.id,
"transcript_job_id": transcription_job.id if transcription_job else None,
"speaker_job_id": job_ids['speaker_recognition'],
"memory_job_id": job_ids['memory'],
"duration_seconds": round(duration, 2),
})

# Build job chain description
job_chain = []
if transcription_job:
job_chain.append(transcription_job.id)
if job_ids['speaker_recognition']:
job_chain.append(job_ids['speaker_recognition'])
if job_ids['memory']:
job_chain.append(job_ids['memory'])

audio_logger.info(
f"✅ Processed {file.filename} → conversation {conversation_id}, "
f"jobs: {transcription_job.id}{job_ids['speaker_recognition']} → {job_ids['memory']}"
f"jobs: {''.join(job_chain) if job_chain else 'none'}"
)

except (OSError, IOError) as e:
Expand All @@ -233,20 +250,33 @@ async def upload_and_process_audio_files(
"error": str(e),
})

successful_files = [f for f in processed_files if f.get("status") == "processing"]
successful_files = [f for f in processed_files if f.get("status") == "started"]
failed_files = [f for f in processed_files if f.get("status") == "error"]

return {
response_body = {
"message": f"Uploaded and processing {len(successful_files)} file(s)",
"client_id": client_id,
"files": processed_files,
"summary": {
"total": len(files),
"processing": len(successful_files),
"started": len(successful_files), # RQ standard
"failed": len(failed_files),
},
}

# Return appropriate HTTP status code based on results
if len(failed_files) == len(files):
# ALL files failed - return 400 Bad Request
audio_logger.error(f"All {len(files)} file(s) failed to upload")
return JSONResponse(status_code=400, content=response_body)
elif len(failed_files) > 0:
# SOME files failed (partial success) - return 207 Multi-Status
audio_logger.warning(f"Partial upload: {len(successful_files)} succeeded, {len(failed_files)} failed")
return JSONResponse(status_code=207, content=response_body)
else:
# All files succeeded - return 200 OK
return response_body

except (OSError, IOError) as e:
# File system errors during upload handling
audio_logger.exception("File I/O error in upload_and_process_audio_files")
Expand Down
Loading
Loading