Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
373 changes: 218 additions & 155 deletions backends/advanced/src/advanced_omi_backend/app_factory.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,25 @@
audio_logger = logging.getLogger("audio_processing")


async def _get_conversation_or_error(conversation_id: str, user: User):
"""Fetch a conversation and validate user access.

Returns (conversation, None) on success, or (None, error_response) on failure.
"""
conversation = await Conversation.find_one(
Conversation.conversation_id == conversation_id
)
if not conversation:
return None, JSONResponse(
status_code=404, content={"error": "Conversation not found"}
)
if not user.is_superuser and conversation.user_id != str(user.user_id):
return None, JSONResponse(
status_code=403, content={"error": "Access forbidden"}
)
return conversation, None


async def close_current_conversation(client_id: str, user: User):
"""Close the current conversation for a specific client.

Expand Down Expand Up @@ -112,18 +131,9 @@ async def close_current_conversation(client_id: str, user: User):
async def get_conversation(conversation_id: str, user: User):
"""Get a single conversation with full transcript details."""
try:
# Find the conversation using Beanie
conversation = await Conversation.find_one(
Conversation.conversation_id == conversation_id
)
if not conversation:
return JSONResponse(
status_code=404, content={"error": "Conversation not found"}
)

# Check ownership for non-admin users
if not user.is_superuser and conversation.user_id != str(user.user_id):
return JSONResponse(status_code=403, content={"error": "Access forbidden"})
conversation, error = await _get_conversation_or_error(conversation_id, user)
if error:
return error

# Build response with explicit curated fields
response = {
Expand Down Expand Up @@ -184,16 +194,9 @@ async def get_conversation(conversation_id: str, user: User):
async def get_conversation_memories(conversation_id: str, user: User, limit: int = 100):
"""Get memories extracted from a specific conversation."""
try:
conversation = await Conversation.find_one(
Conversation.conversation_id == conversation_id
)
if not conversation:
return JSONResponse(
status_code=404, content={"error": "Conversation not found"}
)

if not user.is_superuser and conversation.user_id != str(user.user_id):
return JSONResponse(status_code=403, content={"error": "Access forbidden"})
conversation, error = await _get_conversation_or_error(conversation_id, user)
if error:
return error

memory_service = get_memory_service()
memories = await memory_service.get_memories_by_source(
Expand Down Expand Up @@ -671,29 +674,13 @@ async def delete_conversation(
f"Attempting to {'permanently ' if permanent else ''}delete conversation: {masked_id}"
)

# Find the conversation using Beanie
conversation = await Conversation.find_one(
Conversation.conversation_id == conversation_id
)

if not conversation:
return JSONResponse(
status_code=404,
content={"error": f"Conversation '{conversation_id}' not found"},
)

# Check ownership for non-admin users
if not user.is_superuser and conversation.user_id != str(user.user_id):
logger.warning(
f"User {user.user_id} attempted to delete conversation {conversation_id} without permission"
)
return JSONResponse(
status_code=403,
content={
"error": "Access forbidden. You can only delete your own conversations.",
"details": f"Conversation '{conversation_id}' does not belong to your account.",
},
)
conversation, error = await _get_conversation_or_error(conversation_id, user)
if error:
if error.status_code == 403:
logger.warning(
f"User {user.user_id} attempted to delete conversation {conversation_id} without permission"
)
return error

# Hard delete (admin only, permanent flag)
if permanent and user.is_superuser:
Expand All @@ -719,18 +706,9 @@ async def restore_conversation(conversation_id: str, user: User) -> JSONResponse
user: Requesting user
"""
try:
conversation = await Conversation.find_one(
Conversation.conversation_id == conversation_id
)

if not conversation:
return JSONResponse(
status_code=404, content={"error": "Conversation not found"}
)

# Permission check
if not user.is_superuser and conversation.user_id != str(user.user_id):
return JSONResponse(status_code=403, content={"error": "Access denied"})
conversation, error = await _get_conversation_or_error(conversation_id, user)
if error:
return error

if not conversation.deleted:
return JSONResponse(
Expand Down Expand Up @@ -933,16 +911,9 @@ def _enqueue_speaker_reprocessing_chain(
async def toggle_star(conversation_id: str, user: User):
"""Toggle the starred/favorite status of a conversation."""
try:
conversation = await Conversation.find_one(
Conversation.conversation_id == conversation_id
)
if not conversation:
return JSONResponse(
status_code=404, content={"error": "Conversation not found"}
)

if not user.is_superuser and conversation.user_id != str(user.user_id):
return JSONResponse(status_code=403, content={"error": "Access forbidden"})
conversation, error = await _get_conversation_or_error(conversation_id, user)
if error:
return error

# Toggle
conversation.starred = not conversation.starred
Expand Down Expand Up @@ -993,17 +964,9 @@ async def toggle_star(conversation_id: str, user: User):
async def reprocess_orphan(conversation_id: str, user: User):
"""Reprocess an orphan audio session - restore if deleted and enqueue full processing chain."""
try:
conversation = await Conversation.find_one(
Conversation.conversation_id == conversation_id
)
if not conversation:
return JSONResponse(
status_code=404, content={"error": "Conversation not found"}
)

# Check ownership
if not user.is_superuser and conversation.user_id != str(user.user_id):
return JSONResponse(status_code=403, content={"error": "Access forbidden"})
conversation, error = await _get_conversation_or_error(conversation_id, user)
if error:
return error

# Verify audio chunks exist (check both deleted and non-deleted)
total_chunks = await AudioChunkDocument.find(
Expand Down Expand Up @@ -1068,23 +1031,11 @@ async def reprocess_orphan(conversation_id: str, user: User):
async def reprocess_transcript(conversation_id: str, user: User):
"""Reprocess transcript for a conversation. Users can only reprocess their own conversations."""
try:
# Find the conversation using Beanie
conversation_model = await Conversation.find_one(
Conversation.conversation_id == conversation_id
conversation_model, error = await _get_conversation_or_error(
conversation_id, user
)
if not conversation_model:
return JSONResponse(
status_code=404, content={"error": "Conversation not found"}
)

# Check ownership for non-admin users
if not user.is_superuser and conversation_model.user_id != str(user.user_id):
return JSONResponse(
status_code=403,
content={
"error": "Access forbidden. You can only reprocess your own conversations."
},
)
if error:
return error

# Get audio_uuid from conversation
# Validate audio chunks exist in MongoDB
Expand Down Expand Up @@ -1137,24 +1088,11 @@ async def reprocess_memory(
):
"""Reprocess memory extraction for a specific transcript version. Users can only reprocess their own conversations."""
try:
# Find the conversation using Beanie
conversation_model = await Conversation.find_one(
Conversation.conversation_id == conversation_id
conversation_model, error = await _get_conversation_or_error(
conversation_id, user
)
if not conversation_model:
return JSONResponse(
status_code=404, content={"error": "Conversation not found"}
)

# Check ownership for non-admin users
if not user.is_superuser and conversation_model.user_id != str(user.user_id):
return JSONResponse(
status_code=403,
content={
"error": "Access forbidden. You can only reprocess your own conversations."
},
)

if error:
return error
# Resolve transcript version ID (handle "active" special case)
error, transcript_version_id, transcript_version = _resolve_transcript_version(
conversation_model, transcript_version_id
Expand Down Expand Up @@ -1205,23 +1143,11 @@ async def reprocess_speakers(
"""
try:
# 1. Find conversation and validate ownership
conversation_model = await Conversation.find_one(
Conversation.conversation_id == conversation_id
conversation_model, error = await _get_conversation_or_error(
conversation_id, user
)
if not conversation_model:
return JSONResponse(
status_code=404, content={"error": "Conversation not found"}
)

# Check ownership for non-admin users
if not user.is_superuser and conversation_model.user_id != str(user.user_id):
return JSONResponse(
status_code=403,
content={
"error": "Access forbidden. You can only reprocess your own conversations."
},
)

if error:
return error
# 2-3. Resolve source transcript version ID and find version object
error, source_version_id, source_version = _resolve_transcript_version(
conversation_model, transcript_version_id
Expand Down Expand Up @@ -1349,23 +1275,11 @@ async def activate_transcript_version(
):
"""Activate a specific transcript version. Users can only modify their own conversations."""
try:
# Find the conversation using Beanie
conversation_model = await Conversation.find_one(
Conversation.conversation_id == conversation_id
conversation_model, error = await _get_conversation_or_error(
conversation_id, user
)
if not conversation_model:
return JSONResponse(
status_code=404, content={"error": "Conversation not found"}
)

# Check ownership for non-admin users
if not user.is_superuser and conversation_model.user_id != str(user.user_id):
return JSONResponse(
status_code=403,
content={
"error": "Access forbidden. You can only modify your own conversations."
},
)
if error:
return error

# Activate the transcript version using Beanie model method
success = conversation_model.set_active_transcript_version(version_id)
Expand Down Expand Up @@ -1401,23 +1315,11 @@ async def activate_transcript_version(
async def activate_memory_version(conversation_id: str, version_id: str, user: User):
"""Activate a specific memory version. Users can only modify their own conversations."""
try:
# Find the conversation using Beanie
conversation_model = await Conversation.find_one(
Conversation.conversation_id == conversation_id
conversation_model, error = await _get_conversation_or_error(
conversation_id, user
)
if not conversation_model:
return JSONResponse(
status_code=404, content={"error": "Conversation not found"}
)

# Check ownership for non-admin users
if not user.is_superuser and conversation_model.user_id != str(user.user_id):
return JSONResponse(
status_code=403,
content={
"error": "Access forbidden. You can only modify your own conversations."
},
)
if error:
return error

# Activate the memory version using Beanie model method
success = conversation_model.set_active_memory_version(version_id)
Expand Down Expand Up @@ -1449,23 +1351,11 @@ async def activate_memory_version(conversation_id: str, version_id: str, user: U
async def get_conversation_version_history(conversation_id: str, user: User):
"""Get version history for a conversation. Users can only access their own conversations."""
try:
# Find the conversation using Beanie to check ownership
conversation_model = await Conversation.find_one(
Conversation.conversation_id == conversation_id
conversation_model, error = await _get_conversation_or_error(
conversation_id, user
)
if not conversation_model:
return JSONResponse(
status_code=404, content={"error": "Conversation not found"}
)

# Check ownership for non-admin users
if not user.is_superuser and conversation_model.user_id != str(user.user_id):
return JSONResponse(
status_code=403,
content={
"error": "Access forbidden. You can only access your own conversations."
},
)
if error:
return error

# Get version history from model
# Convert datetime objects to ISO strings for JSON serialization
Expand Down
Loading
Loading