diff --git a/frontera/contrib/backends/remote/codecs/json.py b/frontera/contrib/backends/remote/codecs/json.py index 135c44d83..14d1c8236 100644 --- a/frontera/contrib/backends/remote/codecs/json.py +++ b/frontera/contrib/backends/remote/codecs/json.py @@ -60,7 +60,8 @@ def _prepare_request_message(request): 'method': request.method, 'headers': request.headers, 'cookies': request.cookies, - 'meta': request.meta} + 'meta': request.meta, + 'body':request.body} def _prepare_links_message(links): @@ -71,7 +72,11 @@ def _prepare_response_message(response, send_body): return {'url': response.url, 'status_code': response.status_code, 'meta': response.meta, - 'body': b64encode(response.body) if send_body else None} + 'response_headers':response.headers, + 'body': b64encode(response.body) if send_body else None, + 'request_method':response.request.method, + 'request_headers':response.request.headers, + 'request_cookies':response.request.cookies} class CrawlFrontierJSONEncoder(json.JSONEncoder): @@ -153,9 +158,13 @@ def __init__(self, request_model, response_model, *a, **kw): def _response_from_object(self, obj): url = obj['url'] request = self._request_model(url=url, - meta=obj['meta']) + meta=obj['meta'], + method=obj['request_method'], + headers=obj['request_headers'], + cookies=obj['request_cookies']) return self._response_model(url=url, status_code=obj['status_code'], + headers=obj['response_headers'], body=b64decode(obj['body']) if obj['body'] is not None else None, request=request) @@ -164,7 +173,8 @@ def _request_from_object(self, obj): method=obj['method'], headers=obj['headers'], cookies=obj['cookies'], - meta=obj['meta']) + meta=obj['meta'], + body=obj['body']) def decode(self, message): message = _convert_from_saved_type(super(Decoder, self).decode(message)) @@ -194,8 +204,4 @@ def decode(self, message): def decode_request(self, message): obj = _convert_from_saved_type(super(Decoder, self).decode(message)) - return self._request_model(url=obj['url'], - method=obj['method'], - headers=obj['headers'], - cookies=obj['cookies'], - meta=obj['meta']) + return self._request_from_object(obj) diff --git a/frontera/contrib/backends/remote/codecs/msgpack.py b/frontera/contrib/backends/remote/codecs/msgpack.py index 5a155ec7b..421297262 100644 --- a/frontera/contrib/backends/remote/codecs/msgpack.py +++ b/frontera/contrib/backends/remote/codecs/msgpack.py @@ -32,12 +32,12 @@ def serialize(obj): else: logger.warning('unable to serialize object: {}'.format(obj)) return None - return [request.url, request.method, request.headers, request.cookies, serialize(request.meta)] + return [request.url, request.method, request.headers, request.cookies, serialize(request.meta), request.body] def _prepare_response_message(response, send_body): - return [response.url, response.status_code, response.meta, response.headers, response.body if send_body else None] - + return [response.url, response.status_code, response.meta, response.headers, response.body if send_body else None + , response.request.method, response.request.headers, response.request.cookies] class Encoder(BaseEncoder): def __init__(self, request_model, *a, **kw): @@ -81,14 +81,18 @@ def _response_from_object(self, obj): body=obj[4], headers=obj[3], request=self._request_model(url=url, - meta=obj[2])) + meta=obj[2], + method=obj[5], + headers=obj[6], + cookies=obj[7])) def _request_from_object(self, obj): return self._request_model(url=to_native_str(obj[0]), method=obj[1], headers=obj[2], cookies=obj[3], - meta=obj[4]) + meta=obj[4], + body=obj[5]) def decode(self, buffer): obj = unpackb(buffer, encoding='utf-8') diff --git a/frontera/contrib/backends/sqlalchemy/components.py b/frontera/contrib/backends/sqlalchemy/components.py index 8661ac576..b6338b0f3 100644 --- a/frontera/contrib/backends/sqlalchemy/components.py +++ b/frontera/contrib/backends/sqlalchemy/components.py @@ -95,6 +95,7 @@ def _create_page(self, obj): db_page.headers = obj.headers db_page.method = to_native_str(obj.method) db_page.cookies = obj.cookies + db_page.body=obj.body elif isinstance(obj, Response): db_page.headers = obj.request.headers db_page.method = to_native_str(obj.request.method) @@ -177,7 +178,7 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs): for item in self._order_by(self.session.query(self.queue_model).filter_by(partition_id=partition_id)).\ limit(max_n_requests): method = item.method or b'GET' - r = Request(item.url, method=method, meta=item.meta, headers=item.headers, cookies=item.cookies) + r = Request(item.url, method=method, meta=item.meta, headers=item.headers, cookies=item.cookies, body=item.body) r.meta[b'fingerprint'] = to_bytes(item.fingerprint) r.meta[b'score'] = item.score results.append(r) @@ -203,7 +204,7 @@ def schedule(self, batch): host_crc32 = get_crc32(hostname) q = self.queue_model(fingerprint=to_native_str(fprint), score=score, url=request.url, meta=request.meta, headers=request.headers, cookies=request.cookies, method=to_native_str(request.method), - partition_id=partition_id, host_crc32=host_crc32, created_at=time()*1E+6) + partition_id=partition_id, host_crc32=host_crc32, created_at=time()*1E+6, body=request.body) to_save.append(q) request.meta[b'state'] = States.QUEUED self.session.bulk_save_objects(to_save) diff --git a/frontera/contrib/backends/sqlalchemy/models.py b/frontera/contrib/backends/sqlalchemy/models.py index 8211d21c6..54b99c7e3 100644 --- a/frontera/contrib/backends/sqlalchemy/models.py +++ b/frontera/contrib/backends/sqlalchemy/models.py @@ -26,6 +26,7 @@ class MetadataModel(DeclarativeBase): error = Column(String(128)) meta = Column(PickleType()) headers = Column(PickleType()) + body = Column(PickleType()) cookies = Column(PickleType()) method = Column(String(6)) @@ -76,6 +77,7 @@ class QueueModelMixin(object): meta = Column(PickleType()) headers = Column(PickleType()) cookies = Column(PickleType()) + body = Column(PickleType()) method = Column(String(6)) created_at = Column(BigInteger, index=True) depth = Column(SmallInteger) diff --git a/frontera/contrib/backends/sqlalchemy/revisiting.py b/frontera/contrib/backends/sqlalchemy/revisiting.py index b2b574715..6d77a2774 100644 --- a/frontera/contrib/backends/sqlalchemy/revisiting.py +++ b/frontera/contrib/backends/sqlalchemy/revisiting.py @@ -67,7 +67,7 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs): limit(max_n_requests): method = 'GET' if not item.method else item.method results.append(Request(item.url, method=method, meta=item.meta, headers=item.headers, - cookies=item.cookies)) + cookies=item.cookies, body=item.body)) self.session.delete(item) self.session.commit() except Exception as exc: @@ -92,7 +92,7 @@ def schedule(self, batch): q = self.queue_model(fingerprint=fprint, score=score, url=request.url, meta=request.meta, headers=request.headers, cookies=request.cookies, method=request.method, partition_id=partition_id, host_crc32=host_crc32, created_at=time()*1E+6, - crawl_at=schedule_at) + crawl_at=schedule_at, body=request.body) to_save.append(q) request.meta[b'state'] = States.QUEUED self.session.bulk_save_objects(to_save) diff --git a/tests/contrib/backends/hbase/test_hbase.py b/tests/contrib/backends/hbase/test_hbase.py index c79881a30..75ec85ff1 100644 --- a/tests/contrib/backends/hbase/test_hbase.py +++ b/tests/contrib/backends/hbase/test_hbase.py @@ -10,7 +10,10 @@ from tests import mock import pytest -r1 = Request('https://www.example.com', meta={b'fingerprint': b'10', +data = {'id': 'xxx', + 'name': 'yyy'} + +r1 = Request('https://www.example.com', method='post', body=data, meta={b'fingerprint': b'10', b'domain': {b'name': b'www.example.com', b'fingerprint': b'81'}}) r2 = Request('http://example.com/some/page/', meta={b'fingerprint': b'11', b'domain': {b'name': b'example.com', b'fingerprint': b'82'}}) @@ -52,6 +55,16 @@ def test_queue(self): assert set([r.url for r in queue.get_next_requests(10, 1, min_requests=3, min_hosts=1, max_requests_per_host=10)]) == set([r1.url, r2.url]) + + def test_queue_with_post_request(self): + connection = Connection(host='hbase-docker', port=9090) + queue = HBaseQueue(connection, 1, b'queue', drop=True, use_snappy=False) + batch = [('10', 0.5, r1, True)] + queue.schedule(batch) + requests=queue.get_next_requests(10, 0, min_requests=3, min_hosts=1,max_requests_per_host=10) + self.assertEqual(b'POST', requests[0].method) + self.assertEqual(data, requests[0].body) + @pytest.mark.xfail def test_queue_with_delay(self): connection = Connection(host='hbase-docker', port=9090) diff --git a/tests/contrib/backends/memory/test_backend_memory.py b/tests/contrib/backends/memory/test_backend_memory.py index 4b1c6cf79..8b92611c3 100644 --- a/tests/contrib/backends/memory/test_backend_memory.py +++ b/tests/contrib/backends/memory/test_backend_memory.py @@ -1,7 +1,10 @@ from __future__ import absolute_import +from unittest import TestCase, main + +from frontera.contrib.backends.memory import MemoryQueue from tests.test_overused_buffer import DFSOverusedBackendTest from tests import backends - +from frontera.core.models import Request class TestFIFO(backends.FIFOBackendTest): backend_class = 'frontera.contrib.backends.memory.FIFO' @@ -29,3 +32,19 @@ class TestBFS(backends.BFSBackendTest): class TestRANDOM(backends.RANDOMBackendTest): backend_class = 'frontera.contrib.backends.memory.RANDOM' + + +class TestMemoryQueue(TestCase): + def test_scheduling_past_1part_post(self): + subject = MemoryQueue(1) + data={'id':'xxx', + 'name':'yyy'} + batch = [ + ("1", 1, Request(url='https://www.knuthellan.com/', body=data, method='POST'), True), + ] + subject.schedule(batch) + requests = subject.get_next_requests(5, 0) + for request in requests: + self.assertTrue(request.method == b'POST') + self.assertTrue(request.body == data) + diff --git a/tests/contrib/backends/redis_backend/test_redis.py b/tests/contrib/backends/redis_backend/test_redis.py index adbc6ceb4..278d9774e 100644 --- a/tests/contrib/backends/redis_backend/test_redis.py +++ b/tests/contrib/backends/redis_backend/test_redis.py @@ -27,6 +27,14 @@ def __init__(self, fingerprint, crawl_at, url, domain=None): self.headers = {} self.cookies = None self.status_code = 200 + self.body='' + + +class PostRequest(Request): + def __init__(self, fingerprint, crawl_at, url, body, domain=None): + super(PostRequest, self).__init__(fingerprint, crawl_at, url, domain) + self.method = 'POST' + self.body=body def get_pool(): @@ -314,7 +322,23 @@ def test_get_next_requests_few_items_few_hosts(self): self.assertTrue('https://www.knuthellan.com/' in urls) self.assertEqual(0, subject.count()) - + def test_scheduling_past_1part_post(self): + subject = self.setup_subject(1) + data={'id':'xxx', + 'name':'yyy'} + batch = [ + ("1", 1, PostRequest("1", int(time()) - 10, 'https://www.knuthellan.com/', body=data, domain='knuthellan.com'), True), + ("2", 0.1, PostRequest("2", int(time()) - 10, 'https://www.khellan.com/', body=data, domain='khellan.com'), True), + ("3", 0.5, PostRequest("3", int(time()) - 10, 'https://www.hellan.me/', body=data, domain='hellan.me'), True), + ] + subject.schedule(batch) + self.assertEqual(3, subject.count()) + requests = subject.get_next_requests(5, 0, min_hosts=1, min_requests=1, max_requests_per_host=5) + self.assertEqual(3, len(requests)) + for request in requests: + self.assertTrue(request.method == b'POST') + self.assertTrue(request.body == data) + self.assertEqual(0, subject.count()) class RedisStateTest(TestCase): def test_update_cache(self): diff --git a/tests/test_message_bus_backend.py b/tests/test_message_bus_backend.py index 68278d133..0b4647f69 100644 --- a/tests/test_message_bus_backend.py +++ b/tests/test_message_bus_backend.py @@ -5,8 +5,10 @@ from frontera.settings import Settings from frontera.core.models import Request, Response +data = {'id': 'xxx', + 'name': 'yyy'} -r1 = Request('http://www.example.com/', meta={b'domain': {b'fingerprint': b'1'}}) +r1 = Request('http://www.example.com/',method='post',body=data, meta={b'domain': {b'fingerprint': b'1'}}) r2 = Request('http://www.scrapy.org/', meta={b'domain': {b'fingerprint': b'2'}}) r3 = Request('http://www.test.com/some/page', meta={b'domain': {b'fingerprint': b'3'}}) @@ -18,6 +20,8 @@ def mbb_setup(self, settings=None): settings = settings or Settings() settings.MESSAGE_BUS = 'tests.mocks.message_bus.FakeMessageBus' settings.STORE_CONTENT = True + #test json codecs + # settings.MESSAGE_BUS_CODEC='frontera.contrib.backends.remote.codecs.json' manager.settings = settings manager.request_model = Request manager.response_model = Response @@ -43,13 +47,16 @@ def test_add_seeds(self): mbb.add_seeds([r1, r2, r3]) seeds = [mbb._decoder.decode(m)[1][0] for m in mbb.spider_log_producer.messages] self.assertEqual(set([seed.url for seed in seeds]), set([r1.url, r2.url, r3.url])) + seed=seeds[0] + self.assertEqual(b'POST',seed.method) + self.assertEqual(data,seed.body) def test_page_crawled(self): mbb = self.mbb_setup() - resp = Response(r1.url, body='body', request=r1) + resp = Response(r1.url, body=b'body', request=r1) mbb.page_crawled(resp) page = mbb._decoder.decode(mbb.spider_log_producer.messages[0])[1] - self.assertEqual((page.request.url, page.body), (resp.request.url, 'body')) + self.assertEqual((page.request.url, page.body), (resp.request.url, b'body')) def test_links_extracted(self): mbb = self.mbb_setup() @@ -57,6 +64,8 @@ def test_links_extracted(self): requests = [mbb._decoder.decode(m)[1] for m in mbb.spider_log_producer.messages] links = [mbb._decoder.decode(m)[2][0] for m in mbb.spider_log_producer.messages] self.assertEqual(set([r.url for r in requests]), set([r1.url])) + self.assertEqual(b'POST', requests[0].method) + self.assertEqual(data, requests[0].body) self.assertEqual(set([link.url for link in links]), set([r2.url, r3.url])) def test_request_error(self): @@ -64,6 +73,8 @@ def test_request_error(self): mbb.request_error(r1, 'error') _, error_request, error_message = mbb._decoder.decode(mbb.spider_log_producer.messages[0]) self.assertEqual((error_request.url, error_message), (r1.url, 'error')) + self.assertEqual(b'POST', error_request.method) + self.assertEqual(data, error_request.body) def test_get_next_requests(self): mbb = self.mbb_setup() @@ -80,3 +91,7 @@ def test_get_next_requests(self): mbb.consumer.put_messages(encoded_requests) requests = set(mbb.get_next_requests(10, overused_keys=['www.example.com'], key_type='domain')) self.assertEqual(set([r.url for r in requests]), set([r2.url, r3.url])) + #test get post request + requests = mbb.get_next_requests(10, overused_keys=[], key_type='domain') + self.assertEqual(b'POST', requests[0].method) + self.assertEqual(data, requests[0].body)