Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark partition as busy when a new batch is sent to it #281

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
29 changes: 20 additions & 9 deletions frontera/contrib/messagebus/zeromq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import six

from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseStreamConsumer, \
BaseSpiderFeedStream, BaseScoringLogStream
BaseSpiderFeedStream, BaseScoringLogStream, BaseStreamProducer
from frontera.contrib.backends.partitioners import FingerprintPartitioner, Crc32NamePartitioner
from frontera.contrib.messagebus.zeromq.socket_config import SocketConfig
from six.moves import range
Expand Down Expand Up @@ -61,7 +61,7 @@ def get_offset(self, partition_id):
return self.counter


class Producer(object):
class Producer(BaseStreamProducer):
def __init__(self, context, location, identity):
self.identity = identity
self.sender = context.zeromq.socket(zmq.PUB)
Expand Down Expand Up @@ -176,22 +176,32 @@ def __init__(self, messagebus):
self.consumer_hwm = messagebus.spider_feed_rcvhwm
self.producer_hwm = messagebus.spider_feed_sndhwm
self.hostname_partitioning = messagebus.hostname_partitioning
self.max_next_requests = messagebus.max_next_requests
self._producer = None

def consumer(self, partition_id):
return Consumer(self.context, self.out_location, partition_id, b'sf', seq_warnings=True, hwm=self.consumer_hwm)

def producer(self):
return SpiderFeedProducer(self.context, self.in_location, self.partitions,
self.producer_hwm, self.hostname_partitioning)
if not self._producer:
self._producer = SpiderFeedProducer(
self.context, self.in_location, self.partitions,
self.producer_hwm, self.hostname_partitioning)
return self._producer

def available_partitions(self):
return self.ready_partitions
if not self._producer:
return []

def mark_ready(self, partition_id):
self.ready_partitions.add(partition_id)
partitions = []
for partition_id, last_offset in self.partitions_offset.items():
lag = self._producer.get_offset(partition_id) - last_offset
if lag < self.max_next_requests:
partitions.append(partition_id)
return partitions

def mark_busy(self, partition_id):
self.ready_partitions.discard(partition_id)
def set_spider_offset(self, partition_id, offset):
self.partitions_offset[partition_id] = offset
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this variable isn't defined

Copy link
Contributor Author

@isra17 isra17 Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha good catch, fixed



class Context(object):
Expand All @@ -210,6 +220,7 @@ def __init__(self, settings):
self.spider_feed_sndhwm = int(settings.get('MAX_NEXT_REQUESTS') * len(self.spider_feed_partitions) * 1.2)
self.spider_feed_rcvhwm = int(settings.get('MAX_NEXT_REQUESTS') * 2.0)
self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING')
self.max_next_requests = int(settings.get('MAX_NEXT_REQUESTS'))
if self.socket_config.is_ipv6:
self.context.zeromq.setsockopt(zmq.IPV6, True)

Expand Down
15 changes: 5 additions & 10 deletions frontera/core/messagebus.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,13 @@ def available_partitions(self):
"""
raise NotImplementedError

def mark_ready(self, partition_id):
def set_spider_offset(self, partition_id, offset):
"""
Marks partition as ready/available for receiving new batches.
:param partition_id: int
:return: nothing
"""
pass

def mark_busy(self, partition_id):
"""
Marks partition as busy, so that spider assigned to this partition is busy processing previous batches.
Set the message processed offset for a given partition. Used to
calculate the lag between the message sent and message processed
to prevent overflowing the queue of an unresponsive partition.
:param partition_id: int
:param offset: int
:return: nothing
"""
pass
Expand Down
13 changes: 1 addition & 12 deletions frontera/worker/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,7 @@ def consume_incoming(self, *args, **kwargs):
continue
if type == 'offset':
_, partition_id, offset = msg
producer_offset = self.spider_feed_producer.get_offset(partition_id)
if producer_offset is None:
continue
else:
lag = producer_offset - offset
if lag < 0:
# non-sense in general, happens when SW is restarted and not synced yet with Spiders.
continue
if lag < self.max_next_requests or offset == 0:
self.spider_feed.mark_ready(partition_id)
else:
self.spider_feed.mark_busy(partition_id)
self.spider_feed.set_spider_offset(partition_id, offset)
continue
logger.debug('Unknown message type %s', type)
except Exception as exc:
Expand Down
29 changes: 17 additions & 12 deletions tests/mocks/message_bus.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseStreamConsumer, \
BaseScoringLogStream, BaseSpiderFeedStream
BaseScoringLogStream, BaseSpiderFeedStream, BaseStreamProducer


class Consumer(BaseStreamConsumer):
Expand Down Expand Up @@ -27,7 +27,7 @@ def get_offset(self, partition_id):
return self.offset


class Producer(object):
class Producer(BaseStreamProducer):

def __init__(self):
self.messages = []
Expand Down Expand Up @@ -70,23 +70,28 @@ def consumer(self, partition_id, type):
class SpiderFeedStream(BaseSpiderFeedStream):

def __init__(self, messagebus):
self.ready_partitions = set(messagebus.spider_feed_partitions)
self._producer = Producer()
self.max_next_requests = messagebus.max_next_requests
self.partitions_offset = {}
for partition_id in messagebus.spider_feed_partitions:
self.partitions_offset[partition_id] = 0

def producer(self):
return Producer()
return self._producer

def consumer(self, partition_id):
return Consumer()

def available_partitions(self):
return self.ready_partitions

def mark_ready(self, partition_id):
self.ready_partitions.add(partition_id)

def mark_busy(self, partition_id):
self.ready_partitions.discard(partition_id)

partitions = []
for partition_id, last_offset in self.partitions_offset.items():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, if partition doesn't exist (yet) in this dict - it will not be returned as available, which is wrong.
What if producer offsets will be less (because of DB worker restart) than consumer offsets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 86 does create the keys for each partition. In the worst case, a new partition will first send an offset message and create a key in the partitions_offset. As for the negative offset I don't think anything will break? From my understanding, when a DBWorker restart, a spider will have a big invalid offset, but it should be marked as ready and on its next message this will fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok!

lag = self._producer.get_offset(partition_id) - last_offset
if lag < self.max_next_requests or last_offset == 0:
partitions.append(partition_id)
return partitions

def set_spider_offset(self, partition_id, offset):
self.partitions_offset[partition_id] = offset

class FakeMessageBus(BaseMessageBus):

Expand Down
25 changes: 25 additions & 0 deletions tests/test_worker_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,28 @@ def test_offset(self):
dbw._backend.queue.put_requests([r1, r2, r3])
assert dbw.new_batch() == 3
assert 3 in dbw._backend.partitions

def test_partition_available(self):
dbw = self.dbw_setup(True)
msg1 = dbw._encoder.encode_offset(0, 128)
msg2 = dbw._encoder.encode_offset(1, 0)
dbw.spider_log_consumer.put_messages([msg1, msg2])
dbw.spider_feed_producer.offset = 128
dbw.consume_incoming()

assert 0 in dbw.spider_feed.available_partitions()
assert 1 in dbw.spider_feed.available_partitions()

msg3 = dbw._encoder.encode_offset(1, 1)
dbw.spider_log_consumer.put_messages([msg3])
dbw.consume_incoming()
assert 1 not in dbw.spider_feed.available_partitions()

msg3 = dbw._encoder.encode_offset(1, 1)
dbw.spider_log_consumer.put_messages([msg3])
dbw.consume_incoming()
assert 1 not in dbw.spider_feed.available_partitions()

dbw.spider_feed_producer.offset = 256
assert 0 not in dbw.spider_feed.available_partitions()
assert 1 not in dbw.spider_feed.available_partitions()