diff --git a/assets/templates/browserbase/cua/README.md b/assets/templates/browserbase/cua/README.md index 4d668beab..6d797f3d2 100644 --- a/assets/templates/browserbase/cua/README.md +++ b/assets/templates/browserbase/cua/README.md @@ -66,6 +66,8 @@ npm install @browserbasehq/stagehand fastify |----------|---------|-------------| | `CUA_SERVER_PORT` | `3000` | Server port | | `CUA_SERVER_HOST` | `0.0.0.0` | Server host | +| `CUA_SESSION_CREATE_MAX_CONCURRENT` | `2` | Max concurrent session-create inits | +| `CUA_SESSION_CREATE_MAX_PENDING` | `200` | Max queued session-create requests before 503 | ## API Endpoints @@ -271,4 +273,3 @@ cua-server/ ├── tsconfig.json # TypeScript configuration └── README.md # This file ``` - diff --git a/assets/templates/browserbase/cua/server.ts b/assets/templates/browserbase/cua/server.ts index c70446796..e51bf4370 100644 --- a/assets/templates/browserbase/cua/server.ts +++ b/assets/templates/browserbase/cua/server.ts @@ -1,5 +1,5 @@ import Fastify, { FastifyInstance, FastifyRequest, FastifyReply } from "fastify"; -import { sessionManager } from "./sessionManager"; +import { SessionCreateError, sessionManager } from "./sessionManager"; import { executeAction } from "./actionExecutor"; import { captureBrowserState } from "./stateCapture"; import { @@ -46,6 +46,15 @@ export function createServer(): FastifyInstance { state, }; } catch (error) { + if (error instanceof SessionCreateError) { + reply.status(error.statusCode); + return { + error: error.message, + code: error.code, + retryable: error.retryable, + statusCode: error.statusCode, + }; + } const errorMessage = error instanceof Error ? error.message : String(error); reply.status(500); return { error: errorMessage, code: "SESSION_CREATE_FAILED" }; @@ -165,4 +174,3 @@ export function createServer(): FastifyInstance { return server; } - diff --git a/assets/templates/browserbase/cua/sessionManager.ts b/assets/templates/browserbase/cua/sessionManager.ts index 7cda22ff2..f4f5dd8c1 100644 --- a/assets/templates/browserbase/cua/sessionManager.ts +++ b/assets/templates/browserbase/cua/sessionManager.ts @@ -2,6 +2,9 @@ import { Stagehand } from "@browserbasehq/stagehand"; import type { Page } from "@browserbasehq/stagehand"; import { BrowserSession, SessionCreateRequest } from "./types"; +const DEFAULT_MAX_CONCURRENT_CREATES = 2; +const DEFAULT_MAX_PENDING_CREATES = 200; + /** * Generates a unique session ID */ @@ -9,6 +12,107 @@ function generateSessionId(): string { return `session_${Date.now()}_${Math.random().toString(36).substring(2, 11)}`; } +function parsePositiveInt(value: string | undefined, fallback: number): number { + if (!value) { + return fallback; + } + const parsed = Number.parseInt(value, 10); + return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback; +} + +function describeError(error: unknown): string { + if (error instanceof Error) { + return error.message || error.name; + } + if (typeof error === "string") { + return error; + } + return "unknown session creation error"; +} + +function inferStatusCode(error: unknown): number | undefined { + if (typeof error !== "object" || error === null) { + return undefined; + } + const obj = error as Record; + const statusCandidate = obj.status ?? obj.statusCode; + if (typeof statusCandidate === "number" && Number.isFinite(statusCandidate)) { + return statusCandidate; + } + const response = obj.response; + if (typeof response === "object" && response !== null) { + const responseStatus = (response as Record).status; + if (typeof responseStatus === "number" && Number.isFinite(responseStatus)) { + return responseStatus; + } + } + return undefined; +} + +export class SessionCreateError extends Error { + readonly code: string; + readonly statusCode: number; + readonly retryable: boolean; + + constructor( + message: string, + opts?: { code?: string; statusCode?: number; retryable?: boolean }, + ) { + super(message); + this.name = "SessionCreateError"; + this.code = opts?.code ?? "SESSION_CREATE_FAILED"; + this.statusCode = opts?.statusCode ?? 500; + this.retryable = opts?.retryable ?? true; + } +} + +function classifySessionCreateError(error: unknown): SessionCreateError { + const statusCode = inferStatusCode(error); + const message = describeError(error); + + if (statusCode === 429) { + return new SessionCreateError(message, { + code: "SESSION_RATE_LIMITED", + statusCode: 429, + retryable: true, + }); + } + if (statusCode === 401 || statusCode === 403) { + return new SessionCreateError(message, { + code: "SESSION_AUTH_FAILED", + statusCode, + retryable: false, + }); + } + if (statusCode === 502 || statusCode === 503 || statusCode === 504) { + return new SessionCreateError(message, { + code: "SESSION_PROVIDER_UNAVAILABLE", + statusCode, + retryable: true, + }); + } + if (typeof statusCode === "number" && statusCode >= 500) { + return new SessionCreateError(message, { + code: "SESSION_PROVIDER_ERROR", + statusCode, + retryable: true, + }); + } + if (typeof statusCode === "number" && statusCode >= 400) { + return new SessionCreateError(message, { + code: "SESSION_CREATE_INVALID_REQUEST", + statusCode, + retryable: false, + }); + } + + return new SessionCreateError(message, { + code: "SESSION_CREATE_FAILED", + statusCode: 503, + retryable: true, + }); +} + /** * BrowserSessionManager * @@ -17,6 +121,47 @@ function generateSessionId(): string { */ export class BrowserSessionManager { private sessions: Map = new Map(); + private inFlightCreates = 0; + private pendingCreateResolvers: Array<() => void> = []; + private readonly maxConcurrentCreates = parsePositiveInt( + process.env.CUA_SESSION_CREATE_MAX_CONCURRENT, + DEFAULT_MAX_CONCURRENT_CREATES, + ); + private readonly maxPendingCreates = parsePositiveInt( + process.env.CUA_SESSION_CREATE_MAX_PENDING, + DEFAULT_MAX_PENDING_CREATES, + ); + + private async acquireCreateSlot(): Promise { + if (this.inFlightCreates < this.maxConcurrentCreates) { + this.inFlightCreates += 1; + return; + } + + if (this.pendingCreateResolvers.length >= this.maxPendingCreates) { + throw new SessionCreateError( + `Session creation queue is full (pending=${this.pendingCreateResolvers.length})`, + { + code: "SESSION_CREATE_QUEUE_FULL", + statusCode: 503, + retryable: true, + }, + ); + } + + await new Promise((resolve) => { + this.pendingCreateResolvers.push(resolve); + }); + this.inFlightCreates += 1; + } + + private releaseCreateSlot(): void { + this.inFlightCreates = Math.max(0, this.inFlightCreates - 1); + const next = this.pendingCreateResolvers.shift(); + if (next) { + next(); + } + } /** * Create a new browser session @@ -25,63 +170,92 @@ export class BrowserSessionManager { const sessionId = generateSessionId(); const startTime = Date.now(); const envType = options?.env ?? "LOCAL"; - - console.log(`[Session] Creating ${sessionId} with env: ${envType}, proxies: ${options?.proxies ?? false}`); - - // TODO: Update to accept modelApiKey from client request (MODEL_API_KEY) instead of - // hardcoding OPENAI_API_KEY. This will allow using different model providers. - // See: SessionCreateRequest in types.ts, cua_mode.py session_config - const stagehand = new Stagehand({ - env: envType, - apiKey: options?.browserbaseApiKey, - projectId: options?.browserbaseProjectId, - modelApiKey: process.env.OPENAI_API_KEY, - verbose: 1, - disablePino: true, // Disable pino logging to avoid pino-pretty transport issues in SEA binaries - browserbaseSessionCreateParams: envType === "BROWSERBASE" - ? { - projectId: options?.browserbaseProjectId, - proxies: options?.proxies ?? false, - browserSettings: { - viewport: options?.viewport - ? { - width: options.viewport.width, - height: options.viewport.height, - } - : { width: 1024, height: 768 }, - }, - } - : undefined, - // Only provide localBrowserLaunchOptions for LOCAL mode to avoid Chrome validation in BROWSERBASE mode - localBrowserLaunchOptions: envType === "LOCAL" - ? { - viewport: options?.viewport - ? { - width: options.viewport.width, - height: options.viewport.height, - } - : { width: 1024, height: 768 }, - } - : undefined, - }); + await this.acquireCreateSlot(); + console.log( + `[Session] Creating ${sessionId} with env: ${envType}, proxies: ${options?.proxies ?? false}, in_flight_creates: ${this.inFlightCreates}, queued_creates: ${this.pendingCreateResolvers.length}`, + ); + + let stagehand: Stagehand | null = null; + try { + // TODO: Update to accept modelApiKey from client request (MODEL_API_KEY) instead of + // hardcoding OPENAI_API_KEY. This will allow using different model providers. + // See: SessionCreateRequest in types.ts, cua_mode.py session_config + // Stagehand runtime accepts modelApiKey, but some published typings omit it. + // Keep runtime behavior while avoiding type drift failures. + stagehand = new Stagehand({ + env: envType, + apiKey: options?.browserbaseApiKey, + projectId: options?.browserbaseProjectId, + modelApiKey: process.env.OPENAI_API_KEY, + verbose: 1, + disablePino: true, // Disable pino logging to avoid pino-pretty transport issues in SEA binaries + browserbaseSessionCreateParams: + envType === "BROWSERBASE" + ? { + projectId: options?.browserbaseProjectId, + proxies: options?.proxies ?? false, + browserSettings: { + viewport: options?.viewport + ? { + width: options.viewport.width, + height: options.viewport.height, + } + : { width: 1024, height: 768 }, + }, + } + : undefined, + // Only provide localBrowserLaunchOptions for LOCAL mode to avoid Chrome validation in BROWSERBASE mode + localBrowserLaunchOptions: + envType === "LOCAL" + ? { + viewport: options?.viewport + ? { + width: options.viewport.width, + height: options.viewport.height, + } + : { width: 1024, height: 768 }, + } + : undefined, + } as any); - await stagehand.init(); + await stagehand.init(); - const page = stagehand.context.pages()[0]; + const page = stagehand.context.pages()[0]; - const session: BrowserSession = { - id: sessionId, - stagehand, - page, - createdAt: new Date(), - }; + const session: BrowserSession = { + id: sessionId, + stagehand, + page, + createdAt: new Date(), + }; - this.sessions.set(sessionId, session); - - const duration = Date.now() - startTime; - console.log(`[Session] Created ${sessionId} in ${duration}ms (env: ${envType}, active sessions: ${this.sessions.size})`); + this.sessions.set(sessionId, session); - return session; + const duration = Date.now() - startTime; + console.log( + `[Session] Created ${sessionId} in ${duration}ms (env: ${envType}, active sessions: ${this.sessions.size})`, + ); + + return session; + } catch (error) { + const classified = + error instanceof SessionCreateError + ? error + : classifySessionCreateError(error); + console.error( + `[Session] Failed to create ${sessionId}: ${classified.code} (${classified.statusCode}) - ${classified.message}`, + ); + if (stagehand) { + try { + await stagehand.close(); + } catch { + // no-op: best effort cleanup + } + } + throw classified; + } finally { + this.releaseCreateSlot(); + } } /** diff --git a/assets/templates/browserbase/cua/types.ts b/assets/templates/browserbase/cua/types.ts index ca284ad69..136e8c00b 100644 --- a/assets/templates/browserbase/cua/types.ts +++ b/assets/templates/browserbase/cua/types.ts @@ -113,4 +113,6 @@ export interface BrowserSession { export interface ErrorResponse { error: string; code: string; + retryable?: boolean; + statusCode?: number; } diff --git a/environments/browser_dom_example/browser_dom_example.py b/environments/browser_dom_example/browser_dom_example.py index b15b0afd4..23d1e98b9 100644 --- a/environments/browser_dom_example/browser_dom_example.py +++ b/environments/browser_dom_example/browser_dom_example.py @@ -96,7 +96,7 @@ async def judge_answer( def load_environment( - project_id: str, + project_id: str | None = None, max_turns: int = 10, judge_model: str = "gpt-4o-mini", system_prompt: str = DOM_SYSTEM_PROMPT, @@ -118,7 +118,7 @@ def load_environment( Args: max_turns: Maximum conversation turns (default: 10) judge_model: Model for judging task completion - project_id: Browserbase project ID (required) + project_id: Browserbase project ID (or set BROWSERBASE_PROJECT_ID env var) browserbase_api_key_var: Env var name for Browserbase API key stagehand_model: Model for Stagehand operations (default: openai/gpt-4o-mini) model_api_key_var: Env var name for model API key @@ -131,22 +131,6 @@ def load_environment( Example: >>> env = load_environment() """ - import os - - # Check required env vars upfront - missing = [] - if not os.getenv(browserbase_api_key_var): - missing.append(browserbase_api_key_var) - if not os.getenv(model_api_key_var): - missing.append(model_api_key_var) - - if missing: - raise ValueError( - f"Missing required environment variables for browser-dom-example:\n" - f" {', '.join(missing)}\n\n" - f"Set these in your environment or .env file before running." - ) - # Create inline dataset dataset = create_example_dataset() diff --git a/pyproject.toml b/pyproject.toml index cc23cc1d7..f20a4c828 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ dependencies = [ "gepa", "pyzmq>=27.1.0", "msgpack>=1.1.2", + "wandb>=0.25.0", ] [dependency-groups] diff --git a/tests/test_browser_env.py b/tests/test_browser_env.py index 2a7b738b3..595031cd9 100644 --- a/tests/test_browser_env.py +++ b/tests/test_browser_env.py @@ -6,7 +6,8 @@ import os import pytest -from unittest.mock import MagicMock, patch +import tenacity as tc +from unittest.mock import AsyncMock, MagicMock, patch from datasets import Dataset # Skip all tests in this module if browser dependencies are not installed @@ -97,6 +98,35 @@ def test_use_sandbox_false_creates_local_execution_mode(self): assert isinstance(env._mode_impl, CUAMode) assert env._mode_impl._execution_mode == "local" + def test_forwards_session_create_retry_config(self): + """Test BrowserEnv forwards dedicated session creation retry settings.""" + from verifiers.envs.integrations.browser_env.browser_env import BrowserEnv + from verifiers.envs.integrations.browser_env.modes.cua_mode import CUAMode + + with patch.dict(os.environ, {}, clear=True): + with patch( + "verifiers.envs.integrations.browser_env.modes.cua_mode.CUAMode.verify_server_connection" + ): + env = BrowserEnv( + mode="cua", + use_sandbox=False, + env="LOCAL", + session_create_max_retries=7, + session_create_base_delay=1.0, + session_create_backoff_factor=3.0, + session_create_max_backoff_seconds=45.0, + session_create_jitter=0.01, + dataset=Dataset.from_dict( + {"question": ["test"], "answer": ["test"]} + ), + ) + assert isinstance(env._mode_impl, CUAMode) + assert env._mode_impl.session_create_max_retries == 7 + assert env._mode_impl.session_create_base_delay == 1.0 + assert env._mode_impl.session_create_backoff_factor == 3.0 + assert env._mode_impl.session_create_max_backoff_seconds == 45.0 + assert env._mode_impl.session_create_jitter == 0.01 + class TestCUASandboxModeBackwardsCompat: """Tests for backwards compatibility with CUASandboxMode.""" @@ -123,6 +153,178 @@ def test_deprecated_cua_sandbox_mode_import(self): assert mode._execution_mode == "sandbox" +class TestBrowserEnvCUAEnvResponse: + """Tests for BrowserEnv CUA env_response post-processing.""" + + @staticmethod + def _part_type(part): + if isinstance(part, dict): + return part.get("type") + return getattr(part, "type", None) + + @classmethod + def _image_urls(cls, content): + urls = [] + if not isinstance(content, list): + return urls + + for part in content: + if cls._part_type(part) != "image_url": + continue + if isinstance(part, dict): + image_url = part.get("image_url", {}) + urls.append(image_url.get("url")) + else: + image_source = getattr(part, "image_url", None) + urls.append(getattr(image_source, "url", None)) + return urls + + @staticmethod + def _part_field(part, key): + if isinstance(part, dict): + return part.get(key) + return getattr(part, key, None) + + @pytest.mark.asyncio + async def test_env_response_moves_tool_images_only_and_preserves_non_image_parts( + self, + ): + """Test CUA relocation only touches tool messages and keeps unknown non-image parts.""" + from verifiers.envs.integrations.browser_env.browser_env import BrowserEnv + from verifiers.envs.stateful_tool_env import StatefulToolEnv + from verifiers.types import AssistantMessage, ToolCall, ToolMessage, UserMessage + + with patch.dict(os.environ, {}, clear=True): + with patch( + "verifiers.envs.integrations.browser_env.modes.cua_mode.CUAMode.verify_server_connection" + ): + env = BrowserEnv( + mode="cua", + use_sandbox=False, + env="LOCAL", + dataset=Dataset.from_dict( + {"question": ["test"], "answer": ["test"]} + ), + ) + + assistant_msg = AssistantMessage( + role="assistant", + content="", + tool_calls=[ + ToolCall(id="tc_1", name="screenshot", arguments=" "), + ], + ) + + tool_message = ToolMessage( + role="tool", + tool_call_id="tc_1", + content=[ + {"type": "text", "text": "Status: Success"}, + {"type": "custom_meta", "foo": "bar"}, + { + "type": "image_url", + "image_url": {"url": "data:image/png;base64,abc"}, + }, + ], + ) + user_message = UserMessage( + role="user", + content=[ + {"type": "text", "text": "keep me"}, + { + "type": "image_url", + "image_url": {"url": "data:image/png;base64,should_not_move"}, + }, + ], + ) + + with patch.object( + StatefulToolEnv, + "env_response", + new=AsyncMock(return_value=[tool_message, user_message]), + ): + result = await env.env_response([assistant_msg], {}) + + # Empty args are normalized for zero-arg tool calls. + assert assistant_msg.tool_calls is not None + assert assistant_msg.tool_calls[0].arguments == "{}" + + assert len(result) == 3 + + # Tool message keeps all non-image parts as structured content. + tool_result = result[0] + assert isinstance(tool_result.content, list) + assert [self._part_type(p) for p in tool_result.content] == [ + "text", + "custom_meta", + ] + assert self._part_field(tool_result.content[1], "foo") == "bar" + + # Non-tool message is untouched. + passthrough_user = result[1] + assert isinstance(passthrough_user.content, list) + assert "data:image/png;base64,should_not_move" in self._image_urls( + passthrough_user.content + ) + + # Relocated screenshots are appended as a trailing user message. + relocated_user = result[2] + assert relocated_user.role == "user" + assert isinstance(relocated_user.content, list) + assert self._image_urls(relocated_user.content) == ["data:image/png;base64,abc"] + + @pytest.mark.asyncio + async def test_env_response_without_images_does_not_append_user_message(self): + """Test no relocation user message is appended when tool output has no images.""" + from verifiers.envs.integrations.browser_env.browser_env import BrowserEnv + from verifiers.envs.stateful_tool_env import StatefulToolEnv + from verifiers.types import AssistantMessage, ToolCall, ToolMessage + + with patch.dict(os.environ, {}, clear=True): + with patch( + "verifiers.envs.integrations.browser_env.modes.cua_mode.CUAMode.verify_server_connection" + ): + env = BrowserEnv( + mode="cua", + use_sandbox=False, + env="LOCAL", + dataset=Dataset.from_dict( + {"question": ["test"], "answer": ["test"]} + ), + ) + + assistant_msg = AssistantMessage( + role="assistant", + content="", + tool_calls=[ + ToolCall(id="tc_1", name="wait", arguments="{}"), + ], + ) + + tool_message = ToolMessage( + role="tool", + tool_call_id="tc_1", + content=[ + {"type": "text", "text": "Status: Success"}, + {"type": "custom_meta", "foo": "bar"}, + ], + ) + + with patch.object( + StatefulToolEnv, + "env_response", + new=AsyncMock(return_value=[tool_message]), + ): + result = await env.env_response([assistant_msg], {}) + + assert len(result) == 1 + assert isinstance(result[0].content, list) + assert [self._part_type(p) for p in result[0].content] == [ + "text", + "custom_meta", + ] + + class TestCUAModeScreenshotFilter: """Tests for screenshot filtering in CUAMode.""" @@ -381,6 +583,118 @@ def test_format_response_no_screenshot(self): assert formatted[0]["type"] == "text" +class TestCUAModeSessionCreateRetries: + """Tests for session creation retry behavior.""" + + def test_retryable_status_code_detection(self): + """Test transient status code detection for session creation.""" + from verifiers.envs.integrations.browser_env.modes.cua_mode import CUAMode + + mode = CUAMode(execution_mode="local", save_screenshots=False) + assert mode._is_retryable_status_code(429) + assert mode._is_retryable_status_code(503) + assert not mode._is_retryable_status_code(400) + assert not mode._is_retryable_status_code(401) + + def test_session_create_exception_retry_predicate(self): + """Test session creation retry predicate respects error retryability.""" + from verifiers.envs.integrations.browser_env.modes.cua_mode import ( + CUAMode, + CUASessionCreateError, + ) + + retryable_error = CUASessionCreateError("retry me", retryable=True) + non_retryable_error = CUASessionCreateError("do not retry", retryable=False) + assert CUAMode._should_retry_session_create_exception(retryable_error) + assert not CUAMode._should_retry_session_create_exception(non_retryable_error) + + def test_extract_error_message_handles_json_payload(self): + """Test extraction of human-readable error messages from JSON response bodies.""" + from verifiers.envs.integrations.browser_env.modes.cua_mode import CUAMode + + json_body = '{"error":"rate limited","code":"SESSION_RATE_LIMITED"}' + assert CUAMode._extract_error_message(json_body) == "rate limited" + assert CUAMode._extract_error_message("plain error") == "plain error" + + +class TestCUAModeSandboxSetupCleanup: + """Tests for CUA sandbox setup failure cleanup and error normalization.""" + + @staticmethod + def _single_attempt_retrying() -> tc.AsyncRetrying: + return tc.AsyncRetrying(stop=tc.stop_after_attempt(1), reraise=True) + + @classmethod + def _build_test_mode(cls): + from verifiers.envs.integrations.browser_env.modes.cua_mode import CUAMode + + mode = CUAMode(execution_mode="local", save_screenshots=False) + mode._execution_mode = "sandbox" + mode.use_prebuilt_image = True + mode.retrying = cls._single_attempt_retrying() + mode.session_create_retrying = cls._single_attempt_retrying() + mode.logger = MagicMock() + return mode + + @pytest.mark.asyncio + async def test_setup_state_cleans_sandbox_and_wraps_runtime_errors(self): + """Test setup failures clean created sandbox and wrap non-vf errors.""" + import verifiers as vf + + mode = self._build_test_mode() + mode._create_sandbox = AsyncMock(return_value="sandbox-1") + mode._wait_for_sandbox_ready = AsyncMock(side_effect=RuntimeError("boom")) + mode._delete_sandbox = AsyncMock() + + state = {} + with pytest.raises(vf.SandboxError) as exc_info: + await mode.setup_state(state) + + assert isinstance(exc_info.value.__cause__, RuntimeError) + mode._delete_sandbox.assert_awaited_once_with("sandbox-1") + assert "cua_sandbox_id" not in state + + @pytest.mark.asyncio + async def test_setup_state_cleans_sandbox_on_session_create_error(self): + """Test sandbox is cleaned when session creation fails after setup.""" + from verifiers.envs.integrations.browser_env.modes.cua_mode import ( + CUASessionCreateError, + ) + + mode = self._build_test_mode() + mode._create_sandbox = AsyncMock(return_value="sandbox-2") + mode._wait_for_sandbox_ready = AsyncMock() + mode._wait_for_server = AsyncMock() + mode._create_session_curl = AsyncMock( + side_effect=CUASessionCreateError("rate limited", retryable=True) + ) + mode._delete_sandbox = AsyncMock() + + state = {} + with pytest.raises(CUASessionCreateError): + await mode.setup_state(state) + + mode._delete_sandbox.assert_awaited_once_with("sandbox-2") + assert "cua_sandbox_id" not in state + + @pytest.mark.asyncio + async def test_setup_state_keeps_state_when_cleanup_delete_fails(self): + """Test setup preserves sandbox id for downstream cleanup when delete fails.""" + import verifiers as vf + + mode = self._build_test_mode() + mode._create_sandbox = AsyncMock(return_value="sandbox-3") + mode._wait_for_sandbox_ready = AsyncMock(side_effect=RuntimeError("not ready")) + mode._delete_sandbox = AsyncMock(side_effect=RuntimeError("delete failed")) + + state = {} + with pytest.raises(vf.SandboxError): + await mode.setup_state(state) + + mode._delete_sandbox.assert_awaited_once_with("sandbox-3") + assert state.get("cua_sandbox_id") == "sandbox-3" + + # ============================================================================ # DOMMode Tests # ============================================================================ diff --git a/verifiers/envs/integrations/browser_env/browser_env.py b/verifiers/envs/integrations/browser_env/browser_env.py index 4fc93ddf1..0fcbdba50 100644 --- a/verifiers/envs/integrations/browser_env/browser_env.py +++ b/verifiers/envs/integrations/browser_env/browser_env.py @@ -2,7 +2,7 @@ import logging import os -from typing import Any, Literal +from typing import Any, Literal, cast import verifiers as vf @@ -61,6 +61,11 @@ def __init__( env: Literal["LOCAL", "BROWSERBASE"] = "BROWSERBASE", viewport_width: int = 1024, viewport_height: int = 768, + session_create_max_retries: int | None = None, + session_create_base_delay: float | None = None, + session_create_backoff_factor: float | None = None, + session_create_max_backoff_seconds: float | None = None, + session_create_jitter: float | None = None, save_screenshots: bool = True, keep_recent_screenshots: int | None = 2, proxies: bool = False, @@ -97,6 +102,11 @@ def __init__( env: Browser execution environment - "LOCAL" or "BROWSERBASE" viewport_width: Browser viewport width (default: 1024) viewport_height: Browser viewport height (default: 768) + session_create_max_retries: Retry attempts for browser session creation only + session_create_base_delay: Base delay for session creation retry backoff + session_create_backoff_factor: Exponential factor for session creation retries + session_create_max_backoff_seconds: Max backoff for session creation retries + session_create_jitter: Jitter for session creation retries save_screenshots: Save screenshots to disk (default: True) keep_recent_screenshots: Number of recent screenshots to keep in context (default: 2) proxies: Enable Browserbase proxies (default: False) @@ -139,6 +149,11 @@ def __init__( browserbase_project_id=project_id, viewport_width=viewport_width, viewport_height=viewport_height, + session_create_max_retries=session_create_max_retries, + session_create_base_delay=session_create_base_delay, + session_create_backoff_factor=session_create_backoff_factor, + session_create_max_backoff_seconds=session_create_max_backoff_seconds, + session_create_jitter=session_create_jitter, save_screenshots=save_screenshots, keep_recent_screenshots=keep_recent_screenshots, proxies=proxies, @@ -184,6 +199,75 @@ def update_tool_args( tool_name, tool_args, messages, state, **kwargs ) + @staticmethod + def _content_part_type(part: Any) -> str | None: + if isinstance(part, dict): + return part.get("type") + return getattr(part, "type", None) + + @staticmethod + def _content_part_as_dict(part: Any) -> dict[str, Any] | None: + if isinstance(part, dict): + return part + if hasattr(part, "model_dump"): + dumped = part.model_dump() + if isinstance(dumped, dict): + return dumped + return None + + async def env_response( + self, messages: vf.Messages, state: vf.State, **kwargs + ) -> vf.Messages: + # Some providers send empty tool args for zero-arg tools. + if self.mode == "cua": + last_msg = messages[-1] + tool_calls = getattr(last_msg, "tool_calls", None) + if isinstance(tool_calls, list): + for tool_call in tool_calls: + arguments = getattr(tool_call, "arguments", "") + if isinstance(arguments, str) and not arguments.strip(): + tool_call.arguments = "{}" + + result = await super().env_response(messages, state, **kwargs) + + if self.mode != "cua": + return result + + # OpenAI/Anthropic adapters flatten tool message multimodal content to text. + # Keep non-image parts in tool messages and re-attach screenshots as a user message. + screenshots: list[dict[str, Any]] = [] + for msg in result: + if not isinstance(msg, vf.ToolMessage): + continue + + content = getattr(msg, "content", None) + if not isinstance(content, list): + continue + + remaining_parts: list[Any] = [] + has_image = False + for part in content: + part_type = self._content_part_type(part) + if part_type == "image_url": + has_image = True + part_dict = self._content_part_as_dict(part) + if part_dict is not None: + screenshots.append(part_dict) + continue + + remaining_parts.append(part) + + if has_image: + msg.content = remaining_parts + + if screenshots: + return [ + *result, + vf.UserMessage(role="user", content=cast(Any, screenshots)), + ] + + return result + async def get_prompt_messages(self, state: vf.State) -> vf.Messages: """Get prompt messages, filtering screenshots in CUA mode.""" messages = await super().get_prompt_messages(state) diff --git a/verifiers/envs/integrations/browser_env/modes/cua_mode.py b/verifiers/envs/integrations/browser_env/modes/cua_mode.py index b5240b99d..2ce23fdd9 100644 --- a/verifiers/envs/integrations/browser_env/modes/cua_mode.py +++ b/verifiers/envs/integrations/browser_env/modes/cua_mode.py @@ -37,6 +37,24 @@ APIClient = None # type: ignore[misc, assignment] +_RETRYABLE_STATUS_CODES = frozenset({408, 425, 429, 500, 502, 503, 504}) + + +class CUASessionCreateError(vf.InfraError): + """Session creation failed in a way that may or may not be retryable.""" + + def __init__( + self, + message: str, + *, + status_code: int | None = None, + retryable: bool = True, + ): + super().__init__(message) + self.status_code = status_code + self.retryable = retryable + + class CUAMode: """ CUA-based browser mode supporting both local HTTP and sandbox execution. @@ -80,6 +98,11 @@ def __init__( backoff_factor: float = 2.0, max_backoff_seconds: float = 30.0, jitter: float = 1e-3, + session_create_max_retries: int | None = None, + session_create_base_delay: float | None = None, + session_create_backoff_factor: float | None = None, + session_create_max_backoff_seconds: float | None = None, + session_create_jitter: float | None = None, screenshot_dir: str | None = None, save_screenshots: bool = True, keep_recent_screenshots: int | None = 2, @@ -149,7 +172,31 @@ def __init__( self.backoff_factor = backoff_factor self.max_backoff_seconds = max_backoff_seconds self.jitter = jitter + self.session_create_max_retries = ( + max_retries + if session_create_max_retries is None + else session_create_max_retries + ) + self.session_create_base_delay = ( + base_delay + if session_create_base_delay is None + else session_create_base_delay + ) + self.session_create_backoff_factor = ( + backoff_factor + if session_create_backoff_factor is None + else session_create_backoff_factor + ) + self.session_create_max_backoff_seconds = ( + max_backoff_seconds + if session_create_max_backoff_seconds is None + else session_create_max_backoff_seconds + ) + self.session_create_jitter = ( + jitter if session_create_jitter is None else session_create_jitter + ) self.retrying: AsyncRetrying | None = None + self.session_create_retrying: AsyncRetrying | None = None # Local mode specific self.server_url = server_url.rstrip("/") @@ -230,6 +277,18 @@ def register_tools(self, env) -> None: before_sleep=tc.before_sleep_log(self.logger, logging.ERROR), reraise=True, ) + self.session_create_retrying = tc.AsyncRetrying( + retry=tc.retry_if_exception(self._should_retry_session_create_exception), + stop=tc.stop_after_attempt(max(1, self.session_create_max_retries)), + wait=tc.wait_exponential_jitter( + initial=self.session_create_base_delay, + exp_base=self.session_create_backoff_factor, + max=self.session_create_max_backoff_seconds, + jitter=self.session_create_jitter, + ), + before_sleep=tc.before_sleep_log(self.logger, logging.ERROR), + reraise=True, + ) # Hide internal args from tool schema _skip = ["session_id", "sandbox_id", "tool_call_id"] @@ -248,6 +307,43 @@ def register_tools(self, env) -> None: if self._execution_mode == "local": self.verify_server_connection() + @staticmethod + def _is_retryable_status_code(status_code: int | None) -> bool: + """Return True for transient HTTP statuses that should be retried.""" + return status_code in _RETRYABLE_STATUS_CODES + + @staticmethod + def _extract_error_message(raw_body: str) -> str: + """Extract a concise error message from a JSON or plain-text body.""" + try: + payload = json.loads(raw_body) + if isinstance(payload, dict): + error = payload.get("error") + message = payload.get("message") + if isinstance(error, str) and error.strip(): + return error.strip() + if isinstance(message, str) and message.strip(): + return message.strip() + except json.JSONDecodeError: + pass + body = raw_body.strip() + return body if body else "unknown error" + + @staticmethod + def _should_retry_session_create_exception(exc: BaseException) -> bool: + """Retry transient create-session failures only.""" + if isinstance(exc, CUASessionCreateError): + return exc.retryable + return isinstance( + exc, + ( + aiohttp.ClientError, + asyncio.TimeoutError, + TimeoutError, + OSError, + ), + ) + # ==================== Server Health Check (Local Mode) ==================== async def _check_server_health(self) -> None: @@ -311,14 +407,43 @@ async def _get_http_client(self) -> aiohttp.ClientSession: async def _create_session_http(self) -> dict: """Create a new browser session via the CUA server (HTTP).""" client = await self._get_http_client() - async with client.post( - f"{self.server_url}/sessions", - json=self.session_config, - ) as resp: - if resp.status != 200: - error_text = await resp.text() - raise RuntimeError(f"Failed to create browser session: {error_text}") - return await resp.json() + try: + async with client.post( + f"{self.server_url}/sessions", + json=self.session_config, + ) as resp: + raw_body = await resp.text() + if resp.status != 200: + message = self._extract_error_message(raw_body) + raise CUASessionCreateError( + f"Failed to create browser session (HTTP {resp.status}): {message}", + status_code=resp.status, + retryable=self._is_retryable_status_code(resp.status), + ) + + try: + payload = json.loads(raw_body) + except json.JSONDecodeError as e: + raise CUASessionCreateError( + f"Failed to parse browser session response: {raw_body[:500]}", + status_code=resp.status, + retryable=False, + ) from e + + if not isinstance(payload, dict): + raise CUASessionCreateError( + "Browser session response was not a JSON object", + status_code=resp.status, + retryable=False, + ) + return payload + except CUASessionCreateError: + raise + except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError, OSError) as e: + raise CUASessionCreateError( + f"Failed to create browser session due to network error: {e}", + retryable=True, + ) from e async def _destroy_session_http(self, session_id: str) -> None: """Destroy a browser session via the CUA server (HTTP).""" @@ -618,14 +743,63 @@ async def _create_session_curl(self, sandbox_id: str) -> dict: ) body, http_code = self._parse_curl_response(stdout) + status_code = int(http_code) if http_code.isdigit() else None + + if status_code != 200: + retryable_from_payload: bool | None = None + error_message = self._extract_error_message(body) + try: + payload_obj = json.loads(body) + if isinstance(payload_obj, dict): + if isinstance(payload_obj.get("error"), str): + error_message = payload_obj["error"] + if isinstance(payload_obj.get("retryable"), bool): + retryable_from_payload = payload_obj["retryable"] + except json.JSONDecodeError: + pass + raise CUASessionCreateError( + f"Failed to create browser session in sandbox (HTTP {http_code}): {error_message}", + status_code=status_code, + retryable=( + retryable_from_payload + if retryable_from_payload is not None + else ( + status_code is None + or self._is_retryable_status_code(status_code) + ) + ), + ) try: - return json.loads(body) + payload_obj = json.loads(body) except json.JSONDecodeError as e: - raise RuntimeError( + raise CUASessionCreateError( f"Failed to parse session creation response (HTTP {http_code}): {body[:500]}" ) from e + if not isinstance(payload_obj, dict): + raise CUASessionCreateError( + "Session creation response is not a JSON object", + retryable=False, + ) + + if "sessionId" not in payload_obj: + retryable = status_code is None or self._is_retryable_status_code( + status_code + ) + if isinstance(payload_obj.get("retryable"), bool): + retryable = payload_obj["retryable"] + error_message = "sessionId missing from response" + if isinstance(payload_obj.get("error"), str): + error_message = payload_obj["error"] + raise CUASessionCreateError( + f"Failed to create browser session in sandbox: {error_message}", + status_code=status_code, + retryable=retryable, + ) + + return payload_obj + async def _destroy_session_curl(self, session_id: str, sandbox_id: str) -> None: """Destroy a browser session via curl inside the sandbox.""" try: @@ -769,12 +943,22 @@ def filter_screenshots_in_messages(self, messages: list) -> list: if self.keep_recent_screenshots is None: return messages + def _get_item_type(item): + if hasattr(item, "get"): + return item.get("type") + return getattr(item, "type", None) + + def _get_message_content(msg): + if hasattr(msg, "get"): + return msg.get("content") + return getattr(msg, "content", None) + screenshot_positions: list[tuple[int, int]] = [] for msg_idx, msg in enumerate(messages): - content = msg.get("content") + content = _get_message_content(msg) if isinstance(content, list): for content_idx, item in enumerate(content): - if isinstance(item, dict) and item.get("type") == "image_url": + if _get_item_type(item) == "image_url": screenshot_positions.append((msg_idx, content_idx)) if len(screenshot_positions) <= self.keep_recent_screenshots: @@ -788,7 +972,7 @@ def filter_screenshots_in_messages(self, messages: list) -> list: filtered_messages = copy.deepcopy(messages) for msg_idx, content_idx in positions_to_replace: - content_list = filtered_messages[msg_idx]["content"] + content_list = _get_message_content(filtered_messages[msg_idx]) if isinstance(content_list, list) and content_idx < len(content_list): content_list[content_idx] = { "type": "text", @@ -799,16 +983,57 @@ def filter_screenshots_in_messages(self, messages: list) -> list: # ==================== Lifecycle Methods ==================== + async def _cleanup_failed_sandbox_setup( + self, + sandbox_id: str | None, + session_id: str | None, + state: vf.State, + ) -> None: + """Best-effort cleanup for partially initialized sandbox-mode setup.""" + if session_id and sandbox_id: + try: + async for attempt in self.retrying: # type: ignore[union-attr] + with attempt: + await self._destroy_session_curl(session_id, sandbox_id) + with self._sessions_lock: + self.active_sessions.discard(session_id) + state.pop("session_id", None) + except Exception as e: + if self.logger: + self.logger.warning( + f"Failed to destroy session {session_id} during setup cleanup: {e}" + ) + + if sandbox_id: + sandbox_deleted = False + try: + async for attempt in self.retrying: # type: ignore[union-attr] + with attempt: + await self._delete_sandbox(sandbox_id) + sandbox_deleted = True + except Exception as e: + if self.logger: + self.logger.warning( + f"Failed to delete sandbox {sandbox_id} during setup cleanup: {e}" + ) + if sandbox_deleted: + state.pop("cua_sandbox_id", None) + async def setup_state(self, state: vf.State, **kwargs: Any) -> vf.State: """Create a browser session (and sandbox if in sandbox mode).""" + session_retrying = self.session_create_retrying or self.retrying + if self._execution_mode == "local": # Local mode: create session via HTTP - async for attempt in self.retrying: # type: ignore[union-attr] + async for attempt in session_retrying: # type: ignore[union-attr] with attempt: result = await self._create_session_http() session_id = result.get("sessionId") if not session_id: - raise RuntimeError("Failed to get session ID from server response") + raise CUASessionCreateError( + "Failed to get session ID from local CUA server response", + retryable=False, + ) with self._sessions_lock: self.active_sessions.add(session_id) @@ -817,44 +1042,69 @@ async def setup_state(self, state: vf.State, **kwargs: Any) -> vf.State: state["browser_state"] = result.get("state", {}) else: # Sandbox mode: create sandbox, set up server, create session - if self.use_prebuilt_image: - if self.logger: - self.logger.debug(f"Using prebuilt image: {self.prebuilt_image}") + sandbox_id: str | None = None + session_id: str | None = None + try: + if self.use_prebuilt_image: + if self.logger: + self.logger.debug( + f"Using prebuilt image: {self.prebuilt_image}" + ) - async for attempt in self.retrying: # type: ignore[union-attr] - with attempt: - sandbox_id = await self._create_sandbox() - await self._wait_for_sandbox_ready(sandbox_id) - state["cua_sandbox_id"] = sandbox_id - await self._wait_for_server(sandbox_id) - else: - if self.use_binary: - await self._ensure_binary_exists() + async for attempt in self.retrying: # type: ignore[union-attr] + with attempt: + sandbox_id = await self._create_sandbox() + if sandbox_id is None: + raise vf.SandboxError( + "Failed to create CUA sandbox: no sandbox ID returned" + ) + state["cua_sandbox_id"] = sandbox_id + await self._wait_for_sandbox_ready(sandbox_id) + await self._wait_for_server(sandbox_id) + else: + if self.use_binary: + await self._ensure_binary_exists() - async for attempt in self.retrying: # type: ignore[union-attr] + async for attempt in self.retrying: # type: ignore[union-attr] + with attempt: + sandbox_id = await self._create_sandbox() + if sandbox_id is None: + raise vf.SandboxError( + "Failed to create CUA sandbox: no sandbox ID returned" + ) + state["cua_sandbox_id"] = sandbox_id + await self._wait_for_sandbox_ready(sandbox_id) + await self._upload_server_files(sandbox_id) + await self._start_server(sandbox_id) + await self._wait_for_server(sandbox_id) + + if sandbox_id is None: + raise vf.SandboxError( + "Failed to create CUA sandbox: no sandbox ID returned" + ) + async for attempt in session_retrying: # type: ignore[union-attr] with attempt: - sandbox_id = await self._create_sandbox() - await self._wait_for_sandbox_ready(sandbox_id) - state["cua_sandbox_id"] = sandbox_id - await self._upload_server_files(sandbox_id) - await self._start_server(sandbox_id) - await self._wait_for_server(sandbox_id) - - async for attempt in self.retrying: # type: ignore[union-attr] - with attempt: - result = await self._create_session_curl(sandbox_id) - session_id = result.get("sessionId") - if not session_id: - raise RuntimeError( - f"Failed to get session ID from server response. " - f"Response keys: {list(result.keys())}, Response: {str(result)[:500]}" - ) + result = await self._create_session_curl(sandbox_id) + session_id = result.get("sessionId") + if not session_id: + raise CUASessionCreateError( + "Failed to get session ID from sandbox CUA server response " + f"(keys: {list(result.keys())}, response: {str(result)[:500]})", + retryable=False, + ) - with self._sessions_lock: - self.active_sessions.add(session_id) + with self._sessions_lock: + self.active_sessions.add(session_id) - state["session_id"] = session_id - state["browser_state"] = result.get("state", {}) + state["session_id"] = session_id + state["browser_state"] = result.get("state", {}) + except Exception as e: + await self._cleanup_failed_sandbox_setup(sandbox_id, session_id, state) + if isinstance(e, vf.Error): + raise + raise vf.SandboxError( + f"Failed to set up CUA sandbox session: {e}" + ) from e return state