Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 261 additions & 0 deletions backend/app/agents/chatbot_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
import boto3
import json
import logging
from typing import Dict, Any, List
import httpx
from .base import ProactivePulseAgent
from .orchestrator import OrchestratorAgent
from .insight_fetch import InsightFetchAgent

logger = logging.getLogger(__name__)

class ChatbotAgent(ProactivePulseAgent):
def __init__(self, config):
super().__init__(
name="Chatbot Agent",
description="Natural language interface powered by AWS Bedrock"
)

self.config = config
self.orchestrator = OrchestratorAgent()
self.insight_fetch = InsightFetchAgent(config)

try:
session = boto3.Session(
profile_name='proactive-pulse',
region_name=config.aws_region
)
self.bedrock_client = session.client('bedrock-runtime')
logger.info(f"✅ Bedrock client initialized in region {config.aws_region}")
except Exception as e:
logger.error(f"❌ Failed to initialize Bedrock client: {str(e)}")
raise

# ============================================================
# Main conversational logic with Nova-Lite compliant message body
# ============================================================
async def process_message(self, message: str, conversation_id: str = None) -> Dict[str, Any]:
try:
logger.info(f"Processing message using Bedrock model: {self.config.aws_bedrock_model_text}")

# Step 1: Analyze user intent
intent, resp_type = self._classify_intent(message)
data = None
context = ""

# Step 2: Execute agent based on intent
if intent == 'get_health_status':
async with httpx.AsyncClient() as client:
response = await client.get("http://localhost:8000/health/", timeout=30.0)
response.raise_for_status()
data = response.json()
context = self._get_intent_prompt(intent, data)
elif intent == 'get_critical_insights':
insights = await self.insight_fetch.fetch_critical_insights()
data = {"insights": insights}
context = self._get_intent_prompt(intent, data, message)
elif intent == 'get_recent_insights':
insights = await self.insight_fetch.fetch_recent_insights()
data = {"insights": insights}
context = self._get_intent_prompt(intent, data, message)
elif intent == 'run_analysis':
data = await self.orchestrator.run_analysis(window_minutes=15)
context = self._get_intent_prompt(intent, data)
elif intent == 'detect_anomalies':
if hasattr(self.orchestrator, "detect_anomalies"):
data = await self.orchestrator.detect_anomalies(window_minutes=15)
else:
data = await self.orchestrator.run_analysis(window_minutes=15)
context = self._get_intent_prompt(intent, data)

# Step 3: Build Nova-Lite message payload
body = {
"messages": [
{
"role": "user",
"content": [
{
"text": (
"You are ProactivePulse AI, a helpful assistant that analyzes system health "
"and performance. Use markdown for readability. "
"If the user asks to *list* insights, respond with a clean bullet list only, "
"no explanations or analysis.\n\n"
f"{context}\n\nUser Question: {message}"
)
}
]
}
],
"inferenceConfig": {
"maxTokens": 512,
"temperature": 0.5,
"topP": 0.9
}
}

# Step 4: Invoke Bedrock Nova-Lite model
response = self.bedrock_client.invoke_model(
modelId=self.config.aws_bedrock_model_text,
body=json.dumps(body)
)

# Step 5: Parse and extract completion
result_body = response["body"].read() if hasattr(response["body"], "read") else response["body"]
result = json.loads(result_body)
completion = self._extract_completion(result)

# Step 6: Ensure data is non-null and formatted
if data is None:
data = {}
if resp_type == "insights":
data["formatted"] = self._format_insights_for_display(data.get("insights", []))

return {
"type": resp_type,
"message": completion,
"data": data
}

except Exception as e:
logger.error(f"Error processing message: {e}", exc_info=True)
return {
"type": "error",
"message": "I encountered an error processing your request.",
"data": {"error": str(e)}
}

# ============================================================
# Intent Classification and Prompt Generation
# ============================================================

def _classify_intent(self, message: str) -> (str, str):
"""Classify user intent from the message."""
text_lower = message.lower()

if "health" in text_lower or "status" in text_lower:
return "get_health_status", "health"
if "critical" in text_lower or "urgent" in text_lower:
return "get_critical_insights", "insights"
if "recent" in text_lower and ("insight" in text_lower or "analysis" in text_lower):
return "get_recent_insights", "insights"
if any(k in text_lower for k in ("analy", "analysis", "metrics")):
return "run_analysis", "analysis"
if any(k in text_lower for k in ("anomal", "anomaly")):
return "detect_anomalies", "anomaly"

return "chat", "chat"

def _get_intent_prompt(self, intent: str, data: Dict, message: str = "") -> str:
"""Generate a generalized prompt for the LLM based on the intent."""
prompts = self._get_agent_prompts()
prompt_template = prompts.get(intent, prompts["chat"])

formatted_data = self._format_data_for_llm(intent, data, message)

return prompt_template.format(data=formatted_data)

def _get_agent_prompts(self) -> Dict[str, str]:
"""Returns a dictionary of prompts for each agent."""
return {
"get_health_status": "Analyze the following system health data and provide a summary. Do not invent values. If metrics are missing, state they are 'not available'.\n\n```json\n{data}\n```",
"get_critical_insights": "Analyze these critical system insights. If the user wants a list, provide a bulleted list of issues. Otherwise, summarize the findings.\n\n{data}",
"get_recent_insights": "Analyze these recent system insights. If the user wants a list, provide a bulleted list of issues. Otherwise, summarize the findings.\n\n{data}",
"run_analysis": "Summarize the key findings from the recent system analysis provided below.\n\n{data}",
"detect_anomalies": "Summarize the detected system anomalies provided below.\n\n{data}",
"chat": "You are a helpful assistant. Please respond to the user's question.",
}

def _format_data_for_llm(self, intent: str, data: Dict, message: str) -> str:
"""Formats data for the LLM based on the intent."""
if not data:
return "No data available."

if intent == "get_health_status":
return json.dumps(data.get('services', {}), indent=2)

if intent in ["get_critical_insights", "get_recent_insights"]:
insights = data.get("insights", [])
if not insights:
return "No insights available."

text_lower = message.lower()
if "list" in text_lower or "show" in text_lower:
lines = ["List of insights:"]
for i, ins in enumerate(insights, start=1):
lines.append(f"{i}. {ins['hypothesis']} (Priority: {ins['priority'].upper()}, Confidence: {ins['confidence']*100:.1f}%)")
return "\n".join(lines)

context = ""
for insight in insights:
context += f"- Issue: {insight['hypothesis']}\n"
context += f" Priority: {insight['priority'].upper()}\n"
context += f" Confidence: {insight['confidence']*100:.1f}%\n"
context += f" Finding: {insight['narrative']}\n"
if insight.get('recommended_actions'):
context += " Recommended Actions:\n"
for action in insight['recommended_actions'][:2]:
context += f" * {action}\n"
context += "\n"
return context

if intent == "run_analysis":
if data.get("findings"):
return "Key Findings:\n" + "\n".join([f"- {finding}" for finding in data["findings"]])
return "No findings from the analysis."

if intent == "detect_anomalies":
if data.get("anomalies"):
context = ""
for anomaly in data["anomalies"]:
context += f"- {anomaly['metric']} on {anomaly['asset']}\n"
context += f" Score: {anomaly['score']*100:.1f}%\n"
context += f" Value: {anomaly['value']}\n\n"
return context
return "No anomalies detected."

return json.dumps(data, indent=2)


def _extract_completion(self, result: Dict) -> str:
"""Extract and clean up the model's response"""
try:
if "output" in result and "message" in result["output"]:
content = result["output"]["message"].get("content", [])
if content and isinstance(content[0], dict):
return content[0].get("text", "")
if isinstance(result.get("messages"), list) and result["messages"]:
return result["messages"][0].get("content", [])[0].get("text", "")
return result.get("outputText", "") or result.get("text", "") or ""
except Exception:
return ""

def _format_insights_for_display(self, insights: List[Dict]) -> Dict[str, Any]:
"""Format insights into structured display data"""
return {
"tables": {
"critical_issues": [
{
"issue": i['hypothesis'],
"priority": i['priority'],
"confidence": f"{i['confidence']*100:.1f}%",
"actions": i['recommended_actions']
}
for i in insights if i['priority'] == 'critical'
],
"affected_systems": [
{
"asset": a['asset'],
"metric": a['metric'],
"value": f"{a['value']:.2f}",
"score": f"{a['score']*100:.1f}%"
}
for i in insights
for a in i.get('anomaly_refs', [])
]
},
"summary": {
"total_insights": len(insights),
"critical_count": len([i for i in insights if i['priority'] == 'critical']),
"affected_systems": len(set(a['asset'] for i in insights for a in i.get('anomaly_refs', [])))
}
}
66 changes: 66 additions & 0 deletions backend/app/agents/insight_fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from .base import ProactivePulseAgent
from app.dependencies import get_storage_dependency
import logging

logger = logging.getLogger(__name__)

class InsightFetchAgent(ProactivePulseAgent):
"""Agent responsible for fetching and querying insights from storage."""

def __init__(self, config):
super().__init__(
name="Insight Fetch Agent",
description="Retrieves and filters insights from storage"
)
self.config = config
self.storage = get_storage_dependency()

async def fetch_recent_insights(self, limit: int = 5) -> List[Dict[str, Any]]:
"""Fetch most recent insights."""
try:
insights, _ = await self.storage.list_insights(limit=limit)
return [insight.dict() for insight in insights]
except Exception as e:
logger.error(f"Error fetching recent insights: {e}")
return []

async def fetch_insights_by_type(self, insight_type: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Fetch insights by type (analysis, anomaly, etc)."""
try:
insights, _ = await self.storage.list_insights(
limit=limit,
filters={"type": insight_type}
)
return [insight.dict() for insight in insights]
except Exception as e:
logger.error(f"Error fetching insights by type: {e}")
return []

async def fetch_critical_insights(self, limit: int = 5) -> List[Dict[str, Any]]:
"""Fetch critical priority insights."""
try:
insights, _ = await self.storage.list_insights(
limit=limit,
filters={"priority": "CRITICAL"}
)
return [insight.dict() for insight in insights]
except Exception as e:
logger.error(f"Error fetching critical insights: {e}")
return []

async def search_insights(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Search insights by content."""
try:
# Basic keyword filtering (enhance with proper text search)
insights, _ = await self.storage.list_insights(limit=50) # Get more to filter
matched = [
insight.dict() for insight in insights
if query.lower() in insight.summary.lower()
or query.lower() in insight.explanation.lower()
]
return matched[:limit]
except Exception as e:
logger.error(f"Error searching insights: {e}")
return []
39 changes: 39 additions & 0 deletions backend/app/api/chat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from typing import Dict, Any, Optional
from ..agents.chatbot_agent import ChatbotAgent
from ..config import get_settings
import logging

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/api/chat", tags=["chat"])

class ChatMessage(BaseModel):
text: str

class ChatResponse(BaseModel):
type: str
message: str
data: Optional[Dict[str, Any]] = None

@router.post("/", response_model=ChatResponse)
async def process_chat(
message: ChatMessage,
settings = Depends(get_settings)
):
try:
# Create chatbot instance with settings
chatbot = ChatbotAgent(settings)

# Process message
response = await chatbot.process_message(message.text)
return response

except Exception as e:
logger.error(f"Chat processing error: {str(e)}")
return ChatResponse(
type="error",
message="An error occurred while processing your message",
data={"error": str(e)}
)
5 changes: 3 additions & 2 deletions backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import sys
from io import StringIO
from app.config import get_settings
from app.api import insights, analysis, health
from app.api import chat, insights, analysis, health
from app.utils.logging import setup_logging
from app.agents.orchestrator import OrchestratorAgent
from app.storage.factory import get_storage_service
Expand Down Expand Up @@ -138,7 +138,7 @@ async def lifespan(app: FastAPI):
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins,
allow_origins=["http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
Expand Down Expand Up @@ -173,6 +173,7 @@ async def global_exception_handler(request: Request, exc: Exception):


# Include routers
app.include_router(chat.router)
app.include_router(health.router, prefix="/health", tags=["health"])
app.include_router(insights.router, prefix="/insights", tags=["insights"])
app.include_router(analysis.router, prefix="/analysis", tags=["analysis"])
Expand Down
Loading
Loading