diff --git a/.env.example b/.env.example index 50acb9e3..6a0a23c6 100644 --- a/.env.example +++ b/.env.example @@ -176,4 +176,6 @@ AWS_BREEZE_PORTAL_URL="http://localhost:5173" GCP_BREEZE_PORTAL_URL="http://localhost:5174" DEFAULT_ANNOUNCEMENT_BANNER_BACKGROUND_COLOR=#714acd -DEFAULT_ANNOUNCEMENT_BANNER_TEXT_COLOR=#714acd \ No newline at end of file +DEFAULT_ANNOUNCEMENT_BANNER_TEXT_COLOR=#714acd + +ENABLE_CONVERSATION_STORAGE=false \ No newline at end of file diff --git a/app/agents/voice/automatic/__init__.py b/app/agents/voice/automatic/__init__.py index 6b4f051d..e5a800df 100644 --- a/app/agents/voice/automatic/__init__.py +++ b/app/agents/voice/automatic/__init__.py @@ -511,12 +511,18 @@ async def on_function_calls_started(service, function_calls): args.session_id, config.ENABLE_CHARTS, stt_mute_filter, + context_aggregator, "LLMSpyProcessor", ) pipeline_components.extend([stt_mute_filter]) else: tool_call_processor = LLMSpyProcessor( - rtvi, args.session_id, config.ENABLE_CHARTS, None, "LLMSpyProcessor" + rtvi, + args.session_id, + config.ENABLE_CHARTS, + None, + context_aggregator, + "LLMSpyProcessor", ) pipeline_components.extend([rtvi, context_aggregator.user()]) diff --git a/app/agents/voice/automatic/processors/llm_spy.py b/app/agents/voice/automatic/processors/llm_spy.py index 898dcfbf..5a33bbec 100644 --- a/app/agents/voice/automatic/processors/llm_spy.py +++ b/app/agents/voice/automatic/processors/llm_spy.py @@ -33,6 +33,9 @@ from app.agents.voice.automatic.features.charts.rtvi.rtvi import emit_chart_components from app.agents.voice.automatic.features.hitl.utils import is_dangerous_operation from app.agents.voice.automatic.rtvi.rtvi import emit_rtvi_event +from app.agents.voice.automatic.services.conversation.conversation_storage import ( + get_conversation_storage_service, +) from app.agents.voice.automatic.utils.conversation_manager import ( get_conversation_manager, ) @@ -140,6 +143,7 @@ def __init__( session_id: str, enable_charts: bool, stt_mute_filter: Optional[FrameProcessor] = None, + context_aggregator=None, # OpenAI LLM context aggregator pair name: str = "LLMSpyProcessor", ): super().__init__(name=name) @@ -147,6 +151,7 @@ def __init__( self._session_id = session_id self._enable_charts = enable_charts self._stt_mute_filter = stt_mute_filter + self._context_aggregator = context_aggregator # Register this RTVI processor globally for function confirmations set_rtvi_processor(rtvi) @@ -165,11 +170,23 @@ def __init__( # Conversation management (delegates to service) self._conversation_manager = get_conversation_manager() + # Conversation storage setup (completely independent from charts) + if config.ENABLE_CONVERSATION_STORAGE: + self._conversation_accumulated_text = "" + self._conversation_is_collecting = False + self._last_saved_user_message = None + async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames and delegate conversation logic to ConversationManager.""" await super().process_frame(frame, direction) if isinstance(frame, TextFrame): + # Collect text for conversation storage if we're in collecting mode + if config.ENABLE_CONVERSATION_STORAGE and getattr( + self, "_conversation_is_collecting", False + ): + self._conversation_accumulated_text += frame.text + if config.SANITIZE_TEXT_FOR_TTS: await self.push_frame( TextFrame(text=sanitize_markdown(frame.text)), direction @@ -182,16 +199,25 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self.push_frame(frame, direction) # LLM Response Start - begin collecting text and start conversation turn - elif isinstance(frame, LLMFullResponseStartFrame) and self._enable_charts: - self._is_collecting_response = True - self._accumulated_text = "" + elif isinstance(frame, LLMFullResponseStartFrame) and ( + self._enable_charts or config.ENABLE_CONVERSATION_STORAGE + ): + # Charts logic + if self._enable_charts: + self._is_collecting_response = True + self._accumulated_text = "" - # Start conversation turn via ConversationManager - event = await self._conversation_manager.start_turn_with_events( - self._session_id - ) - if event: - await emit_rtvi_event(self._rtvi, event, self._session_id) + # Start conversation turn via ConversationManager + event = await self._conversation_manager.start_turn_with_events( + self._session_id + ) + if event: + await emit_rtvi_event(self._rtvi, event, self._session_id) + + # Conversation storage logic (independent) + if config.ENABLE_CONVERSATION_STORAGE: + self._conversation_is_collecting = True + self._conversation_accumulated_text = "" await self.push_frame(frame, direction) @@ -212,16 +238,32 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self.push_frame(frame, direction) # LLM Response Complete - send to ConversationManager - elif isinstance(frame, LLMFullResponseEndFrame) and self._enable_charts: - if self._accumulated_text.strip(): - event = await self._conversation_manager.add_llm_response_with_events( - self._session_id, self._accumulated_text.strip() - ) - if event: - await emit_rtvi_event(self._rtvi, event, self._session_id) + elif isinstance(frame, LLMFullResponseEndFrame) and ( + self._enable_charts or config.ENABLE_CONVERSATION_STORAGE + ): + # Charts logic + if self._enable_charts: + if self._accumulated_text.strip(): + event = ( + await self._conversation_manager.add_llm_response_with_events( + self._session_id, self._accumulated_text.strip() + ) + ) + if event: + await emit_rtvi_event(self._rtvi, event, self._session_id) + + self._accumulated_text = "" + self._is_collecting_response = False + + # Conversation storage logic (independent) + if config.ENABLE_CONVERSATION_STORAGE: + if self._conversation_accumulated_text.strip(): + await self._save_conversation_pair( + self._conversation_accumulated_text.strip() + ) + self._conversation_accumulated_text = "" + self._conversation_is_collecting = False - self._accumulated_text = "" - self._is_collecting_response = False await self.push_frame(frame, direction) # Function Call Start - emit RTVI event and track in conversation @@ -332,3 +374,49 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): ) else: await self.push_frame(frame, direction) + + async def _save_conversation_pair(self, assistant_message: str): + """Save user and assistant messages as a pair to database.""" + try: + # Save both messages to database + conversation_service = get_conversation_storage_service() + + # Extract user message from context if context_aggregator is available + user_message = None + if hasattr(self, "_context_aggregator") and self._context_aggregator: + try: + context = self._context_aggregator._user._context + context_messages = context.get_messages() + + for msg in reversed(context_messages): + if msg.get("role") == "user": + content = msg.get("content", "") + if isinstance(content, str) and content.strip(): + user_message = content.strip() + break + except Exception as e: + logger.debug(f"Could not extract user message from context: {e}") + + # If we have a user message, check if it's a duplicate + if user_message: + if user_message != self._last_saved_user_message: + # Save user message first (only if not duplicate) + await conversation_service.save_message( + session_id=self._session_id, + role="user", + content=user_message, + ) + + # Track the saved user message to prevent duplicates + self._last_saved_user_message = user_message + + # Always save assistant message (even if user message was duplicate) + # This allows multiple assistant responses per user question + await conversation_service.save_message( + session_id=self._session_id, + role="assistant", + content=assistant_message, + ) + + except Exception as e: + logger.error(f"[{self._session_id}] Failed to save conversation pair: {e}") diff --git a/app/agents/voice/automatic/services/conversation/conversation_storage.py b/app/agents/voice/automatic/services/conversation/conversation_storage.py new file mode 100644 index 00000000..3b4dca71 --- /dev/null +++ b/app/agents/voice/automatic/services/conversation/conversation_storage.py @@ -0,0 +1,170 @@ +""" +Conversation Storage Service. +Handles all conversation and message storage operations. +""" + +from typing import Any, Dict, List, Optional + +from app.core.logger import logger +from app.database.accessor.conversation.conversations import ( + complete_conversation as db_complete_conversation, +) +from app.database.accessor.conversation.conversations import ( + create_conversation as db_create_conversation, +) +from app.database.accessor.conversation.conversations import ( + get_conversation_by_session_id, +) +from app.database.accessor.conversation.conversations import ( + save_message as db_save_message, +) +from app.database.accessor.conversation.conversations import ( + update_conversation_activity, +) +from app.schemas import Conversation, ConversationMessage + + +class ConversationStorageService: + """ + Service for managing conversation storage. + Provides high-level interface for conversation operations. + """ + + async def create_conversation( + self, + session_id: str, + merchant_id: Optional[str] = None, + client_sid: Optional[str] = None, + user_email: Optional[str] = None, + user_name: Optional[str] = None, + shop_id: Optional[str] = None, + shop_url: Optional[str] = None, + reseller_id: Optional[str] = None, + mode: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> Optional[Conversation]: + """ + Create a new conversation. + + Args: + session_id: Unique session identifier + merchant_id: Merchant identifier + client_sid: Optional client session ID + user_email: User email + user_name: User name + shop_id: Shop identifier + shop_url: Shop URL + reseller_id: Reseller identifier + mode: Conversation mode (e.g., 'TEST' or 'LIVE') + metadata: Additional metadata + + Returns: + Created Conversation object or None if failed + """ + conversation = await db_create_conversation( + session_id=session_id, + merchant_id=merchant_id, + client_sid=client_sid, + user_email=user_email, + user_name=user_name, + shop_id=shop_id, + shop_url=shop_url, + reseller_id=reseller_id, + mode=mode, + metadata=metadata, + ) + + if conversation: + logger.info(f"Conversation created: {session_id}") + + return conversation + + async def save_message( + self, + session_id: str, + role: str, + content: str, + last_saved_user_message: Optional[str] = None, + ) -> tuple[Optional[ConversationMessage], Optional[str]]: + """ + Save a message to a conversation. + + Args: + session_id: Session identifier + role: Message role ('user' or 'assistant') + content: Message content + last_saved_user_message: Last saved user message to prevent duplicates (for user messages only) + + Returns: + Tuple of (Saved ConversationMessage or None if failed, updated last_saved_user_message) + """ + # For user messages, check if it's a duplicate + if role == "user" and last_saved_user_message == content: + logger.debug(f"Skipping duplicate user message for session: {session_id}") + return (None, last_saved_user_message) + + # Get conversation + conversation = await get_conversation_by_session_id(session_id) + if not conversation: + logger.error(f"Conversation not found for session: {session_id}") + return (None, last_saved_user_message) + + # Calculate sequence number (current count + 1) + sequence_number = conversation.message_count + 1 + + # Save message + message = await db_save_message( + conversation_id=conversation.id, + role=role, + content=content, + sequence_number=sequence_number, + ) + + if message: + # Update conversation activity + await update_conversation_activity(session_id) + else: + logger.error(f"Failed to save message to session: {session_id}") + + # Update last_saved_user_message if this was a user message + new_last_saved = ( + content if role == "user" and message else last_saved_user_message + ) + + return (message, new_last_saved) + + async def complete_conversation(self, session_id: str) -> Optional[Conversation]: + """ + Mark conversation as completed. + + Args: + session_id: Session identifier + + Returns: + Updated Conversation object or None if failed + """ + conversation = await db_complete_conversation(session_id) + + if conversation: + logger.info(f"Conversation completed: {session_id}") + else: + logger.error(f"Failed to complete conversation: {session_id}") + + return conversation + + +# Global service instance +_conversation_storage_service: Optional[ConversationStorageService] = None + + +def get_conversation_storage_service() -> ConversationStorageService: + """ + Get the global conversation storage service instance. + + Returns: + ConversationStorageService instance + """ + global _conversation_storage_service + if _conversation_storage_service is None: + _conversation_storage_service = ConversationStorageService() + return _conversation_storage_service diff --git a/app/core/config.py b/app/core/config.py index 32b601d1..e882ac34 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -452,6 +452,11 @@ def get_required_env(var_name: str) -> str: BREEZE_BUDDY_DASHBOARD_PASSWORD = os.getenv("BREEZE_BUDDY_DASHBOARD_PASSWORD", "") BREEZE_BUDDY_SESSION_SECRET_KEY = os.getenv("BREEZE_BUDDY_SESSION_SECRET_KEY", "") +# Conversation Storage Configuration +ENABLE_CONVERSATION_STORAGE = ( + os.environ.get("ENABLE_CONVERSATION_STORAGE", "false").lower() == "true" +) + ENABLE_BREEZE_BUDDY_TRACING = ( os.getenv("ENABLE_BREEZE_BUDDY_TRACING", "false").lower() == "true" ) diff --git a/app/database/accessor/conversation/conversations.py b/app/database/accessor/conversation/conversations.py new file mode 100644 index 00000000..72e2b8c8 --- /dev/null +++ b/app/database/accessor/conversation/conversations.py @@ -0,0 +1,180 @@ +""" +Database accessor functions for conversations. +""" + +from typing import Any, Dict, List, Optional + +from app.core.logger import logger +from app.database.decoder.conversation.conversations import ( + decode_conversation, + decode_conversation_message, +) +from app.database.queries import run_parameterized_query +from app.database.queries.conversation.conversations import ( + complete_conversation_query, + get_conversation_by_session_id_query, + insert_conversation_query, + insert_message_query, + update_conversation_activity_query, + update_conversation_message_count_query, +) +from app.schemas import Conversation, ConversationMessage + + +# Conversation accessors +async def create_conversation( + session_id: str, + merchant_id: Optional[str] = None, + client_sid: Optional[str] = None, + user_email: Optional[str] = None, + user_name: Optional[str] = None, + shop_id: Optional[str] = None, + shop_url: Optional[str] = None, + reseller_id: Optional[str] = None, + mode: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, +) -> Optional[Conversation]: + """ + Create a new conversation record. + """ + logger.info(f"Creating conversation with session_id: {session_id}") + + try: + query_text, values = insert_conversation_query( + session_id=session_id, + merchant_id=merchant_id, + client_sid=client_sid, + user_email=user_email, + user_name=user_name, + shop_id=shop_id, + shop_url=shop_url, + reseller_id=reseller_id, + mode=mode, + metadata=metadata, + ) + + result = await run_parameterized_query(query_text, values) + if result: + decoded_result = decode_conversation(result) + logger.info(f"Conversation created successfully: {session_id}") + return decoded_result + + logger.error("Failed to create conversation") + return None + + except Exception as e: + logger.error(f"Error creating conversation: {e}") + return None + + +async def get_conversation_by_session_id( + session_id: str, +) -> Optional[Conversation]: + """ + Get conversation by session_id. + """ + try: + query_text, values = get_conversation_by_session_id_query(session_id) + result = await run_parameterized_query(query_text, values) + + if result: + return decode_conversation(result) + + return None + + except Exception as e: + logger.error(f"Error getting conversation by session_id: {e}") + return None + + +async def update_conversation_activity( + session_id: str, +) -> Optional[Conversation]: + """ + Update conversation last activity time. + """ + try: + query_text, values = update_conversation_activity_query(session_id) + result = await run_parameterized_query(query_text, values) + + if result: + return decode_conversation(result) + + return None + + except Exception as e: + logger.error(f"Error updating conversation activity: {e}") + return None + + +async def complete_conversation( + session_id: str, +) -> Optional[Conversation]: + """ + Mark conversation as completed. + """ + try: + query_text, values = complete_conversation_query(session_id) + result = await run_parameterized_query(query_text, values) + + if result: + logger.info(f"Conversation completed: {session_id}") + return decode_conversation(result) + + return None + + except Exception as e: + logger.error(f"Error completing conversation: {e}") + return None + + +# Message accessors +async def save_message( + conversation_id: str, + role: str, + content: str, + sequence_number: int, +) -> Optional[ConversationMessage]: + """ + Save a conversation message. + """ + try: + query_text, values = insert_message_query( + conversation_id=conversation_id, + role=role, + content=content, + sequence_number=sequence_number, + ) + + result = await run_parameterized_query(query_text, values) + if result: + decoded_result = decode_conversation_message(result) + + # Update message count + await update_message_count(conversation_id) + + return decoded_result + + return None + + except Exception as e: + logger.error(f"Error saving message: {e}") + return None + + +async def update_message_count(conversation_id: str) -> Optional[Conversation]: + """ + Update message count in conversation. + """ + try: + query_text, values = update_conversation_message_count_query(conversation_id) + result = await run_parameterized_query(query_text, values) + + if result: + return decode_conversation(result) + + return None + + except Exception as e: + logger.error(f"Error updating message count: {e}") + return None diff --git a/app/database/decoder/conversation/conversations.py b/app/database/decoder/conversation/conversations.py new file mode 100644 index 00000000..461d9efe --- /dev/null +++ b/app/database/decoder/conversation/conversations.py @@ -0,0 +1,68 @@ +""" +Decoder functions for conversations. +""" + +import json +from typing import List, Optional + +import asyncpg + +from app.schemas import Conversation, ConversationMessage, ConversationStatus + + +def decode_conversation(result: List[asyncpg.Record]) -> Optional[Conversation]: + """ + Decode conversation from database result using Pydantic model. + """ + if not result or len(result) == 0: + return None + + row = result[0] + + # Parse metadata JSON string back to dict + metadata = row["metadata"] + if metadata and isinstance(metadata, str): + metadata = json.loads(metadata) + + return Conversation( + id=str(row["id"]), + session_id=row["session_id"], + client_sid=row["client_sid"], + merchant_id=row["merchant_id"], + user_email=row["user_email"], + user_name=row["user_name"], + shop_id=row["shop_id"], + shop_url=row["shop_url"], + reseller_id=row["reseller_id"], + mode=row["mode"], + status=ConversationStatus(row["status"]), + summary=row["summary"], + message_count=row["message_count"], + started_at=row["started_at"], + last_activity_at=row["last_activity_at"], + completed_at=row["completed_at"], + metadata=metadata, + created_at=row["created_at"], + updated_at=row["updated_at"], + ) + + +def decode_conversation_message( + result: List[asyncpg.Record], +) -> Optional[ConversationMessage]: + """ + Decode conversation message from database result using Pydantic model. + """ + if not result or len(result) == 0: + return None + + row = result[0] + return ConversationMessage( + id=str(row["id"]), + conversation_id=str(row["conversation_id"]), + role=row["role"], + content=row["content"], + sequence_number=row["sequence_number"], + timestamp=row["timestamp"], + created_at=row["created_at"], + ) diff --git a/app/database/queries/conversation/conversations.py b/app/database/queries/conversation/conversations.py new file mode 100644 index 00000000..05c040ea --- /dev/null +++ b/app/database/queries/conversation/conversations.py @@ -0,0 +1,168 @@ +""" +Database query functions for conversations. +""" + +import json +from datetime import datetime +from typing import Any, Dict, List, Optional, Tuple + +# Table names +CONVERSATIONS_TABLE = "conversations" +CONVERSATION_MESSAGES_TABLE = "conversation_messages" + + +# Conversation queries +def insert_conversation_query( + session_id: str, + merchant_id: str, + client_sid: Optional[str] = None, + user_email: Optional[str] = None, + user_name: Optional[str] = None, + shop_id: Optional[str] = None, + shop_url: Optional[str] = None, + reseller_id: Optional[str] = None, + mode: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, +) -> Tuple[str, List[Any]]: + """ + Generate query to insert conversation record. + """ + text = f""" + INSERT INTO "{CONVERSATIONS_TABLE}" + ( + "session_id", + "client_sid", + "merchant_id", + "user_email", + "user_name", + "shop_id", + "shop_url", + "reseller_id", + "mode", + "status", + "metadata", + "started_at", + "last_activity_at", + "created_at", + "updated_at" + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) RETURNING *; + """ + + now = datetime.now() + + # Convert metadata dict to JSON string for PostgreSQL JSONB + metadata_json = json.dumps(metadata) if metadata else None + + values = [ + session_id, + client_sid, + merchant_id, + user_email, + user_name, + shop_id, + shop_url, + reseller_id, + mode, + "active", # Default status + metadata_json, + now, + now, + now, + now, + ] + + return text, values + + +def get_conversation_by_session_id_query(session_id: str) -> Tuple[str, List[Any]]: + """ + Generate query to get conversation by session_id. + """ + text = f'SELECT * FROM "{CONVERSATIONS_TABLE}" WHERE "session_id" = $1;' + values = [session_id] + return text, values + + +def update_conversation_activity_query(session_id: str) -> Tuple[str, List[Any]]: + """ + Generate query to update conversation last activity time. + """ + text = f""" + UPDATE "{CONVERSATIONS_TABLE}" + SET "last_activity_at" = NOW(), "updated_at" = NOW() + WHERE "session_id" = $1 + RETURNING *; + """ + values = [session_id] + return text, values + + +def complete_conversation_query(session_id: str) -> Tuple[str, List[Any]]: + """ + Generate query to mark conversation as completed. + """ + text = f""" + UPDATE "{CONVERSATIONS_TABLE}" + SET "status" = $2, "completed_at" = NOW(), "updated_at" = NOW() + WHERE "session_id" = $1 + RETURNING *; + """ + values = [session_id, "completed"] + return text, values + + +# Message queries +def insert_message_query( + conversation_id: str, + role: str, + content: str, + sequence_number: int, +) -> Tuple[str, List[Any]]: + """ + Generate query to insert a message. + """ + text = f""" + INSERT INTO "{CONVERSATION_MESSAGES_TABLE}" + ( + "conversation_id", + "role", + "content", + "sequence_number", + "timestamp", + "created_at" + ) + VALUES ($1, $2, $3, $4, $5, $6) RETURNING *; + """ + + now = datetime.now() + values = [ + conversation_id, + role, + content, + sequence_number, + now, + now, + ] + + return text, values + + +def update_conversation_message_count_query( + conversation_id: str, +) -> Tuple[str, List[Any]]: + """ + Generate query to update message count in conversation. + """ + text = f""" + UPDATE "{CONVERSATIONS_TABLE}" + SET "message_count" = ( + SELECT COUNT(*) FROM "{CONVERSATION_MESSAGES_TABLE}" + WHERE "conversation_id" = $1 + ), + "updated_at" = NOW() + WHERE "id" = $1 + RETURNING *; + """ + values = [conversation_id] + return text, values diff --git a/app/helpers/automatic/session_manager.py b/app/helpers/automatic/session_manager.py index 5bffbf3d..2d5b3166 100644 --- a/app/helpers/automatic/session_manager.py +++ b/app/helpers/automatic/session_manager.py @@ -28,6 +28,10 @@ import subprocess # for TimeoutExpired from typing import Any, Dict +from app.agents.voice.automatic.services.conversation.conversation_storage import ( + get_conversation_storage_service, +) +from app.core import config from app.core.logger import logger # The global registry for all running voice agent subprocesses. @@ -69,6 +73,7 @@ async def session_cleanup_callback(session_id: str): 1. Iterates through the `bot_procs` dictionary. 2. Finds the entry where the session ID matches. 3. Calls `cleanup_session_tracking` with the corresponding PID. + 4. Marks the conversation as completed in the database. """ pid_to_remove = None for pid, proc_info in list(bot_procs.items()): @@ -78,6 +83,15 @@ async def session_cleanup_callback(session_id: str): if pid_to_remove: await cleanup_session_tracking(pid_to_remove) + # Mark conversation as completed in database + try: + if config.ENABLE_CONVERSATION_STORAGE: + conversation_service = get_conversation_storage_service() + await conversation_service.complete_conversation(session_id) + logger.info(f"Conversation marked as completed for session: {session_id}") + except Exception as e: + logger.error(f"Failed to complete conversation for session {session_id}: {e}") + async def monitor_session_cleanup(): """ diff --git a/app/main.py b/app/main.py index 00758e49..a56403bb 100644 --- a/app/main.py +++ b/app/main.py @@ -14,6 +14,9 @@ from pipecat.transports.daily.utils import DailyRESTHelper from app import __version__ +from app.agents.voice.automatic.services.conversation.conversation_storage import ( + get_conversation_storage_service, +) from app.api.routers import automatic, breeze_buddy from app.core.config import ( DAILY_API_KEY, @@ -21,6 +24,7 @@ DAILY_ROOM_MAX_POOL_SIZE, DAILY_ROOM_POOL_SIZE, ENABLE_AUTOMATIC_DAILY_RECORDING, + ENABLE_CONVERSATION_STORAGE, HOST, MAX_DAILY_SESSION_LIMIT, PORT, @@ -213,6 +217,40 @@ async def bot_connect( "Voice agent session mapping created" ) + # 3.5. Create conversation record if storage is enabled + if ENABLE_CONVERSATION_STORAGE: + try: + conversation_service = get_conversation_storage_service() + conversation = await conversation_service.create_conversation( + session_id=session_id, + merchant_id=request.merchantId, + client_sid=client_sid, + user_email=request.email, + user_name=request.userName, + shop_id=request.shopId, + shop_url=request.shopUrl, + reseller_id=request.resellerId, + mode=request.mode.upper() if request.mode else None, + metadata={ + "tts_provider": session_params.get("tts_provider"), + "voice_name": session_params.get("voice_name"), + "shop_type": session_params.get("shop_type"), + "platform_integrations": session_params.get( + "platform_integrations" + ), + }, + ) + if conversation: + logger.bind(session_id=session_id).info( + f"Conversation record created for session {session_id}" + ) + else: + logger.error( + f"Failed to create conversation record for session {session_id}" + ) + except Exception as e: + logger.error(f"Error creating conversation record: {e}") + # 4. Try to get process from pool pool = get_voice_agent_pool() try: diff --git a/app/schemas.py b/app/schemas.py index cc6e34d6..56f96006 100644 --- a/app/schemas.py +++ b/app/schemas.py @@ -139,3 +139,49 @@ class TokenData(BaseModel): username: Optional[str] = None email: Optional[str] = None scopes: list[str] = Field(default_factory=list) + + +class ConversationStatus(str, Enum): + ACTIVE = "active" + COMPLETED = "completed" + + +class MessageRole(str, Enum): + USER = "user" + ASSISTANT = "assistant" + + +class Conversation(BaseModel): + """Conversation model""" + + id: str + session_id: str + client_sid: Optional[str] = None + merchant_id: Optional[str] = None + user_email: Optional[str] = None + user_name: Optional[str] = None + shop_id: Optional[str] = None + shop_url: Optional[str] = None + reseller_id: Optional[str] = None + mode: Optional[str] = None + status: ConversationStatus = ConversationStatus.ACTIVE + summary: Optional[str] = None + message_count: int = 0 + started_at: datetime + last_activity_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + metadata: Optional[Dict[str, Any]] = None + created_at: datetime + updated_at: datetime + + +class ConversationMessage(BaseModel): + """Conversation message model""" + + id: str + conversation_id: str + role: MessageRole + content: str + sequence_number: int + timestamp: datetime + created_at: datetime diff --git a/scripts/create_tables.py b/scripts/create_tables.py index a16d08d5..11c86120 100644 --- a/scripts/create_tables.py +++ b/scripts/create_tables.py @@ -15,6 +15,8 @@ OUTBOUND_NUMBERS_TABLE = "outbound_number" CALL_EXECUTION_CONFIG_TABLE = "call_execution_config" LEAD_CALL_TRACKER_TABLE = "lead_call_tracker" +CONVERSATIONS_TABLE = "conversations" +CONVERSATION_MESSAGES_TABLE = "conversation_messages" def create_lead_call_tracker_table_query() -> str: @@ -71,6 +73,68 @@ def create_call_execution_config_table_query() -> str: """ +def create_conversations_table_query() -> str: + """ + Generate query to create conversations table. + """ + return f""" + CREATE TABLE IF NOT EXISTS "{CONVERSATIONS_TABLE}" ( + "id" UUID PRIMARY KEY DEFAULT gen_random_uuid(), + "session_id" VARCHAR(255) UNIQUE NOT NULL, + "client_sid" VARCHAR(255), + "merchant_id" VARCHAR(255), + "user_email" VARCHAR(255), + "user_name" VARCHAR(255), + "shop_id" VARCHAR(255), + "shop_url" VARCHAR(255), + "reseller_id" VARCHAR(255), + "mode" VARCHAR(50), + "status" VARCHAR(50) DEFAULT 'active' NOT NULL, + "summary" TEXT, + "message_count" INTEGER DEFAULT 0, + "started_at" TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL, + "last_activity_at" TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + "completed_at" TIMESTAMP WITH TIME ZONE, + "metadata" JSONB, + "created_at" TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL, + "updated_at" TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL + ); + + CREATE INDEX IF NOT EXISTS "idx_conversations_session_id" ON "{CONVERSATIONS_TABLE}" ("session_id"); + CREATE INDEX IF NOT EXISTS "idx_conversations_merchant_id" ON "{CONVERSATIONS_TABLE}" ("merchant_id"); + CREATE INDEX IF NOT EXISTS "idx_conversations_user_email" ON "{CONVERSATIONS_TABLE}" ("user_email"); + CREATE INDEX IF NOT EXISTS "idx_conversations_shop_id" ON "{CONVERSATIONS_TABLE}" ("shop_id"); + CREATE INDEX IF NOT EXISTS "idx_conversations_status" ON "{CONVERSATIONS_TABLE}" ("status"); + CREATE INDEX IF NOT EXISTS "idx_conversations_started_at" ON "{CONVERSATIONS_TABLE}" ("started_at" DESC); + CREATE INDEX IF NOT EXISTS "idx_conversations_user_lookup" ON "{CONVERSATIONS_TABLE}" ("merchant_id", "user_email", "started_at" DESC); + CREATE INDEX IF NOT EXISTS "idx_conversations_shop_lookup" ON "{CONVERSATIONS_TABLE}" ("merchant_id", "shop_id", "started_at" DESC); + """ + + +def create_conversation_messages_table_query() -> str: + """ + Generate query to create conversation_messages table. + """ + return f""" + CREATE TABLE IF NOT EXISTS "{CONVERSATION_MESSAGES_TABLE}" ( + "id" UUID PRIMARY KEY DEFAULT gen_random_uuid(), + "conversation_id" UUID NOT NULL, + "role" VARCHAR(20) NOT NULL, + "content" TEXT NOT NULL, + "sequence_number" INTEGER NOT NULL, + "timestamp" TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL, + "created_at" TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL, + CONSTRAINT "fk_conversation_id" FOREIGN KEY ("conversation_id") + REFERENCES "{CONVERSATIONS_TABLE}" ("id") ON DELETE CASCADE, + CONSTRAINT "unique_conversation_sequence" UNIQUE ("conversation_id", "sequence_number") + ); + + CREATE INDEX IF NOT EXISTS "idx_conversation_messages_conversation_id" ON "{CONVERSATION_MESSAGES_TABLE}" ("conversation_id"); + CREATE INDEX IF NOT EXISTS "idx_conversation_messages_sequence" ON "{CONVERSATION_MESSAGES_TABLE}" ("conversation_id", "sequence_number"); + CREATE INDEX IF NOT EXISTS "idx_conversation_messages_timestamp" ON "{CONVERSATION_MESSAGES_TABLE}" ("timestamp" DESC); + """ + + def create_outbound_numbers_table_query() -> str: """ Generate query to create outbound_numbers table. @@ -137,6 +201,36 @@ async def create_lead_call_tracker_table(): return False +async def create_conversations_table(): + """ + Create the conversations table with all constraints and indexes. + """ + try: + async for conn in get_db_connection(): + print("Creating conversations table...") + await conn.execute(create_conversations_table_query()) + print("Conversations table created successfully") + return True + except Exception as e: + print(f"Error creating conversations table: {e}") + return False + + +async def create_conversation_messages_table(): + """ + Create the conversation_messages table with all constraints and indexes. + """ + try: + async for conn in get_db_connection(): + print("Creating conversation_messages table...") + await conn.execute(create_conversation_messages_table_query()) + print("Conversation messages table created successfully") + return True + except Exception as e: + print(f"Error creating conversation_messages table: {e}") + return False + + async def create_all_tables(): """ Create all database tables. @@ -147,11 +241,15 @@ async def create_all_tables(): outbound_numbers_success = await create_outbound_numbers_table() call_execution_config_success = await create_call_execution_config_table() lead_call_tracker_success = await create_lead_call_tracker_table() + conversations_success = await create_conversations_table() + conversation_messages_success = await create_conversation_messages_table() if ( outbound_numbers_success and call_execution_config_success and lead_call_tracker_success + and conversations_success + and conversation_messages_success ): print("All database tables created successfully") return True