diff --git a/sdk/agentguard/__init__.py b/sdk/agentguard/__init__.py index 0539eb7..04bcdc5 100644 --- a/sdk/agentguard/__init__.py +++ b/sdk/agentguard/__init__.py @@ -9,6 +9,7 @@ from .evaluation import AssertionResult, EvalResult, EvalSuite, summarize_trace from .guards import ( AgentGuardError, + AgentKilled, BaseGuard, BudgetExceeded, BudgetGuard, @@ -17,6 +18,7 @@ LoopDetected, LoopGuard, RateLimitGuard, + RemoteGuard, TimeoutExceeded, TimeoutGuard, ) @@ -81,6 +83,7 @@ def _show_first_run_prompt() -> None: __all__ = [ "AgentGuardError", + "AgentKilled", "AssertionResult", "AsyncTraceContext", "AsyncTracer", @@ -96,6 +99,7 @@ def _show_first_run_prompt() -> None: "LoopDetected", "LoopGuard", "RateLimitGuard", + "RemoteGuard", "StdoutSink", "TimeoutExceeded", "TimeoutGuard", diff --git a/sdk/agentguard/guards.py b/sdk/agentguard/guards.py index 51c5af4..8c33684 100644 --- a/sdk/agentguard/guards.py +++ b/sdk/agentguard/guards.py @@ -22,6 +22,7 @@ from collections import Counter, deque from dataclasses import dataclass from typing import Any, Callable, Deque, Dict, Optional, Tuple +from urllib.parse import urlparse class AgentGuardError(Exception): @@ -57,6 +58,22 @@ class TimeoutExceeded(AgentGuardError, RuntimeError): pass +class AgentKilled(AgentGuardError, RuntimeError): + """Raised when an agent is killed via a remote dashboard signal. + + This exception is raised by ``RemoteGuard`` when the dashboard + sends a kill signal for the running agent:: + + try: + guard.check() + except AgentKilled: + print("Agent was stopped remotely") + """ + pass + + + + class BaseGuard: """Base class for all guards. @@ -316,6 +333,77 @@ def _check_warning(self) -> None: if self._on_warning: self._on_warning(msg) + @classmethod + def from_remote( + cls, + api_key: str, + name: str = "default", + dashboard_url: str = "https://app.agentguard47.com", + fallback_max_cost_usd: Optional[float] = None, + fallback_max_tokens: Optional[int] = None, + fallback_max_calls: Optional[int] = None, + ) -> "BudgetGuard": + """Create a BudgetGuard with limits fetched from the dashboard. + + Attempts to fetch budget configuration from the dashboard API. + Falls back to the provided local defaults if the fetch fails. + + Usage:: + + # Fetch limits from dashboard, fall back to $10 max: + guard = BudgetGuard.from_remote( + api_key="ag_live_abc123", + name="production", + fallback_max_cost_usd=10.00, + ) + + Args: + api_key: Dashboard API key. + name: Budget configuration name on the dashboard. + dashboard_url: Base URL of the dashboard API. + fallback_max_cost_usd: Local fallback if fetch fails. + fallback_max_tokens: Local fallback if fetch fails. + fallback_max_calls: Local fallback if fetch fails. + + Returns: + A configured BudgetGuard instance. + + Raises: + ValueError: If no limits are available (neither remote nor fallback). + """ + import urllib.request as _urllib_request + from urllib.error import HTTPError as _HTTPError + + parsed = urlparse(dashboard_url) + if parsed.scheme not in ("http", "https"): + raise ValueError(f"dashboard_url scheme must be http or https, got {parsed.scheme!r}") + + url = f"{dashboard_url.rstrip('/')}/api/v1/budgets?name={name}" + headers = {"Authorization": f"Bearer {api_key}"} + + max_cost_usd = fallback_max_cost_usd + max_tokens = fallback_max_tokens + max_calls = fallback_max_calls + + try: + req = _urllib_request.Request(url, headers=headers, method="GET") + with _urllib_request.urlopen(req, timeout=10) as resp: # nosec B310 — scheme validated above + data = json.loads(resp.read().decode("utf-8")) + if "max_cost_usd" in data: + max_cost_usd = float(data["max_cost_usd"]) + if "max_tokens" in data: + max_tokens = int(data["max_tokens"]) + if "max_calls" in data: + max_calls = int(data["max_calls"]) + except (_HTTPError, OSError, json.JSONDecodeError, ValueError, KeyError): + pass # Fall back to local defaults + + return cls( + max_cost_usd=max_cost_usd, + max_tokens=max_tokens, + max_calls=max_calls, + ) + def reset(self) -> None: """Reset all usage counters to zero.""" with self._lock: @@ -547,5 +635,163 @@ def __repr__(self) -> str: return f"RateLimitGuard(max_calls_per_minute={self._max_calls})" +class RemoteGuard(BaseGuard): + """Poll the dashboard for remote kill signals and budget updates. + + Background thread checks ``/api/v1/status`` periodically. On kill signal, + ``check()`` raises ``AgentKilled``. On budget update, linked BudgetGuard + limits are modified. Falls back gracefully if network is unavailable. + + Usage:: + + guard = RemoteGuard(api_key="ag_live_abc123") + guard.start() + guard.check() # raises AgentKilled if dashboard sent kill signal + + Args: + api_key: Dashboard API key (``ag_...`` prefix). + poll_interval: Seconds between status polls. Default 30. + dashboard_url: Base URL of the dashboard API. + budget_guard: Optional BudgetGuard to update with remote limits. + agent_id: Optional agent identifier. Defaults to a random UUID. + """ + + def __init__( + self, + api_key: str, + poll_interval: float = 30.0, + dashboard_url: str = "https://app.agentguard47.com", + budget_guard: Optional["BudgetGuard"] = None, + agent_id: Optional[str] = None, + ) -> None: + import uuid as _uuid + + if not api_key: + raise ValueError("api_key is required for RemoteGuard") + if poll_interval <= 0: + raise ValueError("poll_interval must be > 0") + parsed = urlparse(dashboard_url) + if parsed.scheme not in ("http", "https"): + raise ValueError(f"dashboard_url scheme must be http or https, got {parsed.scheme!r}") + + self._api_key = api_key + self._poll_interval = poll_interval + self._dashboard_url = dashboard_url.rstrip("/") + self._budget_guard = budget_guard + self._agent_id = agent_id or _uuid.uuid4().hex + self._killed = False + self._kill_reason: Optional[str] = None + self._paused = False + self._lock = threading.Lock() + self._stop = threading.Event() + self._thread: Optional[threading.Thread] = None + self._last_poll: Optional[float] = None + self._started = False + + def start(self) -> None: + """Start the background polling thread. Idempotent.""" + with self._lock: + if self._started: + return + self._started = True + self._stop.clear() + self._thread = threading.Thread( + target=self._poll_loop, daemon=True + ) + self._thread.start() + + def stop(self) -> None: + """Stop the background polling thread.""" + self._stop.set() + if self._thread is not None: + self._thread.join(timeout=5) + + def check(self) -> None: + """Check if the agent has been killed remotely. Raises AgentKilled.""" + with self._lock: + if self._killed: + reason = self._kill_reason or "Agent killed via remote dashboard signal" + raise AgentKilled(reason) + + def auto_check(self, event_name: str, event_data: Optional[Dict[str, Any]] = None) -> None: + """Auto-check: delegates to check().""" + self.check() + + def reset(self) -> None: + """Reset the kill/pause state.""" + with self._lock: + self._killed = False + self._kill_reason = None + self._paused = False + + @property + def agent_id(self) -> str: + """The unique identifier for this agent instance.""" + return self._agent_id + + @property + def is_killed(self) -> bool: + """Whether a remote kill signal has been received.""" + with self._lock: + return self._killed + + @property + def is_started(self) -> bool: + """Whether the polling thread is running.""" + with self._lock: + return self._started + + def _poll_loop(self) -> None: + """Background loop: poll the dashboard for status updates.""" + while not self._stop.wait(self._poll_interval): + self._poll_once() + + def _poll_once(self) -> None: + """Execute a single poll to the dashboard status API.""" + import urllib.request as _urllib_request + from urllib.error import HTTPError as _HTTPError + + url = f"{self._dashboard_url}/api/v1/status?agent_id={self._agent_id}" + headers = {"Authorization": f"Bearer {self._api_key}"} + + try: + req = _urllib_request.Request(url, headers=headers, method="GET") + with _urllib_request.urlopen(req, timeout=10) as resp: # nosec B310 — scheme validated in __init__ + data = json.loads(resp.read().decode("utf-8")) + except (_HTTPError, OSError, json.JSONDecodeError, ValueError): + # Network failure — fall back gracefully, agent keeps running + return + + with self._lock: + self._last_poll = time.monotonic() + + # Handle kill signal + action = data.get("action") + if action == "kill": + self._killed = True + self._kill_reason = data.get( + "reason", "Agent killed via remote dashboard signal" + ) + + # Handle budget update + budget_update = data.get("budget") + if budget_update and self._budget_guard is not None: + bg = self._budget_guard + with bg._lock: + if "max_cost_usd" in budget_update: + bg._max_cost_usd = float(budget_update["max_cost_usd"]) + if "max_tokens" in budget_update: + bg._max_tokens = int(budget_update["max_tokens"]) + if "max_calls" in budget_update: + bg._max_calls = int(budget_update["max_calls"]) + + def __repr__(self) -> str: + status = "killed" if self._killed else ("running" if self._started else "idle") + return ( + f"RemoteGuard(agent_id={self._agent_id!r}, " + f"poll_interval={self._poll_interval}, status={status})" + ) + + def _stable_json(data: Dict[str, Any]) -> str: return json.dumps(data, sort_keys=True, separators=(",", ":")) diff --git a/sdk/agentguard/sinks/http.py b/sdk/agentguard/sinks/http.py index 7e27eb3..daa255a 100644 --- a/sdk/agentguard/sinks/http.py +++ b/sdk/agentguard/sinks/http.py @@ -182,6 +182,8 @@ class HttpSink(TraceSink): compress: Enable gzip compression. Default True. max_retries: Maximum retry attempts on failure. Default 3. max_buffer_size: Maximum events to buffer before dropping oldest. Default 10000. + heartbeat_interval: Seconds between heartbeat events. None disables heartbeats. + heartbeat_guards: List of guards whose state to include in heartbeats. """ def __init__( @@ -193,6 +195,8 @@ def __init__( compress: bool = True, max_retries: int = 3, max_buffer_size: int = 10_000, + heartbeat_interval: Optional[float] = None, + heartbeat_guards: Optional[List[Any]] = None, _allow_private: bool = False, ) -> None: _validate_url(url, allow_private=_allow_private) @@ -222,12 +226,23 @@ def __init__( self._max_buffer_size = max_buffer_size self._dropped_count = 0 + self._heartbeat_interval = heartbeat_interval + self._heartbeat_guards = heartbeat_guards or [] + self._buffer: List[Dict[str, Any]] = [] self._lock = threading.Lock() self._stop = threading.Event() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() + + self._heartbeat_thread: Optional[threading.Thread] = None + if heartbeat_interval is not None and heartbeat_interval > 0: + self._heartbeat_thread = threading.Thread( + target=self._heartbeat_loop, daemon=True + ) + self._heartbeat_thread.start() + atexit.register(self.shutdown) def emit(self, event: Dict[str, Any]) -> None: @@ -338,11 +353,40 @@ def _send(self, batch: List[Dict[str, Any]]) -> None: exc_info=True, ) + def _heartbeat_loop(self) -> None: + """Background loop: emit heartbeat events at the configured interval.""" + while not self._stop.wait(self._heartbeat_interval): + self._emit_heartbeat() + + def _emit_heartbeat(self) -> None: + """Emit a single heartbeat event with current guard state.""" + guard_state = {} + for guard in self._heartbeat_guards: + name = type(guard).__name__ + state = getattr(guard, "state", None) + if state is not None: + guard_state[name] = { + k: v for k, v in state.__dict__.items() + if not k.startswith("_") + } + elif hasattr(guard, "is_killed"): + guard_state[name] = {"is_killed": guard.is_killed} + + event = { + "kind": "heartbeat", + "name": "agent.heartbeat", + "ts": time.time(), + "data": {"guards": guard_state}, + } + self.emit(event) + def shutdown(self) -> None: - """Flush remaining events and stop the background thread.""" + """Flush remaining events and stop the background threads.""" self._stop.set() self._flush() self._thread.join(timeout=5) + if self._heartbeat_thread is not None: + self._heartbeat_thread.join(timeout=5) def __repr__(self) -> str: return f"HttpSink(url={self._url!r})" diff --git a/sdk/tests/test_architecture.py b/sdk/tests/test_architecture.py index 71ae592..81aa0e1 100644 --- a/sdk/tests/test_architecture.py +++ b/sdk/tests/test_architecture.py @@ -54,6 +54,7 @@ "FuzzyLoopGuard", "BudgetGuard", "RateLimitGuard", + "RemoteGuard", "JsonlFileSink", "HttpSink", ] diff --git a/sdk/tests/test_exports.py b/sdk/tests/test_exports.py index 59ca2c8..c3ef417 100644 --- a/sdk/tests/test_exports.py +++ b/sdk/tests/test_exports.py @@ -66,9 +66,9 @@ def test_all_list_complete(self): "__version__", "init", "shutdown", "get_tracer", "get_budget_guard", "Tracer", "JsonlFileSink", "StdoutSink", "TraceSink", - "AgentGuardError", + "AgentGuardError", "AgentKilled", "BaseGuard", "LoopGuard", "BudgetGuard", "TimeoutGuard", - "FuzzyLoopGuard", "RateLimitGuard", + "FuzzyLoopGuard", "RateLimitGuard", "RemoteGuard", "LoopDetected", "BudgetExceeded", "BudgetWarning", "TimeoutExceeded", "estimate_cost", "HttpSink", diff --git a/sdk/tests/test_guards.py b/sdk/tests/test_guards.py index 81a1e93..06f359e 100644 --- a/sdk/tests/test_guards.py +++ b/sdk/tests/test_guards.py @@ -1,11 +1,14 @@ -"""Tests for guards: LoopGuard, BudgetGuard, TimeoutGuard, FuzzyLoopGuard, RateLimitGuard.""" +"""Tests for guards: LoopGuard, BudgetGuard, TimeoutGuard, FuzzyLoopGuard, RateLimitGuard, RemoteGuard.""" import json import os import tempfile +import threading import time import unittest +from http.server import BaseHTTPRequestHandler, HTTPServer from agentguard.guards import ( + AgentKilled, BaseGuard, BudgetExceeded, BudgetGuard, @@ -13,6 +16,7 @@ LoopDetected, LoopGuard, RateLimitGuard, + RemoteGuard, TimeoutExceeded, TimeoutGuard, ) @@ -592,5 +596,239 @@ def test_fails_with_warnings(self): self.assertFalse(result.passed) +# --------------------------------------------------------------------------- +# RemoteGuard +# --------------------------------------------------------------------------- + + +class _MockStatusHandler(BaseHTTPRequestHandler): + """Mock dashboard status API handler.""" + response_data: dict = {"action": "continue"} + + def do_GET(self): + body = json.dumps(self.__class__.response_data).encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(body) + + def log_message(self, *args): + pass + + +class _MockBudgetHandler(BaseHTTPRequestHandler): + """Mock dashboard budget API handler.""" + response_data: dict = {"max_cost_usd": 25.0, "max_calls": 500} + + def do_GET(self): + body = json.dumps(self.__class__.response_data).encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(body) + + def log_message(self, *args): + pass + + +class _Mock500Handler(BaseHTTPRequestHandler): + """Always returns 500.""" + def do_GET(self): + self.send_response(500) + self.end_headers() + + def log_message(self, *args): + pass + + +class TestRemoteGuard(unittest.TestCase): + @classmethod + def setUpClass(cls): + _MockStatusHandler.response_data = {"action": "continue"} + cls.server = HTTPServer(("127.0.0.1", 0), _MockStatusHandler) + cls.port = cls.server.server_address[1] + cls.server_thread = threading.Thread(target=cls.server.serve_forever, daemon=True) + cls.server_thread.start() + + @classmethod + def tearDownClass(cls): + cls.server.shutdown() + + def test_no_kill_signal(self): + _MockStatusHandler.response_data = {"action": "continue"} + guard = RemoteGuard( + api_key="ag_test", + dashboard_url=f"http://127.0.0.1:{self.port}", + poll_interval=0.1, + ) + guard.start() + time.sleep(0.3) + guard.check() # should not raise + guard.stop() + + def test_kill_signal_raises(self): + _MockStatusHandler.response_data = { + "action": "kill", + "reason": "Stopped by admin", + } + guard = RemoteGuard( + api_key="ag_test", + dashboard_url=f"http://127.0.0.1:{self.port}", + poll_interval=0.1, + ) + guard.start() + time.sleep(0.3) + with self.assertRaises(AgentKilled) as ctx: + guard.check() + self.assertIn("Stopped by admin", str(ctx.exception)) + guard.stop() + + def test_kill_signal_default_reason(self): + _MockStatusHandler.response_data = {"action": "kill"} + guard = RemoteGuard( + api_key="ag_test", + dashboard_url=f"http://127.0.0.1:{self.port}", + poll_interval=0.1, + ) + guard.start() + time.sleep(0.3) + with self.assertRaises(AgentKilled) as ctx: + guard.check() + self.assertIn("remote dashboard", str(ctx.exception)) + guard.stop() + + def test_budget_update(self): + budget = BudgetGuard(max_cost_usd=10.00) + _MockStatusHandler.response_data = { + "action": "continue", + "budget": {"max_cost_usd": 25.0}, + } + guard = RemoteGuard( + api_key="ag_test", + dashboard_url=f"http://127.0.0.1:{self.port}", + poll_interval=0.1, + budget_guard=budget, + ) + guard.start() + time.sleep(0.3) + self.assertAlmostEqual(budget.max_cost_usd, 25.0) + guard.stop() + + def test_auto_check_delegates(self): + _MockStatusHandler.response_data = {"action": "kill"} + guard = RemoteGuard( + api_key="ag_test", + dashboard_url=f"http://127.0.0.1:{self.port}", + poll_interval=0.1, + ) + guard.start() + time.sleep(0.3) + with self.assertRaises(AgentKilled): + guard.auto_check("tool.search", {"q": "test"}) + guard.stop() + + def test_reset_clears_kill(self): + _MockStatusHandler.response_data = {"action": "kill"} + guard = RemoteGuard( + api_key="ag_test", + dashboard_url=f"http://127.0.0.1:{self.port}", + poll_interval=0.1, + ) + guard.start() + time.sleep(0.3) + self.assertTrue(guard.is_killed) + guard.reset() + self.assertFalse(guard.is_killed) + guard.check() # should not raise after reset + guard.stop() + + def test_repr(self): + guard = RemoteGuard(api_key="ag_test", agent_id="test-agent-1") + r = repr(guard) + self.assertIn("RemoteGuard", r) + self.assertIn("test-agent-1", r) + self.assertIn("idle", r) + + def test_agent_id_auto_generated(self): + guard = RemoteGuard(api_key="ag_test") + self.assertTrue(len(guard.agent_id) > 0) + + def test_invalid_api_key(self): + with self.assertRaises(ValueError): + RemoteGuard(api_key="") + + def test_invalid_poll_interval(self): + with self.assertRaises(ValueError): + RemoteGuard(api_key="ag_test", poll_interval=0) + + def test_start_is_idempotent(self): + guard = RemoteGuard( + api_key="ag_test", + dashboard_url=f"http://127.0.0.1:{self.port}", + poll_interval=60, + ) + guard.start() + guard.start() # second call should be safe + guard.stop() + + def test_check_without_start_does_not_raise(self): + guard = RemoteGuard(api_key="ag_test") + guard.check() # no poll thread = no kill signal = no raise + + def test_isinstance_base_guard(self): + guard = RemoteGuard(api_key="ag_test") + self.assertIsInstance(guard, BaseGuard) + + +class TestRemoteGuardNetworkFailure(unittest.TestCase): + def test_network_failure_falls_back(self): + guard = RemoteGuard( + api_key="ag_test", + dashboard_url="http://127.0.0.1:1", # nothing listening + poll_interval=0.1, + ) + guard.start() + time.sleep(0.3) + guard.check() # should not raise — graceful fallback + guard.stop() + + +class TestBudgetGuardFromRemote(unittest.TestCase): + @classmethod + def setUpClass(cls): + _MockBudgetHandler.response_data = {"max_cost_usd": 25.0, "max_calls": 500} + cls.server = HTTPServer(("127.0.0.1", 0), _MockBudgetHandler) + cls.port = cls.server.server_address[1] + cls.server_thread = threading.Thread(target=cls.server.serve_forever, daemon=True) + cls.server_thread.start() + + @classmethod + def tearDownClass(cls): + cls.server.shutdown() + + def test_fetches_remote_config(self): + guard = BudgetGuard.from_remote( + api_key="ag_test", + dashboard_url=f"http://127.0.0.1:{self.port}", + ) + self.assertAlmostEqual(guard.max_cost_usd, 25.0) + self.assertEqual(guard.max_calls, 500) + + def test_fallback_on_network_failure(self): + guard = BudgetGuard.from_remote( + api_key="ag_test", + dashboard_url="http://127.0.0.1:1", # nothing listening + fallback_max_cost_usd=10.0, + ) + self.assertAlmostEqual(guard.max_cost_usd, 10.0) + + def test_fallback_no_limits_raises(self): + with self.assertRaises(ValueError): + BudgetGuard.from_remote( + api_key="ag_test", + dashboard_url="http://127.0.0.1:1", + ) + + if __name__ == "__main__": unittest.main() diff --git a/sdk/tests/test_http_sink.py b/sdk/tests/test_http_sink.py index 6855d58..0dd0a2b 100644 --- a/sdk/tests/test_http_sink.py +++ b/sdk/tests/test_http_sink.py @@ -259,5 +259,91 @@ def test_allow_private_flag(self): sink.shutdown() +class _HeartbeatCollectorHandler(BaseHTTPRequestHandler): + received: list = [] + + def do_POST(self): + import gzip as _gzip + + length = int(self.headers.get("Content-Length", 0)) + raw = self.rfile.read(length) + encoding = self.headers.get("Content-Encoding", "") + if encoding == "gzip": + raw = _gzip.decompress(raw) + body = raw.decode("utf-8") + self.__class__.received.append(body) + self.send_response(200) + self.end_headers() + self.wfile.write(b"ok") + + def log_message(self, *args): + pass + + +class TestHttpSinkHeartbeat(unittest.TestCase): + @classmethod + def setUpClass(cls): + _HeartbeatCollectorHandler.received = [] + cls.server = HTTPServer(("127.0.0.1", 0), _HeartbeatCollectorHandler) + cls.port = cls.server.server_address[1] + cls.server_thread = threading.Thread(target=cls.server.serve_forever, daemon=True) + cls.server_thread.start() + + @classmethod + def tearDownClass(cls): + cls.server.shutdown() + + def setUp(self): + _HeartbeatCollectorHandler.received.clear() + + def test_heartbeat_emits_events(self): + from agentguard.guards import BudgetGuard + + budget = BudgetGuard(max_cost_usd=10.0) + budget.consume(cost_usd=2.50) + + sink = HttpSink( + url=f"http://127.0.0.1:{self.port}/ingest", _allow_private=True, + batch_size=1, + flush_interval=60, + heartbeat_interval=0.2, + heartbeat_guards=[budget], + ) + time.sleep(0.5) + sink.shutdown() + + # Should have received at least one heartbeat + self.assertGreaterEqual(len(_HeartbeatCollectorHandler.received), 1) + # Parse the first heartbeat + first_batch = _HeartbeatCollectorHandler.received[0] + events = [json.loads(line) for line in first_batch.strip().split("\n")] + heartbeats = [e for e in events if e.get("kind") == "heartbeat"] + self.assertGreaterEqual(len(heartbeats), 1) + hb = heartbeats[0] + self.assertEqual(hb["name"], "agent.heartbeat") + self.assertIn("BudgetGuard", hb["data"]["guards"]) + self.assertAlmostEqual( + hb["data"]["guards"]["BudgetGuard"]["cost_used"], 2.50 + ) + + def test_heartbeat_disabled_by_default(self): + sink = HttpSink( + url=f"http://127.0.0.1:{self.port}/ingest", _allow_private=True, + batch_size=100, + flush_interval=60, + ) + time.sleep(0.3) + sink.shutdown() + # No heartbeat events should be emitted + heartbeat_found = False + for batch in _HeartbeatCollectorHandler.received: + for line in batch.strip().split("\n"): + if line: + event = json.loads(line) + if event.get("kind") == "heartbeat": + heartbeat_found = True + self.assertFalse(heartbeat_found) + + if __name__ == "__main__": unittest.main()