From 2273aed00146850a283a69c3c469a95db793b5ae Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Tue, 30 Jan 2024 11:04:02 +0100 Subject: [PATCH] Add the first version of `EventManager` (#25) --- .../workflows/check_version_availability.yaml | 2 +- .github/workflows/lint_and_type_checks.yaml | 2 +- .github/workflows/release.yaml | 2 +- .github/workflows/unit_tests.yaml | 2 +- CONTRIBUTING.md | 2 +- pyproject.toml | 9 +- src/crawlee/autoscaling/system_status.py | 37 ++--- src/crawlee/events/__init__.py | 1 + src/crawlee/events/event_manager.py | 157 ++++++++++++++++++ src/crawlee/events/py.typed | 0 src/crawlee/events/types.py | 55 ++++++ src/crawlee/py.typed | 0 12 files changed, 240 insertions(+), 29 deletions(-) create mode 100644 src/crawlee/events/__init__.py create mode 100644 src/crawlee/events/event_manager.py create mode 100644 src/crawlee/events/py.typed create mode 100644 src/crawlee/events/types.py create mode 100644 src/crawlee/py.typed diff --git a/.github/workflows/check_version_availability.yaml b/.github/workflows/check_version_availability.yaml index bf37ad4ba..33f66d3fc 100644 --- a/.github/workflows/check_version_availability.yaml +++ b/.github/workflows/check_version_availability.yaml @@ -19,7 +19,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: "3.8" + python-version: "3.9" - name: Install dependencies run: make install-dev diff --git a/.github/workflows/lint_and_type_checks.yaml b/.github/workflows/lint_and_type_checks.yaml index 760d401f0..1384f5529 100644 --- a/.github/workflows/lint_and_type_checks.yaml +++ b/.github/workflows/lint_and_type_checks.yaml @@ -9,7 +9,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] steps: # We need to check out the head commit in case of PRs, and the default ref otherwise (during release). diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index e54bce66a..05866515c 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -66,7 +66,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: 3.8 + python-version: 3.9 - name: Install dependencies run: make install-dev diff --git a/.github/workflows/unit_tests.yaml b/.github/workflows/unit_tests.yaml index 274bae9c5..02828eed2 100644 --- a/.github/workflows/unit_tests.yaml +++ b/.github/workflows/unit_tests.yaml @@ -9,7 +9,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, windows-latest] - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] runs-on: ${{ matrix.os }} steps: diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 4193dae4b..43262f8e1 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -2,7 +2,7 @@ ## Environment -For local development, it is required to have Python 3.8 (or a later version) installed. +For local development, it is required to have Python 3.9 (or a later version) installed. It is recommended to set up a virtual environment while developing this package to isolate your development environment, however, due to the many varied ways Python can be installed and virtual environments can be set up, this is left up to diff --git a/pyproject.toml b/pyproject.toml index adca33c43..29c770914 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,6 @@ classifiers = [ "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", @@ -20,7 +19,7 @@ classifiers = [ "Topic :: Software Development :: Libraries", ] -requires-python = ">=3.8" +requires-python = ">=3.9" # We use inclusive ordered comparison clause for non-Apify packages intentionally in order to enhance the Apify SDK's # compatibility with a wide range of external packages. This decision was discussed in detail in the following PR: @@ -28,6 +27,7 @@ requires-python = ">=3.8" dependencies = [ "colorama >= 0.4.6", "more_itertools >= 10.2.0", + "pyee >= 11.1.0", "typing-extensions >= 4.1.0", ] @@ -38,7 +38,7 @@ dev = [ "mypy ~= 1.8.0", "pre-commit ~= 3.4.0", "pydoc-markdown ~= 4.8.2", - "pytest ~= 8.0.0", + "pytest ~= 7.4.4", "pytest-asyncio ~= 0.23.4", "pytest-cov ~= 4.1.0", "pytest-only ~= 2.0.0", @@ -79,6 +79,7 @@ ignore = [ "COM812", # This rule may cause conflicts when used with the formatter "D100", # Missing docstring in public module "D104", # Missing docstring in public package + "D107", # Missing docstring in `__init__` "EM", # flake8-errmsg "G004", # Logging statement uses f-string "ISC001", # This rule may cause conflicts when used with the formatter @@ -135,7 +136,7 @@ asyncio_mode = "auto" timeout = 1200 [tool.mypy] -python_version = "3.8" +python_version = "3.9" files = ["scripts", "src", "tests"] check_untyped_defs = true disallow_incomplete_defs = true diff --git a/src/crawlee/autoscaling/system_status.py b/src/crawlee/autoscaling/system_status.py index 022b108cd..80dc57f95 100644 --- a/src/crawlee/autoscaling/system_status.py +++ b/src/crawlee/autoscaling/system_status.py @@ -70,6 +70,23 @@ class SystemStatus: `SystemStatus.get_historical_status` returns a boolean that represents the long-term status of the system. It considers the full snapshot history available in the `Snapshotter` instance. + + Attributes: + snapshotter: The `Snapshotter` instance to be queried for `SystemStatus`. + + current_history_secs: Defines max age of snapshots used in the `SystemStatus.get_current_status` measurement. + + max_memory_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a memory sample. + If the sample exceeds this ratio, the system will be overloaded. + + max_event_loop_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in an event loop sample. + If the sample exceeds this ratio, the system will be overloaded. + + max_cpu_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a CPU sample. If the sample + exceeds this ratio, the system will be overloaded. + + max_client_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a Client sample. + If the sample exceeds this ratio, the system will be overloaded. """ def __init__( @@ -81,26 +98,6 @@ def __init__( max_cpu_overloaded_ratio: float = 0.4, max_client_overloaded_ratio: float = 0.3, ) -> None: - """Create a new instance. - - Args: - snapshotter: The `Snapshotter` instance to be queried for `SystemStatus`. - - current_history_secs: Defines max age of snapshots used in the `SystemStatus.get_current_status` - measurement. Defaults to 5. - - max_memory_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a memory sample. - If the sample exceeds this ratio, the system will be overloaded. Defaults to 0.2. - - max_event_loop_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in an event loop sample. - If the sample exceeds this ratio, the system will be overloaded. Defaults to 0.6. - - max_cpu_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a CPU sample. If the sample - exceeds this ratio, the system will be overloaded. Defaults to 0.4. - - max_client_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a Client sample. - If the sample exceeds this ratio, the system will be overloaded. Defaults to 0.3. - """ self.snapshotter = snapshotter self.current_history_secs = current_history_secs self.max_memory_overloaded_ratio = max_memory_overloaded_ratio diff --git a/src/crawlee/events/__init__.py b/src/crawlee/events/__init__.py new file mode 100644 index 000000000..5a8735d85 --- /dev/null +++ b/src/crawlee/events/__init__.py @@ -0,0 +1 @@ +from .event_manager import EventManager diff --git a/src/crawlee/events/event_manager.py b/src/crawlee/events/event_manager.py new file mode 100644 index 000000000..f5272579c --- /dev/null +++ b/src/crawlee/events/event_manager.py @@ -0,0 +1,157 @@ +from __future__ import annotations + +import asyncio +from collections import defaultdict +from contextlib import suppress +from functools import wraps +from inspect import iscoroutinefunction +from logging import getLogger +from typing import TYPE_CHECKING + +from pyee.asyncio import AsyncIOEventEmitter + +if TYPE_CHECKING: + from datetime import timedelta + + from crawlee.events.types import Event, EventData, Listener, WrappedListener + +logger = getLogger(__name__) + + +class EventManager: + """Event manager for registering, emitting, and managing event listeners. + + Event manager allows you to register event listeners, emit events, and wait for event listeners to complete + their execution. It is built on top of the `pyee.asyncio.AsyncIOEventEmitter` class. + + Attributes: + _event_emitter: The event emitter which is used to emit events and call the event listeners. + + _listener_tasks: The set of asyncio tasks which are currently executing the event listeners. + + _listeners_to_wrappers: The mapping between events and listeners, and the wrapped listeners which are + registered with the event emitter. + """ + + def __init__(self: EventManager) -> None: + logger.debug('Calling LocalEventManager.__init__()...') + self._event_emitter = AsyncIOEventEmitter() + + # Listeners are wrapped in a asyncio.Task, store their references here so that we can wait for them to finish + self._listener_tasks: set[asyncio.Task] = set() + + # Store the mapping between events and listeners like this: + # event -> listener -> [wrapped_listener_1, wrapped_listener_2, ...] + self._listeners_to_wrappers: dict[Event, dict[Listener, list[WrappedListener]]] = defaultdict( + lambda: defaultdict(list), + ) + + async def close(self: EventManager, *, timeout: timedelta | None = None) -> None: + """Close the event manager. + + This will stop listening for the events, and it will wait for all the event listeners to finish. + + Args: + timeout: Optional timeout after which the pending event listeners are canceled. + """ + logger.debug('Calling LocalEventManager.close()...') + await self.wait_for_all_listeners_to_complete(timeout=timeout) + self._event_emitter.remove_all_listeners() + + def on(self: EventManager, *, event: Event, listener: Listener) -> None: + """Add an event listener to the event manager. + + Args: + event: The Actor event for which to listen to. + listener: The function (sync or async) which is to be called when the event is emitted. + """ + logger.debug('Calling LocalEventManager.on()...') + + @wraps(listener) + async def listener_wrapper(event_data: EventData) -> None: + logger.debug('Calling LocalEventManager.on.listener_wrapper()...') + + # If the listener is a coroutine function, just call it, otherwise, run it in a separate thread + # to avoid blocking the event loop + coroutine = ( + listener(event_data) if iscoroutinefunction(listener) else asyncio.to_thread(listener, event_data) + ) + + listener_task = asyncio.create_task(coroutine, name=f'Task-{event.value}-{listener.__name__}') + self._listener_tasks.add(listener_task) + + try: + await listener_task + except Exception: + # We need to swallow the exception and just log it here, otherwise it could break the event emitter + logger.exception( + 'Exception in the event listener', + extra={'event_name': event.value, 'listener_name': listener.__name__}, + ) + finally: + logger.debug('LocalEventManager.on.listener_wrapper(): Removing listener task from the set...') + self._listener_tasks.remove(listener_task) + + self._listeners_to_wrappers[event][listener].append(listener_wrapper) + self._event_emitter.add_listener(event.value, listener_wrapper) + + def off(self: EventManager, *, event: Event, listener: Listener | None = None) -> None: + """Remove a listener, or all listeners, from an Actor event. + + Args: + event: The Actor event for which to remove listeners. + listener: The listener which is supposed to be removed. If not passed, all listeners of this event + are removed. + """ + logger.debug('Calling LocalEventManager.off()...') + + if listener: + for listener_wrapper in self._listeners_to_wrappers[event][listener]: + self._event_emitter.remove_listener(event.value, listener_wrapper) + self._listeners_to_wrappers[event][listener] = [] + else: + self._listeners_to_wrappers[event] = defaultdict(list) + self._event_emitter.remove_all_listeners(event.value) + + def emit(self: EventManager, *, event: Event, event_data: EventData) -> None: + """Emit an event. + + Args: + event: The event which will be emitted. + event_data: The data which will be passed to the event listeners. + """ + logger.debug('Calling LocalEventManager.emit()...') + self._event_emitter.emit(event.value, event_data) + + async def wait_for_all_listeners_to_complete(self: EventManager, *, timeout: timedelta | None = None) -> None: + """Wait for all currently executing event listeners to complete. + + Args: + timeout: The maximum time to wait for the event listeners to finish. If they do not complete within + the specified timeout, they will be canceled. + """ + logger.debug('Calling LocalEventManager.wait_for_all_listeners_to_complete()...') + + async def wait_for_listeners() -> None: + """Gathers all listener tasks and awaits their completion, logging any exceptions encountered.""" + results = await asyncio.gather(*self._listener_tasks, return_exceptions=True) + for result in results: + if isinstance(result, Exception): + logger.exception('Event listener raised an exception.', exc_info=result) + + tasks = [asyncio.create_task(wait_for_listeners(), name=f'Task-{wait_for_listeners.__name__}')] + timeout_secs = timeout.total_seconds() if timeout else None + + try: + _, pending = await asyncio.wait(tasks, timeout=timeout_secs) + if pending: + logger.warning('Waiting timeout reached; canceling unfinished event listeners.') + except asyncio.CancelledError: + logger.warning('Asyncio wait was cancelled; canceling unfinished event listeners.') + raise + finally: + for task in tasks: + if not task.done(): + task.cancel() + with suppress(asyncio.CancelledError): + await task diff --git a/src/crawlee/events/py.typed b/src/crawlee/events/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/src/crawlee/events/types.py b/src/crawlee/events/types.py new file mode 100644 index 000000000..44ba2d0e4 --- /dev/null +++ b/src/crawlee/events/types.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from collections.abc import Callable, Coroutine +from dataclasses import dataclass +from enum import Enum +from typing import TYPE_CHECKING, Any, Union + +if TYPE_CHECKING: + from crawlee.autoscaling.system_status import SystemInfo + + +class Event(Enum): + """Enum of all possible events that can be emitted.""" + + PERSIST_STATE = 'persistState' + SYSTEM_INFO = 'systemInfo' + MIGRATING = 'migrating' + ABORTING = 'aborting' + EXIT = 'exit' + + +@dataclass +class EventPersistStateData: + """Data for the persist state event.""" + + is_migrating: bool + + +@dataclass +class EventSystemInfoData: + """Data for the system info event.""" + + system_info: SystemInfo + + +@dataclass +class EventMigratingData: + """Data for the migrating event.""" + + +@dataclass +class EventAbortingData: + """Data for the aborting event.""" + + +@dataclass +class EventExitData: + """Data for the exit event.""" + + +EventData = Union[EventPersistStateData, EventSystemInfoData, EventMigratingData, EventAbortingData, EventExitData] +SyncListener = Callable[..., None] +AsyncListener = Callable[..., Coroutine[Any, Any, None]] +Listener = Union[SyncListener, AsyncListener] +WrappedListener = Callable[..., Coroutine[Any, Any, None]] diff --git a/src/crawlee/py.typed b/src/crawlee/py.typed new file mode 100644 index 000000000..e69de29bb