Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions browser_ai/agent/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import re
import time
import uuid
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar
Expand Down Expand Up @@ -41,6 +42,22 @@
DOMHistoryElement,
HistoryTreeProcessor,
)
from browser_ai.event_bus import (
emit_async,
AgentCompletedEvent,
AgentFailedEvent,
AgentRetryEvent,
AgentStartedEvent,
AgentStepCompletedEvent,
AgentStepFailedEvent,
AgentStepStartedEvent,
LLMRequestCompletedEvent,
LLMRequestStartedEvent,
LLMRateLimitEvent,
PlanningCompletedEvent,
PlanningStartedEvent,
UserHelpRequestedEvent,
)
from browser_ai.utils import time_execution_async
from browser_ai.agent.media import create_history_gif

Expand All @@ -49,6 +66,10 @@

T = TypeVar("T", bound=BaseModel)

# Constants for event emission
CHARS_PER_TOKEN_ESTIMATE = 4 # Rough estimate for token calculation
ERROR_MESSAGE_MAX_LENGTH = 500 # Maximum length for error messages in events


class Agent:
# region Initialization
Expand Down Expand Up @@ -295,6 +316,12 @@ async def step(self, step_info: Optional[AgentStepInfo] = None) -> None:
model_output = None
result: list[ActionResult] = []

# Emit step started event
await emit_async(AgentStepStartedEvent(
step_number=self.n_steps,
agent_id=self.agent_id,
))

try:
state = await self.browser_context.get_state()

Expand Down Expand Up @@ -350,6 +377,12 @@ async def step(self, step_info: Optional[AgentStepInfo] = None) -> None:
"🙋‍♂️ Task requires user intervention - pausing execution"
)

# Emit user help requested event
await emit_async(UserHelpRequestedEvent(
request_message="Task requires user intervention (e.g., CAPTCHA, login)",
step_number=self.n_steps,
))

# Store the current page URL to detect when user completes the intervention
current_page = await self.browser_context.get_current_page()
original_url = current_page.url
Expand Down Expand Up @@ -415,6 +448,27 @@ async def step(self, step_info: Optional[AgentStepInfo] = None) -> None:
if state:
self._make_history_item(model_output, state, result)

# Emit step completed event
# Note: n_steps was incremented in get_next_action, so use n_steps - 1 for the completed step
completed_step_number = self.n_steps - 1
step_result = result[-1].extracted_content if result and result[-1].extracted_content else None
has_error = any(r.error for r in result) if result else False
if has_error:
error_msg = next((r.error for r in result if r.error), "Unknown error")
await emit_async(AgentStepFailedEvent(
step_number=completed_step_number,
agent_id=self.agent_id,
error_message=str(error_msg)[:ERROR_MESSAGE_MAX_LENGTH],
error_type=type(error_msg).__name__ if not isinstance(error_msg, str) else "StepError",
))
else:
await emit_async(AgentStepCompletedEvent(
step_number=completed_step_number,
agent_id=self.agent_id,
actions_taken=actions,
result=step_result,
))

async def _handle_step_error(self, error: Exception) -> list[ActionResult]:
"""Handle all types of errors that can occur during a step"""
include_trace = logger.isEnabledFor(logging.DEBUG)
Expand All @@ -437,6 +491,11 @@ async def _handle_step_error(self, error: Exception) -> list[ActionResult]:
self.consecutive_failures += 1
elif isinstance(error, RateLimitError) or isinstance(error, ResourceExhausted):
logger.warning(f"{prefix}{error_msg}")
# Emit rate limit event
await emit_async(LLMRateLimitEvent(
model_name=self.model_name,
retry_after_seconds=self.retry_delay,
))
await asyncio.sleep(self.retry_delay)
self.consecutive_failures += 1
else:
Expand Down Expand Up @@ -506,10 +565,19 @@ def _convert_input_messages(
@time_execution_async("--get_next_action")
async def get_next_action(self, input_messages: list[BaseMessage]) -> AgentOutput:
"""Get next action from LLM based on current state"""
# Emit LLM request started event
await emit_async(LLMRequestStartedEvent(
model_name=self.model_name,
purpose="action",
input_tokens_estimate=len(str(input_messages)) // CHARS_PER_TOKEN_ESTIMATE,
))

converted_input_messages = self._convert_input_messages(
input_messages, self.model_name
)

start_time = time.time()

if (
self.model_name == "deepseek-reasoner"
or self.model_name.startswith("deepseek-r1")
Expand Down Expand Up @@ -539,9 +607,18 @@ async def get_next_action(self, input_messages: list[BaseMessage]) -> AgentOutpu
response: dict[str, Any] = await structured_llm.ainvoke(input_messages) # type: ignore
parsed: AgentOutput | None = response["parsed"]

response_time_ms = (time.time() - start_time) * 1000

if parsed is None:
raise ValueError("Could not parse response.")

# Emit LLM request completed event
await emit_async(LLMRequestCompletedEvent(
model_name=self.model_name,
purpose="action",
response_time_ms=response_time_ms,
))

# cut the number of actions to max_actions_per_step
parsed.action = parsed.action[: self.max_actions_per_step]
self._log_response(parsed)
Expand Down Expand Up @@ -620,6 +697,14 @@ def _log_agent_run(self) -> None:
@observe(name="agent.run", ignore_output=True)
async def run(self, max_steps: int = 100) -> AgentHistoryList:
"""Execute the task with maximum number of steps"""
# Emit agent started event
await emit_async(AgentStartedEvent(
task=self.task,
agent_id=self.agent_id,
use_vision=self.use_vision,
))

task_success = False
try:
self._log_agent_run()

Expand Down Expand Up @@ -651,6 +736,7 @@ async def run(self, max_steps: int = 100) -> AgentHistoryList:
continue

logger.info("✅ Task completed successfully")
task_success = True
if self.register_done_callback:
self.register_done_callback(self.history)
break
Expand All @@ -659,6 +745,24 @@ async def run(self, max_steps: int = 100) -> AgentHistoryList:

return self.history
finally:
# Emit agent completed/failed event
final_result = self.history.final_result() if self.history.history else None
if task_success:
await emit_async(AgentCompletedEvent(
task=self.task,
agent_id=self.agent_id,
total_steps=self.n_steps,
success=True,
final_result=final_result,
))
else:
await emit_async(AgentFailedEvent(
task=self.task,
agent_id=self.agent_id,
error_message="Task did not complete successfully",
total_steps=self.n_steps,
))

if not self.injected_browser_context:
await self.browser_context.close()

Expand Down Expand Up @@ -1048,6 +1152,12 @@ async def _run_planner(self) -> Optional[str]:
if not self.planner_llm:
return None

# Emit planning started event
await emit_async(PlanningStartedEvent(
task=self.task,
agent_id=self.agent_id,
))

# Create planner message history using full message history
planner_messages = [
PlannerPrompt(self.action_descriptions).get_system_message(),
Expand Down Expand Up @@ -1080,15 +1190,29 @@ async def _run_planner(self) -> Optional[str]:
# if deepseek-reasoner, remove think tags
if self.planner_model_name == "deepseek-reasoner":
plan = self._remove_think_tags(plan)

# Parse and emit planning completed event
plan_steps = []
try:
plan_json = json.loads(plan)
logger.info(f"Planning Analysis:\n{json.dumps(plan_json, indent=4)}")
# Try to extract steps if plan is structured
if isinstance(plan_json, dict) and "steps" in plan_json:
plan_steps = plan_json["steps"]
elif isinstance(plan_json, list):
plan_steps = [str(s) for s in plan_json]
except json.JSONDecodeError:
logger.info(f"Planning Analysis:\n{plan}")
plan_steps = [plan[:200]] if plan else []
except Exception as e:
logger.debug(f"Error parsing planning analysis: {e}")
logger.info(f"Plan: {plan}")

await emit_async(PlanningCompletedEvent(
plan_steps=plan_steps,
agent_id=self.agent_id,
))

return plan

# endregion
Loading