Skip to content

Commit

Permalink
refactor!: RequestQueue and service management rehaul (#429)
Browse files Browse the repository at this point in the history
- closes #423
- closes #174
- closes #203
- related to #354 - let's investigate this, but I believe that it won't
go away until #433 is resolved
- closes #83 

Locking in memory storage was not implemented - see #433

---------

Co-authored-by: Vlada Dusek <v.dusek96@gmail.com>
  • Loading branch information
janbuchar and vdusek authored Aug 22, 2024
1 parent ada0990 commit b155a9f
Show file tree
Hide file tree
Showing 17 changed files with 498 additions and 340 deletions.
2 changes: 1 addition & 1 deletion src/crawlee/_utils/lru_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
T = TypeVar('T')


class LRUCache(MutableMapping, Generic[T]):
class LRUCache(MutableMapping[str, T], Generic[T]):
"""Attempt to reimplement LRUCache from `@apify/datastructures` using `OrderedDict`."""

def __init__(self, max_length: int) -> None:
Expand Down
15 changes: 0 additions & 15 deletions src/crawlee/base_storage_client/base_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
ProcessedRequest,
ProlongRequestLockResponse,
Request,
RequestListResponse,
RequestQueueHead,
RequestQueueHeadWithLocks,
RequestQueueMetadata,
Expand Down Expand Up @@ -185,17 +184,3 @@ async def batch_delete_requests(self, requests: list[Request]) -> BatchRequestsO
Args:
requests: The requests to delete from the queue.
"""

@abstractmethod
async def list_requests(
self,
*,
limit: int | None = None,
exclusive_start_id: str | None = None,
) -> RequestListResponse:
"""List requests from the queue.
Args:
limit: How many requests to retrieve.
exclusive_start_id: All requests up to this one (including) are skipped from the result.
"""
14 changes: 8 additions & 6 deletions src/crawlee/basic_crawler/basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
from tldextract import TLDExtract
from typing_extensions import NotRequired, TypedDict, TypeVar, Unpack, assert_never

from crawlee import Glob
from crawlee import Glob, service_container
from crawlee._utils.urls import convert_to_absolute_url, is_url_absolute
from crawlee._utils.wait import wait_for
from crawlee.autoscaling import AutoscaledPool, ConcurrencySettings
from crawlee.autoscaling.snapshotter import Snapshotter
from crawlee.autoscaling.system_status import SystemStatus
from crawlee.basic_crawler.context_pipeline import ContextPipeline
from crawlee.basic_crawler.router import Router
from crawlee.configuration import Configuration
from crawlee.enqueue_strategy import EnqueueStrategy
from crawlee.errors import (
ContextPipelineInitializationError,
Expand All @@ -35,7 +34,6 @@
SessionError,
UserDefinedErrorHandlerError,
)
from crawlee.events import LocalEventManager
from crawlee.http_clients import HttpxHttpClient
from crawlee.log_config import CrawleeLogFormatter
from crawlee.models import BaseRequestData, DatasetItemsListPage, Request, RequestState
Expand All @@ -47,6 +45,8 @@
if TYPE_CHECKING:
import re

from crawlee.configuration import Configuration
from crawlee.events.event_manager import EventManager
from crawlee.http_clients import BaseHttpClient, HttpResponse
from crawlee.proxy_configuration import ProxyConfiguration, ProxyInfo
from crawlee.sessions import Session
Expand Down Expand Up @@ -77,6 +77,7 @@ class BasicCrawlerOptions(TypedDict, Generic[TCrawlingContext]):
retry_on_blocked: NotRequired[bool]
proxy_configuration: NotRequired[ProxyConfiguration]
statistics: NotRequired[Statistics[StatisticsState]]
event_manager: NotRequired[EventManager]
configure_logging: NotRequired[bool]
_context_pipeline: NotRequired[ContextPipeline[TCrawlingContext]]
_additional_context_managers: NotRequired[Sequence[AsyncContextManager]]
Expand Down Expand Up @@ -111,6 +112,7 @@ def __init__(
retry_on_blocked: bool = True,
proxy_configuration: ProxyConfiguration | None = None,
statistics: Statistics | None = None,
event_manager: EventManager | None = None,
configure_logging: bool = True,
_context_pipeline: ContextPipeline[TCrawlingContext] | None = None,
_additional_context_managers: Sequence[AsyncContextManager] | None = None,
Expand Down Expand Up @@ -138,6 +140,7 @@ def __init__(
retry_on_blocked: If set to True, the crawler will try to automatically bypass any detected bot protection
proxy_configuration: A HTTP proxy configuration to be used for making requests
statistics: A preconfigured `Statistics` instance if you wish to use non-default configuration
event_manager: A custom `EventManager` instance if you wish to use a non-default one
configure_logging: If set to True, the crawler will configure the logging infrastructure
_context_pipeline: Allows extending the request lifecycle and modifying the crawling context.
This parameter is meant to be used by child classes, not when BasicCrawler is instantiated directly.
Expand All @@ -164,7 +167,7 @@ def __init__(
self._max_session_rotations = max_session_rotations

self._request_provider = request_provider
self._configuration = configuration or Configuration.get_global_configuration()
self._configuration = configuration or service_container.get_configuration()

self._request_handler_timeout = request_handler_timeout
self._internal_timeout = (
Expand All @@ -175,8 +178,7 @@ def __init__(

self._tld_extractor = TLDExtract(cache_dir=tempfile.TemporaryDirectory().name)

self._event_manager = LocalEventManager() # TODO: switch based on configuration
# https://github.com/apify/crawlee-py/issues/83
self._event_manager = event_manager or service_container.get_event_manager()
self._snapshotter = Snapshotter(self._event_manager)
self._pool = AutoscaledPool(
system_status=SystemStatus(self._snapshotter),
Expand Down
21 changes: 13 additions & 8 deletions src/crawlee/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

from datetime import timedelta
from typing import Annotated, ClassVar, cast
from typing import Annotated

from pydantic import AliasChoices, Field
from pydantic_settings import BaseSettings, SettingsConfigDict
Expand All @@ -22,8 +22,6 @@ class Configuration(BaseSettings):
purge_on_start: Whether to purge the storage on start.
"""

_default_instance: ClassVar[Self | None] = None

model_config = SettingsConfigDict(populate_by_name=True)

internal_timeout: Annotated[timedelta | None, Field(alias='crawlee_internal_timeout')] = None
Expand Down Expand Up @@ -206,12 +204,19 @@ class Configuration(BaseSettings):
),
] = False

in_cloud: Annotated[bool, Field(alias='crawlee_in_cloud')] = False

@classmethod
def get_global_configuration(cls) -> Self:
"""Retrieve the global instance of the configuration."""
if Configuration._default_instance is None:
Configuration._default_instance = cls()
from crawlee import service_container

if service_container.get_configuration_if_set() is None:
service_container.set_configuration(cls())

global_instance = service_container.get_configuration()

if not isinstance(global_instance, cls):
raise TypeError(
f'Requested global configuration object of type {cls}, but {global_instance.__class__} was found'
)

return cast(Self, Configuration._default_instance)
return global_instance
23 changes: 10 additions & 13 deletions src/crawlee/memory_storage_client/request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
ProcessedRequest,
ProlongRequestLockResponse,
Request,
RequestListResponse,
RequestQueueHead,
RequestQueueHeadWithLocks,
RequestQueueMetadata,
Expand Down Expand Up @@ -215,7 +214,14 @@ async def list_head(self, *, limit: int | None = None) -> RequestQueueHead:

@override
async def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) -> RequestQueueHeadWithLocks:
raise NotImplementedError('This method is not supported in memory storage.')
result = await self.list_head(limit=limit)
return RequestQueueHeadWithLocks(
lock_secs=lock_secs,
limit=result.limit,
had_multiple_clients=result.had_multiple_clients,
queue_modified_at=result.queue_modified_at,
items=result.items,
)

@override
async def add_request(
Expand Down Expand Up @@ -380,7 +386,7 @@ async def prolong_request_lock(
forefront: bool = False,
lock_secs: int,
) -> ProlongRequestLockResponse:
raise NotImplementedError('This method is not supported in memory storage.')
return ProlongRequestLockResponse(lock_expires_at=datetime.now(timezone.utc))

@override
async def delete_request_lock(
Expand All @@ -389,7 +395,7 @@ async def delete_request_lock(
*,
forefront: bool = False,
) -> None:
raise NotImplementedError('This method is not supported in memory storage.')
return None

@override
async def batch_add_requests(
Expand Down Expand Up @@ -431,15 +437,6 @@ async def batch_add_requests(
async def batch_delete_requests(self, requests: list[Request]) -> BatchRequestsOperationResponse:
raise NotImplementedError('This method is not supported in memory storage.')

@override
async def list_requests(
self,
*,
limit: int | None = None,
exclusive_start_id: str | None = None,
) -> RequestListResponse:
raise NotImplementedError('This method is not supported in memory storage.')

async def update_timestamps(self, *, has_been_modified: bool) -> None:
"""Update the timestamps of the request queue."""
self._accessed_at = datetime.now(timezone.utc)
Expand Down
11 changes: 0 additions & 11 deletions src/crawlee/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ class RequestQueueHeadWithLocks(RequestQueueHead):
"""Model for request queue head with locks."""

lock_secs: Annotated[int, Field(alias='lockSecs')]
items: Annotated[list[Request], Field(alias='items', default_factory=list)]


class BaseListPage(BaseModel):
Expand Down Expand Up @@ -449,13 +448,3 @@ class BatchRequestsOperationResponse(BaseModel):

processed_requests: Annotated[list[ProcessedRequest], Field(alias='processedRequests')]
unprocessed_requests: Annotated[list[UnprocessedRequest], Field(alias='unprocessedRequests')]


class RequestListResponse(BaseModel):
"""Response to a request list call."""

model_config = ConfigDict(populate_by_name=True)

limit: Annotated[int, Field()]
exclusive_start_key: Annotated[str | None, Field(alias='exclusiveStartId')]
items: Annotated[list[Request], Field()]
126 changes: 126 additions & 0 deletions src/crawlee/service_container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Literal

from typing_extensions import NotRequired, TypedDict

from crawlee.configuration import Configuration
from crawlee.events.local_event_manager import LocalEventManager
from crawlee.memory_storage_client.memory_storage_client import MemoryStorageClient

if TYPE_CHECKING:
from crawlee.base_storage_client.base_storage_client import BaseStorageClient
from crawlee.events.event_manager import EventManager


StorageClientType = Literal['cloud', 'local']


class _Services(TypedDict):
local_storage_client: NotRequired[BaseStorageClient]
cloud_storage_client: NotRequired[BaseStorageClient]
configuration: NotRequired[Configuration]
event_manager: NotRequired[EventManager]


_services = _Services()
_default_storage_client_type: StorageClientType = 'local'


class ServiceConflictError(RuntimeError):
"""Thrown when a service is getting reconfigured."""

def __init__(self, service_name: str, new_value: object, old_value: object) -> None:
super().__init__(
f"Service '{service_name}' was already set (existing value is '{old_value}', new value is '{new_value}')."
)


def get_storage_client(*, client_type: StorageClientType | None = None) -> BaseStorageClient:
"""Get the storage client instance for the current environment.
Args:
client_type: Allows retrieving a specific storage client type, regardless of where we are running.
Returns:
The current storage client instance.
"""
if client_type is None:
client_type = _default_storage_client_type

if client_type == 'cloud':
if 'cloud_storage_client' not in _services:
raise RuntimeError('Cloud client was not provided.')
return _services['cloud_storage_client']

if 'local_storage_client' not in _services:
_services['local_storage_client'] = MemoryStorageClient()

return _services['local_storage_client']


def set_local_storage_client(local_client: BaseStorageClient) -> None:
"""Set the local storage client instance.
Args:
local_client: The local storage client instance.
"""
if (existing_service := _services.get('local_storage_client')) and existing_service is not local_client:
raise ServiceConflictError('local_storage_client', local_client, existing_service)

_services['local_storage_client'] = local_client


def set_cloud_storage_client(cloud_client: BaseStorageClient) -> None:
"""Set the cloud storage client instance.
Args:
cloud_client: The cloud storage client instance.
"""
if (existing_service := _services.get('cloud_storage_client')) and existing_service is not cloud_client:
raise ServiceConflictError('cloud_storage_client', cloud_client, existing_service)

_services['cloud_storage_client'] = cloud_client


def set_default_storage_client_type(client_type: StorageClientType) -> None:
"""Set the default storage client type."""
global _default_storage_client_type # noqa: PLW0603
_default_storage_client_type = client_type


def get_configuration() -> Configuration:
"""Get the configuration object."""
if 'configuration' not in _services:
_services['configuration'] = Configuration()

return _services['configuration']


def get_configuration_if_set() -> Configuration | None:
"""Get the configuration object, or None if it hasn't been set yet."""
return _services.get('configuration')


def set_configuration(configuration: Configuration) -> None:
"""Set the configuration object."""
if (existing_service := _services.get('configuration')) and existing_service is not configuration:
raise ServiceConflictError('configuration', configuration, existing_service)

_services['configuration'] = configuration


def get_event_manager() -> EventManager:
"""Get the event manager."""
if 'event_manager' not in _services:
_services['event_manager'] = LocalEventManager()

return _services['event_manager']


def set_event_manager(event_manager: EventManager) -> None:
"""Set the event manager."""
if (existing_service := _services.get('event_manager')) and existing_service is not event_manager:
raise ServiceConflictError('event_manager', event_manager, existing_service)

_services['event_manager'] = event_manager
4 changes: 2 additions & 2 deletions src/crawlee/statistics/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

from typing_extensions import Self, TypeVar

import crawlee.service_container
from crawlee._utils.recurring_task import RecurringTask
from crawlee.events import LocalEventManager
from crawlee.events.types import Event, EventPersistStateData
from crawlee.statistics import FinalStatistics, StatisticsPersistedState, StatisticsState
from crawlee.statistics.error_tracker import ErrorTracker
Expand Down Expand Up @@ -85,7 +85,7 @@ def __init__(
self.error_tracker = ErrorTracker()
self.error_tracker_retry = ErrorTracker()

self._events = event_manager or LocalEventManager()
self._events = event_manager or crawlee.service_container.get_event_manager()

self._requests_in_progress = dict[str, RequestProcessingRecord]()

Expand Down
Loading

0 comments on commit b155a9f

Please sign in to comment.