Skip to content

Conversation

@satvikbatra
Copy link

@satvikbatra satvikbatra commented Oct 30, 2025

Implements persistent conversation storage for voice assistant interactions in PostgreSQL. Stores user messages (actual speech transcriptions) and assistant responses.

Key Features

  • Auto-disable safety: Feature auto-disables if PostgreSQL env vars missing (prevents crashes)
  • Actual transcriptions: Extracts real user speech from context_aggregator
  • Multi-response support: Saves multiple assistant responses per user question (e.g., LLM response + function call result)
  • Session lifecycle: Marks conversations as completed on session cleanup

Implementation

Layered architecture following existing patterns:

  • Database queries with parameterized SQL
  • Accessor layer with error handling
  • Decoder layer for Pydantic models
  • Service layer API

Database Schema

conversations: session_id, merchant_id, user_email, status, message_count, metadata (JSONB), timestamps

conversation_messages: conversation_id (FK), role (user/assistant), content, sequence_number, timestamp

Configuration

ENABLE_CONVERSATION_STORAGE=true (default)

Requires: POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD

@coderabbitai
Copy link

coderabbitai bot commented Oct 30, 2025

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

The PR introduces a complete conversation storage system for the voice agent platform. It adds PostgreSQL database schema (conversations and conversation_messages tables), implements database accessors and decoders, creates a service layer for conversation lifecycle management, integrates conversation creation/completion hooks into session management, updates LLMSpyProcessor to accept a context aggregator and save conversation pairs, and gates the feature behind an ENABLE_CONVERSATION_STORAGE configuration flag validated against database connectivity.

Changes

Cohort / File(s) Summary
Configuration & Feature Flag
app/core/config.py
Added ENABLE_CONVERSATION_STORAGE configuration flag with automatic validation of PostgreSQL connection parameters; disables storage if any required credentials are missing.
Database Schema & Migration
scripts/create_tables.py
Added table creation functions for conversations and conversation_messages with appropriate columns, constraints, and indexes; integrated into main table creation flow.
Database Query Builders
app/database/queries/conversations.py
New module providing parameterized SQL builders for conversation CRUD operations, message persistence, activity tracking, and message count management.
Database Accessors
app/database/accessor/conversations.py
New async accessor layer exposing conversation creation, retrieval, updates (status, activity, summary), completion, and message operations with consistent error handling.
Database Decoders
app/database/decoder/conversations.py
New decoding utilities converting asyncpg records to Pydantic models for conversations and messages with metadata JSON deserialization.
Domain Models
app/schemas.py
Added ConversationStatus enum, Conversation, ConversationMessage, and ConversationWithMessages Pydantic models to represent conversation data structures.
Service Layer
app/services/conversation_storage.py
New high-level service wrapping database accessors for conversation and message management, with singleton accessor function.
LLM Processing & Context
app/agents/voice/automatic/processors/llm_spy.py
Enhanced LLMSpyProcessor with optional context_aggregator parameter, added _save_conversation_pair() method to persist user/assistant message pairs, and modified frame handling to accumulate and save conversations when storage enabled.
Processor Initialization
app/agents/voice/automatic/__init__.py
Updated LLMSpyProcessor constructor calls to pass context_aggregator parameter in both STTMuteFilter and non-filter branches.
Session Lifecycle Integration
app/main.py
Added conversation creation on bot connect when storage enabled, with exception handling and logging.
Session Cleanup
app/helpers/automatic/session_manager.py
Added conversation completion call in cleanup callback when storage enabled, with error handling.

Sequence Diagram

sequenceDiagram
    participant Client
    participant MainServer as Main<br/>(bot_connect)
    participant SessionMgr as SessionManager<br/>(cleanup)
    participant LLMSpyProc as LLMSpyProcessor
    participant ConvService as ConvStorageService
    participant Database as PostgreSQL

    Client->>MainServer: Session initiated
    MainServer->>ConvService: create_conversation(session_id, ...)
    ConvService->>Database: INSERT INTO conversations
    Database-->>ConvService: conversation record
    ConvService-->>MainServer: Conversation created

    MainServer->>LLMSpyProc: Initialize with context_aggregator
    
    LLMSpyProc->>LLMSpyProc: process_frame (LLMTextFrame)
    LLMSpyProc->>LLMSpyProc: Accumulate text when collecting
    
    LLMSpyProc->>LLMSpyProc: process_frame (LLMFullResponseEndFrame)
    LLMSpyProc->>LLMSpyProc: _save_conversation_pair(assistant_msg)
    LLMSpyProc->>ConvService: save_message(conv_id, "assistant", ...)
    ConvService->>Database: INSERT INTO conversation_messages
    Database-->>ConvService: Message saved
    ConvService-->>LLMSpyProc: Success

    Client->>SessionMgr: Session ends
    SessionMgr->>ConvService: complete_conversation(session_id)
    ConvService->>Database: UPDATE conversations SET status='completed'
    Database-->>ConvService: Updated
    ConvService-->>SessionMgr: Conversation completed
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Areas requiring extra attention:

  • LLMSpyProcessor logic changes (app/agents/voice/automatic/processors/llm_spy.py): The conditional handling of conversation storage alongside existing chart functionality needs careful review to ensure no regressions in frame processing order and TTS sanitization paths.
  • Database schema design (scripts/create_tables.py, app/database/queries/conversations.py): Verify foreign key constraints, indexes for performance (particularly session_id lookups), and timestamp handling across tables.
  • Context aggregator integration (app/agents/voice/automatic/__init__.py, llm_spy.py): Ensure context_aggregator is properly initialized and passed through the processor pipeline; verify null-handling when aggregator is None.
  • Configuration validation (app/core/config.py): Verify PostgreSQL credential validation logic doesn't mask legitimate connection failures; check environment variable reading robustness.
  • Conversation pair save logic (llm_spy.py::_save_conversation_pair): Review deduplication using _last_saved_user_message, error handling, and interaction with context_aggregator to extract user messages.

Possibly related PRs

  • BZ-45188: feat: Mute until first bot complete #308: Modifies the same LLMSpyProcessor constructor signature and voice pipeline initialization; involves stt_mute_filter integration alongside the context_aggregator addition.
  • Format Clairvoyance #212: Contains earlier modifications to LLMSpyProcessor in app/agents/voice/automatic/processors/llm_spy.py affecting the same processor initialization path.
  • Correct ordering of import #223: Modifies LLMSpyProcessor registration and initialization wiring in the voice pipeline; overlaps with processor signature and initialization changes.

Suggested reviewers

  • badri-singhal
  • murdore

Poem

🐰 Hops through conversations, storing every word,
Context aggregates where assistant's voice is heard,
From sessions born to conversations saved,
In PostgreSQL's garden, dialogue is paved!

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The pull request title "feat: Implement conversation storage feature" clearly and directly describes the primary objective of the changeset. The changes across all modified files—including new database accessors, decoders, and queries; a conversation storage service; configuration flags; schema definitions; and integration points in the bot connection flow and session cleanup—all work cohesively to implement a comprehensive conversation storage feature. The title is concise, specific, and avoids vague terminology, making it immediately clear to reviewers that this PR introduces persistence capabilities for conversations in a database.
Docstring Coverage ✅ Passed Docstring coverage is 96.43% which is sufficient. The required threshold is 80.00%.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
app/services/conversation_storage.py (1)

280-294: Consider thread-safe singleton pattern.

The current singleton implementation is not safe for concurrent access. Multiple concurrent calls to get_conversation_storage_service() could create multiple instances. However, since ConversationStorageService is stateless, this is unlikely to cause functional issues.

If strict singleton semantics are desired, consider using a more robust pattern:

from asyncio import Lock

_conversation_storage_service: Optional[ConversationStorageService] = None
_service_lock = Lock()

async def get_conversation_storage_service() -> ConversationStorageService:
    """Get the global conversation storage service instance."""
    global _conversation_storage_service
    if _conversation_storage_service is None:
        async with _service_lock:
            if _conversation_storage_service is None:
                _conversation_storage_service = ConversationStorageService()
    return _conversation_storage_service

Or use a simpler synchronous approach if the service is always initialized at startup:

_conversation_storage_service = ConversationStorageService()

def get_conversation_storage_service() -> ConversationStorageService:
    """Get the global conversation storage service instance."""
    return _conversation_storage_service
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3b7b13e and 26fea01.

📒 Files selected for processing (11)
  • app/agents/voice/automatic/__init__.py (1 hunks)
  • app/agents/voice/automatic/processors/llm_spy.py (4 hunks)
  • app/core/config.py (1 hunks)
  • app/database/accessor/conversations.py (1 hunks)
  • app/database/decoder/conversations.py (1 hunks)
  • app/database/queries/conversations.py (1 hunks)
  • app/helpers/automatic/session_manager.py (2 hunks)
  • app/main.py (2 hunks)
  • app/schemas.py (1 hunks)
  • app/services/conversation_storage.py (1 hunks)
  • scripts/create_tables.py (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (9)
app/database/decoder/conversations.py (1)
app/schemas.py (3)
  • Conversation (145-166)
  • ConversationMessage (169-178)
  • ConversationStatus (140-142)
app/main.py (2)
app/services/conversation_storage.py (2)
  • get_conversation_storage_service (284-294)
  • create_conversation (35-82)
app/database/accessor/conversations.py (1)
  • create_conversation (34-76)
app/agents/voice/automatic/processors/llm_spy.py (5)
app/agents/voice/automatic/utils/conversation_manager.py (3)
  • get_conversation_manager (470-475)
  • start_turn_with_events (230-251)
  • add_llm_response_with_events (253-275)
app/agents/voice/automatic/services/mem0/memory.py (1)
  • process_frame (380-530)
app/agents/voice/automatic/rtvi/rtvi.py (1)
  • emit_rtvi_event (6-13)
app/services/conversation_storage.py (2)
  • get_conversation_storage_service (284-294)
  • save_message (122-159)
app/database/accessor/conversations.py (1)
  • save_message (229-259)
scripts/create_tables.py (1)
app/database/__init__.py (1)
  • get_db_connection (66-74)
app/schemas.py (1)
app/agents/voice/automatic/features/charts/conversation.py (1)
  • ConversationMessage (63-96)
app/helpers/automatic/session_manager.py (2)
app/services/conversation_storage.py (2)
  • get_conversation_storage_service (284-294)
  • complete_conversation (209-226)
app/database/accessor/conversations.py (1)
  • complete_conversation (159-177)
app/database/accessor/conversations.py (5)
app/database/decoder/conversations.py (4)
  • decode_conversation (13-47)
  • decode_conversation_list (50-88)
  • decode_conversation_message (91-109)
  • decode_conversation_message_list (112-132)
app/database/queries/__init__.py (1)
  • run_parameterized_query (14-30)
app/database/queries/conversations.py (13)
  • complete_conversation_query (126-137)
  • get_conversation_by_id_query (87-93)
  • get_conversation_by_session_id_query (78-84)
  • get_last_n_messages_query (251-264)
  • get_message_count_query (267-276)
  • get_messages_by_conversation_id_query (228-248)
  • get_recent_conversations_query (156-189)
  • insert_conversation_query (15-75)
  • insert_message_query (193-225)
  • update_conversation_activity_query (96-107)
  • update_conversation_message_count_query (279-296)
  • update_conversation_status_query (110-123)
  • update_conversation_summary_query (140-153)
app/schemas.py (2)
  • Conversation (145-166)
  • ConversationMessage (169-178)
app/services/conversation_storage.py (3)
  • create_conversation (35-82)
  • complete_conversation (209-226)
  • save_message (122-159)
app/agents/voice/automatic/__init__.py (1)
app/agents/voice/automatic/processors/llm_spy.py (1)
  • LLMSpyProcessor (126-403)
app/services/conversation_storage.py (2)
app/database/accessor/conversations.py (9)
  • complete_conversation (159-177)
  • create_conversation (34-76)
  • get_conversation_by_session_id (79-96)
  • get_last_n_messages (285-305)
  • get_messages_by_conversation_id (262-282)
  • get_recent_conversations (200-225)
  • save_message (229-259)
  • update_conversation_activity (119-136)
  • update_conversation_summary (180-197)
app/schemas.py (3)
  • Conversation (145-166)
  • ConversationMessage (169-178)
  • ConversationWithMessages (181-185)
🪛 Ruff (0.14.2)
app/main.py

252-252: Do not catch blind exception: Exception

(BLE001)

app/agents/voice/automatic/processors/llm_spy.py

378-378: Do not catch blind exception: Exception

(BLE001)


402-402: Do not catch blind exception: Exception

(BLE001)

scripts/create_tables.py

214-214: Do not catch blind exception: Exception

(BLE001)


229-229: Do not catch blind exception: Exception

(BLE001)

app/database/queries/conversations.py

30-50: Possible SQL injection vector through string-based query construction

(S608)


82-82: Possible SQL injection vector through string-based query construction

(S608)


91-91: Possible SQL injection vector through string-based query construction

(S608)


100-105: Possible SQL injection vector through string-based query construction

(S608)


116-121: Possible SQL injection vector through string-based query construction

(S608)


130-135: Possible SQL injection vector through string-based query construction

(S608)


146-151: Possible SQL injection vector through string-based query construction

(S608)


182-187: Possible SQL injection vector through string-based query construction

(S608)


202-213: Possible SQL injection vector through string-based query construction

(S608)


235-239: Possible SQL injection vector through string-based query construction

(S608)


257-262: Possible SQL injection vector through string-based query construction

(S608)


271-274: Possible SQL injection vector through string-based query construction

(S608)


285-294: Possible SQL injection vector through string-based query construction

(S608)

app/helpers/automatic/session_manager.py

94-94: Do not catch blind exception: Exception

(BLE001)

app/database/accessor/conversations.py

72-72: Consider moving this statement to an else block

(TRY300)


74-74: Do not catch blind exception: Exception

(BLE001)


92-92: Consider moving this statement to an else block

(TRY300)


94-94: Do not catch blind exception: Exception

(BLE001)


112-112: Consider moving this statement to an else block

(TRY300)


114-114: Do not catch blind exception: Exception

(BLE001)


132-132: Consider moving this statement to an else block

(TRY300)


134-134: Do not catch blind exception: Exception

(BLE001)


152-152: Consider moving this statement to an else block

(TRY300)


154-154: Do not catch blind exception: Exception

(BLE001)


173-173: Consider moving this statement to an else block

(TRY300)


175-175: Do not catch blind exception: Exception

(BLE001)


193-193: Consider moving this statement to an else block

(TRY300)


195-195: Do not catch blind exception: Exception

(BLE001)


221-221: Consider moving this statement to an else block

(TRY300)


223-223: Do not catch blind exception: Exception

(BLE001)


255-255: Consider moving this statement to an else block

(TRY300)


257-257: Do not catch blind exception: Exception

(BLE001)


278-278: Consider moving this statement to an else block

(TRY300)


280-280: Do not catch blind exception: Exception

(BLE001)


301-301: Consider moving this statement to an else block

(TRY300)


303-303: Do not catch blind exception: Exception

(BLE001)


319-319: Consider moving this statement to an else block

(TRY300)


321-321: Do not catch blind exception: Exception

(BLE001)


337-337: Consider moving this statement to an else block

(TRY300)


339-339: Do not catch blind exception: Exception

(BLE001)

🔇 Additional comments (34)
app/services/conversation_storage.py (8)

35-82: LGTM!

The method provides a clean wrapper around the database accessor with appropriate logging.


84-94: LGTM!

Clean delegation to the database accessor.


96-120: LGTM!

The method correctly retrieves conversation and messages separately, returning a composed object.


161-179: LGTM!

The method correctly retrieves the last N messages with appropriate fallback behavior.


181-207: LGTM!

Clean wrapper around the database accessor with appropriate parameter passing.


209-226: LGTM!

The method correctly marks conversations as completed with appropriate logging.


228-246: LGTM!

The method correctly updates the conversation summary with appropriate error logging.


248-277: LGTM!

The method correctly builds LLM context from conversation data with appropriate structure.

app/database/queries/conversations.py (13)

78-84: LGTM!

Simple, correctly parameterized query.


87-93: LGTM!

Correctly parameterized query.


96-107: LGTM!

Correctly uses database NOW() for timestamps.


110-123: LGTM!

Properly parameterized UPDATE query.


126-137: Verify status value matches enum.

The status is hardcoded as "completed" (line 136). Ensure this matches ConversationStatus.COMPLETED from app/schemas.py.

The verification script from the previous comment will also show this enum value.


140-153: LGTM!

Correctly parameterized UPDATE query.


156-189: LGTM!

Dynamic query construction with proper parameter indexing.


193-225: LGTM!

Correctly structured INSERT query.


228-248: LGTM!

Correctly handles optional LIMIT parameter.


251-264: LGTM!

Correctly retrieves last N messages in DESC order (reversed by caller).


267-276: LGTM!

Straightforward COUNT query.


279-296: LGTM!

The query correctly uses $1 for both the subquery filter (conversation_id FK) and the outer UPDATE filter (id PK), as they refer to the same UUID value.


15-75: No action required.

The verification confirms that the hardcoded "active" string matches ConversationStatus.ACTIVE from the enum definition. The code is correct and consistent.

app/database/accessor/conversations.py (13)

34-76: LGTM!

The function correctly creates a conversation with appropriate error handling and logging.


79-96: LGTM!

Clean accessor with proper error handling.


99-116: LGTM!

Correct implementation following established patterns.


119-136: LGTM!

Correctly updates activity timestamp.


139-156: LGTM!

Correct status update implementation.


159-177: LGTM!

Correctly completes conversations with appropriate logging.


180-197: LGTM!

Correct summary update implementation.


200-225: LGTM!

Correctly retrieves recent conversations with appropriate fallback behavior.


229-259: LGTM!

The accessor correctly saves messages and updates the count. Note that the race condition for sequence numbers exists in the service layer (already flagged).


262-282: LGTM!

Correctly retrieves messages with optional limit.


285-305: LGTM!

Correctly retrieves and reorders last N messages.


308-323: LGTM!

Correctly retrieves message count with appropriate default.


326-341: LGTM!

Correctly updates message count using a COUNT subquery.

Comment on lines 366 to 419
user_message = None
if self._context_aggregator:
try:
context = self._context_aggregator._user._context
context_messages = context.get_messages()

for msg in reversed(context_messages):
if msg.get("role") == "user":
content = msg.get("content", "")
if isinstance(content, str) and content.strip():
user_message = content.strip()
break
except Exception as e:
logger.debug(f"Could not extract user message from context: {e}")

# If we have a user message, check if it's a duplicate
if user_message:
if user_message != self._last_saved_user_message:
# Save user message first (only if not duplicate)
await conversation_service.save_message(
session_id=self._session_id,
role="user",
content=user_message,
)

# Track the saved user message to prevent duplicates
self._last_saved_user_message = user_message

# Always save assistant message (even if user message was duplicate)
# This allows multiple assistant responses per user question
await conversation_service.save_message(
session_id=self._session_id,
role="assistant",
content=assistant_message,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle list-based message content when extracting user text

Line 372 currently treats msg["content"] as a plain string. In practice the aggregator can emit OpenAI-style payloads where content is a list of {type: "text", text: ...} dicts (see the Mem0 memory service for identical handling). In that very common case, user_message stays None, so we never persist the user side and the transcript becomes one-sided. Please expand the extraction logic to cover list/dict content so we reliably store user turns.

-            if self._context_aggregator:
-                try:
-                    context = self._context_aggregator._user._context
-                    context_messages = context.get_messages()
-
-                    for msg in reversed(context_messages):
-                        if msg.get("role") == "user":
-                            content = msg.get("content", "")
-                            if isinstance(content, str) and content.strip():
-                                user_message = content.strip()
-                                break
+            if self._context_aggregator:
+                try:
+                    context = self._context_aggregator._user._context
+                    context_messages = context.get_messages()
+
+                    for msg in reversed(context_messages):
+                        if msg.get("role") == "user":
+                            content = msg.get("content")
+                            if isinstance(content, str) and content.strip():
+                                user_message = content.strip()
+                                break
+                            if isinstance(content, list):
+                                for part in content:
+                                    if (
+                                        isinstance(part, dict)
+                                        and part.get("type") == "text"
+                                    ):
+                                        text = part.get("text", "")
+                                        if isinstance(text, str) and text.strip():
+                                            user_message = text.strip()
+                                            break
+                                if user_message:
+                                    break
🧰 Tools
🪛 Ruff (0.14.2)

378-378: Do not catch blind exception: Exception

(BLE001)

Comment on lines 217 to 251
# 3.5. Create conversation record if storage is enabled
if ENABLE_CONVERSATION_STORAGE:
try:
from app.services.conversation_storage import (
get_conversation_storage_service,
)

conversation_service = get_conversation_storage_service()
conversation = await conversation_service.create_conversation(
session_id=session_id,
merchant_id=request.merchantId,
client_sid=client_sid,
user_email=request.email,
user_name=request.userName,
shop_id=request.shopId,
shop_url=request.shopUrl,
reseller_id=request.resellerId,
mode=request.mode.upper() if request.mode else None,
metadata={
"tts_provider": session_params.get("tts_provider"),
"voice_name": session_params.get("voice_name"),
"shop_type": session_params.get("shop_type"),
"platform_integrations": session_params.get(
"platform_integrations"
),
},
)
if conversation:
logger.bind(session_id=session_id).info(
f"Conversation record created for session {session_id}"
)
else:
logger.error(
f"Failed to create conversation record for session {session_id}"
)
except Exception as e:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard conversation creation when merchantId is missing

Line 228 hits the conversations table, where merchant_id is NOT NULL (see create_conversations_table_query). However, AutomaticVoiceUserConnectRequest.merchantId is still optional, and we have existing flows that omit it. When that happens we pass None, immediately raising a NOT NULL violation, so the insert fails and every subsequent save attempts log “conversation not found,” effectively breaking storage for those sessions. Please short-circuit this path when merchantId is absent instead of letting the database error out.

-    if ENABLE_CONVERSATION_STORAGE:
-        try:
+    if ENABLE_CONVERSATION_STORAGE:
+        if not request.merchantId:
+            logger.bind(session_id=session_id).warning(
+                "Conversation storage enabled but merchantId missing; skipping record creation."
+            )
+        else:
+            try:
             from app.services.conversation_storage import (
                 get_conversation_storage_service,
             )
 
             conversation_service = get_conversation_storage_service()
             conversation = await conversation_service.create_conversation(
                 session_id=session_id,
                 merchant_id=request.merchantId,
                 client_sid=client_sid,
                 user_email=request.email,
                 user_name=request.userName,
                 shop_id=request.shopId,
                 shop_url=request.shopUrl,
                 reseller_id=request.resellerId,
                 mode=request.mode.upper() if request.mode else None,
                 metadata={
                     "tts_provider": session_params.get("tts_provider"),
                     "voice_name": session_params.get("voice_name"),
                     "shop_type": session_params.get("shop_type"),
                     "platform_integrations": session_params.get(
                         "platform_integrations"
                     ),
                 },
             )
             if conversation:
                 logger.bind(session_id=session_id).info(
                     f"Conversation record created for session {session_id}"
                 )
             else:
                 logger.error(
                     f"Failed to create conversation record for session {session_id}"
                 )
-        except Exception as e:
-            logger.error(f"Error creating conversation record: {e}")
+        except Exception as e:
+            logger.error(f"Error creating conversation record: {e}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# 3.5. Create conversation record if storage is enabled
if ENABLE_CONVERSATION_STORAGE:
try:
from app.services.conversation_storage import (
get_conversation_storage_service,
)
conversation_service = get_conversation_storage_service()
conversation = await conversation_service.create_conversation(
session_id=session_id,
merchant_id=request.merchantId,
client_sid=client_sid,
user_email=request.email,
user_name=request.userName,
shop_id=request.shopId,
shop_url=request.shopUrl,
reseller_id=request.resellerId,
mode=request.mode.upper() if request.mode else None,
metadata={
"tts_provider": session_params.get("tts_provider"),
"voice_name": session_params.get("voice_name"),
"shop_type": session_params.get("shop_type"),
"platform_integrations": session_params.get(
"platform_integrations"
),
},
)
if conversation:
logger.bind(session_id=session_id).info(
f"Conversation record created for session {session_id}"
)
else:
logger.error(
f"Failed to create conversation record for session {session_id}"
)
except Exception as e:
# 3.5. Create conversation record if storage is enabled
if ENABLE_CONVERSATION_STORAGE:
if not request.merchantId:
logger.bind(session_id=session_id).warning(
"Conversation storage enabled but merchantId missing; skipping record creation."
)
else:
try:
from app.services.conversation_storage import (
get_conversation_storage_service,
)
conversation_service = get_conversation_storage_service()
conversation = await conversation_service.create_conversation(
session_id=session_id,
merchant_id=request.merchantId,
client_sid=client_sid,
user_email=request.email,
user_name=request.userName,
shop_id=request.shopId,
shop_url=request.shopUrl,
reseller_id=request.resellerId,
mode=request.mode.upper() if request.mode else None,
metadata={
"tts_provider": session_params.get("tts_provider"),
"voice_name": session_params.get("voice_name"),
"shop_type": session_params.get("shop_type"),
"platform_integrations": session_params.get(
"platform_integrations"
),
},
)
if conversation:
logger.bind(session_id=session_id).info(
f"Conversation record created for session {session_id}"
)
else:
logger.error(
f"Failed to create conversation record for session {session_id}"
)
except Exception as e:
logger.error(f"Error creating conversation record: {e}")
🧰 Tools
🪛 Ruff (0.14.2)

252-252: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In app/main.py around lines 217 to 252, the conversation creation block calls
create_conversation with request.merchantId which can be None and causes a NOT
NULL DB error; guard this by checking if request.merchantId is present before
importing/instantiating the conversation service or calling create_conversation,
and if missing short-circuit the flow (e.g., log a warning/info that merchantId
is absent and skip creation) so we never attempt to insert a conversation with a
null merchant_id.

Comment on lines 122 to 159
async def save_message(
self, session_id: str, role: str, content: str
) -> Optional[ConversationMessage]:
"""
Save a message to a conversation.
Args:
session_id: Session identifier
role: Message role ('user' or 'assistant')
content: Message content
Returns:
Saved ConversationMessage or None if failed
"""
# Get conversation
conversation = await get_conversation_by_session_id(session_id)
if not conversation:
logger.error(f"Conversation not found for session: {session_id}")
return None

# Calculate sequence number (current count + 1)
sequence_number = conversation.message_count + 1

# Save message
message = await db_save_message(
conversation_id=conversation.id,
role=role,
content=content,
sequence_number=sequence_number,
)

if message:
# Update conversation activity
await update_conversation_activity(session_id)
else:
logger.error(f"Failed to save message to session: {session_id}")

return message
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Race condition in sequence number calculation.

Between reading message_count (line 143) and inserting the message (line 146), concurrent calls to save_message for the same conversation can read the same count and assign duplicate sequence numbers. This breaks message ordering guarantees.

Consider one of these solutions:

  1. Use a database-side sequence or auto-increment for sequence_number
  2. Use a SELECT FOR UPDATE when reading the conversation to lock it
  3. Use an atomic UPDATE ... RETURNING pattern to increment and retrieve the count in one operation
  4. Add application-level locking per conversation (e.g., using asyncio.Lock in a dict keyed by conversation_id)

Example of solution 4:

from asyncio import Lock
from typing import Dict

# At class level
_message_locks: Dict[str, Lock] = {}

async def save_message(
    self, session_id: str, role: str, content: str
) -> Optional[ConversationMessage]:
    """Save a message to a conversation."""
    # Get conversation
    conversation = await get_conversation_by_session_id(session_id)
    if not conversation:
        logger.error(f"Conversation not found for session: {session_id}")
        return None

    # Get or create lock for this conversation
    if conversation.id not in self._message_locks:
        self._message_locks[conversation.id] = Lock()
    
    async with self._message_locks[conversation.id]:
        # Re-fetch to get latest count under lock
        conversation = await get_conversation_by_session_id(session_id)
        if not conversation:
            return None
            
        sequence_number = conversation.message_count + 1

        # Save message
        message = await db_save_message(
            conversation_id=conversation.id,
            role=role,
            content=content,
            sequence_number=sequence_number,
        )

        if message:
            await update_conversation_activity(session_id)
        else:
            logger.error(f"Failed to save message to session: {session_id}")

        return message

@satvikbatra satvikbatra force-pushed the feat/conversation-storage branch 2 times, most recently from ea7ffcb to 80762da Compare November 3, 2025 10:03
if self._enable_charts:
# LLM response collection
# LLM response collection - needed for both charts and conversation storage
if self._enable_charts or config.ENABLE_CONVERSATION_STORAGE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test your code when charts are enabed

@satvikbatra satvikbatra force-pushed the feat/conversation-storage branch from 80762da to 3a121eb Compare November 4, 2025 10:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants