From 5dd8e84f617fec9a585772f17d039bf4dec8e0d7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 26 Nov 2025 13:53:16 +0000 Subject: [PATCH 1/4] Initial plan From 5f3606155178b17ded8abe4d71e0b409ceed476f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 26 Nov 2025 14:01:26 +0000 Subject: [PATCH 2/4] Add event bus emitter with streaming support and tests Co-authored-by: Sathursan-S <84266926+Sathursan-S@users.noreply.github.com> --- browser_ai/event_bus/__init__.py | 219 ++++++++++ browser_ai/event_bus/emitter.py | 380 +++++++++++++++++ browser_ai/event_bus/handlers/__init__.py | 14 + .../event_bus/handlers/logging_handler.py | 163 +++++++ test_event_bus.py | 403 ++++++++++++++++++ 5 files changed, 1179 insertions(+) create mode 100644 browser_ai/event_bus/emitter.py create mode 100644 browser_ai/event_bus/handlers/logging_handler.py create mode 100644 test_event_bus.py diff --git a/browser_ai/event_bus/__init__.py b/browser_ai/event_bus/__init__.py index e69de29..3e78486 100644 --- a/browser_ai/event_bus/__init__.py +++ b/browser_ai/event_bus/__init__.py @@ -0,0 +1,219 @@ +""" +Browser.AI Event Bus - Real-time Event Streaming System + +A versatile, extensible, and loosely-coupled event streaming emitter +for real-time logging, status updates, LLM outputs, and agent execution +tracking. Designed for UI integration and various subscriber levels. + +Usage: + from browser_ai.event_bus import emit, emit_async, subscribe, EventLevel + from browser_ai.event_bus import AgentStartedEvent, AgentCompletedEvent + + # Subscribe to events + subscribe("agent", my_handler) + + # Emit events (sync or async) + emit(AgentStartedEvent(task="my task")) + await emit_async(AgentCompletedEvent(task="my task", success=True, total_steps=5)) +""" + +from browser_ai.event_bus.core import EventHandler, EventManager +from browser_ai.event_bus.emitter import ( + AsyncEventHandler, + EventLevel, + FilteredEventHandler, + StreamingEventHandler, + emit, + emit_async, + get_event_manager, + subscribe, + unsubscribe, +) +from browser_ai.event_bus.events import ( + # Base + BaseEvent, + # Agent Events + AgentCompletedEvent, + AgentFailedEvent, + AgentRetryEvent, + AgentStartedEvent, + AgentStepCompletedEvent, + AgentStepFailedEvent, + AgentStepStartedEvent, + # Browser Events + BrowserClosedEvent, + BrowserContextClosedEvent, + BrowserContextCreatedEvent, + BrowserInitializedEvent, + PageLoadedEvent, + PageNavigationCompletedEvent, + PageNavigationFailedEvent, + PageNavigationStartedEvent, + ScreenshotCapturedEvent, + TabClosedEvent, + TabCreatedEvent, + TabSwitchedEvent, + # Controller Events + ActionExecutionCompletedEvent, + ActionExecutionFailedEvent, + ActionExecutionStartedEvent, + ActionRegisteredEvent, + ControllerInitializedEvent, + MultipleActionsExecutedEvent, + # DOM Events + DOMElementHighlightedEvent, + DOMProcessingCompletedEvent, + DOMProcessingFailedEvent, + DOMProcessingStartedEvent, + DOMTreeBuiltEvent, + # Error Events + ErrorOccurredEvent, + RecoveryAttemptedEvent, + RecoveryFailedEvent, + RecoverySuccessEvent, + # LLM Events + LLMRateLimitEvent, + LLMRequestCompletedEvent, + LLMRequestFailedEvent, + LLMRequestStartedEvent, + LLMTokenLimitWarningEvent, + # Message Events + ConversationSavedEvent, + MessageAddedEvent, + MessageHistoryClearedEvent, + MessageTrimmedEvent, + ToolCallCreatedEvent, + ToolResponseReceivedEvent, + # Metrics Events + PerformanceMetricEvent, + ResourceUsageEvent, + StepDurationEvent, + TotalExecutionTimeEvent, + # Planning Events + PlanningCompletedEvent, + PlanningFailedEvent, + PlanningStartedEvent, + PlanUpdatedEvent, + # State Events + HistoryRecordedEvent, + MemoryUpdatedEvent, + StateRestoredEvent, + StateSnapshotCreatedEvent, + # User Interaction Events + UserConfirmationRequestedEvent, + UserHelpRequestedEvent, + UserInputReceivedEvent, + # Validation Events + ActionParamsValidationFailedEvent, + OutputValidationFailedEvent, + OutputValidationStartedEvent, + OutputValidationSuccessEvent, + # Extension Events + CDPConnectionClosedEvent, + CDPConnectionEstablishedEvent, + ExtensionConnectedEvent, + ExtensionDisconnectedEvent, + WebSocketMessageReceivedEvent, + WebSocketMessageSentEvent, +) + +__all__ = [ + # Core + "EventManager", + "EventHandler", + "BaseEvent", + # Emitter functions + "emit", + "emit_async", + "subscribe", + "unsubscribe", + "get_event_manager", + "EventLevel", + # Handler classes + "AsyncEventHandler", + "StreamingEventHandler", + "FilteredEventHandler", + # Agent Events + "AgentStartedEvent", + "AgentStepStartedEvent", + "AgentStepCompletedEvent", + "AgentStepFailedEvent", + "AgentCompletedEvent", + "AgentFailedEvent", + "AgentRetryEvent", + # Browser Events + "BrowserInitializedEvent", + "BrowserClosedEvent", + "BrowserContextCreatedEvent", + "BrowserContextClosedEvent", + "PageNavigationStartedEvent", + "PageNavigationCompletedEvent", + "PageNavigationFailedEvent", + "PageLoadedEvent", + "TabCreatedEvent", + "TabSwitchedEvent", + "TabClosedEvent", + "ScreenshotCapturedEvent", + # DOM Events + "DOMTreeBuiltEvent", + "DOMElementHighlightedEvent", + "DOMProcessingStartedEvent", + "DOMProcessingCompletedEvent", + "DOMProcessingFailedEvent", + # Controller Events + "ControllerInitializedEvent", + "ActionRegisteredEvent", + "ActionExecutionStartedEvent", + "ActionExecutionCompletedEvent", + "ActionExecutionFailedEvent", + "MultipleActionsExecutedEvent", + # LLM Events + "LLMRequestStartedEvent", + "LLMRequestCompletedEvent", + "LLMRequestFailedEvent", + "LLMRateLimitEvent", + "LLMTokenLimitWarningEvent", + # Message Events + "MessageAddedEvent", + "MessageTrimmedEvent", + "MessageHistoryClearedEvent", + "ConversationSavedEvent", + "ToolCallCreatedEvent", + "ToolResponseReceivedEvent", + # Validation Events + "OutputValidationStartedEvent", + "OutputValidationSuccessEvent", + "OutputValidationFailedEvent", + "ActionParamsValidationFailedEvent", + # Planning Events + "PlanningStartedEvent", + "PlanningCompletedEvent", + "PlanningFailedEvent", + "PlanUpdatedEvent", + # State Events + "StateSnapshotCreatedEvent", + "StateRestoredEvent", + "MemoryUpdatedEvent", + "HistoryRecordedEvent", + # Error Events + "ErrorOccurredEvent", + "RecoveryAttemptedEvent", + "RecoverySuccessEvent", + "RecoveryFailedEvent", + # Metrics Events + "PerformanceMetricEvent", + "ResourceUsageEvent", + "StepDurationEvent", + "TotalExecutionTimeEvent", + # User Interaction Events + "UserHelpRequestedEvent", + "UserInputReceivedEvent", + "UserConfirmationRequestedEvent", + # Extension Events + "ExtensionConnectedEvent", + "ExtensionDisconnectedEvent", + "CDPConnectionEstablishedEvent", + "CDPConnectionClosedEvent", + "WebSocketMessageReceivedEvent", + "WebSocketMessageSentEvent", +] diff --git a/browser_ai/event_bus/emitter.py b/browser_ai/event_bus/emitter.py new file mode 100644 index 0000000..2fd7edc --- /dev/null +++ b/browser_ai/event_bus/emitter.py @@ -0,0 +1,380 @@ +""" +Browser.AI Event Emitter - Convenience Functions for Event Streaming + +This module provides easy-to-use functions for emitting events from anywhere +in the codebase, supporting both synchronous and asynchronous contexts. + +Features: +- Type-safe event emission with Pydantic validation +- Automatic topic detection from event type +- Support for different subscription levels (DEBUG, INFO, WARNING, ERROR, CRITICAL) +- Async iterator support for real-time streaming +- Thread-safe singleton EventManager access + +Usage: + from browser_ai.event_bus.emitter import emit, emit_async, subscribe + + # Emit an event synchronously + emit(AgentStartedEvent(task="my task")) + + # Emit an event asynchronously + await emit_async(AgentStartedEvent(task="my task")) + + # Subscribe to events + subscribe("agent", my_handler) + + # Subscribe to all events + subscribe("*", my_handler) +""" + +import asyncio +import logging +from enum import Enum +from typing import AsyncIterator, Callable, Optional, Union + +from browser_ai.event_bus.core import EventHandler, EventManager +from browser_ai.event_bus.events import BaseEvent + +logger = logging.getLogger(__name__) + + +class EventLevel(str, Enum): + """ + Event subscription levels for filtering events. + + Subscribers can filter events by importance level: + - DEBUG: All events including detailed debugging info + - INFO: Standard operational events + - WARNING: Events indicating potential issues + - ERROR: Error events and failures + - CRITICAL: Critical system events only + """ + + DEBUG = "debug" + INFO = "info" + WARNING = "warning" + ERROR = "error" + CRITICAL = "critical" + + +# Singleton instance +_event_manager: Optional[EventManager] = None + + +def get_event_manager() -> EventManager: + """ + Get the singleton EventManager instance. + + Returns: + EventManager: The global event manager instance + """ + global _event_manager + if _event_manager is None: + _event_manager = EventManager() + return _event_manager + + +def emit(event: BaseEvent) -> None: + """ + Emit an event synchronously. + + The event will be published to all subscribers of its topic. + Async handlers will be scheduled as fire-and-forget tasks. + + Args: + event: The event to emit (must be a BaseEvent subclass) + + Example: + from browser_ai.event_bus import emit, AgentStartedEvent + + emit(AgentStartedEvent(task="Search for Python tutorials")) + """ + manager = get_event_manager() + manager.publish(event.topic, event) + + +async def emit_async(event: BaseEvent) -> None: + """ + Emit an event asynchronously. + + The event will be published to all subscribers of its topic. + All handlers (sync and async) will be awaited. + + Args: + event: The event to emit (must be a BaseEvent subclass) + + Example: + from browser_ai.event_bus import emit_async, AgentStartedEvent + + await emit_async(AgentStartedEvent(task="Search for Python tutorials")) + """ + manager = get_event_manager() + await manager.publish_async(event.topic, event) + + +def subscribe( + topic: str, + handler: Union[EventHandler, Callable[[BaseEvent], None]], +) -> EventHandler: + """ + Subscribe a handler to a topic. + + Args: + topic: The topic to subscribe to (e.g., "agent", "browser", "llm") + Use "*" to subscribe to all topics + handler: Either an EventHandler instance or a callable that takes a BaseEvent + + Returns: + EventHandler: The registered handler (useful for later unsubscription) + + Example: + from browser_ai.event_bus import subscribe, BaseEvent + + def my_handler(event: BaseEvent): + print(f"Received: {event.name}") + + subscribe("agent", my_handler) + subscribe("*", my_handler) # Subscribe to all events + """ + manager = get_event_manager() + + # Wrap callable in an EventHandler if needed + if not isinstance(handler, EventHandler): + if asyncio.iscoroutinefunction(handler): + handler = _AsyncCallableHandler(handler) + else: + handler = _CallableHandler(handler) + + manager.subscribe(topic, handler) + return handler + + +def unsubscribe(topic: str, handler: EventHandler) -> None: + """ + Unsubscribe a handler from a topic. + + Args: + topic: The topic to unsubscribe from + handler: The handler to remove + + Example: + handler = subscribe("agent", my_handler) + # Later... + unsubscribe("agent", handler) + """ + manager = get_event_manager() + manager.unsubscribe(topic, handler) + + +class _CallableHandler(EventHandler): + """Internal wrapper to convert a sync callable into an EventHandler.""" + + def __init__(self, callback: Callable[[BaseEvent], None]): + self._callback = callback + + @property + def name(self) -> str: + return getattr(self._callback, "__name__", "anonymous_handler") + + def handle(self, event: BaseEvent) -> None: + """Synchronous handle.""" + self._callback(event) + + +class _AsyncCallableHandler(EventHandler): + """Internal wrapper to convert an async callable into an EventHandler.""" + + def __init__(self, callback: Callable[[BaseEvent], None]): + self._callback = callback + + @property + def name(self) -> str: + return getattr(self._callback, "__name__", "async_handler") + + async def handle(self, event: BaseEvent) -> None: + """Asynchronous handle.""" + await self._callback(event) + + +class AsyncEventHandler(EventHandler): + """ + Async event handler that wraps an async callback. + + This is useful when you want to create an async handler from a coroutine function. + """ + + def __init__(self, callback: Callable[[BaseEvent], None]): + self._callback = callback + + @property + def name(self) -> str: + return getattr(self._callback, "__name__", "async_handler") + + async def handle(self, event: BaseEvent) -> None: + await self._callback(event) + + +class StreamingEventHandler(EventHandler): + """ + A handler that collects events for streaming via async iteration. + + This handler is designed for real-time event streaming to WebSocket + clients or Server-Sent Events (SSE) endpoints. + + Usage: + handler = StreamingEventHandler() + subscribe("*", handler) + + async for event in handler.stream(): + await websocket.send(event.model_dump_json()) + """ + + def __init__(self, max_queue_size: int = 1000): + """ + Initialize the streaming handler. + + Args: + max_queue_size: Maximum number of events to buffer + """ + self._queue: asyncio.Queue[BaseEvent] = asyncio.Queue(maxsize=max_queue_size) + self._closed = False + + @property + def name(self) -> str: + return "StreamingEventHandler" + + def handle(self, event: BaseEvent) -> None: + """Queue the event for streaming (non-blocking).""" + if self._closed: + return + try: + self._queue.put_nowait(event) + except asyncio.QueueFull: + # Drop oldest event and add new one + try: + self._queue.get_nowait() + self._queue.put_nowait(event) + except (asyncio.QueueEmpty, asyncio.QueueFull): + pass + + async def stream(self, timeout: Optional[float] = None) -> AsyncIterator[BaseEvent]: + """ + Async iterator that yields events as they arrive. + + Args: + timeout: Optional timeout for waiting on new events (seconds) + + Yields: + BaseEvent: Events as they are received + + Example: + async for event in handler.stream(): + print(event.model_dump_json()) + """ + while not self._closed: + try: + if timeout: + event = await asyncio.wait_for(self._queue.get(), timeout=timeout) + else: + event = await self._queue.get() + yield event + except asyncio.TimeoutError: + continue + except asyncio.CancelledError: + break + + def close(self) -> None: + """Close the streaming handler.""" + self._closed = True + + def is_closed(self) -> bool: + """Check if the handler is closed.""" + return self._closed + + +class FilteredEventHandler(EventHandler): + """ + An event handler that filters events based on level or custom predicates. + + This allows subscribers to receive only events matching certain criteria. + + Usage: + # Filter by event level + handler = FilteredEventHandler( + inner_handler, + level_filter=EventLevel.WARNING + ) + + # Filter with custom predicate + handler = FilteredEventHandler( + inner_handler, + predicate=lambda e: e.topic == "agent" + ) + """ + + def __init__( + self, + inner_handler: EventHandler, + level_filter: Optional[EventLevel] = None, + predicate: Optional[Callable[[BaseEvent], bool]] = None, + ): + """ + Initialize the filtered handler. + + Args: + inner_handler: The handler to delegate to if filter passes + level_filter: Only pass events at or above this level + predicate: Custom filter function returning True to pass event + """ + self._inner = inner_handler + self._level_filter = level_filter + self._predicate = predicate + + @property + def name(self) -> str: + return f"FilteredEventHandler({self._inner.name})" + + def handle(self, event: BaseEvent) -> None: + """Handle the event if it passes the filter.""" + if self._should_pass(event): + self._inner.handle(event) + + def _should_pass(self, event: BaseEvent) -> bool: + """Check if the event should pass through the filter.""" + # Check level filter + if self._level_filter: + event_level = self._get_event_level(event) + if not self._level_meets_threshold(event_level, self._level_filter): + return False + + # Check custom predicate + if self._predicate and not self._predicate(event): + return False + + return True + + def _get_event_level(self, event: BaseEvent) -> EventLevel: + """Determine the level of an event based on its type.""" + name_lower = event.name.lower() + + if "error" in name_lower or "failed" in name_lower: + return EventLevel.ERROR + elif "warning" in name_lower or "limit" in name_lower: + return EventLevel.WARNING + elif "debug" in name_lower: + return EventLevel.DEBUG + else: + return EventLevel.INFO + + def _level_meets_threshold( + self, event_level: EventLevel, threshold: EventLevel + ) -> bool: + """Check if event level meets the threshold.""" + level_order = { + EventLevel.DEBUG: 0, + EventLevel.INFO: 1, + EventLevel.WARNING: 2, + EventLevel.ERROR: 3, + EventLevel.CRITICAL: 4, + } + return level_order.get(event_level, 1) >= level_order.get(threshold, 0) diff --git a/browser_ai/event_bus/handlers/__init__.py b/browser_ai/event_bus/handlers/__init__.py index e69de29..9e02859 100644 --- a/browser_ai/event_bus/handlers/__init__.py +++ b/browser_ai/event_bus/handlers/__init__.py @@ -0,0 +1,14 @@ +""" +Browser.AI Event Bus Handlers + +Pre-built event handlers for common use cases. +""" + +from browser_ai.event_bus.handlers.console import ConsoleHandler +from browser_ai.event_bus.handlers.logging_handler import LogEvent, LoggingEventHandler + +__all__ = [ + "ConsoleHandler", + "LoggingEventHandler", + "LogEvent", +] diff --git a/browser_ai/event_bus/handlers/logging_handler.py b/browser_ai/event_bus/handlers/logging_handler.py new file mode 100644 index 0000000..48e2cab --- /dev/null +++ b/browser_ai/event_bus/handlers/logging_handler.py @@ -0,0 +1,163 @@ +""" +Browser.AI Event Bus Logging Handler + +This module provides a logging handler that emits log records as events +through the event bus, enabling real-time log streaming to UI components. + +Usage: + from browser_ai.event_bus.handlers.logging_handler import LoggingEventHandler + from browser_ai.event_bus import subscribe + + # Enable log event emission + handler = LoggingEventHandler.enable() + + # Subscribe to log events + subscribe("log", my_handler) +""" + +import logging +from datetime import datetime +from typing import Any, Dict, Optional + +from browser_ai.event_bus.events import BaseEvent + + +class LogEvent(BaseEvent): + """ + Event emitted for each log record. + + Attributes: + topic: Always "log" for log events + name: Log level name (e.g., "log_info", "log_error") + level: Numeric log level + level_name: String log level name + logger_name: Name of the logger + message: Formatted log message + module: Module where the log was emitted + function: Function where the log was emitted + line_number: Line number where the log was emitted + exception_info: Exception information if present + metadata: Additional metadata + """ + + topic: str = "log" + name: str = "log_record" + level: int + level_name: str + logger_name: str + message: str + module: Optional[str] = None + function: Optional[str] = None + line_number: Optional[int] = None + exception_info: Optional[str] = None + metadata: Dict[str, Any] = {} + + +class LoggingEventHandler(logging.Handler): + """ + A logging handler that emits log records as events through the event bus. + + This allows UI components and other subscribers to receive real-time + log updates through the event streaming system. + + Usage: + # Enable globally for browser_ai logger + handler = LoggingEventHandler.enable() + + # Or attach to specific logger + handler = LoggingEventHandler() + logging.getLogger("my_logger").addHandler(handler) + """ + + _instance: Optional["LoggingEventHandler"] = None + + def __init__(self, level: int = logging.DEBUG): + """ + Initialize the logging event handler. + + Args: + level: Minimum log level to emit as events + """ + super().__init__(level) + self.setFormatter(logging.Formatter("%(message)s")) + + def emit(self, record: logging.LogRecord) -> None: + """ + Emit a log record as an event. + + Args: + record: The log record to emit + """ + try: + # Import here to avoid circular imports + from browser_ai.event_bus.emitter import emit + + # Format exception info if present + exception_info = None + if record.exc_info: + exception_info = self.formatter.formatException(record.exc_info) if self.formatter else str(record.exc_info) + + # Create log event + event = LogEvent( + name=f"log_{record.levelname.lower()}", + level=record.levelno, + level_name=record.levelname, + logger_name=record.name, + message=self.format(record), + module=record.module, + function=record.funcName, + line_number=record.lineno, + exception_info=exception_info, + metadata={ + "pathname": record.pathname, + "process": record.process, + "thread": record.thread, + }, + ) + + # Emit the event + emit(event) + + except Exception: + # Don't let event emission errors break logging + self.handleError(record) + + @classmethod + def enable( + cls, + logger_name: str = "browser_ai", + level: int = logging.DEBUG, + ) -> "LoggingEventHandler": + """ + Enable log event emission for a logger. + + Args: + logger_name: Name of the logger to attach to + level: Minimum log level to emit + + Returns: + LoggingEventHandler: The handler instance + """ + if cls._instance is not None: + return cls._instance + + handler = cls(level=level) + logger = logging.getLogger(logger_name) + logger.addHandler(handler) + cls._instance = handler + return handler + + @classmethod + def disable(cls, logger_name: str = "browser_ai") -> None: + """ + Disable log event emission. + + Args: + logger_name: Name of the logger to detach from + """ + if cls._instance is None: + return + + logger = logging.getLogger(logger_name) + logger.removeHandler(cls._instance) + cls._instance = None diff --git a/test_event_bus.py b/test_event_bus.py new file mode 100644 index 0000000..c8f4312 --- /dev/null +++ b/test_event_bus.py @@ -0,0 +1,403 @@ +#!/usr/bin/env python3 +""" +Tests for the Browser.AI Event Bus / Event Streaming System + +This test suite validates: +- Event emission (sync and async) +- Event subscription and filtering +- Streaming event handlers +- Logging integration +- Type safety with Pydantic models +""" + +import asyncio +import logging +import pytest +from typing import List +from unittest.mock import MagicMock, AsyncMock + +# Import event bus components +from browser_ai.event_bus import ( + emit, + emit_async, + subscribe, + unsubscribe, + get_event_manager, + EventLevel, + EventManager, + EventHandler, + BaseEvent, + StreamingEventHandler, + FilteredEventHandler, + # Event types + AgentStartedEvent, + AgentCompletedEvent, + AgentStepStartedEvent, + AgentStepCompletedEvent, + AgentFailedEvent, + LLMRequestStartedEvent, + LLMRequestCompletedEvent, + ErrorOccurredEvent, +) +from browser_ai.event_bus.handlers import LoggingEventHandler, LogEvent + + +class TestEventEmission: + """Tests for basic event emission functionality.""" + + def test_emit_sync(self): + """Test synchronous event emission.""" + received_events: List[BaseEvent] = [] + + def handler(event: BaseEvent): + received_events.append(event) + + # Subscribe and emit + h = subscribe("agent", handler) + emit(AgentStartedEvent(task="test task", use_vision=True)) + + assert len(received_events) == 1 + assert isinstance(received_events[0], AgentStartedEvent) + assert received_events[0].task == "test task" + assert received_events[0].use_vision is True + + # Cleanup + unsubscribe("agent", h) + + @pytest.mark.asyncio + async def test_emit_async(self): + """Test asynchronous event emission.""" + received_events: List[BaseEvent] = [] + + async def async_handler(event: BaseEvent): + received_events.append(event) + + # Subscribe and emit + h = subscribe("agent", async_handler) + await emit_async(AgentCompletedEvent( + task="test task", + total_steps=5, + success=True, + final_result="Task completed" + )) + + assert len(received_events) == 1 + assert isinstance(received_events[0], AgentCompletedEvent) + assert received_events[0].success is True + assert received_events[0].total_steps == 5 + + # Cleanup + unsubscribe("agent", h) + + def test_emit_to_multiple_subscribers(self): + """Test event emission to multiple subscribers.""" + received_by_1: List[BaseEvent] = [] + received_by_2: List[BaseEvent] = [] + + def handler1(event: BaseEvent): + received_by_1.append(event) + + def handler2(event: BaseEvent): + received_by_2.append(event) + + h1 = subscribe("agent", handler1) + h2 = subscribe("agent", handler2) + + emit(AgentStartedEvent(task="test")) + + assert len(received_by_1) == 1 + assert len(received_by_2) == 1 + + # Cleanup + unsubscribe("agent", h1) + unsubscribe("agent", h2) + + def test_wildcard_subscription(self): + """Test subscribing to all events with wildcard.""" + received_events: List[BaseEvent] = [] + + def handler(event: BaseEvent): + received_events.append(event) + + h = subscribe("*", handler) + + # Emit events to different topics + emit(AgentStartedEvent(task="test")) + emit(LLMRequestStartedEvent(purpose="action", model_name="gpt-4")) + emit(ErrorOccurredEvent( + error_type="test", + error_message="test error", + component="test" + )) + + assert len(received_events) == 3 + + # Cleanup + unsubscribe("*", h) + + +class TestEventManager: + """Tests for the EventManager singleton.""" + + def test_singleton_pattern(self): + """Test that EventManager follows singleton pattern.""" + manager1 = get_event_manager() + manager2 = get_event_manager() + assert manager1 is manager2 + + def test_subscribe_and_unsubscribe(self): + """Test subscription and unsubscription.""" + manager = get_event_manager() + received: List[BaseEvent] = [] + + def handler(event: BaseEvent): + received.append(event) + + h = subscribe("test_topic", handler) + emit(AgentStartedEvent(task="test")) # Different topic, shouldn't receive + + assert len(received) == 0 + + unsubscribe("test_topic", h) + + +class TestStreamingEventHandler: + """Tests for the StreamingEventHandler.""" + + @pytest.mark.asyncio + async def test_streaming_handler(self): + """Test streaming handler collects and yields events.""" + handler = StreamingEventHandler(max_queue_size=10) + subscribe("agent", handler) + + # Emit some events + emit(AgentStartedEvent(task="task1")) + emit(AgentStepStartedEvent(step_number=1)) + emit(AgentStepCompletedEvent(step_number=1, actions_taken=[])) + + # Collect events with timeout + collected = [] + async def collect(): + async for event in handler.stream(timeout=0.1): + collected.append(event) + if len(collected) >= 3: + break + + await asyncio.wait_for(collect(), timeout=1.0) + + assert len(collected) == 3 + assert isinstance(collected[0], AgentStartedEvent) + assert isinstance(collected[1], AgentStepStartedEvent) + assert isinstance(collected[2], AgentStepCompletedEvent) + + handler.close() + unsubscribe("agent", handler) + + @pytest.mark.asyncio + async def test_streaming_handler_overflow(self): + """Test streaming handler handles overflow gracefully.""" + handler = StreamingEventHandler(max_queue_size=2) + subscribe("agent", handler) + + # Emit more events than queue can hold + for i in range(5): + emit(AgentStepStartedEvent(step_number=i)) + + # Should only have the most recent events + # (queue drops oldest when full) + handler.close() + unsubscribe("agent", handler) + + +class TestFilteredEventHandler: + """Tests for the FilteredEventHandler.""" + + def test_level_filter(self): + """Test filtering by event level.""" + received: List[BaseEvent] = [] + + class InnerHandler(EventHandler): + def handle(self, event: BaseEvent): + received.append(event) + + inner = InnerHandler() + filtered = FilteredEventHandler( + inner, + level_filter=EventLevel.ERROR + ) + + subscribe("agent", filtered) + subscribe("error", filtered) + + # Info-level event (should be filtered) + emit(AgentStartedEvent(task="test")) + + # Error-level event (should pass) + emit(ErrorOccurredEvent( + error_type="test", + error_message="test error", + component="agent" + )) + + # Only error event should have passed + assert len(received) == 1 + assert isinstance(received[0], ErrorOccurredEvent) + + unsubscribe("agent", filtered) + unsubscribe("error", filtered) + + def test_predicate_filter(self): + """Test filtering with custom predicate.""" + received: List[BaseEvent] = [] + + class InnerHandler(EventHandler): + def handle(self, event: BaseEvent): + received.append(event) + + inner = InnerHandler() + filtered = FilteredEventHandler( + inner, + predicate=lambda e: getattr(e, 'task', '') == 'important' + ) + + subscribe("agent", filtered) + + emit(AgentStartedEvent(task="unimportant")) + emit(AgentStartedEvent(task="important")) + + assert len(received) == 1 + assert received[0].task == "important" + + unsubscribe("agent", filtered) + + +class TestEventTypeSafety: + """Tests for Pydantic type safety.""" + + def test_event_validation(self): + """Test that events validate their fields.""" + # Valid event + event = AgentStartedEvent(task="test task") + assert event.task == "test task" + assert event.topic == "agent" + assert event.name == "agent_started" + + def test_event_serialization(self): + """Test event serialization to JSON.""" + event = AgentCompletedEvent( + task="test", + total_steps=10, + success=True, + final_result="Done" + ) + + json_str = event.model_dump_json() + assert "test" in json_str + assert "true" in json_str.lower() + + def test_event_deserialization(self): + """Test event deserialization from dict.""" + data = { + "task": "my task", + "agent_id": "abc123", + "use_vision": False + } + event = AgentStartedEvent(**data) + assert event.task == "my task" + assert event.agent_id == "abc123" + assert event.use_vision is False + + +class TestLoggingEventHandler: + """Tests for the LoggingEventHandler.""" + + def test_log_event_emission(self): + """Test that log records are emitted as events.""" + received_events: List[LogEvent] = [] + + def handler(event: BaseEvent): + if isinstance(event, LogEvent): + received_events.append(event) + + # Subscribe to log events + h = subscribe("log", handler) + + # Enable logging handler + log_handler = LoggingEventHandler.enable( + logger_name="test_logger", + level=logging.DEBUG + ) + + # Log something + logger = logging.getLogger("test_logger") + logger.info("Test log message") + + assert len(received_events) >= 1 + # Find our log message + found = any( + "Test log message" in e.message + for e in received_events + ) + assert found + + # Cleanup + LoggingEventHandler.disable("test_logger") + unsubscribe("log", h) + + +class TestEventLevels: + """Tests for EventLevel enum.""" + + def test_event_level_values(self): + """Test EventLevel enum values.""" + assert EventLevel.DEBUG.value == "debug" + assert EventLevel.INFO.value == "info" + assert EventLevel.WARNING.value == "warning" + assert EventLevel.ERROR.value == "error" + assert EventLevel.CRITICAL.value == "critical" + + def test_event_level_ordering(self): + """Test that event levels have logical ordering.""" + level_order = { + EventLevel.DEBUG: 0, + EventLevel.INFO: 1, + EventLevel.WARNING: 2, + EventLevel.ERROR: 3, + EventLevel.CRITICAL: 4, + } + + # Verify ordering is preserved + levels = list(EventLevel) + for i, level in enumerate(levels[:-1]): + assert level_order[level] < level_order[levels[i + 1]] + + +class TestConcurrency: + """Tests for concurrent event handling.""" + + @pytest.mark.asyncio + async def test_concurrent_emission(self): + """Test concurrent event emission.""" + received: List[BaseEvent] = [] + lock = asyncio.Lock() + + async def handler(event: BaseEvent): + async with lock: + received.append(event) + + h = subscribe("agent", handler) + + # Emit events concurrently + tasks = [ + emit_async(AgentStepStartedEvent(step_number=i)) + for i in range(10) + ] + await asyncio.gather(*tasks) + + assert len(received) == 10 + + unsubscribe("agent", h) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) From e1a4aa119b90c277727ea7e4682e00731b6f2e67 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 26 Nov 2025 14:07:59 +0000 Subject: [PATCH 3/4] Integrate event emission into Agent service and add example Co-authored-by: Sathursan-S <84266926+Sathursan-S@users.noreply.github.com> --- browser_ai/agent/service.py | 118 ++++++++++++++++++ .../event_bus/handlers/logging_handler.py | 4 + test_event_bus.py | 12 +- 3 files changed, 128 insertions(+), 6 deletions(-) diff --git a/browser_ai/agent/service.py b/browser_ai/agent/service.py index 8b3fd66..37a0fcd 100644 --- a/browser_ai/agent/service.py +++ b/browser_ai/agent/service.py @@ -41,6 +41,22 @@ DOMHistoryElement, HistoryTreeProcessor, ) +from browser_ai.event_bus import ( + emit_async, + AgentCompletedEvent, + AgentFailedEvent, + AgentRetryEvent, + AgentStartedEvent, + AgentStepCompletedEvent, + AgentStepFailedEvent, + AgentStepStartedEvent, + LLMRequestCompletedEvent, + LLMRequestStartedEvent, + LLMRateLimitEvent, + PlanningCompletedEvent, + PlanningStartedEvent, + UserHelpRequestedEvent, +) from browser_ai.utils import time_execution_async from browser_ai.agent.media import create_history_gif @@ -295,6 +311,12 @@ async def step(self, step_info: Optional[AgentStepInfo] = None) -> None: model_output = None result: list[ActionResult] = [] + # Emit step started event + await emit_async(AgentStepStartedEvent( + step_number=self.n_steps, + agent_id=self.agent_id, + )) + try: state = await self.browser_context.get_state() @@ -350,6 +372,12 @@ async def step(self, step_info: Optional[AgentStepInfo] = None) -> None: "šŸ™‹ā€ā™‚ļø Task requires user intervention - pausing execution" ) + # Emit user help requested event + await emit_async(UserHelpRequestedEvent( + request_message="Task requires user intervention (e.g., CAPTCHA, login)", + step_number=self.n_steps, + )) + # Store the current page URL to detect when user completes the intervention current_page = await self.browser_context.get_current_page() original_url = current_page.url @@ -415,6 +443,25 @@ async def step(self, step_info: Optional[AgentStepInfo] = None) -> None: if state: self._make_history_item(model_output, state, result) + # Emit step completed event + step_result = result[-1].extracted_content if result and result[-1].extracted_content else None + has_error = any(r.error for r in result) if result else False + if has_error: + error_msg = next((r.error for r in result if r.error), "Unknown error") + await emit_async(AgentStepFailedEvent( + step_number=self.n_steps - 1, # n_steps was incremented in get_next_action + agent_id=self.agent_id, + error_message=str(error_msg)[:500], + error_type=type(error_msg).__name__ if not isinstance(error_msg, str) else "StepError", + )) + else: + await emit_async(AgentStepCompletedEvent( + step_number=self.n_steps - 1, + agent_id=self.agent_id, + actions_taken=actions, + result=step_result, + )) + async def _handle_step_error(self, error: Exception) -> list[ActionResult]: """Handle all types of errors that can occur during a step""" include_trace = logger.isEnabledFor(logging.DEBUG) @@ -437,6 +484,11 @@ async def _handle_step_error(self, error: Exception) -> list[ActionResult]: self.consecutive_failures += 1 elif isinstance(error, RateLimitError) or isinstance(error, ResourceExhausted): logger.warning(f"{prefix}{error_msg}") + # Emit rate limit event + await emit_async(LLMRateLimitEvent( + model_name=self.model_name, + retry_after_seconds=self.retry_delay, + )) await asyncio.sleep(self.retry_delay) self.consecutive_failures += 1 else: @@ -506,10 +558,20 @@ def _convert_input_messages( @time_execution_async("--get_next_action") async def get_next_action(self, input_messages: list[BaseMessage]) -> AgentOutput: """Get next action from LLM based on current state""" + # Emit LLM request started event + await emit_async(LLMRequestStartedEvent( + model_name=self.model_name, + purpose="action", + input_tokens_estimate=len(str(input_messages)) // 4, # Rough estimate + )) + converted_input_messages = self._convert_input_messages( input_messages, self.model_name ) + import time + start_time = time.time() + if ( self.model_name == "deepseek-reasoner" or self.model_name.startswith("deepseek-r1") @@ -539,9 +601,18 @@ async def get_next_action(self, input_messages: list[BaseMessage]) -> AgentOutpu response: dict[str, Any] = await structured_llm.ainvoke(input_messages) # type: ignore parsed: AgentOutput | None = response["parsed"] + response_time_ms = (time.time() - start_time) * 1000 + if parsed is None: raise ValueError("Could not parse response.") + # Emit LLM request completed event + await emit_async(LLMRequestCompletedEvent( + model_name=self.model_name, + purpose="action", + response_time_ms=response_time_ms, + )) + # cut the number of actions to max_actions_per_step parsed.action = parsed.action[: self.max_actions_per_step] self._log_response(parsed) @@ -620,6 +691,14 @@ def _log_agent_run(self) -> None: @observe(name="agent.run", ignore_output=True) async def run(self, max_steps: int = 100) -> AgentHistoryList: """Execute the task with maximum number of steps""" + # Emit agent started event + await emit_async(AgentStartedEvent( + task=self.task, + agent_id=self.agent_id, + use_vision=self.use_vision, + )) + + task_success = False try: self._log_agent_run() @@ -651,6 +730,7 @@ async def run(self, max_steps: int = 100) -> AgentHistoryList: continue logger.info("āœ… Task completed successfully") + task_success = True if self.register_done_callback: self.register_done_callback(self.history) break @@ -659,6 +739,24 @@ async def run(self, max_steps: int = 100) -> AgentHistoryList: return self.history finally: + # Emit agent completed/failed event + final_result = self.history.final_result() if self.history.history else None + if task_success: + await emit_async(AgentCompletedEvent( + task=self.task, + agent_id=self.agent_id, + total_steps=self.n_steps, + success=True, + final_result=final_result, + )) + else: + await emit_async(AgentFailedEvent( + task=self.task, + agent_id=self.agent_id, + error_message="Task did not complete successfully", + total_steps=self.n_steps, + )) + if not self.injected_browser_context: await self.browser_context.close() @@ -1048,6 +1146,12 @@ async def _run_planner(self) -> Optional[str]: if not self.planner_llm: return None + # Emit planning started event + await emit_async(PlanningStartedEvent( + task=self.task, + agent_id=self.agent_id, + )) + # Create planner message history using full message history planner_messages = [ PlannerPrompt(self.action_descriptions).get_system_message(), @@ -1080,15 +1184,29 @@ async def _run_planner(self) -> Optional[str]: # if deepseek-reasoner, remove think tags if self.planner_model_name == "deepseek-reasoner": plan = self._remove_think_tags(plan) + + # Parse and emit planning completed event + plan_steps = [] try: plan_json = json.loads(plan) logger.info(f"Planning Analysis:\n{json.dumps(plan_json, indent=4)}") + # Try to extract steps if plan is structured + if isinstance(plan_json, dict) and "steps" in plan_json: + plan_steps = plan_json["steps"] + elif isinstance(plan_json, list): + plan_steps = [str(s) for s in plan_json] except json.JSONDecodeError: logger.info(f"Planning Analysis:\n{plan}") + plan_steps = [plan[:200]] if plan else [] except Exception as e: logger.debug(f"Error parsing planning analysis: {e}") logger.info(f"Plan: {plan}") + await emit_async(PlanningCompletedEvent( + plan_steps=plan_steps, + agent_id=self.agent_id, + )) + return plan # endregion diff --git a/browser_ai/event_bus/handlers/logging_handler.py b/browser_ai/event_bus/handlers/logging_handler.py index 48e2cab..e0d23d2 100644 --- a/browser_ai/event_bus/handlers/logging_handler.py +++ b/browser_ai/event_bus/handlers/logging_handler.py @@ -89,6 +89,10 @@ def emit(self, record: logging.LogRecord) -> None: record: The log record to emit """ try: + # Avoid infinite recursion: don't emit events for event_bus logs + if record.name.startswith("browser_ai.event_bus"): + return + # Import here to avoid circular imports from browser_ai.event_bus.emitter import emit diff --git a/test_event_bus.py b/test_event_bus.py index c8f4312..c52ba64 100644 --- a/test_event_bus.py +++ b/test_event_bus.py @@ -322,26 +322,26 @@ def handler(event: BaseEvent): # Subscribe to log events h = subscribe("log", handler) - # Enable logging handler + # Enable logging handler for a test logger (not browser_ai to avoid recursion) log_handler = LoggingEventHandler.enable( - logger_name="test_logger", + logger_name="test_log_handler", level=logging.DEBUG ) # Log something - logger = logging.getLogger("test_logger") - logger.info("Test log message") + logger = logging.getLogger("test_log_handler") + logger.info("Test log message for event emission") assert len(received_events) >= 1 # Find our log message found = any( - "Test log message" in e.message + "Test log message for event emission" in e.message for e in received_events ) assert found # Cleanup - LoggingEventHandler.disable("test_logger") + LoggingEventHandler.disable("test_log_handler") unsubscribe("log", h) From 22cfd1117ffe234849ad750dad2d3524ff389de3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 26 Nov 2025 14:12:41 +0000 Subject: [PATCH 4/4] Address code review feedback and add example documentation Co-authored-by: Sathursan-S <84266926+Sathursan-S@users.noreply.github.com> --- browser_ai/agent/service.py | 16 +- docs/event_streaming_example.py | 264 ++++++++++++++++++++++++++++++++ test_event_bus.py | 1 - 3 files changed, 275 insertions(+), 6 deletions(-) create mode 100644 docs/event_streaming_example.py diff --git a/browser_ai/agent/service.py b/browser_ai/agent/service.py index 37a0fcd..7c6cf28 100644 --- a/browser_ai/agent/service.py +++ b/browser_ai/agent/service.py @@ -5,6 +5,7 @@ import logging import os import re +import time import uuid from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Type, TypeVar @@ -65,6 +66,10 @@ T = TypeVar("T", bound=BaseModel) +# Constants for event emission +CHARS_PER_TOKEN_ESTIMATE = 4 # Rough estimate for token calculation +ERROR_MESSAGE_MAX_LENGTH = 500 # Maximum length for error messages in events + class Agent: # region Initialization @@ -444,19 +449,21 @@ async def step(self, step_info: Optional[AgentStepInfo] = None) -> None: self._make_history_item(model_output, state, result) # Emit step completed event + # Note: n_steps was incremented in get_next_action, so use n_steps - 1 for the completed step + completed_step_number = self.n_steps - 1 step_result = result[-1].extracted_content if result and result[-1].extracted_content else None has_error = any(r.error for r in result) if result else False if has_error: error_msg = next((r.error for r in result if r.error), "Unknown error") await emit_async(AgentStepFailedEvent( - step_number=self.n_steps - 1, # n_steps was incremented in get_next_action + step_number=completed_step_number, agent_id=self.agent_id, - error_message=str(error_msg)[:500], + error_message=str(error_msg)[:ERROR_MESSAGE_MAX_LENGTH], error_type=type(error_msg).__name__ if not isinstance(error_msg, str) else "StepError", )) else: await emit_async(AgentStepCompletedEvent( - step_number=self.n_steps - 1, + step_number=completed_step_number, agent_id=self.agent_id, actions_taken=actions, result=step_result, @@ -562,14 +569,13 @@ async def get_next_action(self, input_messages: list[BaseMessage]) -> AgentOutpu await emit_async(LLMRequestStartedEvent( model_name=self.model_name, purpose="action", - input_tokens_estimate=len(str(input_messages)) // 4, # Rough estimate + input_tokens_estimate=len(str(input_messages)) // CHARS_PER_TOKEN_ESTIMATE, )) converted_input_messages = self._convert_input_messages( input_messages, self.model_name ) - import time start_time = time.time() if ( diff --git a/docs/event_streaming_example.py b/docs/event_streaming_example.py new file mode 100644 index 0000000..f9dec46 --- /dev/null +++ b/docs/event_streaming_example.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python3 +""" +Example: Event Streaming with Browser.AI + +This example demonstrates how to use the event bus for real-time +event streaming during agent execution. Perfect for: +- UI integration (WebSocket, SSE) +- Real-time logging dashboards +- Progress tracking +- Custom monitoring systems + +Usage: + python examples/event_streaming_example.py +""" + +import asyncio +from datetime import datetime + +from browser_ai.event_bus import ( + # Core functions + subscribe, + unsubscribe, + emit, + emit_async, + EventLevel, + # Handler classes for streaming + StreamingEventHandler, + FilteredEventHandler, + # Event types + BaseEvent, + AgentStartedEvent, + AgentStepStartedEvent, + AgentStepCompletedEvent, + AgentCompletedEvent, + AgentFailedEvent, + LLMRequestStartedEvent, + LLMRequestCompletedEvent, + ErrorOccurredEvent, +) +from browser_ai.event_bus.handlers import LoggingEventHandler, ConsoleHandler + + +def simple_subscriber_example(): + """Example 1: Simple event subscription with a callback function.""" + print("\n" + "=" * 60) + print("Example 1: Simple Event Subscription") + print("=" * 60) + + received_events = [] + + def my_event_handler(event: BaseEvent): + """Handle incoming events.""" + timestamp = datetime.fromtimestamp(event.timestamp).strftime("%H:%M:%S") + print(f"[{timestamp}] šŸ“Ø {event.topic}/{event.name}") + received_events.append(event) + + # Subscribe to agent events + handler = subscribe("agent", my_event_handler) + + # Emit some test events + emit(AgentStartedEvent(task="Search for Python tutorials", use_vision=True)) + emit(AgentStepStartedEvent(step_number=1)) + emit(AgentStepCompletedEvent(step_number=1, actions_taken=[{"click": {"index": 5}}])) + + print(f"\nāœ… Received {len(received_events)} events") + + # Clean up + unsubscribe("agent", handler) + + +def wildcard_subscriber_example(): + """Example 2: Subscribe to all events with wildcard.""" + print("\n" + "=" * 60) + print("Example 2: Wildcard Subscription (All Events)") + print("=" * 60) + + event_counts = {"agent": 0, "llm": 0, "error": 0} + + def count_events(event: BaseEvent): + topic = event.topic + if topic in event_counts: + event_counts[topic] += 1 + print(f" šŸ“Š {event.topic}/{event.name}") + + # Subscribe to ALL events + handler = subscribe("*", count_events) + + # Emit events to different topics + emit(AgentStartedEvent(task="Test task")) + emit(LLMRequestStartedEvent(purpose="action", model_name="gpt-4")) + emit(LLMRequestCompletedEvent(purpose="action", response_time_ms=1500)) + emit(ErrorOccurredEvent( + error_type="test", + error_message="Test error", + component="test" + )) + + print(f"\nšŸ“ˆ Event counts: {event_counts}") + + unsubscribe("*", handler) + + +def filtered_handler_example(): + """Example 3: Filter events by level.""" + print("\n" + "=" * 60) + print("Example 3: Filtered Event Handler (Errors Only)") + print("=" * 60) + + from browser_ai.event_bus.core import EventHandler + + class PrintHandler(EventHandler): + def handle(self, event: BaseEvent): + print(f" āš ļø FILTERED: {event.topic}/{event.name}") + + # Create filtered handler that only receives error-level events + inner = PrintHandler() + filtered = FilteredEventHandler(inner, level_filter=EventLevel.ERROR) + + subscribe("agent", filtered) + subscribe("error", filtered) + + print("Emitting info-level event (should be filtered)...") + emit(AgentStartedEvent(task="Test")) + + print("Emitting error-level event (should pass through)...") + emit(ErrorOccurredEvent( + error_type="TestError", + error_message="Something went wrong", + component="test" + )) + + unsubscribe("agent", filtered) + unsubscribe("error", filtered) + + +async def async_streaming_example(): + """Example 4: Async streaming for real-time UI updates.""" + print("\n" + "=" * 60) + print("Example 4: Streaming Event Handler (For WebSocket/SSE)") + print("=" * 60) + + # Create a streaming handler for real-time event delivery + stream_handler = StreamingEventHandler(max_queue_size=100) + subscribe("*", stream_handler) + + # Simulate background event emission (like agent running) + async def emit_events(): + await emit_async(AgentStartedEvent(task="Background task")) + await asyncio.sleep(0.1) + await emit_async(AgentStepStartedEvent(step_number=1)) + await asyncio.sleep(0.1) + await emit_async(AgentStepCompletedEvent(step_number=1, actions_taken=[])) + await asyncio.sleep(0.1) + await emit_async(AgentCompletedEvent( + task="Background task", + total_steps=1, + success=True, + final_result="Done!" + )) + + # Start event emission in background + asyncio.create_task(emit_events()) + + # Stream events as they arrive (like sending to WebSocket clients) + print("Streaming events (timeout after 1 second):") + count = 0 + async for event in stream_handler.stream(timeout=0.5): + print(f" šŸ”„ Stream: {event.topic}/{event.name}") + count += 1 + if count >= 4: + break + + stream_handler.close() + unsubscribe("*", stream_handler) + print(f"\nāœ… Streamed {count} events") + + +def logging_integration_example(): + """Example 5: Integrate logging with event bus.""" + print("\n" + "=" * 60) + print("Example 5: Logging Integration") + print("=" * 60) + + import logging + + log_events = [] + + def capture_logs(event: BaseEvent): + if hasattr(event, 'message'): + log_events.append(event) + # Only print if not too many (avoid recursion in output) + if len(log_events) <= 5: + print(f" šŸ“ Log: {event.message[:50]}...") + + # Subscribe to log events + handler = subscribe("log", capture_logs) + + # Enable logging event emission for a test logger (not browser_ai to avoid noise) + log_handler = LoggingEventHandler.enable( + logger_name="test_app", + level=logging.INFO + ) + + # Log something + logger = logging.getLogger("test_app") + logger.info("This is a test log message") + logger.warning("This is a warning") + logger.error("This is an error") + + print(f"\nāœ… Captured {len(log_events)} log events") + + # Clean up + LoggingEventHandler.disable("test_app") + unsubscribe("log", handler) + + +def json_serialization_example(): + """Example 6: JSON serialization for API/WebSocket.""" + print("\n" + "=" * 60) + print("Example 6: JSON Serialization (For APIs)") + print("=" * 60) + + # Create an event + event = AgentStepCompletedEvent( + step_number=5, + agent_id="agent-123", + actions_taken=[ + {"click": {"index": 10}}, + {"input_text": {"index": 20, "text": "Hello"}} + ], + result="Successfully clicked and typed" + ) + + # Serialize to JSON (perfect for WebSocket/API responses) + json_str = event.model_dump_json(indent=2) + print(f"JSON Output:\n{json_str}") + + # Can also get as dict + event_dict = event.model_dump() + print(f"\nDict keys: {list(event_dict.keys())}") + + +async def main(): + """Run all examples.""" + print("\nšŸš€ Browser.AI Event Streaming Examples") + print("=" * 60) + + # Synchronous examples + simple_subscriber_example() + wildcard_subscriber_example() + filtered_handler_example() + json_serialization_example() + logging_integration_example() + + # Async example + await async_streaming_example() + + print("\n" + "=" * 60) + print("āœ… All examples completed!") + print("=" * 60) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/test_event_bus.py b/test_event_bus.py index c52ba64..16d5f5f 100644 --- a/test_event_bus.py +++ b/test_event_bus.py @@ -14,7 +14,6 @@ import logging import pytest from typing import List -from unittest.mock import MagicMock, AsyncMock # Import event bus components from browser_ai.event_bus import (