Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: RequestQueue and service management rehaul #429

Merged
merged 10 commits into from
Aug 22, 2024
2 changes: 1 addition & 1 deletion src/crawlee/_utils/lru_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
T = TypeVar('T')


class LRUCache(MutableMapping, Generic[T]):
class LRUCache(MutableMapping[str, T], Generic[T]):
"""Attempt to reimplement LRUCache from `@apify/datastructures` using `OrderedDict`."""

def __init__(self, max_length: int) -> None:
Expand Down
15 changes: 0 additions & 15 deletions src/crawlee/base_storage_client/base_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
ProcessedRequest,
ProlongRequestLockResponse,
Request,
RequestListResponse,
RequestQueueHead,
RequestQueueHeadWithLocks,
RequestQueueMetadata,
Expand Down Expand Up @@ -185,17 +184,3 @@ async def batch_delete_requests(self, requests: list[Request]) -> BatchRequestsO
Args:
requests: The requests to delete from the queue.
"""

@abstractmethod
async def list_requests(
janbuchar marked this conversation as resolved.
Show resolved Hide resolved
self,
*,
limit: int | None = None,
exclusive_start_id: str | None = None,
) -> RequestListResponse:
"""List requests from the queue.

Args:
limit: How many requests to retrieve.
exclusive_start_id: All requests up to this one (including) are skipped from the result.
"""
14 changes: 8 additions & 6 deletions src/crawlee/basic_crawler/basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
from tldextract import TLDExtract
from typing_extensions import NotRequired, TypedDict, TypeVar, Unpack, assert_never

from crawlee import Glob
from crawlee import Glob, service_container
from crawlee._utils.urls import convert_to_absolute_url, is_url_absolute
from crawlee._utils.wait import wait_for
from crawlee.autoscaling import AutoscaledPool, ConcurrencySettings
from crawlee.autoscaling.snapshotter import Snapshotter
from crawlee.autoscaling.system_status import SystemStatus
from crawlee.basic_crawler.context_pipeline import ContextPipeline
from crawlee.basic_crawler.router import Router
from crawlee.configuration import Configuration
from crawlee.enqueue_strategy import EnqueueStrategy
from crawlee.errors import (
ContextPipelineInitializationError,
Expand All @@ -35,7 +34,6 @@
SessionError,
UserDefinedErrorHandlerError,
)
from crawlee.events import LocalEventManager
from crawlee.http_clients import HttpxHttpClient
from crawlee.log_config import CrawleeLogFormatter
from crawlee.models import BaseRequestData, DatasetItemsListPage, Request, RequestState
Expand All @@ -47,6 +45,8 @@
if TYPE_CHECKING:
import re

from crawlee.configuration import Configuration
from crawlee.events.event_manager import EventManager
from crawlee.http_clients import BaseHttpClient, HttpResponse
from crawlee.proxy_configuration import ProxyConfiguration, ProxyInfo
from crawlee.sessions import Session
Expand Down Expand Up @@ -77,6 +77,7 @@ class BasicCrawlerOptions(TypedDict, Generic[TCrawlingContext]):
retry_on_blocked: NotRequired[bool]
proxy_configuration: NotRequired[ProxyConfiguration]
statistics: NotRequired[Statistics[StatisticsState]]
event_manager: NotRequired[EventManager]
configure_logging: NotRequired[bool]
_context_pipeline: NotRequired[ContextPipeline[TCrawlingContext]]
_additional_context_managers: NotRequired[Sequence[AsyncContextManager]]
Expand Down Expand Up @@ -111,6 +112,7 @@ def __init__(
retry_on_blocked: bool = True,
proxy_configuration: ProxyConfiguration | None = None,
statistics: Statistics | None = None,
event_manager: EventManager | None = None,
configure_logging: bool = True,
_context_pipeline: ContextPipeline[TCrawlingContext] | None = None,
_additional_context_managers: Sequence[AsyncContextManager] | None = None,
Expand Down Expand Up @@ -138,6 +140,7 @@ def __init__(
retry_on_blocked: If set to True, the crawler will try to automatically bypass any detected bot protection
proxy_configuration: A HTTP proxy configuration to be used for making requests
statistics: A preconfigured `Statistics` instance if you wish to use non-default configuration
event_manager: A custom `EventManager` instance if you wish to use a non-default one
configure_logging: If set to True, the crawler will configure the logging infrastructure
_context_pipeline: Allows extending the request lifecycle and modifying the crawling context.
This parameter is meant to be used by child classes, not when BasicCrawler is instantiated directly.
Expand All @@ -164,7 +167,7 @@ def __init__(
self._max_session_rotations = max_session_rotations

self._request_provider = request_provider
self._configuration = configuration or Configuration.get_global_configuration()
self._configuration = configuration or service_container.get_configuration()

self._request_handler_timeout = request_handler_timeout
self._internal_timeout = (
Expand All @@ -175,8 +178,7 @@ def __init__(

self._tld_extractor = TLDExtract(cache_dir=tempfile.TemporaryDirectory().name)

self._event_manager = LocalEventManager() # TODO: switch based on configuration
# https://github.com/apify/crawlee-py/issues/83
self._event_manager = event_manager or service_container.get_event_manager()
self._snapshotter = Snapshotter(self._event_manager)
self._pool = AutoscaledPool(
system_status=SystemStatus(self._snapshotter),
Expand Down
21 changes: 13 additions & 8 deletions src/crawlee/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

from datetime import timedelta
from typing import Annotated, ClassVar, cast
from typing import Annotated

from pydantic import AliasChoices, Field
from pydantic_settings import BaseSettings, SettingsConfigDict
Expand All @@ -22,8 +22,6 @@ class Configuration(BaseSettings):
purge_on_start: Whether to purge the storage on start.
"""

_default_instance: ClassVar[Self | None] = None

model_config = SettingsConfigDict(populate_by_name=True)

internal_timeout: Annotated[timedelta | None, Field(alias='crawlee_internal_timeout')] = None
Expand Down Expand Up @@ -206,12 +204,19 @@ class Configuration(BaseSettings):
),
] = False

in_cloud: Annotated[bool, Field(alias='crawlee_in_cloud')] = False

@classmethod
def get_global_configuration(cls) -> Self:
"""Retrieve the global instance of the configuration."""
if Configuration._default_instance is None:
Configuration._default_instance = cls()
from crawlee import service_container

if service_container.get_configuration_if_set() is None:
service_container.set_configuration(cls())

global_instance = service_container.get_configuration()

if not isinstance(global_instance, cls):
raise TypeError(
f'Requested global configuration object of type {cls}, but {global_instance.__class__} was found'
)

return cast(Self, Configuration._default_instance)
return global_instance
23 changes: 10 additions & 13 deletions src/crawlee/memory_storage_client/request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
ProcessedRequest,
ProlongRequestLockResponse,
Request,
RequestListResponse,
RequestQueueHead,
RequestQueueHeadWithLocks,
RequestQueueMetadata,
Expand Down Expand Up @@ -215,7 +214,14 @@ async def list_head(self, *, limit: int | None = None) -> RequestQueueHead:

@override
async def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) -> RequestQueueHeadWithLocks:
raise NotImplementedError('This method is not supported in memory storage.')
result = await self.list_head(limit=limit)
return RequestQueueHeadWithLocks(
lock_secs=lock_secs,
limit=result.limit,
had_multiple_clients=result.had_multiple_clients,
queue_modified_at=result.queue_modified_at,
items=result.items,
)

@override
async def add_request(
Expand Down Expand Up @@ -380,7 +386,7 @@ async def prolong_request_lock(
forefront: bool = False,
lock_secs: int,
) -> ProlongRequestLockResponse:
raise NotImplementedError('This method is not supported in memory storage.')
return ProlongRequestLockResponse(lock_expires_at=datetime.now(timezone.utc))

@override
async def delete_request_lock(
Expand All @@ -389,7 +395,7 @@ async def delete_request_lock(
*,
forefront: bool = False,
) -> None:
raise NotImplementedError('This method is not supported in memory storage.')
return None

@override
async def batch_add_requests(
Expand Down Expand Up @@ -431,15 +437,6 @@ async def batch_add_requests(
async def batch_delete_requests(self, requests: list[Request]) -> BatchRequestsOperationResponse:
raise NotImplementedError('This method is not supported in memory storage.')

@override
async def list_requests(
self,
*,
limit: int | None = None,
exclusive_start_id: str | None = None,
) -> RequestListResponse:
raise NotImplementedError('This method is not supported in memory storage.')

async def update_timestamps(self, *, has_been_modified: bool) -> None:
"""Update the timestamps of the request queue."""
self._accessed_at = datetime.now(timezone.utc)
Expand Down
11 changes: 0 additions & 11 deletions src/crawlee/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ class RequestQueueHeadWithLocks(RequestQueueHead):
"""Model for request queue head with locks."""

lock_secs: Annotated[int, Field(alias='lockSecs')]
items: Annotated[list[Request], Field(alias='items', default_factory=list)]


class BaseListPage(BaseModel):
Expand Down Expand Up @@ -449,13 +448,3 @@ class BatchRequestsOperationResponse(BaseModel):

processed_requests: Annotated[list[ProcessedRequest], Field(alias='processedRequests')]
unprocessed_requests: Annotated[list[UnprocessedRequest], Field(alias='unprocessedRequests')]


class RequestListResponse(BaseModel):
"""Response to a request list call."""

model_config = ConfigDict(populate_by_name=True)

limit: Annotated[int, Field()]
exclusive_start_key: Annotated[str | None, Field(alias='exclusiveStartId')]
items: Annotated[list[Request], Field()]
126 changes: 126 additions & 0 deletions src/crawlee/service_container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from __future__ import annotations
vdusek marked this conversation as resolved.
Show resolved Hide resolved

vdusek marked this conversation as resolved.
Show resolved Hide resolved
from typing import TYPE_CHECKING, Literal

from typing_extensions import NotRequired, TypedDict

from crawlee.configuration import Configuration
from crawlee.events.local_event_manager import LocalEventManager
from crawlee.memory_storage_client.memory_storage_client import MemoryStorageClient

if TYPE_CHECKING:
from crawlee.base_storage_client.base_storage_client import BaseStorageClient
from crawlee.events.event_manager import EventManager


StorageClientType = Literal['cloud', 'local']


class _Services(TypedDict):
local_storage_client: NotRequired[BaseStorageClient]
cloud_storage_client: NotRequired[BaseStorageClient]
configuration: NotRequired[Configuration]
event_manager: NotRequired[EventManager]


_services = _Services()
_default_storage_client_type: StorageClientType = 'local'


class ServiceConflictError(RuntimeError):
"""Thrown when a service is getting reconfigured."""

def __init__(self, service_name: str, new_value: object, old_value: object) -> None:
super().__init__(
f"Service '{service_name}' was already set (existing value is '{old_value}', new value is '{new_value}')."
)


def get_storage_client(*, client_type: StorageClientType | None = None) -> BaseStorageClient:
"""Get the storage client instance for the current environment.

Args:
client_type: Allows retrieving a specific storage client type, regardless of where we are running.

Returns:
The current storage client instance.
"""
if client_type is None:
client_type = _default_storage_client_type

if client_type == 'cloud':
if 'cloud_storage_client' not in _services:
raise RuntimeError('Cloud client was not provided.')
return _services['cloud_storage_client']

if 'local_storage_client' not in _services:
_services['local_storage_client'] = MemoryStorageClient()

return _services['local_storage_client']


def set_local_storage_client(local_client: BaseStorageClient) -> None:
"""Set the local storage client instance.

Args:
local_client: The local storage client instance.
"""
if (existing_service := _services.get('local_storage_client')) and existing_service is not local_client:
raise ServiceConflictError('local_storage_client', local_client, existing_service)

_services['local_storage_client'] = local_client


def set_cloud_storage_client(cloud_client: BaseStorageClient) -> None:
"""Set the cloud storage client instance.

Args:
cloud_client: The cloud storage client instance.
"""
if (existing_service := _services.get('cloud_storage_client')) and existing_service is not cloud_client:
raise ServiceConflictError('cloud_storage_client', cloud_client, existing_service)

_services['cloud_storage_client'] = cloud_client


def set_default_storage_client_type(client_type: StorageClientType) -> None:
"""Set the default storage client type."""
global _default_storage_client_type # noqa: PLW0603
_default_storage_client_type = client_type


def get_configuration() -> Configuration:
"""Get the configuration object."""
if 'configuration' not in _services:
_services['configuration'] = Configuration()

return _services['configuration']


def get_configuration_if_set() -> Configuration | None:
"""Get the configuration object, or None if it hasn't been set yet."""
return _services.get('configuration')


def set_configuration(configuration: Configuration) -> None:
"""Set the configuration object."""
if (existing_service := _services.get('configuration')) and existing_service is not configuration:
raise ServiceConflictError('configuration', configuration, existing_service)

_services['configuration'] = configuration


def get_event_manager() -> EventManager:
"""Get the event manager."""
if 'event_manager' not in _services:
_services['event_manager'] = LocalEventManager()

return _services['event_manager']


def set_event_manager(event_manager: EventManager) -> None:
"""Set the event manager."""
if (existing_service := _services.get('event_manager')) and existing_service is not event_manager:
raise ServiceConflictError('event_manager', event_manager, existing_service)

_services['event_manager'] = event_manager
4 changes: 2 additions & 2 deletions src/crawlee/statistics/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

from typing_extensions import Self, TypeVar

import crawlee.service_container
from crawlee._utils.recurring_task import RecurringTask
from crawlee.events import LocalEventManager
from crawlee.events.types import Event, EventPersistStateData
from crawlee.statistics import FinalStatistics, StatisticsPersistedState, StatisticsState
from crawlee.statistics.error_tracker import ErrorTracker
Expand Down Expand Up @@ -85,7 +85,7 @@ def __init__(
self.error_tracker = ErrorTracker()
self.error_tracker_retry = ErrorTracker()

self._events = event_manager or LocalEventManager()
self._events = event_manager or crawlee.service_container.get_event_manager()

self._requests_in_progress = dict[str, RequestProcessingRecord]()

Expand Down
Loading
Loading