Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,6 @@ AWS_BREEZE_PORTAL_URL="http://localhost:5173"
GCP_BREEZE_PORTAL_URL="http://localhost:5174"

DEFAULT_ANNOUNCEMENT_BANNER_BACKGROUND_COLOR=#714acd
DEFAULT_ANNOUNCEMENT_BANNER_TEXT_COLOR=#714acd
DEFAULT_ANNOUNCEMENT_BANNER_TEXT_COLOR=#714acd

ENABLE_CONVERSATION_STORAGE=false
8 changes: 7 additions & 1 deletion app/agents/voice/automatic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,12 +511,18 @@ async def on_function_calls_started(service, function_calls):
args.session_id,
config.ENABLE_CHARTS,
stt_mute_filter,
context_aggregator,
"LLMSpyProcessor",
)
pipeline_components.extend([stt_mute_filter])
else:
tool_call_processor = LLMSpyProcessor(
rtvi, args.session_id, config.ENABLE_CHARTS, None, "LLMSpyProcessor"
rtvi,
args.session_id,
config.ENABLE_CHARTS,
None,
context_aggregator,
"LLMSpyProcessor",
)

pipeline_components.extend([rtvi, context_aggregator.user()])
Expand Down
124 changes: 106 additions & 18 deletions app/agents/voice/automatic/processors/llm_spy.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
from app.agents.voice.automatic.features.charts.rtvi.rtvi import emit_chart_components
from app.agents.voice.automatic.features.hitl.utils import is_dangerous_operation
from app.agents.voice.automatic.rtvi.rtvi import emit_rtvi_event
from app.agents.voice.automatic.services.conversation.conversation_storage import (
get_conversation_storage_service,
)
from app.agents.voice.automatic.utils.conversation_manager import (
get_conversation_manager,
)
Expand Down Expand Up @@ -140,13 +143,15 @@ def __init__(
session_id: str,
enable_charts: bool,
stt_mute_filter: Optional[FrameProcessor] = None,
context_aggregator=None, # OpenAI LLM context aggregator pair
name: str = "LLMSpyProcessor",
):
super().__init__(name=name)
self._rtvi = rtvi
self._session_id = session_id
self._enable_charts = enable_charts
self._stt_mute_filter = stt_mute_filter
self._context_aggregator = context_aggregator

# Register this RTVI processor globally for function confirmations
set_rtvi_processor(rtvi)
Expand All @@ -165,11 +170,23 @@ def __init__(
# Conversation management (delegates to service)
self._conversation_manager = get_conversation_manager()

# Conversation storage setup (completely independent from charts)
if config.ENABLE_CONVERSATION_STORAGE:
self._conversation_accumulated_text = ""
self._conversation_is_collecting = False
self._last_saved_user_message = None

async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames and delegate conversation logic to ConversationManager."""
await super().process_frame(frame, direction)

if isinstance(frame, TextFrame):
# Collect text for conversation storage if we're in collecting mode
if config.ENABLE_CONVERSATION_STORAGE and getattr(
self, "_conversation_is_collecting", False
):
self._conversation_accumulated_text += frame.text

if config.SANITIZE_TEXT_FOR_TTS:
await self.push_frame(
TextFrame(text=sanitize_markdown(frame.text)), direction
Expand All @@ -182,16 +199,25 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
await self.push_frame(frame, direction)

# LLM Response Start - begin collecting text and start conversation turn
elif isinstance(frame, LLMFullResponseStartFrame) and self._enable_charts:
self._is_collecting_response = True
self._accumulated_text = ""
elif isinstance(frame, LLMFullResponseStartFrame) and (
self._enable_charts or config.ENABLE_CONVERSATION_STORAGE
):
# Charts logic
if self._enable_charts:
self._is_collecting_response = True
self._accumulated_text = ""

# Start conversation turn via ConversationManager
event = await self._conversation_manager.start_turn_with_events(
self._session_id
)
if event:
await emit_rtvi_event(self._rtvi, event, self._session_id)
# Start conversation turn via ConversationManager
event = await self._conversation_manager.start_turn_with_events(
self._session_id
)
if event:
await emit_rtvi_event(self._rtvi, event, self._session_id)

# Conversation storage logic (independent)
if config.ENABLE_CONVERSATION_STORAGE:
self._conversation_is_collecting = True
self._conversation_accumulated_text = ""

await self.push_frame(frame, direction)

Expand All @@ -212,16 +238,32 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
await self.push_frame(frame, direction)

# LLM Response Complete - send to ConversationManager
elif isinstance(frame, LLMFullResponseEndFrame) and self._enable_charts:
if self._accumulated_text.strip():
event = await self._conversation_manager.add_llm_response_with_events(
self._session_id, self._accumulated_text.strip()
)
if event:
await emit_rtvi_event(self._rtvi, event, self._session_id)
elif isinstance(frame, LLMFullResponseEndFrame) and (
self._enable_charts or config.ENABLE_CONVERSATION_STORAGE
):
# Charts logic
if self._enable_charts:
if self._accumulated_text.strip():
event = (
await self._conversation_manager.add_llm_response_with_events(
self._session_id, self._accumulated_text.strip()
)
)
if event:
await emit_rtvi_event(self._rtvi, event, self._session_id)

self._accumulated_text = ""
self._is_collecting_response = False

# Conversation storage logic (independent)
if config.ENABLE_CONVERSATION_STORAGE:
if self._conversation_accumulated_text.strip():
await self._save_conversation_pair(
self._conversation_accumulated_text.strip()
)
self._conversation_accumulated_text = ""
self._conversation_is_collecting = False

self._accumulated_text = ""
self._is_collecting_response = False
await self.push_frame(frame, direction)

# Function Call Start - emit RTVI event and track in conversation
Expand Down Expand Up @@ -332,3 +374,49 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
)
else:
await self.push_frame(frame, direction)

async def _save_conversation_pair(self, assistant_message: str):
"""Save user and assistant messages as a pair to database."""
try:
# Save both messages to database
conversation_service = get_conversation_storage_service()

# Extract user message from context if context_aggregator is available
user_message = None
if hasattr(self, "_context_aggregator") and self._context_aggregator:
try:
context = self._context_aggregator._user._context
context_messages = context.get_messages()

for msg in reversed(context_messages):
if msg.get("role") == "user":
content = msg.get("content", "")
if isinstance(content, str) and content.strip():
user_message = content.strip()
break
except Exception as e:
logger.debug(f"Could not extract user message from context: {e}")

# If we have a user message, check if it's a duplicate
if user_message:
if user_message != self._last_saved_user_message:
# Save user message first (only if not duplicate)
await conversation_service.save_message(
session_id=self._session_id,
role="user",
content=user_message,
)

# Track the saved user message to prevent duplicates
self._last_saved_user_message = user_message

# Always save assistant message (even if user message was duplicate)
# This allows multiple assistant responses per user question
await conversation_service.save_message(
session_id=self._session_id,
role="assistant",
content=assistant_message,
)

except Exception as e:
logger.error(f"[{self._session_id}] Failed to save conversation pair: {e}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
"""
Conversation Storage Service.
Handles all conversation and message storage operations.
"""

from typing import Any, Dict, List, Optional

from app.core.logger import logger
from app.database.accessor.conversation.conversations import (
complete_conversation as db_complete_conversation,
)
from app.database.accessor.conversation.conversations import (
create_conversation as db_create_conversation,
)
from app.database.accessor.conversation.conversations import (
get_conversation_by_session_id,
)
from app.database.accessor.conversation.conversations import (
save_message as db_save_message,
)
from app.database.accessor.conversation.conversations import (
update_conversation_activity,
)
from app.schemas import Conversation, ConversationMessage


class ConversationStorageService:
"""
Service for managing conversation storage.
Provides high-level interface for conversation operations.
"""

async def create_conversation(
self,
session_id: str,
merchant_id: Optional[str] = None,
client_sid: Optional[str] = None,
user_email: Optional[str] = None,
user_name: Optional[str] = None,
shop_id: Optional[str] = None,
shop_url: Optional[str] = None,
reseller_id: Optional[str] = None,
mode: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Optional[Conversation]:
"""
Create a new conversation.

Args:
session_id: Unique session identifier
merchant_id: Merchant identifier
client_sid: Optional client session ID
user_email: User email
user_name: User name
shop_id: Shop identifier
shop_url: Shop URL
reseller_id: Reseller identifier
mode: Conversation mode (e.g., 'TEST' or 'LIVE')
metadata: Additional metadata

Returns:
Created Conversation object or None if failed
"""
conversation = await db_create_conversation(
session_id=session_id,
merchant_id=merchant_id,
client_sid=client_sid,
user_email=user_email,
user_name=user_name,
shop_id=shop_id,
shop_url=shop_url,
reseller_id=reseller_id,
mode=mode,
metadata=metadata,
)

if conversation:
logger.info(f"Conversation created: {session_id}")

return conversation

async def save_message(
self,
session_id: str,
role: str,
content: str,
last_saved_user_message: Optional[str] = None,
) -> tuple[Optional[ConversationMessage], Optional[str]]:
"""
Save a message to a conversation.

Args:
session_id: Session identifier
role: Message role ('user' or 'assistant')
content: Message content
last_saved_user_message: Last saved user message to prevent duplicates (for user messages only)

Returns:
Tuple of (Saved ConversationMessage or None if failed, updated last_saved_user_message)
"""
# For user messages, check if it's a duplicate
if role == "user" and last_saved_user_message == content:
logger.debug(f"Skipping duplicate user message for session: {session_id}")
return (None, last_saved_user_message)

# Get conversation
conversation = await get_conversation_by_session_id(session_id)
if not conversation:
logger.error(f"Conversation not found for session: {session_id}")
return (None, last_saved_user_message)

# Calculate sequence number (current count + 1)
sequence_number = conversation.message_count + 1

# Save message
message = await db_save_message(
conversation_id=conversation.id,
role=role,
content=content,
sequence_number=sequence_number,
)

if message:
# Update conversation activity
await update_conversation_activity(session_id)
else:
logger.error(f"Failed to save message to session: {session_id}")

# Update last_saved_user_message if this was a user message
new_last_saved = (
content if role == "user" and message else last_saved_user_message
)

return (message, new_last_saved)

async def complete_conversation(self, session_id: str) -> Optional[Conversation]:
"""
Mark conversation as completed.

Args:
session_id: Session identifier

Returns:
Updated Conversation object or None if failed
"""
conversation = await db_complete_conversation(session_id)

if conversation:
logger.info(f"Conversation completed: {session_id}")
else:
logger.error(f"Failed to complete conversation: {session_id}")

return conversation


# Global service instance
_conversation_storage_service: Optional[ConversationStorageService] = None


def get_conversation_storage_service() -> ConversationStorageService:
"""
Get the global conversation storage service instance.

Returns:
ConversationStorageService instance
"""
global _conversation_storage_service
if _conversation_storage_service is None:
_conversation_storage_service = ConversationStorageService()
return _conversation_storage_service
5 changes: 5 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,11 @@ def get_required_env(var_name: str) -> str:
BREEZE_BUDDY_DASHBOARD_PASSWORD = os.getenv("BREEZE_BUDDY_DASHBOARD_PASSWORD", "")
BREEZE_BUDDY_SESSION_SECRET_KEY = os.getenv("BREEZE_BUDDY_SESSION_SECRET_KEY", "")

# Conversation Storage Configuration
ENABLE_CONVERSATION_STORAGE = (
os.environ.get("ENABLE_CONVERSATION_STORAGE", "false").lower() == "true"
)

ENABLE_BREEZE_BUDDY_TRACING = (
os.getenv("ENABLE_BREEZE_BUDDY_TRACING", "false").lower() == "true"
)
Expand Down
Loading