Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/agentguard/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .evaluation import AssertionResult, EvalResult, EvalSuite, summarize_trace
from .guards import (
AgentGuardError,
AgentKilled,
BaseGuard,
BudgetExceeded,
BudgetGuard,
Expand All @@ -17,6 +18,7 @@
LoopDetected,
LoopGuard,
RateLimitGuard,
RemoteGuard,
TimeoutExceeded,
TimeoutGuard,
)
Expand Down Expand Up @@ -81,6 +83,7 @@ def _show_first_run_prompt() -> None:

__all__ = [
"AgentGuardError",
"AgentKilled",
"AssertionResult",
"AsyncTraceContext",
"AsyncTracer",
Expand All @@ -96,6 +99,7 @@ def _show_first_run_prompt() -> None:
"LoopDetected",
"LoopGuard",
"RateLimitGuard",
"RemoteGuard",
"StdoutSink",
"TimeoutExceeded",
"TimeoutGuard",
Expand Down
246 changes: 246 additions & 0 deletions sdk/agentguard/guards.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from collections import Counter, deque
from dataclasses import dataclass
from typing import Any, Callable, Deque, Dict, Optional, Tuple
from urllib.parse import urlparse


class AgentGuardError(Exception):
Expand Down Expand Up @@ -57,6 +58,22 @@ class TimeoutExceeded(AgentGuardError, RuntimeError):
pass


class AgentKilled(AgentGuardError, RuntimeError):
"""Raised when an agent is killed via a remote dashboard signal.

This exception is raised by ``RemoteGuard`` when the dashboard
sends a kill signal for the running agent::

try:
guard.check()
except AgentKilled:
print("Agent was stopped remotely")
"""
pass




class BaseGuard:
"""Base class for all guards.

Expand Down Expand Up @@ -316,6 +333,77 @@ def _check_warning(self) -> None:
if self._on_warning:
self._on_warning(msg)

@classmethod
def from_remote(
cls,
api_key: str,
name: str = "default",
dashboard_url: str = "https://app.agentguard47.com",
fallback_max_cost_usd: Optional[float] = None,
fallback_max_tokens: Optional[int] = None,
fallback_max_calls: Optional[int] = None,
) -> "BudgetGuard":
"""Create a BudgetGuard with limits fetched from the dashboard.

Attempts to fetch budget configuration from the dashboard API.
Falls back to the provided local defaults if the fetch fails.

Usage::

# Fetch limits from dashboard, fall back to $10 max:
guard = BudgetGuard.from_remote(
api_key="ag_live_abc123",
name="production",
fallback_max_cost_usd=10.00,
)

Args:
api_key: Dashboard API key.
name: Budget configuration name on the dashboard.
dashboard_url: Base URL of the dashboard API.
fallback_max_cost_usd: Local fallback if fetch fails.
fallback_max_tokens: Local fallback if fetch fails.
fallback_max_calls: Local fallback if fetch fails.

Returns:
A configured BudgetGuard instance.

Raises:
ValueError: If no limits are available (neither remote nor fallback).
"""
import urllib.request as _urllib_request
from urllib.error import HTTPError as _HTTPError

parsed = urlparse(dashboard_url)
if parsed.scheme not in ("http", "https"):
raise ValueError(f"dashboard_url scheme must be http or https, got {parsed.scheme!r}")

url = f"{dashboard_url.rstrip('/')}/api/v1/budgets?name={name}"
headers = {"Authorization": f"Bearer {api_key}"}

max_cost_usd = fallback_max_cost_usd
max_tokens = fallback_max_tokens
max_calls = fallback_max_calls

try:
req = _urllib_request.Request(url, headers=headers, method="GET")
with _urllib_request.urlopen(req, timeout=10) as resp: # nosec B310 — scheme validated above
data = json.loads(resp.read().decode("utf-8"))
if "max_cost_usd" in data:
max_cost_usd = float(data["max_cost_usd"])
if "max_tokens" in data:
max_tokens = int(data["max_tokens"])
if "max_calls" in data:
max_calls = int(data["max_calls"])
except (_HTTPError, OSError, json.JSONDecodeError, ValueError, KeyError):
pass # Fall back to local defaults

return cls(
max_cost_usd=max_cost_usd,
max_tokens=max_tokens,
max_calls=max_calls,
)

def reset(self) -> None:
"""Reset all usage counters to zero."""
with self._lock:
Expand Down Expand Up @@ -547,5 +635,163 @@ def __repr__(self) -> str:
return f"RateLimitGuard(max_calls_per_minute={self._max_calls})"


class RemoteGuard(BaseGuard):
"""Poll the dashboard for remote kill signals and budget updates.

Background thread checks ``/api/v1/status`` periodically. On kill signal,
``check()`` raises ``AgentKilled``. On budget update, linked BudgetGuard
limits are modified. Falls back gracefully if network is unavailable.

Usage::

guard = RemoteGuard(api_key="ag_live_abc123")
guard.start()
guard.check() # raises AgentKilled if dashboard sent kill signal

Args:
api_key: Dashboard API key (``ag_...`` prefix).
poll_interval: Seconds between status polls. Default 30.
dashboard_url: Base URL of the dashboard API.
budget_guard: Optional BudgetGuard to update with remote limits.
agent_id: Optional agent identifier. Defaults to a random UUID.
"""

def __init__(
self,
api_key: str,
poll_interval: float = 30.0,
dashboard_url: str = "https://app.agentguard47.com",
budget_guard: Optional["BudgetGuard"] = None,
agent_id: Optional[str] = None,
) -> None:
import uuid as _uuid

if not api_key:
raise ValueError("api_key is required for RemoteGuard")
if poll_interval <= 0:
raise ValueError("poll_interval must be > 0")
parsed = urlparse(dashboard_url)
if parsed.scheme not in ("http", "https"):
raise ValueError(f"dashboard_url scheme must be http or https, got {parsed.scheme!r}")

self._api_key = api_key
self._poll_interval = poll_interval
self._dashboard_url = dashboard_url.rstrip("/")
self._budget_guard = budget_guard
self._agent_id = agent_id or _uuid.uuid4().hex
self._killed = False
self._kill_reason: Optional[str] = None
self._paused = False
self._lock = threading.Lock()
self._stop = threading.Event()
self._thread: Optional[threading.Thread] = None
self._last_poll: Optional[float] = None
self._started = False

def start(self) -> None:
"""Start the background polling thread. Idempotent."""
with self._lock:
if self._started:
return
self._started = True
self._stop.clear()
self._thread = threading.Thread(
target=self._poll_loop, daemon=True
)
self._thread.start()

def stop(self) -> None:
"""Stop the background polling thread."""
self._stop.set()
if self._thread is not None:
self._thread.join(timeout=5)

def check(self) -> None:
"""Check if the agent has been killed remotely. Raises AgentKilled."""
with self._lock:
if self._killed:
reason = self._kill_reason or "Agent killed via remote dashboard signal"
raise AgentKilled(reason)

def auto_check(self, event_name: str, event_data: Optional[Dict[str, Any]] = None) -> None:
"""Auto-check: delegates to check()."""
self.check()

def reset(self) -> None:
"""Reset the kill/pause state."""
with self._lock:
self._killed = False
self._kill_reason = None
self._paused = False

@property
def agent_id(self) -> str:
"""The unique identifier for this agent instance."""
return self._agent_id

@property
def is_killed(self) -> bool:
"""Whether a remote kill signal has been received."""
with self._lock:
return self._killed

@property
def is_started(self) -> bool:
"""Whether the polling thread is running."""
with self._lock:
return self._started

def _poll_loop(self) -> None:
"""Background loop: poll the dashboard for status updates."""
while not self._stop.wait(self._poll_interval):
self._poll_once()

def _poll_once(self) -> None:
"""Execute a single poll to the dashboard status API."""
import urllib.request as _urllib_request
from urllib.error import HTTPError as _HTTPError

url = f"{self._dashboard_url}/api/v1/status?agent_id={self._agent_id}"
headers = {"Authorization": f"Bearer {self._api_key}"}

try:
req = _urllib_request.Request(url, headers=headers, method="GET")
with _urllib_request.urlopen(req, timeout=10) as resp: # nosec B310 — scheme validated in __init__
data = json.loads(resp.read().decode("utf-8"))
except (_HTTPError, OSError, json.JSONDecodeError, ValueError):
# Network failure — fall back gracefully, agent keeps running
return

with self._lock:
self._last_poll = time.monotonic()

# Handle kill signal
action = data.get("action")
if action == "kill":
self._killed = True
self._kill_reason = data.get(
"reason", "Agent killed via remote dashboard signal"
)

# Handle budget update
budget_update = data.get("budget")
if budget_update and self._budget_guard is not None:
bg = self._budget_guard
with bg._lock:
if "max_cost_usd" in budget_update:
bg._max_cost_usd = float(budget_update["max_cost_usd"])
if "max_tokens" in budget_update:
bg._max_tokens = int(budget_update["max_tokens"])
if "max_calls" in budget_update:
bg._max_calls = int(budget_update["max_calls"])

def __repr__(self) -> str:
status = "killed" if self._killed else ("running" if self._started else "idle")
return (
f"RemoteGuard(agent_id={self._agent_id!r}, "
f"poll_interval={self._poll_interval}, status={status})"
)


def _stable_json(data: Dict[str, Any]) -> str:
return json.dumps(data, sort_keys=True, separators=(",", ":"))
46 changes: 45 additions & 1 deletion sdk/agentguard/sinks/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ class HttpSink(TraceSink):
compress: Enable gzip compression. Default True.
max_retries: Maximum retry attempts on failure. Default 3.
max_buffer_size: Maximum events to buffer before dropping oldest. Default 10000.
heartbeat_interval: Seconds between heartbeat events. None disables heartbeats.
heartbeat_guards: List of guards whose state to include in heartbeats.
"""

def __init__(
Expand All @@ -193,6 +195,8 @@ def __init__(
compress: bool = True,
max_retries: int = 3,
max_buffer_size: int = 10_000,
heartbeat_interval: Optional[float] = None,
heartbeat_guards: Optional[List[Any]] = None,
_allow_private: bool = False,
) -> None:
_validate_url(url, allow_private=_allow_private)
Expand Down Expand Up @@ -222,12 +226,23 @@ def __init__(
self._max_buffer_size = max_buffer_size
self._dropped_count = 0

self._heartbeat_interval = heartbeat_interval
self._heartbeat_guards = heartbeat_guards or []

self._buffer: List[Dict[str, Any]] = []
self._lock = threading.Lock()
self._stop = threading.Event()

self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()

self._heartbeat_thread: Optional[threading.Thread] = None
if heartbeat_interval is not None and heartbeat_interval > 0:
self._heartbeat_thread = threading.Thread(
target=self._heartbeat_loop, daemon=True
)
self._heartbeat_thread.start()

atexit.register(self.shutdown)

def emit(self, event: Dict[str, Any]) -> None:
Expand Down Expand Up @@ -338,11 +353,40 @@ def _send(self, batch: List[Dict[str, Any]]) -> None:
exc_info=True,
)

def _heartbeat_loop(self) -> None:
"""Background loop: emit heartbeat events at the configured interval."""
while not self._stop.wait(self._heartbeat_interval):
self._emit_heartbeat()

def _emit_heartbeat(self) -> None:
"""Emit a single heartbeat event with current guard state."""
guard_state = {}
for guard in self._heartbeat_guards:
name = type(guard).__name__
state = getattr(guard, "state", None)
if state is not None:
guard_state[name] = {
k: v for k, v in state.__dict__.items()
if not k.startswith("_")
}
elif hasattr(guard, "is_killed"):
guard_state[name] = {"is_killed": guard.is_killed}

event = {
"kind": "heartbeat",
"name": "agent.heartbeat",
"ts": time.time(),
"data": {"guards": guard_state},
}
self.emit(event)

def shutdown(self) -> None:
"""Flush remaining events and stop the background thread."""
"""Flush remaining events and stop the background threads."""
self._stop.set()
self._flush()
self._thread.join(timeout=5)
if self._heartbeat_thread is not None:
self._heartbeat_thread.join(timeout=5)

def __repr__(self) -> str:
return f"HttpSink(url={self._url!r})"
1 change: 1 addition & 0 deletions sdk/tests/test_architecture.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"FuzzyLoopGuard",
"BudgetGuard",
"RateLimitGuard",
"RemoteGuard",
"JsonlFileSink",
"HttpSink",
]
Expand Down
Loading