diff --git a/codecov.yml b/codecov.yml index 9f319886..4820ae34 100644 --- a/codecov.yml +++ b/codecov.yml @@ -1,4 +1,6 @@ coverage: + ignore: + - "pydoll/browser/interfaces.py" status: project: default: diff --git a/pydoll/browser/chromium/base.py b/pydoll/browser/chromium/base.py index 7939fa1b..1d56b14f 100644 --- a/pydoll/browser/chromium/base.py +++ b/pydoll/browser/chromium/base.py @@ -1,4 +1,5 @@ import asyncio +import concurrent.futures import json import os import shutil @@ -8,7 +9,7 @@ from functools import partial from random import randint from tempfile import TemporaryDirectory -from typing import Any, Awaitable, Callable, Optional, overload +from typing import Any, Awaitable, Callable, Coroutine, Optional, TypeVar, overload from pydoll.browser.interfaces import BrowserOptionsManager from pydoll.browser.managers import ( @@ -32,6 +33,7 @@ InvalidWebSocketAddress, MissingTargetOrWebSocket, NoValidTabFound, + RunInParallelError, ) from pydoll.protocol.base import Command, Response, T_CommandParams, T_CommandResponse from pydoll.protocol.browser.methods import ( @@ -58,6 +60,8 @@ ) from pydoll.protocol.target.types import TargetInfo +T = TypeVar("T") + class Browser(ABC): # noqa: PLR0904 """ @@ -93,6 +97,8 @@ def __init__( self._connection_handler = ConnectionHandler(self._connection_port) self._backup_preferences_dir = '' self._tabs_opened: dict[str, Tab] = {} + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._semaphore: Optional[asyncio.Semaphore] = None async def __aenter__(self) -> 'Browser': """Async context manager entry.""" @@ -144,6 +150,7 @@ async def start(self, headless: bool = False) -> Tab: Raises: FailedToStartBrowser: If the browser fails to start or connect. """ + self._loop = asyncio.get_running_loop() if headless: warnings.warn( "The 'headless' parameter is deprecated and will be removed in a future version. " @@ -422,6 +429,80 @@ async def reset_permissions(self, browser_context_id: Optional[str] = None): """Reset all permissions to defaults and restore prompting behavior.""" return await self._execute_command(BrowserCommands.reset_permissions(browser_context_id)) + async def run_in_parallel(self, *coroutines: Coroutine[Any, Any, T]) -> list[T]: + """ + Run coroutines in parallel with optional concurrency limiting. + + Args: + *coroutines: Variable number of coroutines to execute in parallel. + + Returns: + List of results from all coroutines in the same order as input. + + Raises: + RuntimeError: If browser loop is not initialized or not running. + + Note: + Respects max_parallel_tasks option for concurrency control. + """ + if self._loop is None: + raise RuntimeError("Browser loop not initialized. Call start() first.") + + # Thread-safe semaphore initialization with double-checked locking pattern + if self.options.max_parallel_tasks and self._semaphore is None: + # Use a lock to prevent race conditions in semaphore creation + if not hasattr(self, '_semaphore_lock'): + self._semaphore_lock = asyncio.Lock() + + async with self._semaphore_lock: + # Double-check pattern to avoid creating multiple semaphores + if self._semaphore is None: + self._semaphore = asyncio.Semaphore(self.options.max_parallel_tasks) + + wrapped = [self._limited_coroutine(coro) for coro in coroutines] + + async def run_gather(): + return await asyncio.gather(*wrapped, return_exceptions=False) + + # Check if we're in the same event loop + with suppress(RuntimeError): + current_loop = asyncio.get_running_loop() + if current_loop is self._loop: + # Same loop - execute directly + return await run_gather() + + # Different loop or no current loop - use threadsafe execution + if not self._loop.is_running(): + raise RunInParallelError("Browser loop is not running. Cannot execute coroutines.") + + future = asyncio.run_coroutine_threadsafe(run_gather(), self._loop) + + try: + # Use timeout to prevent indefinite blocking + return future.result(timeout=60) + except concurrent.futures.TimeoutError: + future.cancel() + raise RunInParallelError("Coroutine execution timed out after 60 seconds") + except concurrent.futures.CancelledError: + raise RunInParallelError("Coroutine execution was cancelled") + except Exception as e: + raise RunInParallelError(f"Coroutine execution failed: {e}") from e + + async def _limited_coroutine(self, coroutine: Coroutine[Any, Any, T]) -> T: + """ + Execute coroutine with semaphore limiting if configured. + + Args: + coroutine: The coroutine to execute. + + Returns: + The result of the coroutine execution. + """ + if self.options.max_parallel_tasks and self._semaphore is not None: + async with self._semaphore: + return await coroutine + return await coroutine + @overload async def on( self, event_name: str, callback: Callable[[Any], Any], temporary: bool = False diff --git a/pydoll/browser/interfaces.py b/pydoll/browser/interfaces.py index 48fff1da..bccee8a8 100644 --- a/pydoll/browser/interfaces.py +++ b/pydoll/browser/interfaces.py @@ -36,6 +36,16 @@ def headless(self) -> bool: def headless(self, headless: bool): pass + @property + @abstractmethod + def max_parallel_tasks(self) -> int: + pass + + @max_parallel_tasks.setter + @abstractmethod + def max_parallel_tasks(self, max_parallel_tasks: int): + pass + class BrowserOptionsManager(ABC): @abstractmethod diff --git a/pydoll/browser/options.py b/pydoll/browser/options.py index bd545bfe..f6b88aa3 100644 --- a/pydoll/browser/options.py +++ b/pydoll/browser/options.py @@ -28,6 +28,7 @@ def __init__(self): self._start_timeout = 10 self._browser_preferences = {} self._headless = False + self._max_parallel_tasks = 2 @property def arguments(self) -> list[str]: @@ -264,11 +265,13 @@ def block_notifications(self, block: bool): @property def allow_automatic_downloads(self) -> bool: return ( - self._get_pref_path([ - 'profile', - 'default_content_setting_values', - 'automatic_downloads', - ]) + self._get_pref_path( + [ + 'profile', + 'default_content_setting_values', + 'automatic_downloads', + ] + ) == 1 ) @@ -316,3 +319,13 @@ def headless(self, headless: bool): if headless == has_argument: return methods_map[headless]('--headless') + + @property + def max_parallel_tasks(self) -> int: + return self._max_parallel_tasks + + @max_parallel_tasks.setter + def max_parallel_tasks(self, max_parallel_tasks: int): + if max_parallel_tasks < 1: + raise ValueError('max_parallel_tasks must be greater than 0') + self._max_parallel_tasks = max_parallel_tasks diff --git a/pydoll/exceptions.py b/pydoll/exceptions.py index 08e8e409..c302b19f 100644 --- a/pydoll/exceptions.py +++ b/pydoll/exceptions.py @@ -305,3 +305,9 @@ class ElementPreconditionError(ElementException): """Raised when invalid or missing preconditions are provided for element operations.""" message = 'Invalid element preconditions' + + +class RunInParallelError(PydollException): + """Raised when run_in_parallel fails.""" + + message = 'Failed to run coroutines in parallel' diff --git a/tests/test_browser/test_browser_base.py b/tests/test_browser/test_browser_base.py index 8e9a48cc..53ace933 100644 --- a/tests/test_browser/test_browser_base.py +++ b/tests/test_browser/test_browser_base.py @@ -1,5 +1,8 @@ import asyncio import base64 +import concurrent.futures +import threading +import time from unittest.mock import ANY, AsyncMock, MagicMock, patch import pytest @@ -1206,3 +1209,339 @@ async def test_headless_parameter_deprecation_warning(mock_browser): assert mock_browser.options.headless is True assert '--headless' in mock_browser.options.arguments + + +# Tests for run_in_parallel and _limited_coroutine methods +@pytest.mark.asyncio +async def test_run_in_parallel_no_semaphore(mock_browser): + """Test run_in_parallel without semaphore (by mocking the condition).""" + # Setup browser with initialized loop + mock_browser._loop = asyncio.get_event_loop() + mock_browser.options.max_parallel_tasks = 2 + mock_browser._semaphore = None + + # Mock the condition to prevent semaphore creation + # Create test coroutines that return different values + async def coro1(): + await asyncio.sleep(0.01) + return "result1" + + async def coro2(): + await asyncio.sleep(0.01) + return "result2" + + async def coro3(): + await asyncio.sleep(0.01) + return "result3" + + # Execute run_in_parallel + results = await mock_browser.run_in_parallel(coro1(), coro2(), coro3()) + + # Verify results are returned in correct order + assert results == ["result1", "result2", "result3"] + assert mock_browser._semaphore is not None + + +@pytest.mark.asyncio +async def test_run_in_parallel_with_semaphore(mock_browser): + """Test run_in_parallel with semaphore (max_parallel_tasks set).""" + # Setup browser with initialized loop + mock_browser._loop = asyncio.get_event_loop() + mock_browser.options.max_parallel_tasks = 2 # Enable semaphore with limit 2 + mock_browser._semaphore = None # Will be created by run_in_parallel + + execution_order = [] + + async def coro_with_tracking(name, delay=0.01): + execution_order.append(f"{name}_start") + await asyncio.sleep(delay) + execution_order.append(f"{name}_end") + return name + + # Execute run_in_parallel with more coroutines than semaphore limit + results = await mock_browser.run_in_parallel( + coro_with_tracking("coro1"), + coro_with_tracking("coro2"), + coro_with_tracking("coro3") + ) + + # Verify results are correct + assert results == ["coro1", "coro2", "coro3"] + # Verify semaphore was created + assert mock_browser._semaphore is not None + assert mock_browser._semaphore._value == 2 + + +@pytest.mark.asyncio +async def test_run_in_parallel_loop_not_initialized(mock_browser): + """Test run_in_parallel raises RuntimeError when loop not initialized.""" + # Setup browser without initialized loop + mock_browser._loop = None + + async def dummy_coro(): + return "test" + + # Should raise RuntimeError + with pytest.raises(RuntimeError, match="Browser loop not initialized. Call start\\(\\) first."): + await mock_browser.run_in_parallel(dummy_coro()) + + +@pytest.mark.asyncio +async def test_run_in_parallel_from_different_loop(mock_browser): + """Test run_in_parallel when called from different loop (same thread).""" + + # Create a separate event loop for the browser and run it in a thread + browser_loop = asyncio.new_event_loop() + mock_browser._loop = browser_loop + mock_browser.options.max_parallel_tasks = 1 + mock_browser._semaphore = None + + results = [] + exception_container = [] + + def browser_loop_thread(): + """Run the browser loop in a separate thread.""" + try: + asyncio.set_event_loop(browser_loop) + browser_loop.run_forever() + except Exception as e: + exception_container.append(e) + + # Start browser loop in separate thread + thread = threading.Thread(target=browser_loop_thread, daemon=True) + thread.start() + + # Wait a bit for the loop to start + await asyncio.sleep(0.01) + + try: + # This should use run_coroutine_threadsafe since we're calling from main thread + # while browser loop runs in different thread + def run_sync(): + # Create fresh coroutines for the browser loop + async def fresh_coro1(): + await asyncio.sleep(0.01) + return "thread_result1" + + async def fresh_coro2(): + await asyncio.sleep(0.01) + return "thread_result2" + + # Create the run_in_parallel coroutine + coro = mock_browser.run_in_parallel(fresh_coro1(), fresh_coro2()) + + # Run it using run_coroutine_threadsafe directly + future = asyncio.run_coroutine_threadsafe(coro, browser_loop) + return future.result(timeout=30) + + result = await asyncio.get_running_loop().run_in_executor(None, run_sync) + + results.append(result) + + finally: + # Stop the browser loop + browser_loop.call_soon_threadsafe(browser_loop.stop) + thread.join(timeout=1.0) + if not browser_loop.is_closed(): + browser_loop.close() + + # Verify no exceptions and correct results + assert not exception_container + assert len(results) == 1 + assert results[0] == ["thread_result1", "thread_result2"] + + +@pytest.mark.asyncio +async def test_run_in_parallel_from_same_thread(mock_browser): + """Test run_in_parallel when called from same thread as browser loop.""" + # Setup browser with current event loop + current_loop = asyncio.get_running_loop() + mock_browser._loop = current_loop + mock_browser.options.max_parallel_tasks = 1 + mock_browser._semaphore = None + + async def coro1(): + await asyncio.sleep(0.01) + return "same_thread_result1" + + async def coro2(): + await asyncio.sleep(0.01) + return "same_thread_result2" + + # Execute run_in_parallel (should use direct await path) + results = await mock_browser.run_in_parallel(coro1(), coro2()) + + # Verify results + assert results == ["same_thread_result1", "same_thread_result2"] + + +@pytest.mark.asyncio +async def test_limited_coroutine_without_semaphore(mock_browser): + """Test _limited_coroutine without semaphore.""" + mock_browser.options.max_parallel_tasks = 1 + mock_browser._semaphore = None + + async def test_coro(): + await asyncio.sleep(0.01) + return "no_semaphore_result" + + # Execute _limited_coroutine + result = await mock_browser._limited_coroutine(test_coro()) + + # Verify result + assert result == "no_semaphore_result" + + +@pytest.mark.asyncio +async def test_limited_coroutine_with_semaphore(mock_browser): + """Test _limited_coroutine with semaphore.""" + mock_browser.options.max_parallel_tasks = 1 + mock_browser._semaphore = asyncio.Semaphore(1) + + execution_times = [] + + async def timed_coro(name): + start_time = time.time() + execution_times.append(f"{name}_start_{start_time}") + await asyncio.sleep(0.02) # Small delay to ensure ordering + end_time = time.time() + execution_times.append(f"{name}_end_{end_time}") + return f"semaphore_result_{name}" + + # Execute multiple _limited_coroutine calls concurrently + results = await asyncio.gather( + mock_browser._limited_coroutine(timed_coro("1")), + mock_browser._limited_coroutine(timed_coro("2")) + ) + + # Verify results + assert results == ["semaphore_result_1", "semaphore_result_2"] + # Verify execution was serialized (one should complete before other starts) + assert len(execution_times) == 4 + + +@pytest.mark.asyncio +async def test_run_in_parallel_semaphore_limiting(mock_browser): + """Test that semaphore properly limits concurrent execution.""" + mock_browser._loop = asyncio.get_event_loop() + mock_browser.options.max_parallel_tasks = 2 + mock_browser._semaphore = None # Will be created + + concurrent_count = 0 + max_concurrent = 0 + execution_log = [] + + async def monitored_coro(name): + nonlocal concurrent_count, max_concurrent + concurrent_count += 1 + max_concurrent = max(max_concurrent, concurrent_count) + execution_log.append(f"{name}_start_concurrent_{concurrent_count}") + + await asyncio.sleep(0.02) # Simulate work + + concurrent_count -= 1 + execution_log.append(f"{name}_end_concurrent_{concurrent_count + 1}") + return f"limited_result_{name}" + + # Run 4 coroutines with limit of 2 + results = await mock_browser.run_in_parallel( + monitored_coro("A"), + monitored_coro("B"), + monitored_coro("C"), + monitored_coro("D") + ) + + # Verify results + assert results == ["limited_result_A", "limited_result_B", "limited_result_C", "limited_result_D"] + # Verify concurrency was limited to 2 + assert max_concurrent <= 2 + assert mock_browser._semaphore._value == 2 + + +@pytest.mark.asyncio +async def test_run_in_parallel_empty_coroutines(mock_browser): + """Test run_in_parallel with no coroutines.""" + mock_browser._loop = asyncio.get_event_loop() + mock_browser.options.max_parallel_tasks = 1 + mock_browser._semaphore = None + + # Execute with no coroutines + results = await mock_browser.run_in_parallel() + + # Should return empty list + assert results == [] + + +@pytest.mark.asyncio +async def test_run_in_parallel_single_coroutine(mock_browser): + """Test run_in_parallel with single coroutine.""" + mock_browser._loop = asyncio.get_event_loop() + mock_browser.options.max_parallel_tasks = 1 + mock_browser._semaphore = None + + async def single_coro(): + await asyncio.sleep(0.01) + return "single_result" + + # Execute with single coroutine + results = await mock_browser.run_in_parallel(single_coro()) + + # Should return list with single result + assert results == ["single_result"] + + +@pytest.mark.asyncio +async def test_run_in_parallel_coroutine_exception(mock_browser): + """Test run_in_parallel handles coroutine exceptions properly.""" + mock_browser._loop = asyncio.get_event_loop() + mock_browser.options.max_parallel_tasks = 1 + mock_browser._semaphore = None + + async def failing_coro(): + await asyncio.sleep(0.01) + raise ValueError("Test exception") + + async def success_coro(): + await asyncio.sleep(0.01) + return "success" + + # Should propagate the exception + with pytest.raises(ValueError, match="Test exception"): + await mock_browser.run_in_parallel(success_coro(), failing_coro()) + + +@pytest.mark.asyncio +async def test_limited_coroutine_exception(mock_browser): + """Test _limited_coroutine handles exceptions properly.""" + mock_browser.options.max_parallel_tasks = 1 + mock_browser._semaphore = asyncio.Semaphore(1) + + async def failing_coro(): + await asyncio.sleep(0.01) + raise RuntimeError("Limited coroutine exception") + + # Should propagate the exception + with pytest.raises(RuntimeError, match="Limited coroutine exception"): + await mock_browser._limited_coroutine(failing_coro()) + + +@pytest.mark.asyncio +async def test_run_in_parallel_semaphore_creation_once(mock_browser): + """Test that semaphore is created only once when max_parallel_tasks is set.""" + mock_browser._loop = asyncio.get_event_loop() + mock_browser.options.max_parallel_tasks = 3 + mock_browser._semaphore = None + + async def dummy_coro(value): + await asyncio.sleep(0.01) + return value + + # First call should create semaphore + await mock_browser.run_in_parallel(dummy_coro(1)) + first_semaphore = mock_browser._semaphore + assert first_semaphore is not None + assert first_semaphore._value == 3 + + # Second call should reuse existing semaphore + await mock_browser.run_in_parallel(dummy_coro(2)) + assert mock_browser._semaphore is first_semaphore diff --git a/tests/test_browser/test_browser_options.py b/tests/test_browser/test_browser_options.py index 3e15a45d..eae6aa37 100644 --- a/tests/test_browser/test_browser_options.py +++ b/tests/test_browser/test_browser_options.py @@ -43,17 +43,20 @@ def test_add_duplicate_argument(): with pytest.raises(ArgumentAlreadyExistsInOptions, match='Argument already exists: --headless'): options.add_argument('--headless') + def test_remove_argument(): options = Options() options.add_argument('--headless') options.remove_argument('--headless') assert options.arguments == [] + def test_remove_argument_not_exists(): options = Options() with pytest.raises(ArgumentNotFoundInOptions, match='Argument not found: --headless'): options.remove_argument('--headless') + def test_add_multiple_arguments(): options = Options() options.add_argument('--headless') @@ -175,11 +178,13 @@ def test_wrong_dict_prefs_error(): } } + def test_set_arguments(): options = Options() options.arguments = ['--headless'] assert options.arguments == ['--headless'] + def test_get_pref_path(): options = Options() options.set_default_download_directory('/tmp/downloads') @@ -225,14 +230,20 @@ def browser_preferences(self): def headless(self): return False + @property + def max_parallel_tasks(self): + return 1 + CompleteOptions() + def test_set_headless(): options = Options() options.headless = True assert options.headless is True assert options.arguments == ['--headless'] + def test_set_headless_false(): options = Options() options.headless = True @@ -242,6 +253,7 @@ def test_set_headless_false(): assert options.headless is False assert options.arguments == [] + def test_set_headless_true_twice(): options = Options() options.headless = True @@ -251,6 +263,7 @@ def test_set_headless_true_twice(): assert options.headless is True assert options.arguments == ['--headless'] + def test_set_headless_false_twice(): options = Options() options.headless = False @@ -259,3 +272,15 @@ def test_set_headless_false_twice(): options.headless = False assert options.headless is False assert options.arguments == [] + + +def test_set_max_parallel_tasks(): + options = Options() + options.max_parallel_tasks = 2 + assert options.max_parallel_tasks == 2 + + +def test_set_max_parallel_tasks_error(): + options = Options() + with pytest.raises(ValueError, match='max_parallel_tasks must be greater than 0'): + options.max_parallel_tasks = 0