Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 153 additions & 11 deletions backend/app/agents/chatbot_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ async def process_message(
data = {}
if resp_type == "insights" and "insights" in data:
data["formatted"] = self._format_insights_for_display(data.get("insights", []))

# Log the final data being returned
logger.info(f"Final response data: {data}")

return {
"type": resp_type,
Expand Down Expand Up @@ -285,7 +288,11 @@ async def _classify_intent_agentic(self, message: str, system_context: Dict[str,
5. detect_anomalies - User wants anomaly detection
6. explain_insight - User wants explanation of a specific insight
7. get_recommendations - User wants recommendations
8. chat - General conversation
8. search_insights - User wants to search/filter insights
9. get_ticket_summary - User wants ticket clustering summary
10. correlate_data - User wants to see anomaly-ticket correlations
11. get_system_trends - User wants to see trends over time
12. chat - General conversation

Respond with just the intent name and response type separated by comma.
Format: intent_name, response_type
Expand All @@ -305,7 +312,8 @@ async def _classify_intent_agentic(self, message: str, system_context: Dict[str,
valid_intents = [
'get_health_status', 'get_critical_insights', 'get_recent_insights',
'run_analysis', 'detect_anomalies', 'explain_insight',
'get_recommendations', 'chat'
'get_recommendations', 'search_insights', 'get_ticket_summary',
'correlate_data', 'get_system_trends', 'chat'
]
if intent not in valid_intents:
intent, resp_type = self._classify_intent(message)
Expand All @@ -332,7 +340,11 @@ async def _execute_agentic_action(
async with httpx.AsyncClient() as client:
response = await client.get("http://localhost:8000/health/", timeout=30.0)
response.raise_for_status()
data = response.json()
health_data = response.json()
# Extract services data for frontend display
services = health_data.get("services", {})
logger.info(f"Health services data: {services}")
data = {"services": services}
context = self._get_intent_prompt(intent, data)

elif intent == 'get_critical_insights':
Expand All @@ -346,10 +358,20 @@ async def _execute_agentic_action(
context = self._get_intent_prompt(intent, data, message)

elif intent == 'run_analysis':
# Instead of running analysis directly, provide a link to the analysis page
# Extract time window from message if mentioned
window_minutes = self._extract_time_window(message, default=15)
data = await self.orchestrator.run_analysis(window_minutes=window_minutes)
context = self._get_intent_prompt(intent, data)

# Create a response that provides a link to the analysis page
data = {
"analysis_link": "/analysis",
"window_minutes": window_minutes,
"message": f"I'll help you run an analysis. Please visit the analysis page to configure and start your analysis for the last {window_minutes} minutes."
}

context = f"""I can help you run an analysis. Please visit the analysis page at [http://localhost:3000/analysis](http://localhost:3000/analysis) to configure and start your analysis for the last {window_minutes} minutes.

You can also navigate to the Analysis page from the main menu."""

elif intent == 'detect_anomalies':
window_minutes = self._extract_time_window(message, default=15)
Expand All @@ -373,6 +395,53 @@ async def _execute_agentic_action(
data = {"insights": insights}
context = "Based on the following system insights, provide specific, actionable recommendations:\n\n{data}"
context = context.format(data=self._format_data_for_llm("get_critical_insights", data, message))

elif intent == 'search_insights':
# Search insights based on keywords in message
all_insights = await self.insight_fetch.fetch_recent_insights(limit=50)
relevant = self._find_relevant_insights(message, all_insights)
data = {"insights": relevant}
context = self._get_intent_prompt("get_recent_insights", data, message)

elif intent == 'get_ticket_summary':
# Get ticket clustering summary
window_minutes = self._extract_time_window(message, default=60)
tickets_data = await self.orchestrator.data_ingestion_agent.load_tickets_data(window_minutes)
clusters = await self.orchestrator.ticket_analysis_agent.cluster_tickets(tickets_data)
data = {
"ticket_summary": {
"total_tickets": len(tickets_data),
"clusters": len(clusters),
"top_clusters": clusters[:5] if clusters else []
}
}
context = f"Summarize the ticket clustering results: {len(tickets_data)} tickets grouped into {len(clusters)} clusters."

elif intent == 'correlate_data':
# Show anomaly-ticket correlations
window_minutes = self._extract_time_window(message, default=30)
metrics_data = await self.orchestrator.data_ingestion_agent.load_metrics_data(window_minutes)
tickets_data = await self.orchestrator.data_ingestion_agent.load_tickets_data(window_minutes)
anomalies = await self.orchestrator.anomaly_detection_agent.detect_anomalies(metrics_data)
clusters = await self.orchestrator.ticket_analysis_agent.cluster_tickets(tickets_data)
correlations = await self.orchestrator.correlation_agent.correlate_data(anomalies, clusters)
data = {
"correlations": correlations,
"summary": {
"anomalies": len(anomalies),
"ticket_clusters": len(clusters),
"correlations_found": len(correlations)
}
}
context = f"Explain the correlations between {len(anomalies)} anomalies and {len(clusters)} ticket clusters, resulting in {len(correlations)} correlated insights."

elif intent == 'get_system_trends':
# Get system trends over time
insights = await self.insight_fetch.fetch_recent_insights(limit=20)
# Analyze trends from insights
trend_summary = self._analyze_trends(insights)
data = {"trends": trend_summary, "insights": insights[:10]}
context = "Analyze the system trends based on recent insights and provide key observations."

else: # chat
# For general chat, include system context for awareness
Expand Down Expand Up @@ -721,12 +790,48 @@ def _find_relevant_insights(self, message: str, insights: List[Dict]) -> List[Di
relevant.append(insight)

return relevant[:5] # Return top 5 most relevant

def _analyze_trends(self, insights: List[Dict]) -> Dict[str, Any]:
"""Analyze trends from recent insights."""
if not insights:
return {"message": "No insights available for trend analysis"}

# Count by priority
priority_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0}
affected_services = set()
common_issues = {}

for insight in insights:
priority = insight.get('priority', 'medium')
priority_counts[priority] = priority_counts.get(priority, 0) + 1

# Track affected services/assets
for anomaly in insight.get('anomaly_refs', []):
affected_services.add(anomaly.get('asset', 'unknown'))

# Track common issue types
hypothesis = insight.get('hypothesis', '')
for word in hypothesis.lower().split():
if len(word) > 5: # Track meaningful words
common_issues[word] = common_issues.get(word, 0) + 1

# Get top 3 common issues
top_issues = sorted(common_issues.items(), key=lambda x: x[1], reverse=True)[:3]

return {
"total_insights": len(insights),
"priority_distribution": priority_counts,
"affected_services_count": len(affected_services),
"affected_services": list(affected_services)[:10],
"common_issue_keywords": [issue[0] for issue in top_issues],
"trend": "increasing" if priority_counts.get("critical", 0) > 2 else "stable"
}

# ============================================================
# Intent Classification and Prompt Generation
# ============================================================

def _classify_intent(self, message: str) -> (str, str):
def _classify_intent(self, message: str) -> tuple[str, str]:
"""Classify user intent from the message."""
text_lower = message.lower()

Expand Down Expand Up @@ -755,7 +860,7 @@ def _get_intent_prompt(self, intent: str, data: Dict, message: str = "") -> str:
def _get_agent_prompts(self) -> Dict[str, str]:
"""Returns a dictionary of prompts for each agent."""
return {
"get_health_status": "Analyze the following system health data and provide a summary. Do not invent values. If metrics are missing, state they are 'not available'.\n\n```json\n{data}\n```",
"get_health_status": "Provide a brief summary of the system health status. Do not list individual component statuses in the text. Simply state that all components are operating and that detailed status information is available in the table below. Keep the response concise and under 30 words.",
"get_critical_insights": "Analyze these critical system insights. If the user wants a list, provide a bulleted list of issues. Otherwise, summarize the findings.\n\n{data}",
"get_recent_insights": "Analyze these recent system insights. If the user wants a list, provide a bulleted list of issues. Otherwise, summarize the findings.\n\n{data}",
"run_analysis": "Summarize the key findings from the recent system analysis provided below.\n\n{data}",
Expand All @@ -769,7 +874,15 @@ def _format_data_for_llm(self, intent: str, data: Dict, message: str) -> str:
return "No data available."

if intent == "get_health_status":
return json.dumps(data.get('services', {}), indent=2)
services = data.get('services', {})
if not services:
return "No service health data available."

# Format health data with proper structure for display
context = "## System Health Status\n\n"
context += "All components in the system are currently operating. See the detailed status table below for specific component statuses.\n\n"

return context

if intent in ["get_critical_insights", "get_recent_insights"]:
insights = data.get("insights", [])
Expand Down Expand Up @@ -797,9 +910,38 @@ def _format_data_for_llm(self, intent: str, data: Dict, message: str) -> str:
return context

if intent == "run_analysis":
if data.get("findings"):
return "Key Findings:\n" + "\n".join([f"- {finding}" for finding in data["findings"]])
return "No findings from the analysis."
findings = data.get("findings", [])
if not findings:
return "No findings from the analysis."

# Format findings as markdown with proper headers and structure
context = "## Key Findings Summary\n\n"
context += "The recent system analysis reveals several high-priority anomalies that correlate with various types of issues affecting support tickets. Here is a summary of the key findings:\n\n"

# Group findings by priority or type if possible
for i, finding in enumerate(findings, start=1):
if isinstance(finding, dict):
hypothesis = finding.get('hypothesis', 'Unknown issue')
priority = finding.get('priority', 'medium').upper()
confidence = finding.get('confidence', 0.5)
# Use 'explanation' instead of 'narrative' for insight objects
narrative = finding.get('explanation', finding.get('narrative', ''))
actions = finding.get('recommended_actions', [])

context += f"### {hypothesis}\n\n"
context += f"- **Priority**: {priority}\n"
context += f"- **Confidence**: {confidence*100:.1f}%\n"
context += f"- **Finding**: {narrative}\n"
if actions:
context += "- **Recommended Actions**:\n"
for action in actions[:3]: # Limit to top 3 actions
context += f" - {action}\n"
context += "\n"
else:
# Handle string findings
context += f"{i}. {finding}\n\n"

return context

if intent == "detect_anomalies":
if data.get("anomalies"):
Expand Down
5 changes: 5 additions & 0 deletions backend/app/api/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ async def process_chat(
conversation_id=message.conversation_id,
context=message.context
)

# Log the response being returned
logger.info(f"Chat API response type: {response.get('type')}")
logger.info(f"Chat API response data: {response.get('data')}")

return response

except Exception as e:
Expand Down
57 changes: 49 additions & 8 deletions backend/app/api/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,55 @@ async def health_check(

# Test OpenSearch connectivity
try:
opensearch_config = storage_config.get('opensearch', {})
if opensearch_config.get('endpoint'):
# Check if OpenSearch client is initialized
if hasattr(aws_storage, 'opensearch') and aws_storage.opensearch:
opensearch_healthy = aws_storage.opensearch.health_check()
services["opensearch"] = "healthy" if opensearch_healthy else "unhealthy"
else:
services["opensearch"] = "not_configured"
opensearch_endpoint = settings.opensearch_endpoint
if opensearch_endpoint:
# Try to connect and check cluster health with AWS authentication
try:
import boto3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
import httpx

# Get AWS credentials
session = boto3.Session(
profile_name='proactive-pulse',
region_name=settings.aws_region
)
credentials = session.get_credentials()

# Prepare the request
url = f"{opensearch_endpoint}/_cluster/health"
request = AWSRequest(method='GET', url=url)

# Sign the request with AWS SigV4
SigV4Auth(credentials, 'es', settings.aws_region).add_auth(request)

# Make the authenticated request
async with httpx.AsyncClient(verify=settings.opensearch_verify_certs) as client:
response = await client.get(
url,
headers=dict(request.headers),
timeout=5.0
)
if response.status_code == 200:
health_data = response.json()
cluster_status = health_data.get('status', 'unknown')
logger.info(f"OpenSearch cluster status: {cluster_status}")
# OpenSearch cluster status: green, yellow, or red
# Treat both green and yellow as healthy since yellow is still functional
if cluster_status in ['green', 'yellow']:
services["opensearch"] = "healthy"
else:
services["opensearch"] = "unhealthy"
elif response.status_code == 403:
logger.warning(f"OpenSearch authentication failed (403)")
services["opensearch"] = "unhealthy"
else:
logger.warning(f"OpenSearch returned status {response.status_code}")
services["opensearch"] = "unhealthy"
except Exception as conn_error:
logger.warning(f"OpenSearch connectivity test failed: {conn_error}")
services["opensearch"] = "unhealthy"
else:
services["opensearch"] = "not_configured"
except Exception as e:
Expand Down
Loading
Loading