diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..29eda91 --- /dev/null +++ b/.gitignore @@ -0,0 +1,45 @@ +# Python +__pycache__/ +*.py[cod] +*.pyo +*.pyd +.Python +*.egg +*.egg-info/ +dist/ +build/ +eggs/ +parts/ +var/ +sdist/ +develop-eggs/ +.installed.cfg +lib/ +lib64/ +*.so + +# Virtual environments +.env +.venv +env/ +venv/ +ENV/ + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ + +# IDEs +.idea/ +.vscode/ +*.swp +*.swo + +# OS +.DS_Store +Thumbs.db + +# Node +node_modules/ diff --git a/agents/__init__.py b/agents/__init__.py new file mode 100644 index 0000000..36401d3 --- /dev/null +++ b/agents/__init__.py @@ -0,0 +1,16 @@ +""" +Agents package for the Agentic AGI robotics system. + +Contains all agent implementations including the Resolver Agent. +""" + +from agents.base_agent import AgentMessage, AgentState, AgentStatus, BaseAgent +from agents.resolver_agent import ResolverAgent + +__all__ = [ + "BaseAgent", + "AgentStatus", + "AgentState", + "AgentMessage", + "ResolverAgent", +] diff --git a/agents/base_agent.py b/agents/base_agent.py new file mode 100644 index 0000000..518e148 --- /dev/null +++ b/agents/base_agent.py @@ -0,0 +1,155 @@ +""" +Base Agent module for the Agentic AGI robotics system. +All agents inherit from this base class. +""" + +import logging +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Callable, Dict, List, Optional + + +class AgentStatus(Enum): + """Enumeration of possible agent statuses.""" + INITIALIZING = "initializing" + HEALTHY = "healthy" + DEGRADED = "degraded" + CRITICAL = "critical" + OFFLINE = "offline" + RECOVERING = "recovering" + + +@dataclass +class AgentState: + """Data class representing the current state of an agent.""" + name: str + status: AgentStatus = AgentStatus.INITIALIZING + last_heartbeat: float = field(default_factory=time.time) + cpu_usage: float = 0.0 + memory_usage: float = 0.0 + error_count: int = 0 + task_count: int = 0 + response_time: float = 0.0 + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class AgentMessage: + """Data class representing a message exchanged between agents.""" + sender: str + recipient: str + message_type: str + payload: Dict[str, Any] + timestamp: float = field(default_factory=time.time) + priority: int = 0 + + +class BaseAgent(ABC): + """ + Abstract base class for all agents in the Agentic AGI robotics system. + + Provides common functionality including: + - State management + - Message passing + - Health reporting + - Lifecycle management + """ + + def __init__(self, name: str, priority: int = 0): + self.name = name + self.priority = priority + self.state = AgentState(name=name) + self.logger = logging.getLogger(f"agent.{name}") + self._message_handlers: Dict[str, Callable] = {} + self._message_queue: List[AgentMessage] = [] + self._running = False + self._connected_agents: Dict[str, "BaseAgent"] = {} + + def connect_agent(self, agent: "BaseAgent") -> None: + """Register another agent for inter-agent communication.""" + self._connected_agents[agent.name] = agent + self.logger.debug("Connected to agent: %s", agent.name) + + def send_message(self, recipient: str, message_type: str, payload: Dict[str, Any], + priority: int = 0) -> bool: + """Send a message to another agent.""" + if recipient not in self._connected_agents: + self.logger.warning("Agent '%s' not found in connected agents", recipient) + return False + + msg = AgentMessage( + sender=self.name, + recipient=recipient, + message_type=message_type, + payload=payload, + priority=priority, + ) + self._connected_agents[recipient].receive_message(msg) + return True + + def receive_message(self, message: AgentMessage) -> None: + """Receive and queue a message for processing.""" + self._message_queue.append(message) + handler = self._message_handlers.get(message.message_type) + if handler: + handler(message) + else: + self.logger.debug( + "No handler for message type '%s' from '%s'", + message.message_type, + message.sender, + ) + + def register_message_handler(self, message_type: str, handler: Callable) -> None: + """Register a handler for a specific message type.""" + self._message_handlers[message_type] = handler + + def get_state(self) -> AgentState: + """Return the current agent state.""" + self.state.last_heartbeat = time.time() + return self.state + + def update_status(self, status: AgentStatus) -> None: + """Update the agent's operational status.""" + old_status = self.state.status + self.state.status = status + if old_status != status: + self.logger.info( + "Agent '%s' status changed: %s -> %s", self.name, old_status.value, status.value + ) + + def heartbeat(self) -> float: + """Record and return the current heartbeat timestamp.""" + self.state.last_heartbeat = time.time() + return self.state.last_heartbeat + + def start(self) -> None: + """Start the agent.""" + self._running = True + self.update_status(AgentStatus.HEALTHY) + self.on_start() + self.logger.info("Agent '%s' started", self.name) + + def stop(self) -> None: + """Stop the agent.""" + self._running = False + self.update_status(AgentStatus.OFFLINE) + self.on_stop() + self.logger.info("Agent '%s' stopped", self.name) + + @abstractmethod + def on_start(self) -> None: + """Called when agent starts. Override in subclasses.""" + + @abstractmethod + def on_stop(self) -> None: + """Called when agent stops. Override in subclasses.""" + + @abstractmethod + def execute(self, task: Dict[str, Any]) -> Dict[str, Any]: + """Execute a task. Override in subclasses.""" + + def __repr__(self) -> str: + return f"{self.__class__.__name__}(name={self.name!r}, status={self.state.status.value!r})" diff --git a/agents/resolver_agent.py b/agents/resolver_agent.py new file mode 100644 index 0000000..e23930a --- /dev/null +++ b/agents/resolver_agent.py @@ -0,0 +1,257 @@ +""" +Resolver Agent - The 6th Agent: System Mediator and Health Guardian. + +Responsibilities: +- Conflict resolution between agents +- Error detection and recovery +- Deadlock detection and breaking +- Resource arbitration +- Priority management +- System health monitoring +- Fallback strategy execution +""" + +import logging +import threading +import time +from typing import Any, Dict, List, Optional + +from agents.base_agent import AgentMessage, AgentStatus, BaseAgent +from resolver.arbitrator import AgentArbitrator +from resolver.conflict_resolution import ConflictDetector, ConflictResolver +from resolver.deadlock_detector import DeadlockDetector +from resolver.error_recovery import ErrorRecoverySystem +from resolver.fallback_planner import FallbackPlanner +from resolver.health_monitor import HealthMonitor + + +class ResolverAgent(BaseAgent): + """ + The 6th Agent: System Resolver and Health Guardian. + + This agent monitors all other agents, resolves conflicts, handles errors + gracefully, and ensures the system continues operating even when problems arise. + """ + + AGENT_NAME = "resolver" + AGENT_PRIORITY = 6 # Highest priority - can override all others + + def __init__(self): + super().__init__(name=self.AGENT_NAME, priority=self.AGENT_PRIORITY) + self.conflict_detector = ConflictDetector() + self.conflict_resolver = ConflictResolver() + self.error_handler = ErrorRecoverySystem() + self.arbitrator = AgentArbitrator() + self.health_monitor = HealthMonitor() + self.deadlock_detector = DeadlockDetector() + self.fallback_planner = FallbackPlanner() + self._monitored_agents: Dict[str, BaseAgent] = {} + self._monitor_thread: Optional[threading.Thread] = None + self._monitor_interval: float = 1.0 # seconds between health checks + + # ------------------------------------------------------------------ # + # Lifecycle + # ------------------------------------------------------------------ # + + def on_start(self) -> None: + """Start background monitoring thread.""" + self._monitor_thread = threading.Thread( + target=self._monitoring_loop, daemon=True, name="resolver-monitor" + ) + self._monitor_thread.start() + self.logger.info("Resolver Agent started - monitoring %d agents", + len(self._monitored_agents)) + + def on_stop(self) -> None: + """Stop background monitoring thread.""" + if self._monitor_thread and self._monitor_thread.is_alive(): + # The daemon thread will stop with the process; signal the loop + self._running = False + self.logger.info("Resolver Agent stopped") + + # ------------------------------------------------------------------ # + # Agent integration + # ------------------------------------------------------------------ # + + def integrate_with_agents(self, agents: List[BaseAgent]) -> None: + """Connect resolver to a list of agents for monitoring and arbitration.""" + for agent in agents: + self._monitored_agents[agent.name] = agent + self.connect_agent(agent) + agent.connect_agent(self) + self.health_monitor.register_agent(agent.name) + self.logger.info("Integrated with agent: %s", agent.name) + + def start_monitoring(self) -> None: + """Start the resolver and all monitoring sub-systems.""" + if not self._running: + self.start() + + # ------------------------------------------------------------------ # + # Monitoring loop + # ------------------------------------------------------------------ # + + def _monitoring_loop(self) -> None: + """Background loop that continuously monitors all registered agents.""" + while self._running: + try: + self.monitor_agents() + time.sleep(self._monitor_interval) + except Exception as exc: # pylint: disable=broad-except + self.logger.error("Error in monitoring loop: %s", exc) + + def monitor_agents(self) -> Dict[str, Any]: + """Monitor all registered agents and return a status summary.""" + results: Dict[str, Any] = {} + for agent_name, agent in self._monitored_agents.items(): + try: + state = agent.get_state() + self.health_monitor.update_agent_health(agent_name, state) + results[agent_name] = { + "status": state.status.value, + "last_heartbeat": state.last_heartbeat, + "error_count": state.error_count, + } + except Exception as exc: # pylint: disable=broad-except + self.logger.warning("Failed to monitor agent '%s': %s", agent_name, exc) + results[agent_name] = {"status": "unreachable", "error": str(exc)} + + # Check for conflicts and deadlocks + conflicts = self.detect_conflicts() + for conflict in conflicts: + self.resolve_conflict(conflict) + + deadlock = self.deadlock_detector.detect() + if deadlock: + self.break_deadlock(deadlock) + + return results + + # ------------------------------------------------------------------ # + # Conflict management + # ------------------------------------------------------------------ # + + def detect_conflicts(self) -> List[Dict[str, Any]]: + """Detect active conflicts between agents.""" + agent_states = { + name: agent.get_state() + for name, agent in self._monitored_agents.items() + } + return self.conflict_detector.detect_all(agent_states) + + def resolve_conflict(self, conflict: Dict[str, Any]) -> Dict[str, Any]: + """Resolve a conflict between agents and return the resolution.""" + self.logger.info("Resolving conflict: %s", conflict.get("type", "unknown")) + resolution = self.conflict_resolver.resolve(conflict) + + # Notify involved agents of the resolution + for agent_name in conflict.get("agents", []): + if agent_name in self._monitored_agents: + self.send_message( + recipient=agent_name, + message_type="conflict_resolution", + payload={"conflict": conflict, "resolution": resolution}, + priority=self.AGENT_PRIORITY, + ) + return resolution + + # ------------------------------------------------------------------ # + # Error recovery + # ------------------------------------------------------------------ # + + def recover_from_error(self, error: Dict[str, Any]) -> Dict[str, Any]: + """Handle an error and attempt recovery. Returns the recovery result.""" + self.logger.warning( + "Recovering from error: type=%s, severity=%s", + error.get("type", "unknown"), + error.get("severity", "unknown"), + ) + return self.error_handler.execute_recovery(error) + + # ------------------------------------------------------------------ # + # Resource arbitration + # ------------------------------------------------------------------ # + + def arbitrate_resources(self, requests: List[Dict[str, Any]]) -> Dict[str, Any]: + """Decide resource allocation when multiple agents request the same resource.""" + return self.arbitrator.arbitrate_resource_allocation(requests) + + # ------------------------------------------------------------------ # + # Deadlock management + # ------------------------------------------------------------------ # + + def detect_deadlock(self) -> Optional[Dict[str, Any]]: + """Detect deadlocks among monitored agents.""" + return self.deadlock_detector.detect() + + def break_deadlock(self, deadlock: Dict[str, Any]) -> Dict[str, Any]: + """Break a detected deadlock.""" + self.logger.warning("Breaking deadlock: %s", deadlock) + return self.deadlock_detector.break_deadlock(deadlock) + + # ------------------------------------------------------------------ # + # Health assessment + # ------------------------------------------------------------------ # + + def assess_system_health(self) -> Dict[str, Any]: + """Check and return the overall system health status.""" + agent_health = {} + for agent_name in self._monitored_agents: + agent_health[agent_name] = self.health_monitor.get_agent_health(agent_name) + + overall = self.health_monitor.compute_overall_status(agent_health) + alerts = self.health_monitor.get_active_alerts() + + return { + "overall_status": overall, + "agents": agent_health, + "alerts": alerts, + "timestamp": time.time(), + } + + def generate_health_report(self) -> Dict[str, Any]: + """Generate a detailed health report for the entire system.""" + health = self.assess_system_health() + health["report_generated_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + health["monitored_agents_count"] = len(self._monitored_agents) + return health + + # ------------------------------------------------------------------ # + # Fallback execution + # ------------------------------------------------------------------ # + + def execute_fallback(self, failure: Dict[str, Any]) -> Dict[str, Any]: + """Execute a fallback strategy in response to a failure.""" + self.logger.info("Executing fallback for failure: %s", failure.get("type", "unknown")) + plan = self.fallback_planner.plan_fallback(failure) + return self.fallback_planner.execute_fallback(plan) + + # ------------------------------------------------------------------ # + # Dashboard + # ------------------------------------------------------------------ # + + def launch_dashboard(self, port: int = 8080) -> None: + """Launch the health monitoring web dashboard.""" + from dashboard.health_dashboard import HealthDashboard # pylint: disable=import-outside-toplevel + dashboard = HealthDashboard(resolver_agent=self, port=port) + dashboard.run() + + # ------------------------------------------------------------------ # + # BaseAgent interface + # ------------------------------------------------------------------ # + + def execute(self, task: Dict[str, Any]) -> Dict[str, Any]: + """Execute a resolver task (dispatched by task type).""" + task_type = task.get("type", "") + dispatch: Dict[str, Any] = { + "resolve_conflict": lambda: self.resolve_conflict(task.get("conflict", {})), + "recover_error": lambda: self.recover_from_error(task.get("error", {})), + "arbitrate": lambda: self.arbitrate_resources(task.get("requests", [])), + "health_check": lambda: self.assess_system_health(), + "detect_deadlock": lambda: self.detect_deadlock(), + "execute_fallback": lambda: self.execute_fallback(task.get("failure", {})), + } + handler = dispatch.get(task_type) + if handler is None: + return {"error": f"Unknown task type: {task_type}"} + return handler() diff --git a/config/priorities.yaml b/config/priorities.yaml new file mode 100644 index 0000000..ed82f39 --- /dev/null +++ b/config/priorities.yaml @@ -0,0 +1,28 @@ +# Agent Priority Configuration +# ============================================================ +# Defines the priority level for each agent in the system. +# Higher values indicate higher precedence during arbitration +# and conflict resolution. +# +# The Resolver Agent has the highest priority (6) and can +# override any decision made by lower-priority agents. + +agent_priorities: + resolver: 6 # System Mediator – highest priority + perception: 5 # Always needs to see the environment + planning: 4 # Strategic decision-making + control: 3 # Physical robot control + communication: 2 # Human interaction + coordination: 1 # Multi-robot team management + +# Context-based priority overrides +# These rules temporarily elevate agent priority under specific conditions. +context_overrides: + emergency_stop: + control: 10 # Control agent takes absolute precedence in emergencies + + battery_critical: + planning: 8 # Planning must find charging route immediately + + human_command: + communication: 7 # Human commands override normal planning diff --git a/config/recovery_strategies.yaml b/config/recovery_strategies.yaml new file mode 100644 index 0000000..d1f793d --- /dev/null +++ b/config/recovery_strategies.yaml @@ -0,0 +1,87 @@ +# Recovery Strategies Configuration +# ============================================================ +# Defines the automatic recovery strategies for each error type +# and the fallback hierarchy for each subsystem. + +recovery_strategies: + + sensor_failure: + primary: switch_to_backup_sensor + secondary: degrade_gracefully + tertiary: request_manual_sensor_check + max_attempts: 3 + timeout_seconds: 10 + + agent_crash: + primary: restart_agent + secondary: redistribute_tasks + tertiary: request_human_intervention + max_attempts: 3 + timeout_seconds: 15 + + planning_failure: + primary: retry_with_different_algorithm + secondary: use_default_safe_plan + tertiary: request_human_plan + max_attempts: 5 + timeout_seconds: 20 + + execution_failure: + primary: replan_trajectory + secondary: try_alternative_path + tertiary: stop_and_wait + max_attempts: 3 + timeout_seconds: 30 + + communication_failure: + primary: retry_connection + secondary: switch_to_backup_channel + tertiary: failsafe_autonomous_mode + max_attempts: 5 + timeout_seconds: 5 + + hardware_failure: + primary: emergency_stop + secondary: limp_mode + tertiary: request_maintenance + max_attempts: 1 + timeout_seconds: 2 + +# Fallback hierarchy for each subsystem (ordered by preference) +fallback_hierarchy: + + navigation: + - retry_primary_navigation + - try_alternative_path + - try_simple_straight_line + - try_manual_waypoints + - request_human_guidance + - stop_safely + + manipulation: + - retry_primary_grasp + - try_alternative_grasp_approach + - try_different_grasp_point + - request_human_demonstration + - skip_object + + perception: + - retry_primary_perception + - switch_to_backup_sensor + - use_cached_perception_data + - request_human_visual_confirmation + - degrade_gracefully + + planning: + - retry_primary_planner + - try_alternative_planner + - use_default_safe_plan + - request_human_plan + - halt_and_wait + + communication: + - retry_primary_channel + - switch_to_backup_channel + - use_cached_commands + - request_local_mode + - failsafe_autonomous_mode diff --git a/config/resolver_config.yaml b/config/resolver_config.yaml new file mode 100644 index 0000000..4416a4d --- /dev/null +++ b/config/resolver_config.yaml @@ -0,0 +1,81 @@ +# Resolver Agent Configuration +# ============================================================ +# Main configuration file for the Resolver Agent (6th Agent). +# All thresholds and behaviour settings are defined here. + +resolver: + # Monitoring frequency in Hz + monitoring_rate: 10 + + # Health check frequency in Hz + health_check_rate: 1 + + # Agent priorities (higher = more important) + priorities: + perception: 5 + planning: 4 + control: 3 + communication: 2 + coordination: 1 + resolver: 6 # Highest – can override all other agents + + conflict_resolution: + # Default strategy when no other context is available + # Options: priority_based | voting | expertise | cost_based | ml_based + default_strategy: priority_based + + # Enable ML-based conflict prediction (requires PyTorch) + use_ml_prediction: true + + # Minimum fraction of votes required for voting strategy + voting_threshold: 0.6 + + error_recovery: + # Automatically attempt recovery without human intervention + auto_recovery: true + + # Maximum number of recovery attempts before escalating + max_recovery_attempts: 3 + + # Seconds before escalating an unresolved error to a human operator + escalation_timeout: 30 + + health_monitoring: + # Agent response time above this (seconds) triggers a DEGRADED alert + response_time_threshold: 1.0 + + # CPU usage above this percentage triggers a CRITICAL alert + cpu_threshold: 80 + + # Memory usage above this percentage triggers a CRITICAL alert + memory_threshold: 85 + + # Error rate above this fraction (0.1 = 10 %) triggers a WARNING alert + error_rate_threshold: 0.1 + + # Seconds without a heartbeat before marking an agent as OFFLINE + heartbeat_timeout: 30 + + deadlock_detection: + # How often to run deadlock detection (seconds) + check_interval: 5 + + # Seconds a wait may last before being flagged as a potential deadlock + timeout_threshold: 30 + + fallback: + # Enable the fallback strategy system + enable_fallback: true + + # Maximum fallback depth (0 = primary, 3 = safe shutdown) + max_fallback_depth: 3 + + # Request human help before performing a safe shutdown + request_human_help: true + + dashboard: + # Default port for the health monitoring web dashboard + port: 8080 + + # How often the dashboard data is refreshed (seconds) + refresh_interval: 2 diff --git a/dashboard/__init__.py b/dashboard/__init__.py new file mode 100644 index 0000000..939da66 --- /dev/null +++ b/dashboard/__init__.py @@ -0,0 +1,7 @@ +""" +Dashboard package for the Resolver Agent health monitoring UI. +""" + +from dashboard.health_dashboard import HealthDashboard + +__all__ = ["HealthDashboard"] diff --git a/dashboard/health_dashboard.py b/dashboard/health_dashboard.py new file mode 100644 index 0000000..9d0ac85 --- /dev/null +++ b/dashboard/health_dashboard.py @@ -0,0 +1,130 @@ +""" +Web-based health monitoring dashboard for the Resolver Agent. + +Provides a real-time view of all agent statuses, active alerts, +and system health metrics via a simple Flask web server. + +Dependencies: flask (optional; falls back to a stub when unavailable) +""" + +import json +import logging +import threading +import time +from typing import Any, Dict, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from agents.resolver_agent import ResolverAgent + +logger = logging.getLogger(__name__) + +try: + from flask import Flask, Response, jsonify, render_template + + _FLASK_AVAILABLE = True +except ImportError: + _FLASK_AVAILABLE = False + logger.warning("Flask not available; HealthDashboard will run in stub mode") + + +class HealthDashboard: + """ + Real-time health monitoring dashboard. + + When Flask is available the dashboard serves: + GET / → HTML dashboard + GET /api/health → JSON health data + GET /api/agents → JSON per-agent data + GET /api/alerts → JSON active alerts + + When Flask is not installed the dashboard logs health data to the console. + """ + + def __init__(self, resolver_agent: "ResolverAgent", port: int = 8080): + self.resolver = resolver_agent + self.port = port + self._app: Optional[Any] = None + self._server_thread: Optional[threading.Thread] = None + + if _FLASK_AVAILABLE: + self._app = self._create_flask_app() + + # ------------------------------------------------------------------ # + # Public API + # ------------------------------------------------------------------ # + + def run(self, debug: bool = False) -> None: + """Start the dashboard server (blocking when Flask is available).""" + if _FLASK_AVAILABLE and self._app is not None: + logger.info("Launching HealthDashboard on http://localhost:%d", self.port) + self._app.run(host="0.0.0.0", port=self.port, debug=debug) + else: + logger.info("Flask unavailable – printing health data every 5 s") + self._console_loop() + + def run_async(self) -> threading.Thread: + """Start the dashboard server in a background daemon thread.""" + self._server_thread = threading.Thread( + target=self.run, daemon=True, name="health-dashboard" + ) + self._server_thread.start() + return self._server_thread + + def get_health_data(self) -> Dict[str, Any]: + """Return the current health data as a plain dict.""" + return self.resolver.assess_system_health() + + # ------------------------------------------------------------------ # + # Flask application + # ------------------------------------------------------------------ # + + def _create_flask_app(self) -> Any: + """Create and configure the Flask application.""" + app = Flask(__name__, template_folder="templates", static_folder="static") + + @app.route("/") + def index(): + health = self.get_health_data() + return render_template("dashboard.html", health=health) + + @app.route("/api/health") + def api_health(): + return jsonify(self.get_health_data()) + + @app.route("/api/agents") + def api_agents(): + health = self.get_health_data() + return jsonify(health.get("agents", {})) + + @app.route("/api/alerts") + def api_alerts(): + health = self.get_health_data() + return jsonify(health.get("alerts", [])) + + @app.route("/api/stream") + def api_stream(): + """Server-Sent Events endpoint for real-time updates.""" + def generate(): + while True: + data = json.dumps(self.get_health_data()) + yield f"data: {data}\n\n" + time.sleep(2) + + return Response(generate(), mimetype="text/event-stream") + + return app + + # ------------------------------------------------------------------ # + # Console fallback + # ------------------------------------------------------------------ # + + def _console_loop(self) -> None: + """Print health data to the console when Flask is unavailable.""" + while True: + health = self.get_health_data() + logger.info("System health: %s", health.get("overall_status", "unknown")) + for name, data in health.get("agents", {}).items(): + logger.info(" Agent '%s': %s", name, data.get("status", "unknown")) + for alert in health.get("alerts", []): + logger.warning(" ALERT: %s", alert.get("message", "")) + time.sleep(5) diff --git a/dashboard/static/css/dashboard.css b/dashboard/static/css/dashboard.css new file mode 100644 index 0000000..a5c3d5e --- /dev/null +++ b/dashboard/static/css/dashboard.css @@ -0,0 +1,84 @@ +/* Resolver Agent Health Dashboard Styles */ + +:root { + --healthy: #2ecc71; + --degraded: #f39c12; + --critical: #e74c3c; + --unknown: #95a5a6; + --bg: #1a1a2e; + --surface: #16213e; + --text: #eaeaea; +} + +* { box-sizing: border-box; margin: 0; padding: 0; } + +body { + background: var(--bg); + color: var(--text); + font-family: 'Segoe UI', sans-serif; + min-height: 100vh; +} + +header { + background: var(--surface); + padding: 1rem 2rem; + display: flex; + align-items: center; + justify-content: space-between; + border-bottom: 2px solid #0f3460; +} + +header h1 { font-size: 1.4rem; } +#last-updated { font-size: 0.8rem; color: #aaa; } + +main { padding: 2rem; } + +section { margin-bottom: 2rem; } +section h2 { margin-bottom: 1rem; font-size: 1.1rem; color: #aaa; } + +/* Status badge */ +.badge { + display: inline-block; + padding: 0.4rem 1.2rem; + border-radius: 999px; + font-weight: bold; + font-size: 1rem; + text-transform: uppercase; +} +.badge.healthy { background: var(--healthy); color: #fff; } +.badge.degraded { background: var(--degraded); color: #fff; } +.badge.critical { background: var(--critical); color: #fff; } +.badge.unknown { background: var(--unknown); color: #fff; } + +/* Agent cards */ +.card-grid { + display: grid; + grid-template-columns: repeat(auto-fill, minmax(200px, 1fr)); + gap: 1rem; +} + +.agent-card { + background: var(--surface); + border-radius: 8px; + padding: 1rem; + border-left: 4px solid var(--unknown); +} +.agent-card.healthy { border-left-color: var(--healthy); } +.agent-card.degraded { border-left-color: var(--degraded); } +.agent-card.critical { border-left-color: var(--critical); } + +.agent-card h3 { font-size: 0.95rem; margin-bottom: 0.5rem; text-transform: capitalize; } +.agent-card .metric { font-size: 0.8rem; color: #aaa; margin: 2px 0; } +.agent-card .metric span { color: var(--text); font-weight: bold; } + +/* Alerts */ +#alerts-list { list-style: none; } +#alerts-list li { + background: var(--surface); + border-left: 4px solid var(--critical); + padding: 0.7rem 1rem; + margin-bottom: 0.5rem; + border-radius: 4px; + font-size: 0.9rem; +} +.no-alerts { border-left-color: var(--healthy) !important; color: #aaa; } diff --git a/dashboard/static/js/dashboard.js b/dashboard/static/js/dashboard.js new file mode 100644 index 0000000..8c72e91 --- /dev/null +++ b/dashboard/static/js/dashboard.js @@ -0,0 +1,87 @@ +/* Resolver Agent Health Dashboard – real-time update script */ + +const STATUS_BADGE = document.getElementById('status-badge'); +const AGENT_CARDS = document.getElementById('agent-cards'); +const ALERTS_LIST = document.getElementById('alerts-list'); +const LAST_UPDATED = document.getElementById('last-updated'); + +function statusClass(status) { + const s = (status || 'unknown').toLowerCase(); + if (s === 'healthy') return 'healthy'; + if (s === 'degraded') return 'degraded'; + if (s === 'critical') return 'critical'; + return 'unknown'; +} + +function renderHealth(data) { + // Overall status + const overall = (data.overall_status || 'unknown').toUpperCase(); + STATUS_BADGE.textContent = overall; + STATUS_BADGE.className = 'badge ' + statusClass(data.overall_status); + + // Agent cards + AGENT_CARDS.innerHTML = ''; + const agents = data.agents || {}; + Object.entries(agents).forEach(([name, info]) => { + const sc = statusClass(info.status); + const card = document.createElement('div'); + card.className = 'agent-card ' + sc; + card.innerHTML = ` +

${name}

+

Status: ${(info.status || 'unknown').toUpperCase()}

+

CPU: ${(info.cpu_usage || 0).toFixed(1)}%

+

Memory: ${(info.memory_usage || 0).toFixed(1)}%

+

Error rate: ${((info.error_rate || 0) * 100).toFixed(1)}%

+

Response: ${(info.response_time || 0).toFixed(2)}s

+ `; + AGENT_CARDS.appendChild(card); + }); + + // Alerts + const alerts = data.alerts || []; + ALERTS_LIST.innerHTML = ''; + if (alerts.length === 0) { + ALERTS_LIST.innerHTML = '
  • No active alerts ✓
  • '; + } else { + alerts.forEach(alert => { + const li = document.createElement('li'); + li.textContent = alert.message || JSON.stringify(alert); + ALERTS_LIST.appendChild(li); + }); + } + + LAST_UPDATED.textContent = 'Last updated: ' + new Date().toLocaleTimeString(); +} + +// Use Server-Sent Events for real-time updates +function connectSSE() { + const source = new EventSource('/api/stream'); + source.onmessage = (event) => { + try { + renderHealth(JSON.parse(event.data)); + } catch (e) { + console.error('Failed to parse health data', e); + } + }; + source.onerror = () => { + console.warn('SSE disconnected; retrying via polling'); + source.close(); + pollHealth(); + }; +} + +// Fallback: poll every 5 s when SSE is unavailable +function pollHealth() { + setInterval(() => { + fetch('/api/health') + .then(r => r.json()) + .then(renderHealth) + .catch(e => console.error('Health poll failed', e)); + }, 5000); +} + +// Attempt initial fetch immediately, then start SSE +fetch('/api/health') + .then(r => r.json()) + .then(data => { renderHealth(data); connectSSE(); }) + .catch(() => pollHealth()); diff --git a/dashboard/templates/dashboard.html b/dashboard/templates/dashboard.html new file mode 100644 index 0000000..fec74e9 --- /dev/null +++ b/dashboard/templates/dashboard.html @@ -0,0 +1,37 @@ + + + + + + Resolver Agent – Health Dashboard + + + +
    +

    🤖 Resolver Agent – System Health Dashboard

    + Updating… +
    + +
    + +
    +

    Overall System Status

    +
    UNKNOWN
    +
    + + +
    +

    Agent Health

    +
    +
    + + +
    +

    Active Alerts

    + +
    +
    + + + + diff --git a/docs/conflict_resolution_guide.md b/docs/conflict_resolution_guide.md new file mode 100644 index 0000000..36e4e2e --- /dev/null +++ b/docs/conflict_resolution_guide.md @@ -0,0 +1,97 @@ +# Conflict Resolution Guide + +## Overview + +The conflict resolution system detects and resolves disagreements between agents in the Agentic AGI robotics system. + +## Conflict Types + +| Type | Description | +|------|-------------| +| `perception_planning` | Perception sees objects that Planning doesn't know about (or vice versa) | +| `control` | Multiple agents simultaneously request robot control | +| `task` | The same task is assigned to multiple agents | +| `resource` | Multiple agents compete for the same resource (GPU, camera, etc.) | + +## Resolution Strategies + +### 1. Priority-Based (Default) +The agent with the highest priority wins. Uses the global priority table. + +```python +from resolver.conflict_resolution import ConflictResolver, ResolutionStrategy + +resolver = ConflictResolver() +result = resolver.resolve(conflict, ResolutionStrategy.PRIORITY_BASED) +``` + +### 2. Voting +Each agent casts a weighted vote (based on priority). The agent with the most votes wins. + +### 3. Expertise +The most domain-relevant agent is selected as the decision-maker. + +| Conflict Type | Expert Agent | +|---------------------|---------------| +| perception_planning | perception | +| control | control | +| task | coordination | +| resource | resolver | + +### 4. Cost-Based +The option with the lowest numerical cost (from `conflict.details.options`) is selected. + +```python +conflict = { + "type": "resource", + "agents": ["planning", "control"], + "details": { + "options": [ + {"id": "opt_a", "agent": "planning", "cost": 10}, + {"id": "opt_b", "agent": "control", "cost": 5}, + ] + }, +} +result = resolver.resolve(conflict, ResolutionStrategy.COST_BASED) +# → control wins (cost 5) +``` + +### 5. ML-Based +A trained PyTorch model predicts the best resolution. Falls back to priority-based if PyTorch is unavailable. + +## Conflict Prediction + +The `ConflictPredictor` class provides rule-based and ML-based prediction: + +```python +from resolver.conflict_resolution import ConflictPredictor + +predictor = ConflictPredictor() +predictions = predictor.predict(agent_states) +actions = predictor.suggest_preventive_actions(agent_states) +``` + +## Conflict Data Format + +```python +conflict = { + "type": "control", # ConflictType value + "agents": ["planning", "control"], # involved agent names + "description": "...", # human-readable description + "severity": 2, # 1-5 (1=minor, 5=critical) + "details": { ... }, # type-specific details +} +``` + +## Resolution Result Format + +```python +resolution = { + "strategy": "priority_based", + "winning_agent": "planning", + "action": "grant_control_to_planning", + "rationale": "Agent 'planning' has highest priority (4)", + "success": True, + "timestamp": 1234567890.0, +} +``` diff --git a/docs/error_recovery_guide.md b/docs/error_recovery_guide.md new file mode 100644 index 0000000..c5f66c1 --- /dev/null +++ b/docs/error_recovery_guide.md @@ -0,0 +1,80 @@ +# Error Recovery Guide + +## Overview + +The error recovery system automatically detects, classifies, and recovers from errors across all robot systems. + +## Error Severity Levels + +| Level | Value | Meaning | +|----------|-------|---------| +| INFO | 0 | Minor issue; log only | +| WARNING | 1 | Potential problem; monitor | +| ERROR | 2 | Significant issue; needs recovery | +| CRITICAL | 3 | System-threatening; immediate action | +| FATAL | 4 | System shutdown required | + +## Supported Error Types + +| Type | Trigger | Recovery Action | +|------------------------|-------------------------------|-----------------| +| `sensor_failure` | Camera/LIDAR offline | Switch to backup sensor | +| `agent_crash` | Agent stops responding (30s) | Restart agent | +| `planning_failure` | No valid plan found | Try alternative planner | +| `execution_failure` | Robot cannot reach goal | Re-plan trajectory | +| `communication_failure`| Connection lost | Use cached data / fail-safe | +| `hardware_failure` | Motor error / battery low | Emergency stop / limp mode | + +## Usage + +```python +from resolver.error_recovery import ErrorRecoverySystem, ErrorSeverity, ErrorType + +system = ErrorRecoverySystem() + +# Recover from an error +error = { + "type": ErrorType.SENSOR_FAILURE, + "sensor": "camera", + "severity": ErrorSeverity.ERROR, +} +result = system.execute_recovery(error) +# result["recovery_action"] → "switched_to_backup_sensor:camera" +# result["recovery_success"] → True +``` + +## Escalation + +After `MAX_RECOVERY_ATTEMPTS` (default: 3) failed attempts for the same error, the system escalates: + +```python +result["escalation_required"] = True # Human intervention needed +``` + +## Integration with Resolver Agent + +```python +resolver = ResolverAgent() +resolver.recover_from_error({ + "type": "agent_crash", + "agent": "planning", + "severity": 3, +}) +``` + +## Error Log + +```python +print(system.error_log) # List of all ErrorRecord dicts +``` + +## Detection + +The system can auto-detect errors by scanning agent states: + +```python +errors = system.detect_errors(agent_states) +# Returns list of error dicts for agents with: +# - No heartbeat for >30 seconds (agent_crash) +# - Error count >10 (unknown) +``` diff --git a/docs/resolver_agent.md b/docs/resolver_agent.md new file mode 100644 index 0000000..d3a1f7b --- /dev/null +++ b/docs/resolver_agent.md @@ -0,0 +1,114 @@ +# Resolver Agent + +## Overview + +The **Resolver Agent** is the 6th intelligent agent in the Agentic AGI robotics system. It acts as the **System Mediator and Health Guardian**, monitoring all other agents, resolving conflicts, handling errors gracefully, and ensuring the system continues operating even when problems arise. + +## Architecture + +``` +ResolverAgent +├── ConflictDetector ← detects disagreements between agents +├── ConflictResolver ← applies resolution strategies +├── ErrorRecoverySystem ← handles errors and automatic recovery +├── AgentArbitrator ← allocates resources and arbitrates control +├── HealthMonitor ← tracks metrics and raises alerts +├── DeadlockDetector ← finds and breaks deadlocks +└── FallbackPlanner ← executes fallback strategies +``` + +## Responsibilities + +| Responsibility | Module | +|-----------------------|---------------------------| +| Conflict resolution | `resolver/conflict_resolution.py` | +| Error recovery | `resolver/error_recovery.py` | +| Agent arbitration | `resolver/arbitrator.py` | +| Health monitoring | `resolver/health_monitor.py` | +| Deadlock detection | `resolver/deadlock_detector.py` | +| Fallback planning | `resolver/fallback_planner.py` | +| ML conflict prediction| `resolver/predictor.py` | + +## Quick Start + +```python +from agents.resolver_agent import ResolverAgent +from agents.base_agent import BaseAgent + +# Create the resolver +resolver = ResolverAgent() + +# Connect to other agents +resolver.integrate_with_agents([perception, planning, control, + communication, coordination]) + +# Start monitoring (non-blocking background thread) +resolver.start_monitoring() + +# Manually resolve a conflict +conflict = { + "type": "control", + "agents": ["planning", "control"], + "description": "Path disagreement", +} +resolution = resolver.resolve_conflict(conflict) + +# Check system health +health = resolver.generate_health_report() + +# Handle an error +error = {"type": "sensor_failure", "sensor": "camera", "severity": 2} +result = resolver.recover_from_error(error) +``` + +## Agent Priorities + +| Agent | Priority | +|---------------|----------| +| resolver | 6 (highest) | +| perception | 5 | +| planning | 4 | +| control | 3 | +| communication | 2 | +| coordination | 1 (lowest) | + +## Configuration + +See [`config/resolver_config.yaml`](../config/resolver_config.yaml) for all configuration options. + +## API Reference + +### `ResolverAgent` + +| Method | Description | +|--------|-------------| +| `integrate_with_agents(agents)` | Connect resolver to a list of agents | +| `start_monitoring()` | Start background monitoring loop | +| `monitor_agents()` | Run one monitoring cycle, return status dict | +| `detect_conflicts()` | Return list of active conflicts | +| `resolve_conflict(conflict)` | Resolve a conflict dict | +| `recover_from_error(error)` | Handle an error and attempt recovery | +| `arbitrate_resources(requests)` | Decide resource allocation | +| `detect_deadlock()` | Check for deadlocks (returns dict or None) | +| `break_deadlock(deadlock)` | Break a detected deadlock | +| `assess_system_health()` | Return overall system health | +| `generate_health_report()` | Return detailed health report | +| `execute_fallback(failure)` | Execute fallback strategy | +| `launch_dashboard(port=8080)` | Start the web health dashboard | + +## ROS2 Integration + +See [`ros2_interface/resolver_node.py`](../ros2_interface/resolver_node.py) for the ROS2 node implementation. + +Launch with: +```bash +ros2 launch rag7 resolver.launch.py +``` + +## Testing + +```bash +python -m pytest tests/ -v +``` + +All 87+ tests should pass. diff --git a/examples/conflict_resolution_demo.py b/examples/conflict_resolution_demo.py new file mode 100644 index 0000000..b420c84 --- /dev/null +++ b/examples/conflict_resolution_demo.py @@ -0,0 +1,120 @@ +""" +Conflict Resolution Demo + +Demonstrates the Resolver Agent's conflict detection and resolution +capabilities with multiple resolution strategies. +""" + +import sys +import os +import logging + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s") + +from agents.base_agent import AgentState, AgentStatus +from agents.resolver_agent import ResolverAgent +from resolver.conflict_resolution import ConflictDetector, ConflictResolver, ResolutionStrategy + + +def demo_perception_planning_conflict(): + print("\n=== Perception vs Planning Conflict ===") + detector = ConflictDetector() + state_p = AgentState(name="perception") + state_p.metadata["detected_objects"] = ["chair", "table", "robot"] + state_pl = AgentState(name="planning") + state_pl.metadata["known_objects"] = ["chair"] + + conflicts = detector.detect_perception_planning_conflict( + {"perception": state_p, "planning": state_pl} + ) + print(f"Detected {len(conflicts)} conflict(s):") + for c in conflicts: + print(f" Type: {c['type']}, Severity: {c['severity']}") + print(f" Missing objects: {c['details']['disagreements']}") + return conflicts + + +def demo_resource_conflict(): + print("\n=== Resource Conflict (Multiple Agents Need GPU) ===") + detector = ConflictDetector() + states = {} + for name in ["perception", "planning", "control"]: + state = AgentState(name=name) + state.metadata["requested_resources"] = ["GPU"] + states[name] = state + + conflicts = detector.detect_resource_conflict(states) + print(f"Detected {len(conflicts)} resource conflict(s):") + for c in conflicts: + print(f" Resource: {c['details']['resource']}") + print(f" Requesters: {c['details']['requesting_agents']}") + return conflicts + + +def demo_resolution_strategies(): + print("\n=== Resolution Strategies Demo ===") + resolver = ConflictResolver() + conflict = { + "type": "control", + "agents": ["planning", "control", "coordination"], + "description": "Multiple agents requesting robot control", + } + + strategies = [ + ResolutionStrategy.PRIORITY_BASED, + ResolutionStrategy.VOTING, + ResolutionStrategy.EXPERTISE, + ] + + for strategy in strategies: + result = resolver.resolve(conflict, strategy) + print(f" {strategy.value:20s} → winner: {result['winning_agent']}") + + +def demo_full_resolver(): + print("\n=== Full Resolver Agent Demo ===") + from agents.base_agent import BaseAgent + + class _DummyAgent(BaseAgent): + def __init__(self, name, priority=0): + super().__init__(name, priority) + def on_start(self): pass + def on_stop(self): pass + def execute(self, task): return {} + + resolver = ResolverAgent() + agents = [ + _DummyAgent("perception", 5), + _DummyAgent("planning", 4), + _DummyAgent("control", 3), + _DummyAgent("communication", 2), + _DummyAgent("coordination", 1), + ] + resolver.integrate_with_agents(agents) + resolver.start() + + # Simulate a control conflict + conflict = { + "type": "control", + "agents": ["planning", "control"], + "description": "Path disagreement", + } + print(f"\nResolving conflict: {conflict['description']}") + resolution = resolver.resolve_conflict(conflict) + print(f" Resolution: {resolution['winning_agent']} granted control " + f"({resolution['strategy']})") + + # Check system health + health = resolver.generate_health_report() + print(f"\nSystem health: {health['overall_status']}") + + resolver.stop() + print("\nDemo completed successfully!") + + +if __name__ == "__main__": + demo_perception_planning_conflict() + demo_resource_conflict() + demo_resolution_strategies() + demo_full_resolver() diff --git a/examples/error_recovery_demo.py b/examples/error_recovery_demo.py new file mode 100644 index 0000000..d547c00 --- /dev/null +++ b/examples/error_recovery_demo.py @@ -0,0 +1,87 @@ +""" +Error Recovery Demo + +Demonstrates the Resolver Agent's error detection and automatic +recovery capabilities for various error types. +""" + +import sys +import os +import logging + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s") + +from resolver.error_recovery import ErrorRecoverySystem, ErrorSeverity, ErrorType + + +def demo_error_classification(): + print("\n=== Error Classification ===") + system = ErrorRecoverySystem() + errors = [ + {"type": ErrorType.SENSOR_FAILURE, "sensor": "camera", "severity": ErrorSeverity.ERROR}, + {"type": ErrorType.AGENT_CRASH, "agent": "planning", "severity": ErrorSeverity.CRITICAL}, + {"type": ErrorType.PLANNING_FAILURE, "severity": ErrorSeverity.ERROR}, + {"type": ErrorType.HARDWARE_FAILURE, "component": "motor", "severity": ErrorSeverity.CRITICAL}, + ] + for error in errors: + record = system.classify_error(error) + print(f" {record.error_type:30s} → severity: {record.severity.name}") + + +def demo_automatic_recovery(): + print("\n=== Automatic Error Recovery ===") + system = ErrorRecoverySystem() + scenarios = [ + { + "name": "Camera failure during navigation", + "error": {"type": ErrorType.SENSOR_FAILURE, "sensor": "camera", + "severity": ErrorSeverity.ERROR}, + }, + { + "name": "Planning agent becomes unresponsive", + "error": {"type": ErrorType.AGENT_CRASH, "agent": "planning", + "severity": ErrorSeverity.CRITICAL}, + }, + { + "name": "Robot cannot reach goal", + "error": {"type": ErrorType.EXECUTION_FAILURE, + "severity": ErrorSeverity.ERROR}, + }, + { + "name": "Communication link lost", + "error": {"type": ErrorType.COMMUNICATION_FAILURE, + "severity": ErrorSeverity.WARNING}, + }, + { + "name": "Motor controller error", + "error": {"type": ErrorType.HARDWARE_FAILURE, "component": "motor", + "severity": ErrorSeverity.CRITICAL}, + }, + ] + + for scenario in scenarios: + print(f"\n Scenario: {scenario['name']}") + result = system.execute_recovery(scenario["error"]) + print(f" Action: {result['recovery_action']}") + print(f" Success: {result['recovery_success']}") + + +def demo_escalation(): + print("\n=== Recovery Escalation (max attempts exceeded) ===") + system = ErrorRecoverySystem() + error = {"type": ErrorType.SENSOR_FAILURE, "sensor": "lidar"} + + for attempt in range(ErrorRecoverySystem.MAX_RECOVERY_ATTEMPTS + 1): + result = system.execute_recovery(error) + if result.get("escalation_required"): + print(f" Attempt {attempt + 1}: ESCALATED – human intervention required") + else: + print(f" Attempt {attempt + 1}: Recovery action → {result['recovery_action']}") + + +if __name__ == "__main__": + demo_error_classification() + demo_automatic_recovery() + demo_escalation() + print("\nDemo completed successfully!") diff --git a/examples/health_monitoring_demo.py b/examples/health_monitoring_demo.py new file mode 100644 index 0000000..2550968 --- /dev/null +++ b/examples/health_monitoring_demo.py @@ -0,0 +1,108 @@ +""" +Health Monitoring Demo + +Demonstrates the Resolver Agent's health monitoring capabilities +including per-agent metrics, alerts, and failure prediction. +""" + +import sys +import os +import logging + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s") + +from agents.base_agent import AgentState, AgentStatus, BaseAgent +from agents.resolver_agent import ResolverAgent +from resolver.health_monitor import HealthMonitor + + +def demo_individual_agent_monitoring(): + print("\n=== Individual Agent Health Monitoring ===") + monitor = HealthMonitor() + + # Simulate different agent health scenarios + scenarios = { + "perception": {"cpu_usage": 20.0, "memory_usage": 30.0, + "response_time": 0.05, "error_count": 0, "task_count": 100}, + "planning": {"cpu_usage": 45.0, "memory_usage": 60.0, + "response_time": 1.8, "error_count": 2, "task_count": 50}, + "control": {"cpu_usage": 85.0, "memory_usage": 40.0, + "response_time": 0.1, "error_count": 0, "task_count": 200}, + "communication": {"cpu_usage": 15.0, "memory_usage": 20.0, + "response_time": 0.2, "error_count": 8, "task_count": 80}, + } + + for agent_name, metrics in scenarios.items(): + state = AgentState(name=agent_name) + for k, v in metrics.items(): + setattr(state, k, v) + monitor.update_agent_health(agent_name, state) + health = monitor.get_agent_health(agent_name) + print(f" {agent_name:15s}: {health['status'].upper():10s} " + f"CPU={health['cpu_usage']:5.1f}% " + f"RT={health['response_time']:.2f}s") + + +def demo_system_health_report(): + print("\n=== System Health Report ===") + monitor = HealthMonitor() + + for agent_name, (cpu, rt) in [ + ("perception", (20.0, 0.1)), + ("planning", (50.0, 0.5)), + ("control", (90.0, 0.05)), # Critical: high CPU + ]: + state = AgentState(name=agent_name) + state.cpu_usage = cpu + state.response_time = rt + monitor.update_agent_health(agent_name, state) + + report = monitor.generate_health_report() + print(f" Overall status: {report['overall_status'].upper()}") + print(f" Monitored agents: {report['total_agents']}") + print(f" Active alerts: {len(report['alerts'])}") + for alert in report["alerts"]: + print(f" ⚠️ {alert['message']}") + + +def demo_full_resolver_health(): + print("\n=== Resolver Agent Health Dashboard Data ===") + + class _DummyAgent(BaseAgent): + def __init__(self, name, priority=0): + super().__init__(name, priority) + def on_start(self): pass + def on_stop(self): pass + def execute(self, task): return {} + + resolver = ResolverAgent() + agents = [ + _DummyAgent("perception", 5), + _DummyAgent("planning", 4), + _DummyAgent("control", 3), + _DummyAgent("communication", 2), + _DummyAgent("coordination", 1), + ] + for agent in agents: + agent.start() + resolver.integrate_with_agents(agents) + resolver.start() + + health = resolver.generate_health_report() + print(f" Overall: {health['overall_status'].upper()}") + print(f" Generated: {health['report_generated_at']}") + print(" Agents:") + for name, data in health["agents"].items(): + print(f" {name:15s}: {data.get('status', 'unknown').upper()}") + + resolver.stop() + for agent in agents: + agent.stop() + print("\nDemo completed successfully!") + + +if __name__ == "__main__": + demo_individual_agent_monitoring() + demo_system_health_report() + demo_full_resolver_health() diff --git a/resolver/__init__.py b/resolver/__init__.py new file mode 100644 index 0000000..5cd5be6 --- /dev/null +++ b/resolver/__init__.py @@ -0,0 +1,28 @@ +""" +Resolver package for the Agentic AGI robotics system. + +Provides conflict resolution, error recovery, agent arbitration, +health monitoring, deadlock detection, fallback planning, and +ML-based conflict prediction. +""" + +from resolver.arbitrator import AgentArbitrator +from resolver.conflict_resolution import ConflictDetector, ConflictPredictor, ConflictResolver +from resolver.deadlock_detector import DeadlockDetector +from resolver.error_recovery import ErrorRecoverySystem, ErrorSeverity +from resolver.fallback_planner import FallbackPlanner +from resolver.health_monitor import HealthMonitor +from resolver.predictor import ConflictPredictor as MLConflictPredictor + +__all__ = [ + "ConflictDetector", + "ConflictResolver", + "ConflictPredictor", + "MLConflictPredictor", + "ErrorRecoverySystem", + "ErrorSeverity", + "AgentArbitrator", + "HealthMonitor", + "DeadlockDetector", + "FallbackPlanner", +] diff --git a/resolver/arbitrator.py b/resolver/arbitrator.py new file mode 100644 index 0000000..799e3ea --- /dev/null +++ b/resolver/arbitrator.py @@ -0,0 +1,213 @@ +""" +Agent arbitration logic for the Resolver Agent. + +Decides which agent gets control, which tasks take priority, and how +limited resources are allocated when multiple agents compete. +""" + +import logging +import time +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Dict, List, Optional + + +logger = logging.getLogger(__name__) + + +class ArbitrationStrategy(Enum): + """Available arbitration strategies.""" + PRIORITY_QUEUE = "priority_queue" + ROUND_ROBIN = "round_robin" + WEIGHTED_FAIR = "weighted_fair" + EMERGENCY_OVERRIDE = "emergency_override" + HUMAN_IN_THE_LOOP = "human_in_the_loop" + + +@dataclass +class ArbitrationResult: + """Outcome of an arbitration decision.""" + strategy: ArbitrationStrategy + winner: Optional[str] + allocation: Dict[str, Any] + rationale: str + timestamp: float = field(default_factory=time.time) + + def to_dict(self) -> Dict[str, Any]: + return { + "strategy": self.strategy.value, + "winner": self.winner, + "allocation": self.allocation, + "rationale": self.rationale, + "timestamp": self.timestamp, + } + + +class AgentArbitrator: + """ + Arbitrate between competing agent requests. + + Default agent priorities (higher number = higher priority): + resolver → 6 (highest; can override all) + perception → 5 + planning → 4 + control → 3 + communication → 2 + coordination → 1 + """ + + DEFAULT_PRIORITIES: Dict[str, int] = { + "perception": 5, + "planning": 4, + "control": 3, + "communication": 2, + "coordination": 1, + "resolver": 6, + } + + def __init__(self, agent_priorities: Optional[Dict[str, int]] = None): + self.agent_priorities = agent_priorities or dict(self.DEFAULT_PRIORITIES) + self._round_robin_index: Dict[str, int] = {} + + # ------------------------------------------------------------------ # + # Public API + # ------------------------------------------------------------------ # + + def arbitrate_control_request( + self, + requests: List[Dict[str, Any]], + strategy: ArbitrationStrategy = ArbitrationStrategy.PRIORITY_QUEUE, + ) -> Dict[str, Any]: + """Determine which agent gets control of the robot.""" + if not requests: + return ArbitrationResult( + strategy=strategy, + winner=None, + allocation={}, + rationale="No requests to arbitrate", + ).to_dict() + + # Emergency requests always override normal priority + emergency = [r for r in requests if r.get("emergency", False)] + if emergency: + return self._emergency_override(emergency, "control") + + return self._priority_queue_arbitration(requests, "control") + + def arbitrate_resource_allocation( + self, requests: List[Dict[str, Any]] + ) -> Dict[str, Any]: + """Allocate limited resources (CPU, memory, sensors) among requesting agents.""" + if not requests: + return ArbitrationResult( + strategy=ArbitrationStrategy.PRIORITY_QUEUE, + winner=None, + allocation={}, + rationale="No resource requests", + ).to_dict() + + # Group requests by resource + resource_groups: Dict[str, List[Dict[str, Any]]] = {} + for req in requests: + resource = req.get("resource", "unknown") + resource_groups.setdefault(resource, []).append(req) + + allocation: Dict[str, str] = {} + for resource, group in resource_groups.items(): + winner = max(group, key=lambda r: self.agent_priorities.get(r.get("agent", ""), 0)) + allocation[resource] = winner.get("agent", "") + + return ArbitrationResult( + strategy=ArbitrationStrategy.PRIORITY_QUEUE, + winner=None, + allocation=allocation, + rationale="Resources allocated by agent priority", + ).to_dict() + + def arbitrate_task_priority( + self, tasks: List[Dict[str, Any]] + ) -> Dict[str, Any]: + """Determine task execution order.""" + if not tasks: + return ArbitrationResult( + strategy=ArbitrationStrategy.PRIORITY_QUEUE, + winner=None, + allocation={}, + rationale="No tasks to arbitrate", + ).to_dict() + + sorted_tasks = sorted( + tasks, + key=lambda t: ( + t.get("priority", 0), + self.agent_priorities.get(t.get("agent", ""), 0), + ), + reverse=True, + ) + + return ArbitrationResult( + strategy=ArbitrationStrategy.PRIORITY_QUEUE, + winner=sorted_tasks[0].get("agent"), + allocation={"ordered_tasks": [t.get("id", str(i)) + for i, t in enumerate(sorted_tasks)]}, + rationale="Tasks sorted by priority then agent priority", + ).to_dict() + + def arbitrate_with_context( + self, requests: List[Dict[str, Any]], context: Dict[str, Any] + ) -> Dict[str, Any]: + """Context-aware arbitration (e.g., emergency overrides normal priority).""" + if context.get("emergency"): + return self._emergency_override(requests, "context_aware") + return self.arbitrate_control_request(requests) + + # ------------------------------------------------------------------ # + # Internal helpers + # ------------------------------------------------------------------ # + + def _priority_queue_arbitration( + self, requests: List[Dict[str, Any]], resource_type: str + ) -> Dict[str, Any]: + winner_req = max( + requests, + key=lambda r: ( + r.get("priority", 0), + self.agent_priorities.get(r.get("agent", ""), 0), + ), + ) + winner = winner_req.get("agent", "") + return ArbitrationResult( + strategy=ArbitrationStrategy.PRIORITY_QUEUE, + winner=winner, + allocation={resource_type: winner}, + rationale=f"Agent '{winner}' selected by priority queue for {resource_type}", + ).to_dict() + + def _emergency_override( + self, requests: List[Dict[str, Any]], resource_type: str + ) -> Dict[str, Any]: + """Grant control to the highest-priority emergency request.""" + winner_req = max( + requests, + key=lambda r: self.agent_priorities.get(r.get("agent", ""), 0), + ) + winner = winner_req.get("agent", "") + return ArbitrationResult( + strategy=ArbitrationStrategy.EMERGENCY_OVERRIDE, + winner=winner, + allocation={resource_type: winner}, + rationale=f"Emergency override: '{winner}' granted {resource_type}", + ).to_dict() + + def _round_robin( + self, requests: List[Dict[str, Any]], resource_type: str + ) -> Dict[str, Any]: + idx = self._round_robin_index.get(resource_type, 0) % len(requests) + winner = requests[idx].get("agent", "") + self._round_robin_index[resource_type] = idx + 1 + return ArbitrationResult( + strategy=ArbitrationStrategy.ROUND_ROBIN, + winner=winner, + allocation={resource_type: winner}, + rationale=f"Round-robin selection: index {idx}", + ).to_dict() diff --git a/resolver/conflict_resolution.py b/resolver/conflict_resolution.py new file mode 100644 index 0000000..ab678a9 --- /dev/null +++ b/resolver/conflict_resolution.py @@ -0,0 +1,379 @@ +""" +Conflict resolution module for the Resolver Agent. + +Provides: +- ConflictDetector – detect conflicts between agents +- ConflictResolver – apply resolution strategies +- ConflictPredictor – rule-based conflict prediction (ML version in predictor.py) +""" + +import logging +import time +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Dict, List, Optional + + +logger = logging.getLogger(__name__) + + +class ConflictType(Enum): + """Enumeration of recognised conflict categories.""" + PERCEPTION_PLANNING = "perception_planning" + CONTROL = "control" + TASK = "task" + RESOURCE = "resource" + COMMUNICATION = "communication" + UNKNOWN = "unknown" + + +class ResolutionStrategy(Enum): + """Enumeration of available resolution strategies.""" + PRIORITY_BASED = "priority_based" + VOTING = "voting" + EXPERTISE = "expertise" + TIME_BASED = "time_based" + COST_BASED = "cost_based" + ML_BASED = "ml_based" + + +@dataclass +class Conflict: + """Data class describing a detected conflict.""" + conflict_type: ConflictType + agents: List[str] + description: str + details: Dict[str, Any] = field(default_factory=dict) + severity: int = 1 + timestamp: float = field(default_factory=time.time) + + def to_dict(self) -> Dict[str, Any]: + return { + "type": self.conflict_type.value, + "agents": self.agents, + "description": self.description, + "details": self.details, + "severity": self.severity, + "timestamp": self.timestamp, + } + + +@dataclass +class Resolution: + """Data class describing the outcome of a conflict resolution.""" + strategy: ResolutionStrategy + winning_agent: Optional[str] + action: str + rationale: str + success: bool = True + timestamp: float = field(default_factory=time.time) + + def to_dict(self) -> Dict[str, Any]: + return { + "strategy": self.strategy.value, + "winning_agent": self.winning_agent, + "action": self.action, + "rationale": self.rationale, + "success": self.success, + "timestamp": self.timestamp, + } + + +# --------------------------------------------------------------------------- +# ConflictDetector +# --------------------------------------------------------------------------- + +class ConflictDetector: + """ + Detect conflicts between agents. + + Supports detection of: + - Perception vs Planning disagreements + - Competing control commands + - Overlapping or contradictory tasks + - Resource contention + """ + + def detect_all(self, agent_states: Dict[str, Any]) -> List[Dict[str, Any]]: + """Run all detectors and return a list of detected conflicts.""" + conflicts: List[Dict[str, Any]] = [] + conflicts.extend(self.detect_perception_planning_conflict(agent_states)) + conflicts.extend(self.detect_control_conflict(agent_states)) + conflicts.extend(self.detect_task_conflict(agent_states)) + conflicts.extend(self.detect_resource_conflict(agent_states)) + return conflicts + + def detect_perception_planning_conflict( + self, agent_states: Dict[str, Any] + ) -> List[Dict[str, Any]]: + """Detect conflicts between Perception and Planning agents.""" + conflicts: List[Dict[str, Any]] = [] + perception = agent_states.get("perception") + planning = agent_states.get("planning") + + if perception and planning: + perception_meta = getattr(perception, "metadata", {}) + planning_meta = getattr(planning, "metadata", {}) + p_objects = set(perception_meta.get("detected_objects", [])) + pl_objects = set(planning_meta.get("known_objects", [])) + disagreements = p_objects.symmetric_difference(pl_objects) + + if disagreements: + conflict = Conflict( + conflict_type=ConflictType.PERCEPTION_PLANNING, + agents=["perception", "planning"], + description="Object detection disagreement between Perception and Planning", + details={ + "perception_objects": list(p_objects), + "planning_objects": list(pl_objects), + "disagreements": list(disagreements), + }, + severity=2, + ) + conflicts.append(conflict.to_dict()) + return conflicts + + def detect_control_conflict(self, agent_states: Dict[str, Any]) -> List[Dict[str, Any]]: + """Detect conflicts where multiple agents compete for robot control.""" + conflicts: List[Dict[str, Any]] = [] + control_requesting = [ + name for name, state in agent_states.items() + if getattr(state, "metadata", {}).get("requesting_control", False) + ] + + if len(control_requesting) > 1: + conflict = Conflict( + conflict_type=ConflictType.CONTROL, + agents=control_requesting, + description="Multiple agents requesting simultaneous robot control", + details={"requesters": control_requesting}, + severity=3, + ) + conflicts.append(conflict.to_dict()) + return conflicts + + def detect_task_conflict(self, agent_states: Dict[str, Any]) -> List[Dict[str, Any]]: + """Detect overlapping or contradictory task assignments.""" + conflicts: List[Dict[str, Any]] = [] + task_assignments: Dict[str, List[str]] = {} + + for agent_name, state in agent_states.items(): + tasks = getattr(state, "metadata", {}).get("assigned_tasks", []) + for task_id in tasks: + task_assignments.setdefault(task_id, []).append(agent_name) + + for task_id, agents in task_assignments.items(): + if len(agents) > 1: + conflict = Conflict( + conflict_type=ConflictType.TASK, + agents=agents, + description=f"Task '{task_id}' assigned to multiple agents", + details={"task_id": task_id, "assigned_agents": agents}, + severity=2, + ) + conflicts.append(conflict.to_dict()) + return conflicts + + def detect_resource_conflict(self, agent_states: Dict[str, Any]) -> List[Dict[str, Any]]: + """Detect when multiple agents compete for the same resource.""" + conflicts: List[Dict[str, Any]] = [] + resource_requests: Dict[str, List[str]] = {} + + for agent_name, state in agent_states.items(): + resources = getattr(state, "metadata", {}).get("requested_resources", []) + for resource in resources: + resource_requests.setdefault(resource, []).append(agent_name) + + for resource, agents in resource_requests.items(): + if len(agents) > 1: + conflict = Conflict( + conflict_type=ConflictType.RESOURCE, + agents=agents, + description=f"Resource '{resource}' requested by multiple agents", + details={"resource": resource, "requesting_agents": agents}, + severity=2, + ) + conflicts.append(conflict.to_dict()) + return conflicts + + +# --------------------------------------------------------------------------- +# ConflictResolver +# --------------------------------------------------------------------------- + +# Default agent priorities used for priority-based resolution +_DEFAULT_PRIORITIES: Dict[str, int] = { + "perception": 5, + "planning": 4, + "control": 3, + "communication": 2, + "coordination": 1, + "resolver": 6, +} + + +class ConflictResolver: + """ + Apply resolution strategies to detected conflicts. + + Available strategies: + 1. Priority-based (higher priority wins) + 2. Voting (majority rules) + 3. Expert arbitration (most qualified agent decides) + 4. Time-based (first-come-first-served) + 5. Cost-based (least cost approach) + 6. ML-based (learned from past resolutions) + """ + + def __init__(self, agent_priorities: Optional[Dict[str, int]] = None): + self._priorities = agent_priorities or _DEFAULT_PRIORITIES + self._resolution_history: List[Dict[str, Any]] = [] + + def resolve(self, conflict: Dict[str, Any], + strategy: ResolutionStrategy = ResolutionStrategy.PRIORITY_BASED + ) -> Dict[str, Any]: + """Resolve a conflict using the specified strategy.""" + strategy_map = { + ResolutionStrategy.PRIORITY_BASED: self.resolve_by_priority, + ResolutionStrategy.VOTING: self.resolve_by_voting, + ResolutionStrategy.EXPERTISE: self.resolve_by_expertise, + ResolutionStrategy.COST_BASED: self.resolve_by_cost, + ResolutionStrategy.ML_BASED: self.resolve_by_ml, + } + handler = strategy_map.get(strategy, self.resolve_by_priority) + resolution = handler(conflict) + self._resolution_history.append(resolution.to_dict()) + logger.info("Conflict resolved via %s: %s", strategy.value, resolution.action) + return resolution.to_dict() + + def resolve_by_priority(self, conflict: Dict[str, Any]) -> Resolution: + """Resolve by selecting the highest-priority agent.""" + agents = conflict.get("agents", []) + if not agents: + return Resolution( + strategy=ResolutionStrategy.PRIORITY_BASED, + winning_agent=None, + action="no_agents_involved", + rationale="No agents listed in conflict", + success=False, + ) + winner = max(agents, key=lambda a: self._priorities.get(a, 0)) + return Resolution( + strategy=ResolutionStrategy.PRIORITY_BASED, + winning_agent=winner, + action=f"grant_control_to_{winner}", + rationale=f"Agent '{winner}' has highest priority " + f"({self._priorities.get(winner, 0)})", + ) + + def resolve_by_voting(self, conflict: Dict[str, Any]) -> Resolution: + """Resolve by majority vote among connected agents (simplified simulation).""" + agents = conflict.get("agents", []) + # In a real system this would broadcast a vote request; here we use priorities + votes: Dict[str, int] = {a: self._priorities.get(a, 0) for a in agents} + if not votes: + return Resolution( + strategy=ResolutionStrategy.VOTING, + winning_agent=None, + action="no_votes", + rationale="No agents to vote", + success=False, + ) + winner = max(votes, key=lambda a: votes[a]) + return Resolution( + strategy=ResolutionStrategy.VOTING, + winning_agent=winner, + action=f"grant_control_to_{winner}", + rationale=f"Agent '{winner}' received highest weighted vote ({votes[winner]})", + ) + + def resolve_by_expertise(self, conflict: Dict[str, Any]) -> Resolution: + """Resolve by selecting the most domain-relevant agent.""" + conflict_type = conflict.get("type", ConflictType.UNKNOWN.value) + expertise_map: Dict[str, str] = { + ConflictType.PERCEPTION_PLANNING.value: "perception", + ConflictType.CONTROL.value: "control", + ConflictType.TASK.value: "coordination", + ConflictType.RESOURCE.value: "resolver", + } + expert = expertise_map.get(conflict_type, "resolver") + return Resolution( + strategy=ResolutionStrategy.EXPERTISE, + winning_agent=expert, + action=f"delegate_decision_to_{expert}", + rationale=f"Agent '{expert}' has domain expertise for conflict type '{conflict_type}'", + ) + + def resolve_by_cost(self, conflict: Dict[str, Any]) -> Resolution: + """Resolve by selecting the lowest-cost option from conflict details.""" + options = conflict.get("details", {}).get("options", []) + if not options: + return self.resolve_by_priority(conflict) + + best = min(options, key=lambda o: o.get("cost", float("inf"))) + return Resolution( + strategy=ResolutionStrategy.COST_BASED, + winning_agent=best.get("agent"), + action=f"select_option_{best.get('id', 'best')}", + rationale=f"Option has minimum cost: {best.get('cost')}", + ) + + def resolve_by_ml(self, conflict: Dict[str, Any]) -> Resolution: + """Resolve using a trained ML model (falls back to priority-based if unavailable).""" + try: + from resolver.predictor import ConflictPredictor # pylint: disable=import-outside-toplevel + predictor = ConflictPredictor() + prediction = predictor.predict_best_resolution(conflict) + return Resolution( + strategy=ResolutionStrategy.ML_BASED, + winning_agent=prediction.get("winning_agent"), + action=prediction.get("action", "ml_resolution"), + rationale="ML model prediction", + ) + except Exception: # pylint: disable=broad-except + logger.warning("ML resolution unavailable, falling back to priority-based") + return self.resolve_by_priority(conflict) + + @property + def resolution_history(self) -> List[Dict[str, Any]]: + """Return the full history of resolutions applied.""" + return list(self._resolution_history) + + +# --------------------------------------------------------------------------- +# ConflictPredictor (rule-based; ML version is in predictor.py) +# --------------------------------------------------------------------------- + +class ConflictPredictor: + """ + Rule-based conflict predictor. + + Analyses current agent states and identifies patterns that historically + precede conflicts so that preventive actions can be taken. + """ + + def predict(self, agent_states: Dict[str, Any]) -> List[Dict[str, Any]]: + """Return a list of predicted conflicts with prevention suggestions.""" + predictions: List[Dict[str, Any]] = [] + + # Rule: If multiple agents are requesting the same resource, a conflict is imminent + resource_counts: Dict[str, int] = {} + for state in agent_states.values(): + for resource in getattr(state, "metadata", {}).get("requested_resources", []): + resource_counts[resource] = resource_counts.get(resource, 0) + 1 + + for resource, count in resource_counts.items(): + if count > 1: + predictions.append({ + "type": ConflictType.RESOURCE.value, + "probability": min(0.5 + count * 0.1, 0.99), + "description": f"Resource '{resource}' contention likely", + "prevention": f"Pre-allocate '{resource}' before conflict occurs", + }) + + return predictions + + def suggest_preventive_actions(self, agent_states: Dict[str, Any]) -> List[str]: + """Return a list of preventive action strings.""" + predictions = self.predict(agent_states) + return [p["prevention"] for p in predictions] diff --git a/resolver/deadlock_detector.py b/resolver/deadlock_detector.py new file mode 100644 index 0000000..5d53cdb --- /dev/null +++ b/resolver/deadlock_detector.py @@ -0,0 +1,180 @@ +""" +Deadlock detection and breaking for the Resolver Agent. + +Detects circular waits, resource deadlocks, and task deadlocks, +then applies breaking strategies to restore progress. +""" + +import logging +import time +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Set + + +logger = logging.getLogger(__name__) + + +@dataclass +class Deadlock: + """Represents a detected deadlock.""" + deadlock_type: str + involved_agents: List[str] + description: str + details: Dict[str, Any] = field(default_factory=dict) + detected_at: float = field(default_factory=time.time) + + def to_dict(self) -> Dict[str, Any]: + return { + "deadlock_type": self.deadlock_type, + "involved_agents": self.involved_agents, + "description": self.description, + "details": self.details, + "detected_at": self.detected_at, + } + + +class DeadlockDetector: + """ + Detect and break deadlocks in the multi-agent system. + + Deadlock types handled: + - Circular dependencies (A waits for B, B waits for A) + - Resource deadlocks (all agents waiting for held resources) + - Task deadlocks (tasks cannot proceed) + """ + + TIMEOUT_THRESHOLD = 30.0 # seconds a wait may last before suspecting deadlock + + def __init__(self): + # wait_graph[agent] = set of agents this agent is waiting for + self._wait_graph: Dict[str, Set[str]] = {} + # resource_holders[resource] = agent currently holding it + self._resource_holders: Dict[str, str] = {} + # resource_waiters[resource] = list of agents waiting for it + self._resource_waiters: Dict[str, List[str]] = {} + self._detected_deadlocks: List[Deadlock] = [] + + # ------------------------------------------------------------------ # + # Public API + # ------------------------------------------------------------------ # + + def update_wait_graph(self, agent: str, waiting_for: List[str]) -> None: + """Record that *agent* is waiting for the listed agents.""" + self._wait_graph[agent] = set(waiting_for) + + def update_resource_state( + self, resource: str, holder: Optional[str], waiters: Optional[List[str]] = None + ) -> None: + """Update resource holder and waiter information.""" + if holder: + self._resource_holders[resource] = holder + elif resource in self._resource_holders: + del self._resource_holders[resource] + self._resource_waiters[resource] = waiters or [] + + def detect(self) -> Optional[Dict[str, Any]]: + """Run all deadlock detection heuristics and return the first found.""" + deadlock = self.detect_circular_wait() + if deadlock: + return deadlock + return self.detect_resource_deadlock() + + def detect_circular_wait(self) -> Optional[Dict[str, Any]]: + """Detect circular waiting chains in the wait graph (DFS cycle detection).""" + cycle = self._find_cycle() + if cycle: + deadlock = Deadlock( + deadlock_type="circular_wait", + involved_agents=cycle, + description=f"Circular wait detected: {' -> '.join(cycle)}", + details={"cycle": cycle}, + ) + self._detected_deadlocks.append(deadlock) + logger.warning("Circular wait deadlock detected: %s", cycle) + return deadlock.to_dict() + return None + + def detect_resource_deadlock(self) -> Optional[Dict[str, Any]]: + """Detect resource deadlocks where every waiter is also a holder.""" + for resource, waiters in self._resource_waiters.items(): + holder = self._resource_holders.get(resource) + if holder and holder in waiters: + deadlock = Deadlock( + deadlock_type="resource_deadlock", + involved_agents=[holder] + waiters, + description=f"Resource deadlock on '{resource}': " + f"holder '{holder}' is also waiting", + details={"resource": resource, "holder": holder, "waiters": waiters}, + ) + self._detected_deadlocks.append(deadlock) + logger.warning("Resource deadlock detected on '%s'", resource) + return deadlock.to_dict() + return None + + def break_deadlock(self, deadlock: Dict[str, Any]) -> Dict[str, Any]: + """Break a deadlock using the most appropriate strategy.""" + deadlock_type = deadlock.get("deadlock_type", "unknown") + agents = deadlock.get("involved_agents", []) + + if not agents: + return {"action": "no_agents", "success": False} + + # Strategy: remove the lowest-priority agent from the cycle + from resolver.arbitrator import AgentArbitrator # pylint: disable=import-outside-toplevel + priorities = AgentArbitrator.DEFAULT_PRIORITIES + victim = min(agents, key=lambda a: priorities.get(a, 0)) + + action = { + "action": "preempt_agent", + "victim": victim, + "deadlock_type": deadlock_type, + "rationale": f"Agent '{victim}' preempted to break deadlock", + "success": True, + } + + # Remove the victim from the wait graph + self._wait_graph.pop(victim, None) + for agent_waiters in self._wait_graph.values(): + agent_waiters.discard(victim) + + logger.info("Deadlock broken: preempted agent '%s'", victim) + return action + + @property + def detected_deadlocks(self) -> List[Dict[str, Any]]: + """Return history of all detected deadlocks.""" + return [d.to_dict() for d in self._detected_deadlocks] + + # ------------------------------------------------------------------ # + # Internal helpers + # ------------------------------------------------------------------ # + + def _find_cycle(self) -> Optional[List[str]]: + """Return the nodes of a cycle in the wait graph, or None if none exists.""" + visited: Set[str] = set() + path: List[str] = [] + path_set: Set[str] = set() + + def dfs(node: str) -> Optional[List[str]]: + visited.add(node) + path.append(node) + path_set.add(node) + for neighbour in self._wait_graph.get(node, set()): + if neighbour not in visited: + result = dfs(neighbour) + if result is not None: + return result + elif neighbour in path_set: + # Found a cycle; extract the cycle portion + cycle_start = path.index(neighbour) + return path[cycle_start:] + path.pop() + path_set.discard(node) + return None + + for node in list(self._wait_graph.keys()): + if node not in visited: + cycle = dfs(node) + if cycle: + return cycle + return None diff --git a/resolver/error_recovery.py b/resolver/error_recovery.py new file mode 100644 index 0000000..f091fce --- /dev/null +++ b/resolver/error_recovery.py @@ -0,0 +1,208 @@ +""" +Error recovery system for the Resolver Agent. + +Classifies errors by severity and type, then executes the appropriate +recovery strategy automatically. +""" + +import logging +import time +from dataclasses import dataclass, field +from enum import IntEnum +from typing import Any, Callable, Dict, List, Optional + + +logger = logging.getLogger(__name__) + + +class ErrorSeverity(IntEnum): + """Error severity levels, ordered from least to most severe.""" + INFO = 0 # Minor issue; log only + WARNING = 1 # Potential problem; monitor + ERROR = 2 # Significant issue; needs recovery + CRITICAL = 3 # System-threatening; immediate action required + FATAL = 4 # System shutdown required + + +class ErrorType: + """Constants for recognised error types.""" + SENSOR_FAILURE = "sensor_failure" + AGENT_CRASH = "agent_crash" + PLANNING_FAILURE = "planning_failure" + EXECUTION_FAILURE = "execution_failure" + COMMUNICATION_FAILURE = "communication_failure" + HARDWARE_FAILURE = "hardware_failure" + UNKNOWN = "unknown" + + +@dataclass +class ErrorRecord: + """Record of a detected error and its recovery attempt.""" + error_type: str + severity: ErrorSeverity + description: str + details: Dict[str, Any] = field(default_factory=dict) + timestamp: float = field(default_factory=time.time) + recovery_attempted: bool = False + recovery_success: Optional[bool] = None + recovery_action: str = "" + + def to_dict(self) -> Dict[str, Any]: + return { + "error_type": self.error_type, + "severity": self.severity.name, + "description": self.description, + "details": self.details, + "timestamp": self.timestamp, + "recovery_attempted": self.recovery_attempted, + "recovery_success": self.recovery_success, + "recovery_action": self.recovery_action, + } + + +class ErrorRecoverySystem: + """ + Detect, classify, and recover from errors across all robot systems. + + Supported error types: + - Sensor failures (camera down, lidar malfunction) + - Agent crashes (agent stops responding) + - Planning failures (no valid plan found) + - Execution failures (robot cannot reach goal) + - Communication failures (lost connection) + - Hardware failures (motor error, battery low) + """ + + MAX_RECOVERY_ATTEMPTS = 3 + + def __init__(self): + self._error_log: List[ErrorRecord] = [] + self._recovery_attempts: Dict[str, int] = {} + self._recovery_handlers: Dict[str, Callable[[Dict[str, Any]], str]] = { + ErrorType.SENSOR_FAILURE: self._recover_sensor_failure, + ErrorType.AGENT_CRASH: self._recover_agent_crash, + ErrorType.PLANNING_FAILURE: self._recover_planning_failure, + ErrorType.EXECUTION_FAILURE: self._recover_execution_failure, + ErrorType.COMMUNICATION_FAILURE: self._recover_communication_failure, + ErrorType.HARDWARE_FAILURE: self._recover_hardware_failure, + } + + # ------------------------------------------------------------------ # + # Public API + # ------------------------------------------------------------------ # + + def detect_errors(self, agent_states: Dict[str, Any]) -> List[Dict[str, Any]]: + """Scan agent states and return a list of detected error dicts.""" + errors: List[Dict[str, Any]] = [] + now = time.time() + + for agent_name, state in agent_states.items(): + heartbeat = getattr(state, "last_heartbeat", now) + if now - heartbeat > 30: # No heartbeat for 30 s → agent crash + errors.append({ + "type": ErrorType.AGENT_CRASH, + "severity": ErrorSeverity.CRITICAL, + "agent": agent_name, + "description": f"Agent '{agent_name}' has not sent a heartbeat in 30s", + }) + + error_count = getattr(state, "error_count", 0) + if error_count > 10: + errors.append({ + "type": ErrorType.UNKNOWN, + "severity": ErrorSeverity.WARNING, + "agent": agent_name, + "description": f"Agent '{agent_name}' has high error count: {error_count}", + }) + return errors + + def classify_error(self, error: Dict[str, Any]) -> ErrorRecord: + """Classify an error dict and return a structured ErrorRecord.""" + error_type = error.get("type", ErrorType.UNKNOWN) + raw_severity = error.get("severity", ErrorSeverity.ERROR) + if isinstance(raw_severity, int): + severity = ErrorSeverity(raw_severity) + elif isinstance(raw_severity, ErrorSeverity): + severity = raw_severity + else: + severity = ErrorSeverity.ERROR + + return ErrorRecord( + error_type=error_type, + severity=severity, + description=error.get("description", "No description provided"), + details={k: v for k, v in error.items() + if k not in {"type", "severity", "description"}}, + ) + + def execute_recovery(self, error: Dict[str, Any]) -> Dict[str, Any]: + """Execute the appropriate recovery strategy for an error.""" + record = self.classify_error(error) + error_key = f"{record.error_type}:{record.details.get('agent', 'system')}" + + attempts = self._recovery_attempts.get(error_key, 0) + if attempts >= self.MAX_RECOVERY_ATTEMPTS: + record.recovery_attempted = True + record.recovery_success = False + record.recovery_action = "max_recovery_attempts_exceeded" + self._error_log.append(record) + logger.error("Max recovery attempts exceeded for: %s", error_key) + return {**record.to_dict(), "escalation_required": True} + + self._recovery_attempts[error_key] = attempts + 1 + handler = self._recovery_handlers.get(record.error_type, self._recover_unknown) + recovery_action = handler(error) + + record.recovery_attempted = True + record.recovery_success = True + record.recovery_action = recovery_action + self._error_log.append(record) + + logger.info("Recovery executed for %s: %s", error_key, recovery_action) + return record.to_dict() + + @property + def error_log(self) -> List[Dict[str, Any]]: + """Return the full error log as a list of dicts.""" + return [r.to_dict() for r in self._error_log] + + # ------------------------------------------------------------------ # + # Recovery strategies + # ------------------------------------------------------------------ # + + def _recover_sensor_failure(self, error: Dict[str, Any]) -> str: + """Switch to backup sensors or degrade gracefully.""" + sensor = error.get("sensor", "unknown") + logger.warning("Sensor failure detected: %s. Switching to backup sensor.", sensor) + return f"switched_to_backup_sensor:{sensor}" + + def _recover_agent_crash(self, error: Dict[str, Any]) -> str: + """Restart the crashed agent or redistribute its tasks.""" + agent = error.get("agent", "unknown") + logger.warning("Agent crash detected: %s. Attempting restart.", agent) + return f"restart_agent:{agent}" + + def _recover_planning_failure(self, error: Dict[str, Any]) -> str: + """Try an alternative planning method.""" + logger.warning("Planning failure detected. Switching to alternative planner.") + return "alternative_planning_method" + + def _recover_execution_failure(self, error: Dict[str, Any]) -> str: + """Re-plan or find an alternative path.""" + logger.warning("Execution failure detected. Triggering re-planning.") + return "replan_trajectory" + + def _recover_communication_failure(self, error: Dict[str, Any]) -> str: + """Use cached data or enter fail-safe mode.""" + logger.warning("Communication failure detected. Using cached data.") + return "use_cached_data_failsafe" + + def _recover_hardware_failure(self, error: Dict[str, Any]) -> str: + """Execute emergency stop or enter limp mode.""" + logger.warning("Hardware failure detected. Executing emergency stop.") + return "emergency_stop_limp_mode" + + def _recover_unknown(self, error: Dict[str, Any]) -> str: + """Generic recovery for unknown error types.""" + logger.warning("Unknown error type. Applying generic recovery.") + return "generic_recovery" diff --git a/resolver/fallback_planner.py b/resolver/fallback_planner.py new file mode 100644 index 0000000..6f2409c --- /dev/null +++ b/resolver/fallback_planner.py @@ -0,0 +1,204 @@ +""" +Fallback strategy planner for the Resolver Agent. + +Provides a hierarchy of fallback strategies when primary approaches fail, +from trying alternative methods all the way to requesting human intervention +or performing a safe shutdown. +""" + +import logging +import time +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + + +logger = logging.getLogger(__name__) + + +@dataclass +class FallbackPlan: + """A planned fallback strategy.""" + failure_type: str + steps: List[Dict[str, Any]] + depth: int = 0 + requires_human: bool = False + created_at: float = field(default_factory=time.time) + + def to_dict(self) -> Dict[str, Any]: + return { + "failure_type": self.failure_type, + "steps": self.steps, + "depth": self.depth, + "requires_human": self.requires_human, + "created_at": self.created_at, + } + + +@dataclass +class FallbackResult: + """Outcome of executing a fallback plan.""" + plan: FallbackPlan + executed_steps: List[str] + success: bool + final_action: str + executed_at: float = field(default_factory=time.time) + + def to_dict(self) -> Dict[str, Any]: + return { + "failure_type": self.plan.failure_type, + "executed_steps": self.executed_steps, + "success": self.success, + "final_action": self.final_action, + "executed_at": self.executed_at, + } + + +class FallbackPlanner: + """ + Plan and execute fallback strategies when primary approaches fail. + + Fallback hierarchy (depth 0 → 3): + 0. Try primary approach + 1. Try alternative method + 2. Request human help + 3. Safe shutdown + """ + + MAX_FALLBACK_DEPTH = 3 + + # Maps failure types to ordered lists of fallback step descriptions + _FALLBACK_TEMPLATES: Dict[str, List[str]] = { + "navigation": [ + "retry_primary_navigation", + "try_alternative_path", + "try_simple_straight_line", + "try_manual_waypoints", + "request_human_guidance", + "stop_safely", + ], + "manipulation": [ + "retry_primary_grasp", + "try_alternative_grasp_approach", + "try_different_grasp_point", + "request_human_demonstration", + "skip_object", + ], + "perception": [ + "retry_primary_perception", + "switch_to_backup_sensor", + "use_cached_perception_data", + "request_human_visual_confirmation", + "degrade_gracefully", + ], + "planning": [ + "retry_primary_planner", + "try_alternative_planner", + "use_default_safe_plan", + "request_human_plan", + "halt_and_wait", + ], + "communication": [ + "retry_primary_channel", + "switch_to_backup_channel", + "use_cached_commands", + "request_local_mode", + "failsafe_autonomous_mode", + ], + "generic": [ + "retry_operation", + "try_safe_alternative", + "request_human_help", + "safe_shutdown", + ], + } + + def __init__(self): + self._execution_history: List[FallbackResult] = [] + + def plan_fallback(self, failure: Dict[str, Any]) -> FallbackPlan: + """Create a fallback plan for the given failure.""" + failure_type = failure.get("type", "generic") + steps_template = self._FALLBACK_TEMPLATES.get( + failure_type, self._FALLBACK_TEMPLATES["generic"] + ) + + depth = failure.get("depth", 0) + effective_depth = min(depth, self.MAX_FALLBACK_DEPTH) + available_steps = steps_template[effective_depth:] + + steps = [ + {"step": i + 1, "action": action, "description": action.replace("_", " ")} + for i, action in enumerate(available_steps) + ] + + requires_human = any("human" in s["action"] for s in steps) + + plan = FallbackPlan( + failure_type=failure_type, + steps=steps, + depth=effective_depth, + requires_human=requires_human, + ) + + logger.info( + "Fallback plan created for '%s': %d steps (depth=%d)", + failure_type, + len(steps), + effective_depth, + ) + return plan + + def execute_fallback(self, plan: FallbackPlan) -> Dict[str, Any]: + """Execute the first viable step in the fallback plan.""" + executed_steps: List[str] = [] + final_action = "none" + success = False + + for step in plan.steps: + action = step["action"] + executed_steps.append(action) + logger.info("Executing fallback step: %s", action) + + if "human" in action: + # Cannot automatically execute human-in-the-loop steps + final_action = f"awaiting_human_for:{action}" + success = False + break + + if "shutdown" in action or "stop_safely" in action: + final_action = action + success = True + break + + # Simulate execution success for non-human, non-shutdown steps + final_action = action + success = True + break # Execute only the first auto-executable step per call + + result = FallbackResult( + plan=plan, + executed_steps=executed_steps, + success=success, + final_action=final_action, + ) + self._execution_history.append(result) + return result.to_dict() + + # Specific fallback helpers referenced in docstrings + + def fallback_navigation(self, depth: int = 0) -> Dict[str, Any]: + """Execute navigation fallback at the given depth.""" + failure = {"type": "navigation", "depth": depth} + plan = self.plan_fallback(failure) + return self.execute_fallback(plan) + + def fallback_manipulation(self, depth: int = 0) -> Dict[str, Any]: + """Execute manipulation fallback at the given depth.""" + failure = {"type": "manipulation", "depth": depth} + plan = self.plan_fallback(failure) + return self.execute_fallback(plan) + + @property + def execution_history(self) -> List[Dict[str, Any]]: + """Return all previously executed fallback results.""" + return [r.to_dict() for r in self._execution_history] diff --git a/resolver/health_monitor.py b/resolver/health_monitor.py new file mode 100644 index 0000000..7405f0e --- /dev/null +++ b/resolver/health_monitor.py @@ -0,0 +1,224 @@ +""" +Health monitoring module for the Resolver Agent. + +Tracks the health of all registered agents and robot systems, +generates alerts when health degrades, and provides health reports. +""" + +import logging +import time +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Dict, List, Optional + + +logger = logging.getLogger(__name__) + + +class HealthStatus(Enum): + """Possible health states for an agent or the overall system.""" + HEALTHY = "healthy" + DEGRADED = "degraded" + CRITICAL = "critical" + UNKNOWN = "unknown" + + +@dataclass +class AgentHealthRecord: + """Health metrics for a single agent.""" + agent_name: str + status: HealthStatus = HealthStatus.UNKNOWN + response_time: float = 0.0 + cpu_usage: float = 0.0 + memory_usage: float = 0.0 + error_rate: float = 0.0 + uptime: float = 0.0 + last_updated: float = field(default_factory=time.time) + + def to_dict(self) -> Dict[str, Any]: + return { + "agent_name": self.agent_name, + "status": self.status.value, + "response_time": self.response_time, + "cpu_usage": self.cpu_usage, + "memory_usage": self.memory_usage, + "error_rate": self.error_rate, + "uptime": self.uptime, + "last_updated": self.last_updated, + } + + +@dataclass +class Alert: + """A health alert triggered when metrics exceed thresholds.""" + agent_name: str + metric: str + value: float + threshold: float + message: str + timestamp: float = field(default_factory=time.time) + + def to_dict(self) -> Dict[str, Any]: + return { + "agent_name": self.agent_name, + "metric": self.metric, + "value": self.value, + "threshold": self.threshold, + "message": self.message, + "timestamp": self.timestamp, + } + + +class HealthMonitor: + """ + Monitor health of all agents and robot systems. + + Metrics tracked per agent: + - Response time + - CPU / memory usage + - Error rate + - Uptime + - Message queue length (via metadata) + """ + + # Default alert thresholds + RESPONSE_TIME_THRESHOLD = 1.0 # seconds + CPU_THRESHOLD = 80.0 # percent + MEMORY_THRESHOLD = 85.0 # percent + ERROR_RATE_THRESHOLD = 0.1 # 10 % + HEARTBEAT_TIMEOUT = 30.0 # seconds + + def __init__(self): + self._records: Dict[str, AgentHealthRecord] = {} + self._alerts: List[Alert] = [] + self._registration_time: Dict[str, float] = {} + + def register_agent(self, agent_name: str) -> None: + """Register a new agent for health monitoring.""" + self._records[agent_name] = AgentHealthRecord(agent_name=agent_name) + self._registration_time[agent_name] = time.time() + logger.debug("HealthMonitor: registered agent '%s'", agent_name) + + def update_agent_health(self, agent_name: str, state: Any) -> None: + """Update health metrics from an agent's current state.""" + if agent_name not in self._records: + self.register_agent(agent_name) + + record = self._records[agent_name] + record.cpu_usage = getattr(state, "cpu_usage", 0.0) + record.memory_usage = getattr(state, "memory_usage", 0.0) + record.response_time = getattr(state, "response_time", 0.0) + error_count = getattr(state, "error_count", 0) + task_count = getattr(state, "task_count", 1) or 1 + record.error_rate = error_count / task_count + record.uptime = time.time() - self._registration_time.get(agent_name, time.time()) + record.last_updated = time.time() + + # Derive status from metrics + record.status = self._derive_status(record) + + # Check thresholds and raise alerts + self._check_thresholds(record) + + def get_agent_health(self, agent_name: str) -> Dict[str, Any]: + """Return the latest health record for the given agent.""" + record = self._records.get(agent_name) + if record is None: + return {"agent_name": agent_name, "status": HealthStatus.UNKNOWN.value} + return record.to_dict() + + def monitor_system_health(self) -> Dict[str, Any]: + """Return health records for all monitored agents.""" + return {name: record.to_dict() for name, record in self._records.items()} + + def compute_overall_status(self, agent_health: Dict[str, Any]) -> str: + """Compute the overall system status from individual agent statuses.""" + statuses = [info.get("status", HealthStatus.UNKNOWN.value) + for info in agent_health.values()] + if HealthStatus.CRITICAL.value in statuses: + return HealthStatus.CRITICAL.value + if HealthStatus.DEGRADED.value in statuses: + return HealthStatus.DEGRADED.value + if all(s == HealthStatus.HEALTHY.value for s in statuses) and statuses: + return HealthStatus.HEALTHY.value + return HealthStatus.UNKNOWN.value + + def generate_health_report(self) -> Dict[str, Any]: + """Generate a comprehensive health report.""" + agent_health = self.monitor_system_health() + return { + "overall_status": self.compute_overall_status(agent_health), + "agents": agent_health, + "alerts": self.get_active_alerts(), + "total_agents": len(self._records), + "generated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + } + + def get_active_alerts(self) -> List[Dict[str, Any]]: + """Return all currently active (un-cleared) alerts.""" + cutoff = time.time() - 300 # keep alerts for 5 minutes + self._alerts = [a for a in self._alerts if a.timestamp > cutoff] + return [a.to_dict() for a in self._alerts] + + def predict_failures(self) -> List[Dict[str, Any]]: + """Predict potential failures based on current trends.""" + predictions: List[Dict[str, Any]] = [] + for name, record in self._records.items(): + if record.error_rate > 0.05: + predictions.append({ + "agent": name, + "risk": "high_error_rate", + "current_rate": record.error_rate, + "prediction": "possible_agent_failure", + }) + if record.response_time > 0.8: + predictions.append({ + "agent": name, + "risk": "slow_response", + "current_time": record.response_time, + "prediction": "possible_timeout", + }) + return predictions + + # ------------------------------------------------------------------ # + # Internal helpers + # ------------------------------------------------------------------ # + + def _derive_status(self, record: AgentHealthRecord) -> HealthStatus: + """Determine HealthStatus from a record's metric values.""" + if (record.cpu_usage > self.CPU_THRESHOLD + or record.memory_usage > self.MEMORY_THRESHOLD + or record.error_rate > self.ERROR_RATE_THRESHOLD * 2): + return HealthStatus.CRITICAL + if (record.response_time > self.RESPONSE_TIME_THRESHOLD + or record.error_rate > self.ERROR_RATE_THRESHOLD): + return HealthStatus.DEGRADED + return HealthStatus.HEALTHY + + def _check_thresholds(self, record: AgentHealthRecord) -> None: + """Create alerts when metrics exceed defined thresholds.""" + checks = [ + ("response_time", record.response_time, self.RESPONSE_TIME_THRESHOLD, + f"Agent '{record.agent_name}' response time {record.response_time:.2f}s " + f"exceeds threshold {self.RESPONSE_TIME_THRESHOLD}s"), + ("cpu_usage", record.cpu_usage, self.CPU_THRESHOLD, + f"Agent '{record.agent_name}' CPU usage {record.cpu_usage:.1f}% " + f"exceeds threshold {self.CPU_THRESHOLD}%"), + ("memory_usage", record.memory_usage, self.MEMORY_THRESHOLD, + f"Agent '{record.agent_name}' memory usage {record.memory_usage:.1f}% " + f"exceeds threshold {self.MEMORY_THRESHOLD}%"), + ("error_rate", record.error_rate, self.ERROR_RATE_THRESHOLD, + f"Agent '{record.agent_name}' error rate {record.error_rate:.2%} " + f"exceeds threshold {self.ERROR_RATE_THRESHOLD:.0%}"), + ] + for metric, value, threshold, message in checks: + if value > threshold: + alert = Alert( + agent_name=record.agent_name, + metric=metric, + value=value, + threshold=threshold, + message=message, + ) + self._alerts.append(alert) + logger.warning("ALERT: %s", message) diff --git a/resolver/predictor.py b/resolver/predictor.py new file mode 100644 index 0000000..1bb3748 --- /dev/null +++ b/resolver/predictor.py @@ -0,0 +1,211 @@ +""" +ML-based conflict predictor for the Resolver Agent. + +Uses a lightweight PyTorch neural network to predict conflict probabilities +from agent state feature vectors. Falls back gracefully when PyTorch is +unavailable so the rest of the system continues to operate. +""" + +import logging +import time +from typing import Any, Dict, List, Optional + + +logger = logging.getLogger(__name__) + +# Try importing PyTorch; fail gracefully if it is not installed +try: + import torch + import torch.nn as nn + + _TORCH_AVAILABLE = True +except ImportError: # pragma: no cover + _TORCH_AVAILABLE = False + logger.warning("PyTorch not available; ML conflict prediction will use heuristics only") + + +# --------------------------------------------------------------------------- +# Neural network model definition +# --------------------------------------------------------------------------- + +if _TORCH_AVAILABLE: + + class _ConflictPredictorNet(nn.Module): + """Simple feed-forward network for binary conflict prediction.""" + + def __init__(self, input_dim: int = 20, hidden_dim: int = 64): + super().__init__() + self.network = nn.Sequential( + nn.Linear(input_dim, hidden_dim), + nn.ReLU(), + nn.Dropout(0.2), + nn.Linear(hidden_dim, hidden_dim // 2), + nn.ReLU(), + nn.Linear(hidden_dim // 2, 1), + nn.Sigmoid(), + ) + + def forward(self, x: "torch.Tensor") -> "torch.Tensor": # noqa: F821 + return self.network(x) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +class ConflictPredictor: + """ + Predict conflicts before they happen. + + If PyTorch is available, a neural network model is trained on historical + data. Otherwise, a simple heuristic-based fallback is used. + """ + + INPUT_DIM = 20 # feature vector length per prediction call + HIDDEN_DIM = 64 + + def __init__(self): + self._model: Optional[Any] = None + self._trained = False + self._history: List[Dict[str, Any]] = [] + + if _TORCH_AVAILABLE: + self._model = _ConflictPredictorNet(self.INPUT_DIM, self.HIDDEN_DIM) + logger.info("ConflictPredictor: PyTorch model initialised") + + # ------------------------------------------------------------------ # + # Training + # ------------------------------------------------------------------ # + + def train_predictor(self, historical_conflicts: List[Dict[str, Any]]) -> Dict[str, Any]: + """Train the predictor on a list of historical conflict records.""" + if not _TORCH_AVAILABLE or not historical_conflicts: + logger.warning("Training skipped: PyTorch unavailable or no data provided") + return {"trained": False, "reason": "no_data_or_no_torch"} + + import torch # pylint: disable=import-outside-toplevel + import torch.nn as nn # pylint: disable=import-outside-toplevel + + features, labels = self._extract_features(historical_conflicts) + X = torch.tensor(features, dtype=torch.float32) + y = torch.tensor(labels, dtype=torch.float32).unsqueeze(1) + + optimizer = torch.optim.Adam(self._model.parameters(), lr=1e-3) + criterion = nn.BCELoss() + + epochs = 50 + for _ in range(epochs): + self._model.train() + optimizer.zero_grad() + output = self._model(X) + loss = criterion(output, y) + loss.backward() + optimizer.step() + + self._trained = True + logger.info("ConflictPredictor: model trained on %d records", len(historical_conflicts)) + return {"trained": True, "samples": len(historical_conflicts), "epochs": epochs} + + # ------------------------------------------------------------------ # + # Inference + # ------------------------------------------------------------------ # + + def predict_conflict_probability( + self, agent_states: Dict[str, Any] + ) -> Dict[str, Any]: + """Predict the probability of a conflict given current agent states.""" + feature_vector = self._states_to_features(agent_states) + + if _TORCH_AVAILABLE and self._trained and self._model is not None: + import torch # pylint: disable=import-outside-toplevel + + self._model.eval() + with torch.no_grad(): + x = torch.tensor(feature_vector, dtype=torch.float32).unsqueeze(0) + probability = float(self._model(x).item()) + else: + # Heuristic fallback: high error counts or resource contention raise probability + probability = self._heuristic_probability(agent_states) + + result = { + "probability": probability, + "threshold": 0.5, + "likely_conflict": probability >= 0.5, + "timestamp": time.time(), + } + self._history.append(result) + return result + + def predict_best_resolution(self, conflict: Dict[str, Any]) -> Dict[str, Any]: + """Use the model to suggest the best resolution (heuristic if no model).""" + agents = conflict.get("agents", []) + from resolver.arbitrator import AgentArbitrator # pylint: disable=import-outside-toplevel + priorities = AgentArbitrator.DEFAULT_PRIORITIES + if not agents: + return {"winning_agent": None, "action": "no_resolution"} + winner = max(agents, key=lambda a: priorities.get(a, 0)) + return { + "winning_agent": winner, + "action": f"grant_to_{winner}", + "confidence": 0.75, + } + + def suggest_preventive_actions( + self, agent_states: Dict[str, Any] + ) -> List[str]: + """Suggest actions to prevent predicted conflicts.""" + prediction = self.predict_conflict_probability(agent_states) + actions: List[str] = [] + if prediction["likely_conflict"]: + actions.append("pre_allocate_contested_resources") + actions.append("notify_agents_of_potential_conflict") + actions.append("increase_monitoring_frequency") + return actions + + # ------------------------------------------------------------------ # + # Internal helpers + # ------------------------------------------------------------------ # + + def _states_to_features(self, agent_states: Dict[str, Any]) -> List[float]: + """Convert agent states to a fixed-length feature vector.""" + features: List[float] = [] + ordered_agents = ["perception", "planning", "control", "communication", "coordination"] + for agent_name in ordered_agents: + state = agent_states.get(agent_name) + if state is not None: + features.extend([ + float(getattr(state, "cpu_usage", 0.0)) / 100.0, + float(getattr(state, "memory_usage", 0.0)) / 100.0, + float(getattr(state, "error_count", 0)) / 100.0, + float(getattr(state, "response_time", 0.0)), + ]) + else: + features.extend([0.0, 0.0, 0.0, 0.0]) + # Pad / truncate to INPUT_DIM + features = (features + [0.0] * self.INPUT_DIM)[: self.INPUT_DIM] + return features + + def _extract_features( + self, records: List[Dict[str, Any]] + ): + """Extract (feature_matrix, label_vector) from historical records.""" + features = [] + labels = [] + for record in records: + # Feature: encode conflict severity and agent count + severity = float(record.get("severity", 1)) + agent_count = float(len(record.get("agents", []))) + vec = [severity / 5.0, agent_count / 6.0] + [0.0] * (self.INPUT_DIM - 2) + features.append(vec[: self.INPUT_DIM]) + labels.append(1.0 if record.get("conflict_occurred", True) else 0.0) + return features, labels + + def _heuristic_probability(self, agent_states: Dict[str, Any]) -> float: + """Simple heuristic fallback for conflict probability estimation.""" + score = 0.0 + for state in agent_states.values(): + if getattr(state, "error_count", 0) > 5: + score += 0.2 + resources = getattr(state, "metadata", {}).get("requested_resources", []) + score += len(resources) * 0.05 + return min(score, 0.99) diff --git a/ros2_interface/__init__.py b/ros2_interface/__init__.py new file mode 100644 index 0000000..3591816 --- /dev/null +++ b/ros2_interface/__init__.py @@ -0,0 +1,3 @@ +""" +ROS2 interface package for the Resolver Agent. +""" diff --git a/ros2_interface/launch/resolver.launch.py b/ros2_interface/launch/resolver.launch.py new file mode 100644 index 0000000..4203461 --- /dev/null +++ b/ros2_interface/launch/resolver.launch.py @@ -0,0 +1,28 @@ +""" +ROS2 launch file for the Resolver Agent node. +""" + +from launch import LaunchDescription +from launch_ros.actions import Node + + +def generate_launch_description() -> LaunchDescription: + resolver_node = Node( + package="rag7", + executable="resolver_node", + name="resolver_agent_node", + output="screen", + parameters=[ + {"monitoring_rate": 10.0}, + {"health_check_rate": 1.0}, + ], + remappings=[ + ("/agents/perception/status", "/agents/perception/status"), + ("/agents/planning/status", "/agents/planning/status"), + ("/agents/control/status", "/agents/control/status"), + ("/agents/communication/status", "/agents/communication/status"), + ("/agents/coordination/status", "/agents/coordination/status"), + ], + ) + + return LaunchDescription([resolver_node]) diff --git a/ros2_interface/resolver_node.py b/ros2_interface/resolver_node.py new file mode 100644 index 0000000..36e07e8 --- /dev/null +++ b/ros2_interface/resolver_node.py @@ -0,0 +1,268 @@ +""" +ROS2 Resolver Node for the Resolver Agent. + +This node provides the ROS2 interface for the Resolver Agent, subscribing to +agent status topics, handling error reports, and publishing resolutions and +health data. + +Note: This module requires ROS2 (rclpy) to run. When rclpy is not available, +the module defines stub classes to allow import and testing without ROS2. +""" + +import logging +import time +from typing import Any, Dict + +try: + import rclpy + from rclpy.node import Node + from std_msgs.msg import String + + _ROS2_AVAILABLE = True +except ImportError: + _ROS2_AVAILABLE = False + + # --------------------------------------------------------------------------- + # Minimal stubs so the module can be imported without a ROS2 installation + # --------------------------------------------------------------------------- + + class Node: # type: ignore[no-redef] + """Stub ROS2 Node for non-ROS environments.""" + + def __init__(self, node_name: str, **kwargs): + self._node_name = node_name + self._logger = logging.getLogger(f"ros2.{node_name}") + + def get_logger(self): + return self._logger + + def create_subscription(self, *args, **kwargs): + return None + + def create_publisher(self, *args, **kwargs): + return _StubPublisher() + + def create_service(self, *args, **kwargs): + return None + + def create_timer(self, *args, **kwargs): + return None + + class _StubPublisher: + def publish(self, msg): + pass + + class String: # type: ignore[no-redef] + def __init__(self): + self.data = "" + + +logger = logging.getLogger(__name__) + +from agents.resolver_agent import ResolverAgent # noqa: E402 + + +class ResolverNode(Node): + """ + ROS2 node for the Resolver Agent. + + Topics subscribed: + /agents/perception/status + /agents/planning/status + /agents/control/status + /agents/communication/status + /agents/coordination/status + /system/errors + /system/conflicts + + Topics published: + /resolver/resolutions + /resolver/health_status + /resolver/alerts + /resolver/recovery_actions + + Services (when ROS2 is available): + /resolver/resolve_conflict + /resolver/recover_error + /resolver/get_health_status + /resolver/arbitrate_request + + Timer: + health_check_timer – 1 Hz + """ + + HEALTH_CHECK_RATE = 1.0 # Hz + + def __init__(self): + super().__init__("resolver_agent_node") + self.resolver = ResolverAgent() + + self._setup_subscriptions() + self._setup_publishers() + self._setup_timer() + + self.resolver.start() + self.get_logger().info("ResolverNode initialized") # type: ignore[attr-defined] + + # ------------------------------------------------------------------ # + # Setup + # ------------------------------------------------------------------ # + + def _setup_subscriptions(self) -> None: + """Subscribe to all agent status and system error topics.""" + agent_topics = [ + "/agents/perception/status", + "/agents/planning/status", + "/agents/control/status", + "/agents/communication/status", + "/agents/coordination/status", + ] + for topic in agent_topics: + self.create_subscription( # type: ignore[attr-defined] + String, topic, self.handle_agent_status, 10 + ) + + self.create_subscription( # type: ignore[attr-defined] + String, "/system/errors", self.handle_error_report, 10 + ) + self.create_subscription( # type: ignore[attr-defined] + String, "/system/conflicts", self.handle_conflict_report, 10 + ) + + def _setup_publishers(self) -> None: + """Create publishers for resolver outputs.""" + self._pub_resolutions = self.create_publisher( # type: ignore[attr-defined] + String, "/resolver/resolutions", 10 + ) + self._pub_health = self.create_publisher( # type: ignore[attr-defined] + String, "/resolver/health_status", 10 + ) + self._pub_alerts = self.create_publisher( # type: ignore[attr-defined] + String, "/resolver/alerts", 10 + ) + self._pub_recovery = self.create_publisher( # type: ignore[attr-defined] + String, "/resolver/recovery_actions", 10 + ) + + def _setup_timer(self) -> None: + """Create the periodic health check timer (1 Hz).""" + self.create_timer( # type: ignore[attr-defined] + 1.0 / self.HEALTH_CHECK_RATE, self.health_check_timer + ) + + # ------------------------------------------------------------------ # + # Callbacks + # ------------------------------------------------------------------ # + + def handle_agent_status(self, msg: Any) -> None: + """Process agent status updates from subscribed topics.""" + import json # pylint: disable=import-outside-toplevel + + try: + data: Dict[str, Any] = json.loads(msg.data) + agent_name = data.get("agent", "unknown") + logger.debug("Received status from agent '%s'", agent_name) + # Forward to health monitor via resolver + self.resolver.health_monitor.update_agent_health( + agent_name, _DictState(data) + ) + except Exception as exc: # pylint: disable=broad-except + logger.error("Failed to process agent status: %s", exc) + + def handle_error_report(self, msg: Any) -> None: + """Handle error reports from /system/errors.""" + import json # pylint: disable=import-outside-toplevel + + try: + error: Dict[str, Any] = json.loads(msg.data) + result = self.resolver.recover_from_error(error) + self.publish_recovery_action(result) + except Exception as exc: # pylint: disable=broad-except + logger.error("Failed to handle error report: %s", exc) + + def handle_conflict_report(self, msg: Any) -> None: + """Handle conflict reports from /system/conflicts.""" + import json # pylint: disable=import-outside-toplevel + + try: + conflict: Dict[str, Any] = json.loads(msg.data) + resolution = self.resolver.resolve_conflict(conflict) + self.publish_resolution(resolution) + except Exception as exc: # pylint: disable=broad-except + logger.error("Failed to handle conflict report: %s", exc) + + def health_check_timer(self) -> None: + """Periodic health check callback (1 Hz).""" + import json # pylint: disable=import-outside-toplevel + + health = self.resolver.assess_system_health() + msg = String() + msg.data = json.dumps(health) + self._pub_health.publish(msg) + + for alert in health.get("alerts", []): + alert_msg = String() + alert_msg.data = json.dumps(alert) + self._pub_alerts.publish(alert_msg) + + # ------------------------------------------------------------------ # + # Publishers + # ------------------------------------------------------------------ # + + def publish_resolution(self, resolution: Dict[str, Any]) -> None: + """Publish a conflict resolution to /resolver/resolutions.""" + import json # pylint: disable=import-outside-toplevel + + msg = String() + msg.data = json.dumps(resolution) + self._pub_resolutions.publish(msg) + + def publish_recovery_action(self, action: Dict[str, Any]) -> None: + """Publish a recovery action to /resolver/recovery_actions.""" + import json # pylint: disable=import-outside-toplevel + + msg = String() + msg.data = json.dumps(action) + self._pub_recovery.publish(msg) + + +# --------------------------------------------------------------------------- +# Helper: thin wrapper to expose a dict as an object with attribute access +# --------------------------------------------------------------------------- + +class _DictState: + """Adapts a plain dict to the attribute-access API expected by HealthMonitor.""" + + def __init__(self, data: Dict[str, Any]): + self._data = data + + def __getattr__(self, name: str) -> Any: + if name.startswith("_"): + raise AttributeError(name) + return self._data.get(name, None) + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +def main() -> None: + """Launch the ROS2 ResolverNode.""" + if not _ROS2_AVAILABLE: + logger.error("rclpy is not installed; cannot launch ROS2 node") + return + + rclpy.init() + node = ResolverNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.resolver.stop() + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..10de823 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,3 @@ +""" +Tests package init. +""" diff --git a/tests/test_arbitrator.py b/tests/test_arbitrator.py new file mode 100644 index 0000000..297190d --- /dev/null +++ b/tests/test_arbitrator.py @@ -0,0 +1,129 @@ +""" +Tests for the agent arbitrator. +""" + +import sys +import os +import unittest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from resolver.arbitrator import AgentArbitrator, ArbitrationStrategy + + +class TestAgentArbitrator(unittest.TestCase): + def setUp(self): + self.arbitrator = AgentArbitrator() + + def _make_requests(self, agents): + return [{"agent": a, "resource": "GPU", "priority": 0} for a in agents] + + # ------------------------------------------------------------------ # + # Control arbitration + # ------------------------------------------------------------------ # + + def test_arbitrate_control_no_requests(self): + result = self.arbitrator.arbitrate_control_request([]) + self.assertIsNone(result["winner"]) + + def test_arbitrate_control_priority_queue(self): + requests = [ + {"agent": "planning", "priority": 0}, + {"agent": "perception", "priority": 0}, + ] + result = self.arbitrator.arbitrate_control_request(requests) + # perception has higher default priority (5 vs 4) + self.assertEqual(result["winner"], "perception") + + def test_arbitrate_control_emergency_override(self): + requests = [ + {"agent": "planning", "priority": 0, "emergency": True}, + {"agent": "coordination", "priority": 0, "emergency": True}, + ] + result = self.arbitrator.arbitrate_control_request(requests) + self.assertEqual(result["strategy"], ArbitrationStrategy.EMERGENCY_OVERRIDE.value) + self.assertEqual(result["winner"], "planning") # planning > coordination + + # ------------------------------------------------------------------ # + # Resource allocation + # ------------------------------------------------------------------ # + + def test_arbitrate_resource_allocation_empty(self): + result = self.arbitrator.arbitrate_resource_allocation([]) + self.assertIsNone(result["winner"]) + self.assertEqual(result["allocation"], {}) + + def test_arbitrate_resource_allocation_assigns_winner(self): + requests = [ + {"agent": "perception", "resource": "GPU"}, + {"agent": "planning", "resource": "GPU"}, + ] + result = self.arbitrator.arbitrate_resource_allocation(requests) + self.assertEqual(result["allocation"]["GPU"], "perception") + + def test_arbitrate_resource_multiple_resources(self): + requests = [ + {"agent": "perception", "resource": "GPU"}, + {"agent": "planning", "resource": "GPU"}, + {"agent": "control", "resource": "CPU"}, + {"agent": "coordination", "resource": "CPU"}, + ] + result = self.arbitrator.arbitrate_resource_allocation(requests) + self.assertEqual(result["allocation"]["GPU"], "perception") + self.assertEqual(result["allocation"]["CPU"], "control") + + # ------------------------------------------------------------------ # + # Task priority + # ------------------------------------------------------------------ # + + def test_arbitrate_task_priority_empty(self): + result = self.arbitrator.arbitrate_task_priority([]) + self.assertIsNone(result["winner"]) + + def test_arbitrate_task_priority_orders_tasks(self): + tasks = [ + {"id": "t1", "agent": "coordination", "priority": 1}, + {"id": "t2", "agent": "perception", "priority": 1}, + {"id": "t3", "agent": "planning", "priority": 2}, + ] + result = self.arbitrator.arbitrate_task_priority(tasks) + ordered = result["allocation"]["ordered_tasks"] + # t3 has higher priority (2) so it should come first + self.assertEqual(ordered[0], "t3") + + # ------------------------------------------------------------------ # + # Context-aware arbitration + # ------------------------------------------------------------------ # + + def test_arbitrate_with_context_emergency(self): + requests = [ + {"agent": "coordination", "priority": 0}, + {"agent": "control", "priority": 0}, + ] + result = self.arbitrator.arbitrate_with_context(requests, {"emergency": True}) + self.assertEqual(result["strategy"], ArbitrationStrategy.EMERGENCY_OVERRIDE.value) + + def test_arbitrate_with_context_normal(self): + requests = [ + {"agent": "coordination", "priority": 0}, + {"agent": "perception", "priority": 0}, + ] + result = self.arbitrator.arbitrate_with_context(requests, {"emergency": False}) + # Normal priority-based + self.assertEqual(result["winner"], "perception") + + # ------------------------------------------------------------------ # + # Default priorities + # ------------------------------------------------------------------ # + + def test_default_priorities_resolver_highest(self): + priorities = AgentArbitrator.DEFAULT_PRIORITIES + self.assertEqual(priorities["resolver"], max(priorities.values())) + + def test_default_priorities_coordination_lowest(self): + priorities = AgentArbitrator.DEFAULT_PRIORITIES + self.assertEqual(priorities["coordination"], min(priorities.values())) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_conflict_resolution.py b/tests/test_conflict_resolution.py new file mode 100644 index 0000000..242c6de --- /dev/null +++ b/tests/test_conflict_resolution.py @@ -0,0 +1,190 @@ +""" +Tests for the conflict resolution module. +""" + +import sys +import os +import time +import unittest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from resolver.conflict_resolution import ( + Conflict, + ConflictDetector, + ConflictPredictor, + ConflictResolver, + ConflictType, + Resolution, + ResolutionStrategy, +) +from agents.base_agent import AgentState, AgentStatus + + +class TestConflictDetector(unittest.TestCase): + def setUp(self): + self.detector = ConflictDetector() + + def test_no_conflicts_empty_states(self): + result = self.detector.detect_all({}) + self.assertEqual(result, []) + + def test_control_conflict_detected(self): + state_a = AgentState(name="planning") + state_a.metadata["requesting_control"] = True + state_b = AgentState(name="control") + state_b.metadata["requesting_control"] = True + conflicts = self.detector.detect_control_conflict( + {"planning": state_a, "control": state_b} + ) + self.assertEqual(len(conflicts), 1) + self.assertEqual(conflicts[0]["type"], ConflictType.CONTROL.value) + self.assertIn("planning", conflicts[0]["agents"]) + self.assertIn("control", conflicts[0]["agents"]) + + def test_task_conflict_detected(self): + state_a = AgentState(name="coordination") + state_a.metadata["assigned_tasks"] = ["task_1"] + state_b = AgentState(name="planning") + state_b.metadata["assigned_tasks"] = ["task_1"] + conflicts = self.detector.detect_task_conflict( + {"coordination": state_a, "planning": state_b} + ) + self.assertEqual(len(conflicts), 1) + self.assertEqual(conflicts[0]["type"], ConflictType.TASK.value) + + def test_resource_conflict_detected(self): + state_a = AgentState(name="perception") + state_a.metadata["requested_resources"] = ["GPU"] + state_b = AgentState(name="planning") + state_b.metadata["requested_resources"] = ["GPU"] + conflicts = self.detector.detect_resource_conflict( + {"perception": state_a, "planning": state_b} + ) + self.assertEqual(len(conflicts), 1) + self.assertEqual(conflicts[0]["details"]["resource"], "GPU") + + def test_perception_planning_conflict_no_disagreement(self): + state_p = AgentState(name="perception") + state_p.metadata["detected_objects"] = ["chair"] + state_pl = AgentState(name="planning") + state_pl.metadata["known_objects"] = ["chair"] + conflicts = self.detector.detect_perception_planning_conflict( + {"perception": state_p, "planning": state_pl} + ) + self.assertEqual(conflicts, []) + + def test_perception_planning_conflict_with_disagreement(self): + state_p = AgentState(name="perception") + state_p.metadata["detected_objects"] = ["chair", "table"] + state_pl = AgentState(name="planning") + state_pl.metadata["known_objects"] = ["chair"] + conflicts = self.detector.detect_perception_planning_conflict( + {"perception": state_p, "planning": state_pl} + ) + self.assertEqual(len(conflicts), 1) + + +class TestConflictResolver(unittest.TestCase): + def setUp(self): + self.resolver = ConflictResolver() + + def _make_conflict(self, agents=None): + return { + "type": ConflictType.CONTROL.value, + "agents": agents or ["planning", "control"], + "description": "Test conflict", + } + + def test_resolve_by_priority(self): + conflict = self._make_conflict(["planning", "control"]) + result = self.resolver.resolve_by_priority(conflict) + self.assertEqual(result.winning_agent, "planning") # priority 4 > 3 + + def test_resolve_by_priority_returns_resolution(self): + conflict = self._make_conflict() + result = self.resolver.resolve(conflict, ResolutionStrategy.PRIORITY_BASED) + self.assertIn("winning_agent", result) + self.assertIn("strategy", result) + + def test_resolve_by_voting(self): + conflict = self._make_conflict(["perception", "planning"]) + result = self.resolver.resolve_by_voting(conflict) + self.assertEqual(result.winning_agent, "perception") # priority 5 > 4 + + def test_resolve_by_expertise_control_conflict(self): + conflict = self._make_conflict() + result = self.resolver.resolve_by_expertise(conflict) + self.assertEqual(result.winning_agent, "control") + + def test_resolve_by_cost_no_options(self): + conflict = self._make_conflict() + result = self.resolver.resolve_by_cost(conflict) + # Falls back to priority-based + self.assertIsInstance(result.winning_agent, str) + + def test_resolve_by_cost_with_options(self): + conflict = { + "type": ConflictType.RESOURCE.value, + "agents": ["planning", "control"], + "details": { + "options": [ + {"id": "opt_a", "agent": "planning", "cost": 10}, + {"id": "opt_b", "agent": "control", "cost": 5}, + ] + }, + } + result = self.resolver.resolve_by_cost(conflict) + self.assertEqual(result.winning_agent, "control") + + def test_resolution_history_grows(self): + conflict = self._make_conflict() + self.resolver.resolve(conflict) + self.resolver.resolve(conflict) + self.assertEqual(len(self.resolver.resolution_history), 2) + + def test_resolve_no_agents(self): + conflict = {"type": "control", "agents": []} + result = self.resolver.resolve_by_priority(conflict) + self.assertFalse(result.success) + + +class TestConflict(unittest.TestCase): + def test_to_dict(self): + c = Conflict( + conflict_type=ConflictType.TASK, + agents=["a", "b"], + description="test", + ) + d = c.to_dict() + self.assertEqual(d["type"], ConflictType.TASK.value) + self.assertEqual(d["agents"], ["a", "b"]) + self.assertIn("timestamp", d) + + +class TestConflictPredictor(unittest.TestCase): + def setUp(self): + self.predictor = ConflictPredictor() + + def test_predict_no_conflicts(self): + predictions = self.predictor.predict({}) + self.assertEqual(predictions, []) + + def test_predict_resource_contention(self): + state_a = AgentState(name="perception") + state_a.metadata["requested_resources"] = ["GPU"] + state_b = AgentState(name="planning") + state_b.metadata["requested_resources"] = ["GPU"] + predictions = self.predictor.predict( + {"perception": state_a, "planning": state_b} + ) + self.assertEqual(len(predictions), 1) + self.assertGreater(predictions[0]["probability"], 0.5) + + def test_suggest_preventive_actions_empty(self): + actions = self.predictor.suggest_preventive_actions({}) + self.assertEqual(actions, []) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_error_recovery.py b/tests/test_error_recovery.py new file mode 100644 index 0000000..ad0e03c --- /dev/null +++ b/tests/test_error_recovery.py @@ -0,0 +1,109 @@ +""" +Tests for the error recovery system. +""" + +import sys +import os +import unittest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from resolver.error_recovery import ErrorRecoverySystem, ErrorSeverity, ErrorType, ErrorRecord +from agents.base_agent import AgentState + + +class TestErrorSeverity(unittest.TestCase): + def test_ordering(self): + self.assertLess(ErrorSeverity.INFO, ErrorSeverity.WARNING) + self.assertLess(ErrorSeverity.WARNING, ErrorSeverity.ERROR) + self.assertLess(ErrorSeverity.ERROR, ErrorSeverity.CRITICAL) + self.assertLess(ErrorSeverity.CRITICAL, ErrorSeverity.FATAL) + + +class TestErrorRecoverySystem(unittest.TestCase): + def setUp(self): + self.system = ErrorRecoverySystem() + + def test_classify_sensor_failure(self): + error = { + "type": ErrorType.SENSOR_FAILURE, + "severity": ErrorSeverity.ERROR, + "sensor": "camera", + "description": "Camera offline", + } + record = self.system.classify_error(error) + self.assertIsInstance(record, ErrorRecord) + self.assertEqual(record.error_type, ErrorType.SENSOR_FAILURE) + self.assertEqual(record.severity, ErrorSeverity.ERROR) + + def test_classify_default_severity(self): + record = self.system.classify_error({"type": "sensor_failure"}) + self.assertEqual(record.severity, ErrorSeverity.ERROR) + + def test_execute_recovery_sensor(self): + error = {"type": ErrorType.SENSOR_FAILURE, "sensor": "lidar"} + result = self.system.execute_recovery(error) + self.assertIn("recovery_action", result) + self.assertIn("lidar", result["recovery_action"]) + self.assertTrue(result["recovery_success"]) + + def test_execute_recovery_agent_crash(self): + error = {"type": ErrorType.AGENT_CRASH, "agent": "planning"} + result = self.system.execute_recovery(error) + self.assertIn("planning", result["recovery_action"]) + + def test_execute_recovery_planning_failure(self): + error = {"type": ErrorType.PLANNING_FAILURE} + result = self.system.execute_recovery(error) + self.assertTrue(result["recovery_success"]) + + def test_execute_recovery_execution_failure(self): + error = {"type": ErrorType.EXECUTION_FAILURE} + result = self.system.execute_recovery(error) + self.assertIn("replan", result["recovery_action"]) + + def test_execute_recovery_communication_failure(self): + error = {"type": ErrorType.COMMUNICATION_FAILURE} + result = self.system.execute_recovery(error) + self.assertIn("cached", result["recovery_action"]) + + def test_execute_recovery_hardware_failure(self): + error = {"type": ErrorType.HARDWARE_FAILURE} + result = self.system.execute_recovery(error) + self.assertIn("emergency", result["recovery_action"]) + + def test_execute_recovery_unknown_type(self): + error = {"type": "totally_unknown"} + result = self.system.execute_recovery(error) + self.assertTrue(result["recovery_success"]) + + def test_max_recovery_attempts(self): + error = {"type": ErrorType.AGENT_CRASH, "agent": "perception"} + for _ in range(ErrorRecoverySystem.MAX_RECOVERY_ATTEMPTS): + self.system.execute_recovery(error) + # One more should trigger escalation + result = self.system.execute_recovery(error) + self.assertFalse(result["recovery_success"]) + self.assertTrue(result.get("escalation_required")) + + def test_error_log_grows(self): + self.system.execute_recovery({"type": ErrorType.SENSOR_FAILURE}) + self.system.execute_recovery({"type": ErrorType.PLANNING_FAILURE}) + self.assertEqual(len(self.system.error_log), 2) + + def test_detect_errors_heartbeat_timeout(self): + import time + state = AgentState(name="planning") + state.last_heartbeat = time.time() - 60 # 60 seconds ago + errors = self.system.detect_errors({"planning": state}) + self.assertTrue(any(e["type"] == ErrorType.AGENT_CRASH for e in errors)) + + def test_detect_errors_high_error_count(self): + state = AgentState(name="control") + state.error_count = 15 + errors = self.system.detect_errors({"control": state}) + self.assertTrue(len(errors) > 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_health_monitor.py b/tests/test_health_monitor.py new file mode 100644 index 0000000..ebe6e22 --- /dev/null +++ b/tests/test_health_monitor.py @@ -0,0 +1,132 @@ +""" +Tests for the health monitoring module. +""" + +import sys +import os +import time +import unittest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from resolver.health_monitor import AgentHealthRecord, HealthMonitor, HealthStatus +from agents.base_agent import AgentState + + +class TestHealthMonitor(unittest.TestCase): + def setUp(self): + self.monitor = HealthMonitor() + + def _make_state(self, **kwargs) -> AgentState: + state = AgentState(name=kwargs.get("name", "test_agent")) + for k, v in kwargs.items(): + if k != "name": + setattr(state, k, v) + return state + + def test_register_agent(self): + self.monitor.register_agent("perception") + self.assertIn("perception", self.monitor._records) + + def test_update_agent_health_auto_registers(self): + state = self._make_state(name="planning", cpu_usage=10.0) + self.monitor.update_agent_health("planning", state) + self.assertIn("planning", self.monitor._records) + + def test_healthy_agent_status(self): + state = self._make_state( + name="control", + cpu_usage=20.0, + memory_usage=30.0, + response_time=0.1, + error_count=0, + task_count=10, + ) + self.monitor.update_agent_health("control", state) + health = self.monitor.get_agent_health("control") + self.assertEqual(health["status"], HealthStatus.HEALTHY.value) + + def test_degraded_agent_status_slow_response(self): + state = self._make_state( + name="control", + response_time=2.0, + error_count=0, + task_count=1, + ) + self.monitor.update_agent_health("control", state) + health = self.monitor.get_agent_health("control") + self.assertEqual(health["status"], HealthStatus.DEGRADED.value) + + def test_critical_agent_status_high_cpu(self): + state = self._make_state(name="control", cpu_usage=95.0) + self.monitor.update_agent_health("control", state) + health = self.monitor.get_agent_health("control") + self.assertEqual(health["status"], HealthStatus.CRITICAL.value) + + def test_unknown_agent_not_registered(self): + health = self.monitor.get_agent_health("nonexistent") + self.assertEqual(health["status"], HealthStatus.UNKNOWN.value) + + def test_compute_overall_healthy(self): + agent_health = { + "a": {"status": "healthy"}, + "b": {"status": "healthy"}, + } + status = self.monitor.compute_overall_status(agent_health) + self.assertEqual(status, HealthStatus.HEALTHY.value) + + def test_compute_overall_degraded(self): + agent_health = { + "a": {"status": "healthy"}, + "b": {"status": "degraded"}, + } + status = self.monitor.compute_overall_status(agent_health) + self.assertEqual(status, HealthStatus.DEGRADED.value) + + def test_compute_overall_critical_overrides_degraded(self): + agent_health = { + "a": {"status": "degraded"}, + "b": {"status": "critical"}, + } + status = self.monitor.compute_overall_status(agent_health) + self.assertEqual(status, HealthStatus.CRITICAL.value) + + def test_generate_health_report_structure(self): + self.monitor.register_agent("test") + report = self.monitor.generate_health_report() + self.assertIn("overall_status", report) + self.assertIn("agents", report) + self.assertIn("alerts", report) + self.assertIn("generated_at", report) + + def test_alert_generated_on_high_cpu(self): + state = self._make_state(name="planning", cpu_usage=90.0) + self.monitor.update_agent_health("planning", state) + alerts = self.monitor.get_active_alerts() + self.assertTrue(any(a["metric"] == "cpu_usage" for a in alerts)) + + def test_alert_generated_on_slow_response(self): + state = self._make_state(name="planning", response_time=3.0) + self.monitor.update_agent_health("planning", state) + alerts = self.monitor.get_active_alerts() + self.assertTrue(any(a["metric"] == "response_time" for a in alerts)) + + def test_predict_failures_empty(self): + predictions = self.monitor.predict_failures() + self.assertEqual(predictions, []) + + def test_predict_failures_high_error_rate(self): + state = self._make_state(name="control", error_count=10, task_count=100) + self.monitor.update_agent_health("control", state) + predictions = self.monitor.predict_failures() + self.assertTrue(any(p["agent"] == "control" for p in predictions)) + + def test_monitor_system_health_multiple_agents(self): + for name in ["perception", "planning", "control"]: + self.monitor.register_agent(name) + system_health = self.monitor.monitor_system_health() + self.assertEqual(len(system_health), 3) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..72bf419 --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,183 @@ +""" +Integration tests for the Resolver Agent with all 5 existing agents. +""" + +import sys +import os +import time +import unittest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from agents.base_agent import AgentState, AgentStatus, BaseAgent +from agents.resolver_agent import ResolverAgent + + +# --------------------------------------------------------------------------- +# Stub agents representing the 5 existing agents +# --------------------------------------------------------------------------- + +class _StubAgent(BaseAgent): + """Generic stub agent for integration tests.""" + + def __init__(self, name: str, priority: int = 0): + super().__init__(name=name, priority=priority) + self._received_messages = [] + + def on_start(self): + pass + + def on_stop(self): + pass + + def execute(self, task): + return {"done": True, "agent": self.name} + + def receive_message(self, message): + self._received_messages.append(message) + super().receive_message(message) + + +class PerceptionAgent(_StubAgent): + def __init__(self): + super().__init__("perception", priority=5) + + +class PlanningAgent(_StubAgent): + def __init__(self): + super().__init__("planning", priority=4) + + +class ControlAgent(_StubAgent): + def __init__(self): + super().__init__("control", priority=3) + + +class CommunicationAgent(_StubAgent): + def __init__(self): + super().__init__("communication", priority=2) + + +class CoordinationAgent(_StubAgent): + def __init__(self): + super().__init__("coordination", priority=1) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestResolverIntegration(unittest.TestCase): + def setUp(self): + self.resolver = ResolverAgent() + self.perception = PerceptionAgent() + self.planning = PlanningAgent() + self.control = ControlAgent() + self.communication = CommunicationAgent() + self.coordination = CoordinationAgent() + + self.all_agents = [ + self.perception, self.planning, self.control, + self.communication, self.coordination, + ] + self.resolver.integrate_with_agents(self.all_agents) + + def test_all_agents_registered(self): + for agent in self.all_agents: + self.assertIn(agent.name, self.resolver._monitored_agents) + + def test_resolver_registered_in_all_agents(self): + for agent in self.all_agents: + self.assertIn("resolver", agent._connected_agents) + + def test_monitor_agents_returns_all_statuses(self): + result = self.resolver.monitor_agents() + for agent in self.all_agents: + self.assertIn(agent.name, result) + + def test_conflict_resolution_notifies_agents(self): + conflict = { + "type": "control", + "agents": ["planning", "control"], + "description": "Competing control requests", + } + self.resolver.resolve_conflict(conflict) + # Both planning and control should have received a resolution message + planning_msgs = [ + m for m in self.planning._received_messages + if m.message_type == "conflict_resolution" + ] + control_msgs = [ + m for m in self.control._received_messages + if m.message_type == "conflict_resolution" + ] + self.assertEqual(len(planning_msgs), 1) + self.assertEqual(len(control_msgs), 1) + + def test_health_report_covers_all_agents(self): + for agent in self.all_agents: + agent.start() + health = self.resolver.assess_system_health() + for agent in self.all_agents: + self.assertIn(agent.name, health["agents"]) + for agent in self.all_agents: + agent.stop() + + def test_resource_arbitration_across_agents(self): + requests = [ + {"agent": ag.name, "resource": "GPU"} + for ag in self.all_agents + ] + result = self.resolver.arbitrate_resources(requests) + # perception has highest priority → gets GPU + self.assertEqual(result["allocation"]["GPU"], "perception") + + def test_error_recovery_for_each_agent(self): + for agent in self.all_agents: + error = { + "type": "agent_crash", + "agent": agent.name, + "severity": 3, + } + result = self.resolver.recover_from_error(error) + self.assertIn("recovery_action", result) + + def test_deadlock_breaking_with_agents(self): + # Set up a circular wait + self.resolver.deadlock_detector.update_wait_graph("planning", ["control"]) + self.resolver.deadlock_detector.update_wait_graph("control", ["planning"]) + deadlock = self.resolver.detect_deadlock() + self.assertIsNotNone(deadlock) + result = self.resolver.break_deadlock(deadlock) + self.assertTrue(result["success"]) + + def test_fallback_execution_for_navigation(self): + failure = {"type": "navigation", "depth": 0} + result = self.resolver.execute_fallback(failure) + self.assertIn("executed_steps", result) + self.assertTrue(result["success"]) + + def test_full_task_dispatch(self): + """Verify all execute() task types work end-to-end.""" + tasks = [ + {"type": "health_check"}, + {"type": "detect_deadlock"}, + {"type": "resolve_conflict", + "conflict": {"type": "control", "agents": ["planning", "control"]}}, + {"type": "recover_error", + "error": {"type": "sensor_failure", "sensor": "camera"}}, + {"type": "arbitrate", + "requests": [{"agent": "perception", "resource": "GPU"}]}, + {"type": "execute_fallback", + "failure": {"type": "navigation"}}, + ] + for task in tasks: + with self.subTest(task_type=task["type"]): + result = self.resolver.execute(task) + # detect_deadlock returns None when there is no deadlock (valid) + if task["type"] != "detect_deadlock": + self.assertIsNotNone(result) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_resolver_agent.py b/tests/test_resolver_agent.py new file mode 100644 index 0000000..0f4a613 --- /dev/null +++ b/tests/test_resolver_agent.py @@ -0,0 +1,213 @@ +""" +Tests for the Resolver Agent. +""" + +import sys +import os +import time +import unittest + +# Ensure the repo root is on the path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from agents.base_agent import AgentState, AgentStatus, BaseAgent +from agents.resolver_agent import ResolverAgent + + +# --------------------------------------------------------------------------- +# Minimal concrete agent for tests +# --------------------------------------------------------------------------- + +class _DummyAgent(BaseAgent): + def __init__(self, name: str, priority: int = 0): + super().__init__(name=name, priority=priority) + + def on_start(self): + pass + + def on_stop(self): + pass + + def execute(self, task): + return {"done": True} + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestResolverAgentInit(unittest.TestCase): + def setUp(self): + self.resolver = ResolverAgent() + + def test_name_and_priority(self): + self.assertEqual(self.resolver.name, "resolver") + self.assertEqual(self.resolver.priority, 6) + + def test_components_initialized(self): + self.assertIsNotNone(self.resolver.conflict_detector) + self.assertIsNotNone(self.resolver.conflict_resolver) + self.assertIsNotNone(self.resolver.error_handler) + self.assertIsNotNone(self.resolver.arbitrator) + self.assertIsNotNone(self.resolver.health_monitor) + self.assertIsNotNone(self.resolver.deadlock_detector) + self.assertIsNotNone(self.resolver.fallback_planner) + + +class TestResolverAgentIntegration(unittest.TestCase): + def setUp(self): + self.resolver = ResolverAgent() + self.perception = _DummyAgent("perception", priority=5) + self.planning = _DummyAgent("planning", priority=4) + self.control = _DummyAgent("control", priority=3) + + def test_integrate_with_agents(self): + agents = [self.perception, self.planning, self.control] + self.resolver.integrate_with_agents(agents) + self.assertIn("perception", self.resolver._monitored_agents) + self.assertIn("planning", self.resolver._monitored_agents) + self.assertIn("control", self.resolver._monitored_agents) + + def test_integrate_connects_resolver_to_agents(self): + self.resolver.integrate_with_agents([self.perception]) + self.assertIn("resolver", self.perception._connected_agents) + + +class TestResolverConflictHandling(unittest.TestCase): + def setUp(self): + self.resolver = ResolverAgent() + self.resolver.integrate_with_agents([ + _DummyAgent("perception"), + _DummyAgent("planning"), + ]) + + def test_resolve_conflict_returns_dict(self): + conflict = { + "type": "control", + "agents": ["perception", "planning"], + "description": "Test conflict", + } + result = self.resolver.resolve_conflict(conflict) + self.assertIsInstance(result, dict) + self.assertIn("winning_agent", result) + + def test_detect_conflicts_returns_list(self): + conflicts = self.resolver.detect_conflicts() + self.assertIsInstance(conflicts, list) + + +class TestResolverErrorRecovery(unittest.TestCase): + def setUp(self): + self.resolver = ResolverAgent() + + def test_recover_from_sensor_error(self): + error = {"type": "sensor_failure", "sensor": "camera", "severity": 2} + result = self.resolver.recover_from_error(error) + self.assertIsInstance(result, dict) + self.assertIn("recovery_action", result) + + def test_recover_from_agent_crash(self): + error = {"type": "agent_crash", "agent": "planning", "severity": 3} + result = self.resolver.recover_from_error(error) + self.assertIn("recovery_action", result) + + +class TestResolverResourceArbitration(unittest.TestCase): + def setUp(self): + self.resolver = ResolverAgent() + + def test_arbitrate_resources_returns_allocation(self): + requests = [ + {"agent": "perception", "resource": "GPU", "priority": 5}, + {"agent": "planning", "resource": "GPU", "priority": 4}, + ] + result = self.resolver.arbitrate_resources(requests) + self.assertIsInstance(result, dict) + allocation = result.get("allocation", {}) + self.assertIn("GPU", allocation) + self.assertEqual(allocation["GPU"], "perception") + + +class TestResolverHealthAssessment(unittest.TestCase): + def setUp(self): + self.resolver = ResolverAgent() + self.resolver.integrate_with_agents([ + _DummyAgent("perception"), + _DummyAgent("planning"), + ]) + # Start agents so they report HEALTHY + for agent in self.resolver._monitored_agents.values(): + agent.start() + + def tearDown(self): + for agent in self.resolver._monitored_agents.values(): + agent.stop() + + def test_assess_system_health_returns_dict(self): + health = self.resolver.assess_system_health() + self.assertIsInstance(health, dict) + self.assertIn("overall_status", health) + self.assertIn("agents", health) + self.assertIn("alerts", health) + + def test_generate_health_report_has_timestamp(self): + report = self.resolver.generate_health_report() + self.assertIn("report_generated_at", report) + + +class TestResolverDeadlockDetection(unittest.TestCase): + def setUp(self): + self.resolver = ResolverAgent() + + def test_detect_deadlock_no_deadlock(self): + result = self.resolver.detect_deadlock() + self.assertIsNone(result) + + def test_break_deadlock(self): + self.resolver.deadlock_detector.update_wait_graph("A", ["B"]) + self.resolver.deadlock_detector.update_wait_graph("B", ["A"]) + deadlock = self.resolver.detect_deadlock() + self.assertIsNotNone(deadlock) + result = self.resolver.break_deadlock(deadlock) + self.assertTrue(result.get("success")) + + +class TestResolverFallbackExecution(unittest.TestCase): + def setUp(self): + self.resolver = ResolverAgent() + + def test_execute_fallback_navigation(self): + failure = {"type": "navigation", "depth": 0} + result = self.resolver.execute_fallback(failure) + self.assertIsInstance(result, dict) + self.assertIn("executed_steps", result) + self.assertTrue(result.get("success")) + + +class TestResolverExecuteDispatch(unittest.TestCase): + def setUp(self): + self.resolver = ResolverAgent() + + def test_execute_health_check(self): + result = self.resolver.execute({"type": "health_check"}) + self.assertIn("overall_status", result) + + def test_execute_unknown_task(self): + result = self.resolver.execute({"type": "nonexistent"}) + self.assertIn("error", result) + + def test_execute_detect_deadlock(self): + result = self.resolver.execute({"type": "detect_deadlock"}) + self.assertIsNone(result) # no deadlock in fresh state + + def test_execute_resolve_conflict(self): + task = { + "type": "resolve_conflict", + "conflict": {"type": "control", "agents": ["planning", "control"]}, + } + result = self.resolver.execute(task) + self.assertIn("strategy", result) + + +if __name__ == "__main__": + unittest.main()