Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for “post requests and data” when integrating scrapy. #317

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions frontera/contrib/backends/remote/codecs/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def _prepare_request_message(request):
'method': request.method,
'headers': request.headers,
'cookies': request.cookies,
'meta': request.meta}
'meta': request.meta,
'body':request.body}


def _prepare_links_message(links):
Expand All @@ -71,7 +72,11 @@ def _prepare_response_message(response, send_body):
return {'url': response.url,
'status_code': response.status_code,
'meta': response.meta,
'body': b64encode(response.body) if send_body else None}
'response_headers':response.headers,
'body': b64encode(response.body) if send_body else None,
'request_method':response.request.method,
'request_headers':response.request.headers,
'request_cookies':response.request.cookies}


class CrawlFrontierJSONEncoder(json.JSONEncoder):
Expand Down Expand Up @@ -153,9 +158,13 @@ def __init__(self, request_model, response_model, *a, **kw):
def _response_from_object(self, obj):
url = obj['url']
request = self._request_model(url=url,
meta=obj['meta'])
meta=obj['meta'],
method=obj['request_method'],
headers=obj['request_headers'],
cookies=obj['request_cookies'])
return self._response_model(url=url,
status_code=obj['status_code'],
headers=obj['response_headers'],
body=b64decode(obj['body']) if obj['body'] is not None else None,
request=request)

Expand All @@ -164,7 +173,8 @@ def _request_from_object(self, obj):
method=obj['method'],
headers=obj['headers'],
cookies=obj['cookies'],
meta=obj['meta'])
meta=obj['meta'],
body=obj['body'])

def decode(self, message):
message = _convert_from_saved_type(super(Decoder, self).decode(message))
Expand Down Expand Up @@ -194,8 +204,4 @@ def decode(self, message):

def decode_request(self, message):
obj = _convert_from_saved_type(super(Decoder, self).decode(message))
return self._request_model(url=obj['url'],
method=obj['method'],
headers=obj['headers'],
cookies=obj['cookies'],
meta=obj['meta'])
return self._request_from_object(obj)
14 changes: 9 additions & 5 deletions frontera/contrib/backends/remote/codecs/msgpack.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ def serialize(obj):
else:
logger.warning('unable to serialize object: {}'.format(obj))
return None
return [request.url, request.method, request.headers, request.cookies, serialize(request.meta)]
return [request.url, request.method, request.headers, request.cookies, serialize(request.meta), request.body]


def _prepare_response_message(response, send_body):
return [response.url, response.status_code, response.meta, response.headers, response.body if send_body else None]

return [response.url, response.status_code, response.meta, response.headers, response.body if send_body else None
, response.request.method, response.request.headers, response.request.cookies]

class Encoder(BaseEncoder):
def __init__(self, request_model, *a, **kw):
Expand Down Expand Up @@ -81,14 +81,18 @@ def _response_from_object(self, obj):
body=obj[4],
headers=obj[3],
request=self._request_model(url=url,
meta=obj[2]))
meta=obj[2],
method=obj[5],
headers=obj[6],
cookies=obj[7]))

def _request_from_object(self, obj):
return self._request_model(url=to_native_str(obj[0]),
method=obj[1],
headers=obj[2],
cookies=obj[3],
meta=obj[4])
meta=obj[4],
body=obj[5])

def decode(self, buffer):
obj = unpackb(buffer, encoding='utf-8')
Expand Down
5 changes: 3 additions & 2 deletions frontera/contrib/backends/sqlalchemy/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def _create_page(self, obj):
db_page.headers = obj.headers
db_page.method = to_native_str(obj.method)
db_page.cookies = obj.cookies
db_page.body=obj.body
elif isinstance(obj, Response):
db_page.headers = obj.request.headers
db_page.method = to_native_str(obj.request.method)
Expand Down Expand Up @@ -177,7 +178,7 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs):
for item in self._order_by(self.session.query(self.queue_model).filter_by(partition_id=partition_id)).\
limit(max_n_requests):
method = item.method or b'GET'
r = Request(item.url, method=method, meta=item.meta, headers=item.headers, cookies=item.cookies)
r = Request(item.url, method=method, meta=item.meta, headers=item.headers, cookies=item.cookies, body=item.body)
r.meta[b'fingerprint'] = to_bytes(item.fingerprint)
r.meta[b'score'] = item.score
results.append(r)
Expand All @@ -203,7 +204,7 @@ def schedule(self, batch):
host_crc32 = get_crc32(hostname)
q = self.queue_model(fingerprint=to_native_str(fprint), score=score, url=request.url, meta=request.meta,
headers=request.headers, cookies=request.cookies, method=to_native_str(request.method),
partition_id=partition_id, host_crc32=host_crc32, created_at=time()*1E+6)
partition_id=partition_id, host_crc32=host_crc32, created_at=time()*1E+6, body=request.body)
to_save.append(q)
request.meta[b'state'] = States.QUEUED
self.session.bulk_save_objects(to_save)
Expand Down
2 changes: 2 additions & 0 deletions frontera/contrib/backends/sqlalchemy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class MetadataModel(DeclarativeBase):
error = Column(String(128))
meta = Column(PickleType())
headers = Column(PickleType())
body = Column(PickleType())
cookies = Column(PickleType())
method = Column(String(6))

Expand Down Expand Up @@ -76,6 +77,7 @@ class QueueModelMixin(object):
meta = Column(PickleType())
headers = Column(PickleType())
cookies = Column(PickleType())
body = Column(PickleType())
method = Column(String(6))
created_at = Column(BigInteger, index=True)
depth = Column(SmallInteger)
Expand Down
4 changes: 2 additions & 2 deletions frontera/contrib/backends/sqlalchemy/revisiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs):
limit(max_n_requests):
method = 'GET' if not item.method else item.method
results.append(Request(item.url, method=method, meta=item.meta, headers=item.headers,
cookies=item.cookies))
cookies=item.cookies, body=item.body))
self.session.delete(item)
self.session.commit()
except Exception as exc:
Expand All @@ -92,7 +92,7 @@ def schedule(self, batch):
q = self.queue_model(fingerprint=fprint, score=score, url=request.url, meta=request.meta,
headers=request.headers, cookies=request.cookies, method=request.method,
partition_id=partition_id, host_crc32=host_crc32, created_at=time()*1E+6,
crawl_at=schedule_at)
crawl_at=schedule_at, body=request.body)
to_save.append(q)
request.meta[b'state'] = States.QUEUED
self.session.bulk_save_objects(to_save)
Expand Down
15 changes: 14 additions & 1 deletion tests/contrib/backends/hbase/test_hbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from tests import mock
import pytest

r1 = Request('https://www.example.com', meta={b'fingerprint': b'10',
data = {'id': 'xxx',
'name': 'yyy'}

r1 = Request('https://www.example.com', method='post', body=data, meta={b'fingerprint': b'10',
b'domain': {b'name': b'www.example.com', b'fingerprint': b'81'}})
r2 = Request('http://example.com/some/page/', meta={b'fingerprint': b'11',
b'domain': {b'name': b'example.com', b'fingerprint': b'82'}})
Expand Down Expand Up @@ -52,6 +55,16 @@ def test_queue(self):
assert set([r.url for r in queue.get_next_requests(10, 1, min_requests=3, min_hosts=1,
max_requests_per_host=10)]) == set([r1.url, r2.url])


def test_queue_with_post_request(self):
connection = Connection(host='hbase-docker', port=9090)
queue = HBaseQueue(connection, 1, b'queue', drop=True, use_snappy=False)
batch = [('10', 0.5, r1, True)]
queue.schedule(batch)
requests=queue.get_next_requests(10, 0, min_requests=3, min_hosts=1,max_requests_per_host=10)
self.assertEqual(b'POST', requests[0].method)
self.assertEqual(data, requests[0].body)

@pytest.mark.xfail
def test_queue_with_delay(self):
connection = Connection(host='hbase-docker', port=9090)
Expand Down
21 changes: 20 additions & 1 deletion tests/contrib/backends/memory/test_backend_memory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import absolute_import
from unittest import TestCase, main

from frontera.contrib.backends.memory import MemoryQueue
from tests.test_overused_buffer import DFSOverusedBackendTest
from tests import backends

from frontera.core.models import Request

class TestFIFO(backends.FIFOBackendTest):
backend_class = 'frontera.contrib.backends.memory.FIFO'
Expand Down Expand Up @@ -29,3 +32,19 @@ class TestBFS(backends.BFSBackendTest):

class TestRANDOM(backends.RANDOMBackendTest):
backend_class = 'frontera.contrib.backends.memory.RANDOM'


class TestMemoryQueue(TestCase):
def test_scheduling_past_1part_post(self):
subject = MemoryQueue(1)
data={'id':'xxx',
'name':'yyy'}
batch = [
("1", 1, Request(url='https://www.knuthellan.com/', body=data, method='POST'), True),
]
subject.schedule(batch)
requests = subject.get_next_requests(5, 0)
for request in requests:
self.assertTrue(request.method == b'POST')
self.assertTrue(request.body == data)

26 changes: 25 additions & 1 deletion tests/contrib/backends/redis_backend/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ def __init__(self, fingerprint, crawl_at, url, domain=None):
self.headers = {}
self.cookies = None
self.status_code = 200
self.body=''


class PostRequest(Request):
def __init__(self, fingerprint, crawl_at, url, body, domain=None):
super(PostRequest, self).__init__(fingerprint, crawl_at, url, domain)
self.method = 'POST'
self.body=body


def get_pool():
Expand Down Expand Up @@ -314,7 +322,23 @@ def test_get_next_requests_few_items_few_hosts(self):
self.assertTrue('https://www.knuthellan.com/' in urls)
self.assertEqual(0, subject.count())


def test_scheduling_past_1part_post(self):
subject = self.setup_subject(1)
data={'id':'xxx',
'name':'yyy'}
batch = [
("1", 1, PostRequest("1", int(time()) - 10, 'https://www.knuthellan.com/', body=data, domain='knuthellan.com'), True),
("2", 0.1, PostRequest("2", int(time()) - 10, 'https://www.khellan.com/', body=data, domain='khellan.com'), True),
("3", 0.5, PostRequest("3", int(time()) - 10, 'https://www.hellan.me/', body=data, domain='hellan.me'), True),
]
subject.schedule(batch)
self.assertEqual(3, subject.count())
requests = subject.get_next_requests(5, 0, min_hosts=1, min_requests=1, max_requests_per_host=5)
self.assertEqual(3, len(requests))
for request in requests:
self.assertTrue(request.method == b'POST')
self.assertTrue(request.body == data)
self.assertEqual(0, subject.count())

class RedisStateTest(TestCase):
def test_update_cache(self):
Expand Down
21 changes: 18 additions & 3 deletions tests/test_message_bus_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from frontera.settings import Settings
from frontera.core.models import Request, Response

data = {'id': 'xxx',
'name': 'yyy'}

r1 = Request('http://www.example.com/', meta={b'domain': {b'fingerprint': b'1'}})
r1 = Request('http://www.example.com/',method='post',body=data, meta={b'domain': {b'fingerprint': b'1'}})
r2 = Request('http://www.scrapy.org/', meta={b'domain': {b'fingerprint': b'2'}})
r3 = Request('http://www.test.com/some/page', meta={b'domain': {b'fingerprint': b'3'}})

Expand All @@ -18,6 +20,8 @@ def mbb_setup(self, settings=None):
settings = settings or Settings()
settings.MESSAGE_BUS = 'tests.mocks.message_bus.FakeMessageBus'
settings.STORE_CONTENT = True
#test json codecs
# settings.MESSAGE_BUS_CODEC='frontera.contrib.backends.remote.codecs.json'
manager.settings = settings
manager.request_model = Request
manager.response_model = Response
Expand All @@ -43,27 +47,34 @@ def test_add_seeds(self):
mbb.add_seeds([r1, r2, r3])
seeds = [mbb._decoder.decode(m)[1][0] for m in mbb.spider_log_producer.messages]
self.assertEqual(set([seed.url for seed in seeds]), set([r1.url, r2.url, r3.url]))
seed=seeds[0]
self.assertEqual(b'POST',seed.method)
self.assertEqual(data,seed.body)

def test_page_crawled(self):
mbb = self.mbb_setup()
resp = Response(r1.url, body='body', request=r1)
resp = Response(r1.url, body=b'body', request=r1)
mbb.page_crawled(resp)
page = mbb._decoder.decode(mbb.spider_log_producer.messages[0])[1]
self.assertEqual((page.request.url, page.body), (resp.request.url, 'body'))
self.assertEqual((page.request.url, page.body), (resp.request.url, b'body'))

def test_links_extracted(self):
mbb = self.mbb_setup()
mbb.links_extracted(r1, [r2, r3])
requests = [mbb._decoder.decode(m)[1] for m in mbb.spider_log_producer.messages]
links = [mbb._decoder.decode(m)[2][0] for m in mbb.spider_log_producer.messages]
self.assertEqual(set([r.url for r in requests]), set([r1.url]))
self.assertEqual(b'POST', requests[0].method)
self.assertEqual(data, requests[0].body)
self.assertEqual(set([link.url for link in links]), set([r2.url, r3.url]))

def test_request_error(self):
mbb = self.mbb_setup()
mbb.request_error(r1, 'error')
_, error_request, error_message = mbb._decoder.decode(mbb.spider_log_producer.messages[0])
self.assertEqual((error_request.url, error_message), (r1.url, 'error'))
self.assertEqual(b'POST', error_request.method)
self.assertEqual(data, error_request.body)

def test_get_next_requests(self):
mbb = self.mbb_setup()
Expand All @@ -80,3 +91,7 @@ def test_get_next_requests(self):
mbb.consumer.put_messages(encoded_requests)
requests = set(mbb.get_next_requests(10, overused_keys=['www.example.com'], key_type='domain'))
self.assertEqual(set([r.url for r in requests]), set([r2.url, r3.url]))
#test get post request
requests = mbb.get_next_requests(10, overused_keys=[], key_type='domain')
self.assertEqual(b'POST', requests[0].method)
self.assertEqual(data, requests[0].body)