Skip to content

Commit

Permalink
Python Scrapy Actor uses Request Queue (#184)
Browse files Browse the repository at this point in the history
Closes #183
  • Loading branch information
vdusek authored Sep 13, 2023
1 parent 22b2157 commit 7762e1a
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 22 deletions.
8 changes: 2 additions & 6 deletions templates/python-scrapy/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
# Add your dependencies here.
# See https://pip.pypa.io/en/latest/reference/requirements-file-format/
# for how to format them
apify ~= 1.1.3
apify ~= 1.1.4
nest-asyncio ~= 1.5.7
scrapy ~= 2.10.0

# Version >= 23 causes the throwing of the following exception:
# AttributeError: 'AsyncioSelectorReactor' object has no attribute '_handleSignals'
Twisted < 23.0.0
scrapy ~= 2.10.1
38 changes: 29 additions & 9 deletions templates/python-scrapy/src/main.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,45 @@
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from scrapy.settings import Settings

from apify import Actor

from .pipelines import ActorDatasetPushPipeline
from .spiders.title_spider import TitleSpider


def get_scrapy_settings(max_depth: int) -> Settings:
"""
Get Scrapy project settings.
"""
settings = get_project_settings()
settings['ITEM_PIPELINES'] = {
'src.pipelines.ActorDatasetPushPipeline': 1,
}
settings['SCHEDULER'] = 'src.scheduler.ApifyScheduler'
settings['DOWNLOADER_MIDDLEWARES'] = {
'scrapy.downloadermiddlewares.retry.RetryMiddleware': None,
'src.middlewares.ApifyRetryMiddleware': 999, # we want top priority in process_response
}
settings['DEPTH_LIMIT'] = max_depth
return settings


async def main():
async with Actor:
Actor.log.info('Actor is being executed...')

# Process Actor input
actor_input = await Actor.get_input() or {}
max_depth = actor_input.get('max_depth', 1)
start_urls = [start_url.get('url') for start_url in actor_input.get('start_urls', [{ 'url': 'https://apify.com' }])]
start_urls = [start_url.get('url') for start_url in actor_input.get('start_urls', [{'url': 'https://apify.com'}])]
settings = get_scrapy_settings(max_depth)

settings = get_project_settings()
settings['ITEM_PIPELINES'] = { ActorDatasetPushPipeline: 1 }
settings['DEPTH_LIMIT'] = max_depth

process = CrawlerProcess(settings, install_root_handler=False)
# Add start URLs to the request queue
rq = await Actor.open_request_queue()
for url in start_urls:
await rq.add_request({'url': url})

# If you want to run multiple spiders, call `process.crawl` for each of them here
process.crawl(TitleSpider, start_urls=start_urls)

process = CrawlerProcess(settings, install_root_handler=False)
process.crawl(TitleSpider)
process.start()
91 changes: 91 additions & 0 deletions templates/python-scrapy/src/middlewares.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import traceback

from scrapy import Spider
from scrapy.downloadermiddlewares.retry import RetryMiddleware
from scrapy.http import Request, Response
from scrapy.utils.response import response_status_message
from scrapy.exceptions import IgnoreRequest

from apify.storages import RequestQueue

from .utils import nested_event_loop, open_queue_with_custom_client, to_apify_request


class ApifyRetryMiddleware(RetryMiddleware):
"""
Basically the default Scrapy retry middleware enriched with Apify's Request Queue interaction.
"""

def __init__(self, *args: list, **kwargs: dict) -> None:
super().__init__(*args, **kwargs)
try:
self._rq: RequestQueue = nested_event_loop.run_until_complete(open_queue_with_custom_client())
except BaseException:
traceback.print_exc()

def __del__(self):
nested_event_loop.stop()
nested_event_loop.close()

def process_response(self, request: Request, response: Response, spider: Spider) -> Request | Response:
"""
Process the response and decide whether the request should be retried.
Args:
request: The request that was sent.
response: The response that was received.
spider: The Spider that sent the request.
Returns:
The response, or a new request if the request should be retried.
"""
# Robots requests are bypassed directly, they don't go through a Scrapy Scheduler, and also through our
# Request Queue. Check the scrapy.downloadermiddlewares.robotstxt.RobotsTxtMiddleware for details.
assert isinstance(request.url, str)
if request.url.endswith('robots.txt'):
return response

try:
returned = nested_event_loop.run_until_complete(self._handle_retry_logic(request, response, spider))
except BaseException:
traceback.print_exc()

return returned

def process_exception(
self,
request: Request,
exception: BaseException,
spider: Spider,
) -> None | Response | Request:
apify_request = to_apify_request(request)

if isinstance(exception, IgnoreRequest):
try:
nested_event_loop.run_until_complete(self._rq.mark_request_as_handled(apify_request))
except BaseException:
traceback.print_exc()
else:
nested_event_loop.run_until_complete(self._rq.reclaim_request(apify_request))

return super().process_exception(request, exception, spider)

async def _handle_retry_logic(
self,
request: Request,
response: Response,
spider: Spider
) -> Request | Response:
apify_request = to_apify_request(request)

if request.meta.get('dont_retry', False):
await self._rq.mark_request_as_handled(apify_request)
return response

if response.status in self.retry_http_codes:
await self._rq.reclaim_request(apify_request)
reason = response_status_message(response.status)
return self._retry(request, reason, spider) or response

await self._rq.mark_request_as_handled(apify_request)
return response
7 changes: 5 additions & 2 deletions templates/python-scrapy/src/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
#
# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html

from apify import Actor
from itemadapter import ItemAdapter

from scrapy import Item, Spider

from apify import Actor


# Used to output the items into the actor's default dataset
# Enabled only when the project is run as an actor
class ActorDatasetPushPipeline:
async def process_item(self, item, spider):
async def process_item(self, item: Item, spider: Spider) -> dict:
item_dict = ItemAdapter(item).asdict()
await Actor.push_data(item_dict)
return item
102 changes: 102 additions & 0 deletions templates/python-scrapy/src/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import traceback

from scrapy import Spider
from scrapy.core.scheduler import BaseScheduler
from scrapy.http.request import Request
from scrapy.utils.reactor import is_asyncio_reactor_installed

from apify.storages import RequestQueue

from .utils import nested_event_loop, open_queue_with_custom_client, to_apify_request, to_scrapy_request


class ApifyScheduler(BaseScheduler):
"""
A Scrapy scheduler that uses the Apify Request Queue to manage requests.
This scheduler requires the asyncio Twisted reactor to be installed.
"""

def __init__(self) -> None:
if not is_asyncio_reactor_installed():
raise ValueError(
f'{ApifyScheduler.__qualname__} requires the asyncio Twisted reactor. '
'Make sure you have it configured in the TWISTED_REACTOR setting. See the asyncio '
'documentation of Scrapy for more information.'
)
self._rq: RequestQueue | None = None
self.spider: Spider | None = None

def open(self, spider: Spider) -> None:
"""
Open the scheduler.
Args:
spider: The spider that the scheduler is associated with.
"""
self.spider = spider

try:
self._rq = nested_event_loop.run_until_complete(open_queue_with_custom_client())
except BaseException:
traceback.print_exc()

def close(self, reason: str) -> None:
"""
Close the scheduler.
Args:
reason: The reason for closing the scheduler.
"""
nested_event_loop.stop()
nested_event_loop.close()

def has_pending_requests(self) -> bool:
"""
Check if the scheduler has any pending requests.
Returns:
True if the scheduler has any pending requests, False otherwise.
"""
try:
is_finished = nested_event_loop.run_until_complete(self._rq.is_finished())
except BaseException:
traceback.print_exc()

return not is_finished

def enqueue_request(self, request: Request) -> bool:
"""
Add a request to the scheduler.
Args:
request: The request to add to the scheduler.
Returns:
True if the request was successfully enqueued, False otherwise.
"""
apify_request = to_apify_request(request)

try:
result = nested_event_loop.run_until_complete(self._rq.add_request(apify_request))
except BaseException:
traceback.print_exc()

return bool(result['wasAlreadyPresent'])

def next_request(self) -> Request | None:
"""
Fetch the next request from the scheduler.
Returns:
The next request, or None if there are no more requests.
"""
try:
apify_request = nested_event_loop.run_until_complete(self._rq.fetch_next_request())
except BaseException:
traceback.print_exc()

if apify_request is None:
return None

return to_scrapy_request(apify_request)
16 changes: 11 additions & 5 deletions templates/python-scrapy/src/spiders/title_spider.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
from typing import Generator
from urllib.parse import urljoin

import scrapy
from scrapy.responsetypes import Response

from apify import Actor


# Scrapes titles pages and enqueues all links it finds on the page
class TitleSpider(scrapy.Spider):
"""
Scrapes titles pages and enqueues all links it finds on the page.
"""

name = 'title_spider'

def __init__(self, start_urls, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_urls = start_urls
def parse(self, response: Response) -> Generator[dict, None, None]:
Actor.log.info(f'TitleSpider is parsing {response}...')

def parse(self, response):
yield {
'url': response.url,
'title': response.css('title::text').extract_first(),
Expand Down
Loading

0 comments on commit 7762e1a

Please sign in to comment.