Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
coverage:
ignore:
- "pydoll/browser/interfaces.py"
status:
project:
default:
Expand Down
83 changes: 82 additions & 1 deletion pydoll/browser/chromium/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import concurrent.futures
import json
import os
import shutil
Expand All @@ -8,7 +9,7 @@
from functools import partial
from random import randint
from tempfile import TemporaryDirectory
from typing import Any, Awaitable, Callable, Optional, overload
from typing import Any, Awaitable, Callable, Coroutine, Optional, TypeVar, overload

from pydoll.browser.interfaces import BrowserOptionsManager
from pydoll.browser.managers import (
Expand All @@ -32,6 +33,7 @@
InvalidWebSocketAddress,
MissingTargetOrWebSocket,
NoValidTabFound,
RunInParallelError,
)
from pydoll.protocol.base import Command, Response, T_CommandParams, T_CommandResponse
from pydoll.protocol.browser.methods import (
Expand All @@ -58,6 +60,8 @@
)
from pydoll.protocol.target.types import TargetInfo

T = TypeVar("T")


class Browser(ABC): # noqa: PLR0904
"""
Expand Down Expand Up @@ -93,6 +97,8 @@ def __init__(
self._connection_handler = ConnectionHandler(self._connection_port)
self._backup_preferences_dir = ''
self._tabs_opened: dict[str, Tab] = {}
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._semaphore: Optional[asyncio.Semaphore] = None

async def __aenter__(self) -> 'Browser':
"""Async context manager entry."""
Expand Down Expand Up @@ -144,6 +150,7 @@ async def start(self, headless: bool = False) -> Tab:
Raises:
FailedToStartBrowser: If the browser fails to start or connect.
"""
self._loop = asyncio.get_running_loop()
if headless:
warnings.warn(
"The 'headless' parameter is deprecated and will be removed in a future version. "
Expand Down Expand Up @@ -422,6 +429,80 @@ async def reset_permissions(self, browser_context_id: Optional[str] = None):
"""Reset all permissions to defaults and restore prompting behavior."""
return await self._execute_command(BrowserCommands.reset_permissions(browser_context_id))

async def run_in_parallel(self, *coroutines: Coroutine[Any, Any, T]) -> list[T]:
"""
Run coroutines in parallel with optional concurrency limiting.

Args:
*coroutines: Variable number of coroutines to execute in parallel.

Returns:
List of results from all coroutines in the same order as input.

Raises:
RuntimeError: If browser loop is not initialized or not running.

Note:
Respects max_parallel_tasks option for concurrency control.
"""
if self._loop is None:
raise RuntimeError("Browser loop not initialized. Call start() first.")

# Thread-safe semaphore initialization with double-checked locking pattern
if self.options.max_parallel_tasks and self._semaphore is None:
# Use a lock to prevent race conditions in semaphore creation
if not hasattr(self, '_semaphore_lock'):
self._semaphore_lock = asyncio.Lock()

async with self._semaphore_lock:
# Double-check pattern to avoid creating multiple semaphores
if self._semaphore is None:
self._semaphore = asyncio.Semaphore(self.options.max_parallel_tasks)

wrapped = [self._limited_coroutine(coro) for coro in coroutines]

async def run_gather():
return await asyncio.gather(*wrapped, return_exceptions=False)

# Check if we're in the same event loop
with suppress(RuntimeError):
current_loop = asyncio.get_running_loop()
if current_loop is self._loop:
# Same loop - execute directly
return await run_gather()

# Different loop or no current loop - use threadsafe execution
if not self._loop.is_running():
raise RunInParallelError("Browser loop is not running. Cannot execute coroutines.")

future = asyncio.run_coroutine_threadsafe(run_gather(), self._loop)

try:
# Use timeout to prevent indefinite blocking
return future.result(timeout=60)
except concurrent.futures.TimeoutError:
future.cancel()
raise RunInParallelError("Coroutine execution timed out after 60 seconds")
except concurrent.futures.CancelledError:
raise RunInParallelError("Coroutine execution was cancelled")
except Exception as e:
raise RunInParallelError(f"Coroutine execution failed: {e}") from e

async def _limited_coroutine(self, coroutine: Coroutine[Any, Any, T]) -> T:
"""
Execute coroutine with semaphore limiting if configured.

Args:
coroutine: The coroutine to execute.

Returns:
The result of the coroutine execution.
"""
if self.options.max_parallel_tasks and self._semaphore is not None:
async with self._semaphore:
return await coroutine
return await coroutine

@overload
async def on(
self, event_name: str, callback: Callable[[Any], Any], temporary: bool = False
Expand Down
10 changes: 10 additions & 0 deletions pydoll/browser/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ def headless(self) -> bool:
def headless(self, headless: bool):
pass

@property
@abstractmethod
def max_parallel_tasks(self) -> int:
pass

@max_parallel_tasks.setter
@abstractmethod
def max_parallel_tasks(self, max_parallel_tasks: int):
pass


class BrowserOptionsManager(ABC):
@abstractmethod
Expand Down
23 changes: 18 additions & 5 deletions pydoll/browser/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(self):
self._start_timeout = 10
self._browser_preferences = {}
self._headless = False
self._max_parallel_tasks = 2

@property
def arguments(self) -> list[str]:
Expand Down Expand Up @@ -264,11 +265,13 @@ def block_notifications(self, block: bool):
@property
def allow_automatic_downloads(self) -> bool:
return (
self._get_pref_path([
'profile',
'default_content_setting_values',
'automatic_downloads',
])
self._get_pref_path(
[
'profile',
'default_content_setting_values',
'automatic_downloads',
]
)
== 1
)

Expand Down Expand Up @@ -316,3 +319,13 @@ def headless(self, headless: bool):
if headless == has_argument:
return
methods_map[headless]('--headless')

@property
def max_parallel_tasks(self) -> int:
return self._max_parallel_tasks

@max_parallel_tasks.setter
def max_parallel_tasks(self, max_parallel_tasks: int):
if max_parallel_tasks < 1:
raise ValueError('max_parallel_tasks must be greater than 0')
self._max_parallel_tasks = max_parallel_tasks
6 changes: 6 additions & 0 deletions pydoll/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,9 @@ class ElementPreconditionError(ElementException):
"""Raised when invalid or missing preconditions are provided for element operations."""

message = 'Invalid element preconditions'


class RunInParallelError(PydollException):
"""Raised when run_in_parallel fails."""

message = 'Failed to run coroutines in parallel'
Loading
Loading