Skip to content

Commit

Permalink
Add the first version of EventManager (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdusek authored Jan 30, 2024
1 parent e53e345 commit 2273aed
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check_version_availability.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.8"
python-version: "3.9"

- name: Install dependencies
run: make install-dev
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint_and_type_checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12"]

steps:
# We need to check out the head commit in case of PRs, and the default ref otherwise (during release).
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: 3.8
python-version: 3.9

- name: Install dependencies
run: make install-dev
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unit_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, windows-latest]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12"]
runs-on: ${{ matrix.os }}

steps:
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Environment

For local development, it is required to have Python 3.8 (or a later version) installed.
For local development, it is required to have Python 3.9 (or a later version) installed.

It is recommended to set up a virtual environment while developing this package to isolate your development environment,
however, due to the many varied ways Python can be installed and virtual environments can be set up, this is left up to
Expand Down
9 changes: 5 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ classifiers = [
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Software Development :: Libraries",
]

requires-python = ">=3.8"
requires-python = ">=3.9"

# We use inclusive ordered comparison clause for non-Apify packages intentionally in order to enhance the Apify SDK's
# compatibility with a wide range of external packages. This decision was discussed in detail in the following PR:
# https://github.com/apify/apify-sdk-python/pull/154
dependencies = [
"colorama >= 0.4.6",
"more_itertools >= 10.2.0",
"pyee >= 11.1.0",
"typing-extensions >= 4.1.0",
]

Expand All @@ -38,7 +38,7 @@ dev = [
"mypy ~= 1.8.0",
"pre-commit ~= 3.4.0",
"pydoc-markdown ~= 4.8.2",
"pytest ~= 8.0.0",
"pytest ~= 7.4.4",
"pytest-asyncio ~= 0.23.4",
"pytest-cov ~= 4.1.0",
"pytest-only ~= 2.0.0",
Expand Down Expand Up @@ -79,6 +79,7 @@ ignore = [
"COM812", # This rule may cause conflicts when used with the formatter
"D100", # Missing docstring in public module
"D104", # Missing docstring in public package
"D107", # Missing docstring in `__init__`
"EM", # flake8-errmsg
"G004", # Logging statement uses f-string
"ISC001", # This rule may cause conflicts when used with the formatter
Expand Down Expand Up @@ -135,7 +136,7 @@ asyncio_mode = "auto"
timeout = 1200

[tool.mypy]
python_version = "3.8"
python_version = "3.9"
files = ["scripts", "src", "tests"]
check_untyped_defs = true
disallow_incomplete_defs = true
Expand Down
37 changes: 17 additions & 20 deletions src/crawlee/autoscaling/system_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ class SystemStatus:
`SystemStatus.get_historical_status` returns a boolean that represents the long-term status of the system. It
considers the full snapshot history available in the `Snapshotter` instance.
Attributes:
snapshotter: The `Snapshotter` instance to be queried for `SystemStatus`.
current_history_secs: Defines max age of snapshots used in the `SystemStatus.get_current_status` measurement.
max_memory_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a memory sample.
If the sample exceeds this ratio, the system will be overloaded.
max_event_loop_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in an event loop sample.
If the sample exceeds this ratio, the system will be overloaded.
max_cpu_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a CPU sample. If the sample
exceeds this ratio, the system will be overloaded.
max_client_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a Client sample.
If the sample exceeds this ratio, the system will be overloaded.
"""

def __init__(
Expand All @@ -81,26 +98,6 @@ def __init__(
max_cpu_overloaded_ratio: float = 0.4,
max_client_overloaded_ratio: float = 0.3,
) -> None:
"""Create a new instance.
Args:
snapshotter: The `Snapshotter` instance to be queried for `SystemStatus`.
current_history_secs: Defines max age of snapshots used in the `SystemStatus.get_current_status`
measurement. Defaults to 5.
max_memory_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a memory sample.
If the sample exceeds this ratio, the system will be overloaded. Defaults to 0.2.
max_event_loop_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in an event loop sample.
If the sample exceeds this ratio, the system will be overloaded. Defaults to 0.6.
max_cpu_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a CPU sample. If the sample
exceeds this ratio, the system will be overloaded. Defaults to 0.4.
max_client_overloaded_ratio: Sets the maximum ratio of overloaded snapshots in a Client sample.
If the sample exceeds this ratio, the system will be overloaded. Defaults to 0.3.
"""
self.snapshotter = snapshotter
self.current_history_secs = current_history_secs
self.max_memory_overloaded_ratio = max_memory_overloaded_ratio
Expand Down
1 change: 1 addition & 0 deletions src/crawlee/events/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .event_manager import EventManager
157 changes: 157 additions & 0 deletions src/crawlee/events/event_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from __future__ import annotations

import asyncio
from collections import defaultdict
from contextlib import suppress
from functools import wraps
from inspect import iscoroutinefunction
from logging import getLogger
from typing import TYPE_CHECKING

from pyee.asyncio import AsyncIOEventEmitter

if TYPE_CHECKING:
from datetime import timedelta

from crawlee.events.types import Event, EventData, Listener, WrappedListener

logger = getLogger(__name__)


class EventManager:
"""Event manager for registering, emitting, and managing event listeners.
Event manager allows you to register event listeners, emit events, and wait for event listeners to complete
their execution. It is built on top of the `pyee.asyncio.AsyncIOEventEmitter` class.
Attributes:
_event_emitter: The event emitter which is used to emit events and call the event listeners.
_listener_tasks: The set of asyncio tasks which are currently executing the event listeners.
_listeners_to_wrappers: The mapping between events and listeners, and the wrapped listeners which are
registered with the event emitter.
"""

def __init__(self: EventManager) -> None:
logger.debug('Calling LocalEventManager.__init__()...')
self._event_emitter = AsyncIOEventEmitter()

# Listeners are wrapped in a asyncio.Task, store their references here so that we can wait for them to finish
self._listener_tasks: set[asyncio.Task] = set()

# Store the mapping between events and listeners like this:
# event -> listener -> [wrapped_listener_1, wrapped_listener_2, ...]
self._listeners_to_wrappers: dict[Event, dict[Listener, list[WrappedListener]]] = defaultdict(
lambda: defaultdict(list),
)

async def close(self: EventManager, *, timeout: timedelta | None = None) -> None:
"""Close the event manager.
This will stop listening for the events, and it will wait for all the event listeners to finish.
Args:
timeout: Optional timeout after which the pending event listeners are canceled.
"""
logger.debug('Calling LocalEventManager.close()...')
await self.wait_for_all_listeners_to_complete(timeout=timeout)
self._event_emitter.remove_all_listeners()

def on(self: EventManager, *, event: Event, listener: Listener) -> None:
"""Add an event listener to the event manager.
Args:
event: The Actor event for which to listen to.
listener: The function (sync or async) which is to be called when the event is emitted.
"""
logger.debug('Calling LocalEventManager.on()...')

@wraps(listener)
async def listener_wrapper(event_data: EventData) -> None:
logger.debug('Calling LocalEventManager.on.listener_wrapper()...')

# If the listener is a coroutine function, just call it, otherwise, run it in a separate thread
# to avoid blocking the event loop
coroutine = (
listener(event_data) if iscoroutinefunction(listener) else asyncio.to_thread(listener, event_data)
)

listener_task = asyncio.create_task(coroutine, name=f'Task-{event.value}-{listener.__name__}')
self._listener_tasks.add(listener_task)

try:
await listener_task
except Exception:
# We need to swallow the exception and just log it here, otherwise it could break the event emitter
logger.exception(
'Exception in the event listener',
extra={'event_name': event.value, 'listener_name': listener.__name__},
)
finally:
logger.debug('LocalEventManager.on.listener_wrapper(): Removing listener task from the set...')
self._listener_tasks.remove(listener_task)

self._listeners_to_wrappers[event][listener].append(listener_wrapper)
self._event_emitter.add_listener(event.value, listener_wrapper)

def off(self: EventManager, *, event: Event, listener: Listener | None = None) -> None:
"""Remove a listener, or all listeners, from an Actor event.
Args:
event: The Actor event for which to remove listeners.
listener: The listener which is supposed to be removed. If not passed, all listeners of this event
are removed.
"""
logger.debug('Calling LocalEventManager.off()...')

if listener:
for listener_wrapper in self._listeners_to_wrappers[event][listener]:
self._event_emitter.remove_listener(event.value, listener_wrapper)
self._listeners_to_wrappers[event][listener] = []
else:
self._listeners_to_wrappers[event] = defaultdict(list)
self._event_emitter.remove_all_listeners(event.value)

def emit(self: EventManager, *, event: Event, event_data: EventData) -> None:
"""Emit an event.
Args:
event: The event which will be emitted.
event_data: The data which will be passed to the event listeners.
"""
logger.debug('Calling LocalEventManager.emit()...')
self._event_emitter.emit(event.value, event_data)

async def wait_for_all_listeners_to_complete(self: EventManager, *, timeout: timedelta | None = None) -> None:
"""Wait for all currently executing event listeners to complete.
Args:
timeout: The maximum time to wait for the event listeners to finish. If they do not complete within
the specified timeout, they will be canceled.
"""
logger.debug('Calling LocalEventManager.wait_for_all_listeners_to_complete()...')

async def wait_for_listeners() -> None:
"""Gathers all listener tasks and awaits their completion, logging any exceptions encountered."""
results = await asyncio.gather(*self._listener_tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
logger.exception('Event listener raised an exception.', exc_info=result)

tasks = [asyncio.create_task(wait_for_listeners(), name=f'Task-{wait_for_listeners.__name__}')]
timeout_secs = timeout.total_seconds() if timeout else None

try:
_, pending = await asyncio.wait(tasks, timeout=timeout_secs)
if pending:
logger.warning('Waiting timeout reached; canceling unfinished event listeners.')
except asyncio.CancelledError:
logger.warning('Asyncio wait was cancelled; canceling unfinished event listeners.')
raise
finally:
for task in tasks:
if not task.done():
task.cancel()
with suppress(asyncio.CancelledError):
await task
Empty file added src/crawlee/events/py.typed
Empty file.
55 changes: 55 additions & 0 deletions src/crawlee/events/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from __future__ import annotations

from collections.abc import Callable, Coroutine
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING, Any, Union

if TYPE_CHECKING:
from crawlee.autoscaling.system_status import SystemInfo


class Event(Enum):
"""Enum of all possible events that can be emitted."""

PERSIST_STATE = 'persistState'
SYSTEM_INFO = 'systemInfo'
MIGRATING = 'migrating'
ABORTING = 'aborting'
EXIT = 'exit'


@dataclass
class EventPersistStateData:
"""Data for the persist state event."""

is_migrating: bool


@dataclass
class EventSystemInfoData:
"""Data for the system info event."""

system_info: SystemInfo


@dataclass
class EventMigratingData:
"""Data for the migrating event."""


@dataclass
class EventAbortingData:
"""Data for the aborting event."""


@dataclass
class EventExitData:
"""Data for the exit event."""


EventData = Union[EventPersistStateData, EventSystemInfoData, EventMigratingData, EventAbortingData, EventExitData]
SyncListener = Callable[..., None]
AsyncListener = Callable[..., Coroutine[Any, Any, None]]
Listener = Union[SyncListener, AsyncListener]
WrappedListener = Callable[..., Coroutine[Any, Any, None]]
Empty file added src/crawlee/py.typed
Empty file.

0 comments on commit 2273aed

Please sign in to comment.