Skip to content

Commit

Permalink
Merge pull request #175 from scrapinghub/item-provider-fixes
Browse files Browse the repository at this point in the history
Switch from ItemProvider to custom builders.
  • Loading branch information
wRAR authored Dec 26, 2023
2 parents 4f122b4 + de6105c commit 459ca30
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 288 deletions.
5 changes: 2 additions & 3 deletions scrapy_poet/commands.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime
import logging
from pathlib import Path
from typing import Dict, Optional, Type
from typing import Optional, Type

import andi
import scrapy
Expand Down Expand Up @@ -38,10 +38,9 @@ def build_instances_from_providers(
request: Request,
response: Response,
plan: andi.Plan,
prev_instances: Optional[Dict] = None,
):
instances = yield super().build_instances_from_providers(
request, response, plan, prev_instances
request, response, plan
)
if request.meta.get("savefixture", False):
saved_dependencies.extend(instances.values())
Expand Down
2 changes: 0 additions & 2 deletions scrapy_poet/downloadermiddlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from .page_input_providers import (
HttpClientProvider,
HttpResponseProvider,
ItemProvider,
PageParamsProvider,
RequestUrlProvider,
ResponseUrlProvider,
Expand All @@ -36,7 +35,6 @@
RequestUrlProvider: 800,
ResponseUrlProvider: 900,
StatsProvider: 1000,
ItemProvider: 2000,
}

InjectionMiddlewareTV = TypeVar("InjectionMiddlewareTV", bound="InjectionMiddleware")
Expand Down
63 changes: 40 additions & 23 deletions scrapy_poet/injection.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import functools
import inspect
import logging
import os
import pprint
import warnings
from typing import Any, Callable, Dict, List, Mapping, Optional, Set, cast
from typing import Any, Callable, Dict, List, Mapping, Optional, Set, Type, cast

import andi
from andi.typeutils import issubclass_safe
Expand All @@ -13,12 +14,12 @@
from scrapy.settings import Settings
from scrapy.statscollectors import MemoryStatsCollector, StatsCollector
from scrapy.utils.conf import build_component_list
from scrapy.utils.defer import maybeDeferred_coro
from scrapy.utils.defer import deferred_from_coro, maybeDeferred_coro
from scrapy.utils.misc import load_object
from twisted.internet.defer import inlineCallbacks
from web_poet import RulesRegistry
from web_poet.page_inputs.http import request_fingerprint
from web_poet.pages import is_injectable
from web_poet.pages import ItemPage, is_injectable
from web_poet.serialization.api import deserialize_leaf, load_class, serialize
from web_poet.utils import get_fq_class_name

Expand Down Expand Up @@ -147,30 +148,60 @@ def build_plan(self, request: Request) -> andi.Plan:
# Callable[[Callable], Optional[Callable]] but the registry
# returns the typing for ``dict.get()`` method.
overrides=self.registry.overrides_for(request.url).get, # type: ignore[arg-type]
custom_builder_fn=self._get_item_builder(request),
)

def _get_item_builder(
self, request: Request
) -> Callable[[Callable], Optional[Callable]]:
"""Return a function suitable for passing as ``custom_builder_fn`` to ``andi.plan``.
The returned function can map an item to a factory for that item based
on the registry.
"""

@functools.lru_cache(maxsize=None) # to minimize the registry queries
def mapping_fn(item_cls: Callable) -> Optional[Callable]:
page_object_cls: Optional[Type[ItemPage]] = self.registry.page_cls_for_item(
request.url, cast(type, item_cls)
)
if not page_object_cls:
return None

async def item_factory(page: page_object_cls) -> item_cls: # type: ignore[valid-type]
return await page.to_item() # type: ignore[attr-defined]

return item_factory

return mapping_fn

@inlineCallbacks
def build_instances(
self,
request: Request,
response: Response,
plan: andi.Plan,
prev_instances: Optional[Dict] = None,
):
"""Build the instances dict from a plan including external dependencies."""
# First we build the external dependencies using the providers
instances = yield from self.build_instances_from_providers(
request,
response,
plan,
prev_instances,
)
# All the remaining dependencies are internal so they can be built just
# following the andi plan.
for cls, kwargs_spec in plan.dependencies:
if cls not in instances.keys():
instances[cls] = cls(**kwargs_spec.kwargs(instances))
cls_fqn = get_fq_class_name(cast(type, cls))
result_cls: type = cast(type, cls)
if isinstance(cls, andi.CustomBuilder):
result_cls = cls.result_class_or_fn
instances[result_cls] = yield deferred_from_coro(
cls.factory(**kwargs_spec.kwargs(instances))
)
else:
instances[result_cls] = cls(**kwargs_spec.kwargs(instances))
cls_fqn = get_fq_class_name(result_cls)
self.crawler.stats.inc_value(f"poet/injector/{cls_fqn}")

return instances
Expand All @@ -181,10 +212,9 @@ def build_instances_from_providers(
request: Request,
response: Response,
plan: andi.Plan,
prev_instances: Optional[Dict] = None,
):
"""Build dependencies handled by registered providers"""
instances: Dict[Callable, Any] = prev_instances or {}
instances: Dict[Callable, Any] = {}
scrapy_provided_dependencies = self.available_dependencies_for_providers(
request, response
)
Expand All @@ -194,22 +224,11 @@ def build_instances_from_providers(
provided_classes = {
cls for cls in dependencies_set if provider.is_provided(cls)
}

# ignore already provided types if provider doesn't need to use them
if not provider.allow_prev_instances:
provided_classes -= instances.keys()
provided_classes -= instances.keys() # ignore already provided types

if not provided_classes:
continue

# If dependency instances were already made by previously invoked
# providers, don't try to build them again since it may result in
# incorrect values (e.g. PO modifying an item > 2 times).
required_deps = set(plan.dependencies[-1][1].values())
built_deps = set(instances.keys())
if required_deps and required_deps == built_deps:
continue

objs, fingerprint = [], None
cache_hit = False
if self.cache:
Expand Down Expand Up @@ -245,8 +264,6 @@ def build_instances_from_providers(
externally_provided=scrapy_provided_dependencies,
full_final_kwargs=False,
).final_kwargs(scrapy_provided_dependencies)
if provider.allow_prev_instances:
kwargs.update({"prev_instances": instances})
try:
# Invoke the provider to get the data
objs = yield maybeDeferred_coro(
Expand Down
140 changes: 6 additions & 134 deletions scrapy_poet/page_input_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,12 @@
different providers in order to acquire data from multiple external sources,
for example, from scrapy-playwright or from an API for automatic extraction.
"""
import asyncio
from dataclasses import make_dataclass
from inspect import isclass
from typing import (
Any,
Callable,
ClassVar,
Dict,
FrozenSet,
List,
Optional,
Set,
Type,
Union,
)
from typing import Any, Callable, ClassVar, FrozenSet, List, Set, Union
from warnings import warn
from weakref import WeakKeyDictionary

import andi
from scrapy import Request
from scrapy.crawler import Crawler
from scrapy.http import Response
from scrapy.utils.defer import maybe_deferred_to_future
from web_poet import (
HttpClient,
HttpRequest,
Expand All @@ -43,13 +26,9 @@
Stats,
)
from web_poet.page_inputs.stats import StatCollector, StatNum
from web_poet.pages import is_injectable

from scrapy_poet.downloader import create_scrapy_downloader
from scrapy_poet.injection_errors import (
MalformedProvidedClassesError,
ProviderDependencyDeadlockError,
)
from scrapy_poet.injection_errors import MalformedProvidedClassesError


class PageObjectInputProvider:
Expand Down Expand Up @@ -119,12 +98,6 @@ def __call__(self, to_provide, response: Response):
provided_classes: Union[Set[Callable], Callable[[Callable], bool]]
name: ClassVar[str] = "" # It must be a unique name. Used by the cache mechanism

# If set to True, the Injector will not skip the Provider when the dependency has
# been built. Instead, the Injector will pass the previously built instances (by
# the other providers) to the Provider. The Provider can then choose to modify
# these previous instances before returning them to the Injector.
allow_prev_instances: bool = False

def is_provided(self, type_: Callable) -> bool:
"""
Return ``True`` if the given type is provided by this provider based
Expand Down Expand Up @@ -265,122 +238,21 @@ def __call__(self, to_provide: Set[Callable], response: Response):


class ItemProvider(PageObjectInputProvider):
provided_classes = set()
name = "item"

template_deadlock_msg = (
"Deadlock detected! A loop has been detected to "
"trying to resolve this plan: {plan}"
)

allow_prev_instances: bool = True

def __init__(self, injector):
super().__init__(injector)
self.registry = self.injector.registry

# The key that's used here is the ``scrapy.Request`` instance to ensure
# that the cached instances under it are properly garbage collected
# after processing such request.
self._cached_instances = WeakKeyDictionary()

# This is only used when the reactor is ``AsyncioSelectorReactor`` since
# the ``asyncio.Future`` that it uses doesn't trigger a RecursionError
# unlike Twisted's Deferred. So we use this as a soft-proxy to recursion
# depth to check how many calls to ``self.injector.build_instances`` are
# made.
# Similar to ``_cached_instances`` above, the key is ``scrapy.Request``.
self._build_instances_call_counter = WeakKeyDictionary()

def provided_classes(self, cls):
"""If the item is in any of the ``to_return`` in the rules, then it can
be provided by using the corresponding page object in ``use``.
"""
return isclass(cls) and self.registry.search(to_return=cls)

def update_cache(self, request: Request, mapping: Dict[Type, Any]) -> None:
if request not in self._cached_instances:
self._cached_instances[request] = {}
self._cached_instances[request].update(mapping)

def get_from_cache(self, request: Request, cls: Callable) -> Optional[Any]:
return self._cached_instances.get(request, {}).get(cls)

def check_if_deadlock(self, request: Request) -> bool:
"""Should only be used when ``AsyncioSelectorReactor`` is the reactor."""
if request not in self._build_instances_call_counter:
self._build_instances_call_counter[request] = 0
self._build_instances_call_counter[request] += 1

# If there are more than 100 calls to ``injector.build_instances()``
# for a given request, it might be a deadlock. This limit is large
# enough since the dependency tree for item dependencies needing page
# objects and/or items wouldn't reach this far.
if self._build_instances_call_counter[request] > 100:
return True
return False
msg = "The ItemProvider now does nothing and you should disable it."
warn(msg, DeprecationWarning, stacklevel=2)

async def __call__(
self,
to_provide: Set[Callable],
request: Request,
response: Response,
prev_instances: Dict,
) -> List[Any]:
results = []
for cls in to_provide:
if item := self.get_from_cache(request, cls):
results.append(item)
continue

page_object_cls = self.registry.page_cls_for_item(request.url, cls)
if not page_object_cls:
warn(
f"Can't find appropriate page object for {cls} item for "
f"url: '{request.url}'. Check the ApplyRules you're using."
)
continue

# https://github.com/scrapinghub/andi/issues/23#issuecomment-1331682180
fake_call_signature = make_dataclass(
"FakeCallSignature", [("page_object", page_object_cls)]
)
plan = andi.plan(
fake_call_signature,
is_injectable=is_injectable,
externally_provided=self.injector.is_class_provided_by_any_provider,
)

try:
deferred_or_future = maybe_deferred_to_future(
self.injector.build_instances(
request, response, plan, prev_instances
)
)
# RecursionError NOT raised when ``AsyncioSelectorReactor`` is used.
# Could be related: https://github.com/python/cpython/issues/93837

# Need to check before awaiting on the ``asyncio.Future``
# before it gets stuck on a potential deadlock.
if asyncio.isfuture(deferred_or_future):
if self.check_if_deadlock(request):
raise ProviderDependencyDeadlockError(
self.template_deadlock_msg.format(plan=plan)
)

po_instances = await deferred_or_future
except RecursionError:
raise ProviderDependencyDeadlockError(
self.template_deadlock_msg.format(plan=plan)
)

page_object = po_instances[page_object_cls]
item = await page_object.to_item()

self.update_cache(request, po_instances)
self.update_cache(request, {type(item): item})

results.append(item)
return results
return []


class ScrapyPoetStatCollector(StatCollector):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
package_data={"scrapy_poet": ["VERSION"]},
python_requires=">=3.8",
install_requires=[
"andi >= 0.5.0",
"andi >= 0.6.0",
"attrs >= 21.3.0",
"parsel >= 1.5.0",
"scrapy >= 2.6.0",
Expand Down
Loading

0 comments on commit 459ca30

Please sign in to comment.