diff --git a/experiment/measurer/measure_manager.py b/experiment/measurer/measure_manager.py index c9d0e8c25..c9b08041f 100644 --- a/experiment/measurer/measure_manager.py +++ b/experiment/measurer/measure_manager.py @@ -937,10 +937,7 @@ def start_workers(self, request_queue, response_queue, pool): 'experiment': self.experiment, } google_cloud_worker = measure_worker.GoogleCloudMeasureWorker(config) - # Only creating one worker for debugging purposes - pool.apply_async(google_cloud_worker.measure_worker_loop) - return # Since each worker is going to be in an infinite loop, we dont need # result return. Workers' life scope will end automatically when # there are no more snapshots left to measure. diff --git a/experiment/measurer/measure_worker.py b/experiment/measurer/measure_worker.py index 8f9cbaf32..d44037ada 100644 --- a/experiment/measurer/measure_worker.py +++ b/experiment/measurer/measure_worker.py @@ -80,16 +80,19 @@ def measure_worker_loop(self): # 'SnapshotMeasureRequest', ['fuzzer', 'benchmark', 'trial_id', # 'cycle'] request = self.get_task_from_request_queue() - logger.info( - 'Measurer worker: Got request %s %s %d %d from request queue', - request.fuzzer, request.benchmark, request.trial_id, - request.cycle) - measured_snapshot = measure_manager.measure_snapshot_coverage( - request.fuzzer, request.benchmark, request.trial_id, - request.cycle, self.region_coverage) - result, retry = self.process_measured_snapshot_result( - measured_snapshot, request) - self.put_result_in_response_queue(result, retry) + if request: + logger.info( + 'Measurer worker: Got request %s %s %d %d from request queue', # pylint: disable=line-too-long + request.fuzzer, + request.benchmark, + request.trial_id, + request.cycle) + measured_snapshot = measure_manager.measure_snapshot_coverage( + request.fuzzer, request.benchmark, request.trial_id, + request.cycle, self.region_coverage) + result, retry = self.process_measured_snapshot_result( + measured_snapshot, request) + self.put_result_in_response_queue(result, retry) time.sleep(MEASUREMENT_TIMEOUT) @@ -162,32 +165,34 @@ def _create_request_queue_subscription(self): return None def get_task_from_request_queue( - self) -> measurer_datatypes.SnapshotMeasureRequest: - while True: - try: - response = self.subscriber_client.pull(request={ - 'subscription': self.subscription_path, - 'max_messages': 1 - }) + self) -> Optional[measurer_datatypes.SnapshotMeasureRequest]: + try: + response = self.subscriber_client.pull(request={ + 'subscription': self.subscription_path, + 'max_messages': 1 + }) + except google.api_core.exceptions.GoogleAPICallError as error: + logger.error('Error when calling pubsub API: %s', error) + return None + + if not response.received_messages: + return None - if response.received_messages: - message = response.received_messages[0] - ack_ids = [message.ack_id] - - # Acknowledge the received message to remove it from the - # queue. - self.subscriber_client.acknowledge(request={ - 'subscription': self.subscription_path, - 'ack_ids': ack_ids - }) - - # Needs to deserialize data from bytes to - # SnapshotMeasureRequest - serialized_data = json.loads(message.message.data) - return measurer_datatypes.from_dict_to_snapshot_measure_request( # pylint: disable=line-too-long - serialized_data) - except google.api_core.exceptions.GoogleAPICallError as error: - logger.error('Error when calling pubsub API: %s', error) + message = response.received_messages[0] + ack_ids = [message.ack_id] + + # Acknowledge the received message to remove it from the + # queue. + self.subscriber_client.acknowledge(request={ + 'subscription': self.subscription_path, + 'ack_ids': ack_ids + }) + + # Needs to deserialize data from bytes to + # SnapshotMeasureRequest + serialized_data = json.loads(message.message.data) + return measurer_datatypes.from_dict_to_snapshot_measure_request( # pylint: disable=line-too-long + serialized_data) def process_measured_snapshot_result(self, measured_snapshot, request): if measured_snapshot: diff --git a/experiment/measurer/test_measure_manager.py b/experiment/measurer/test_measure_manager.py index 45164e769..995acda35 100644 --- a/experiment/measurer/test_measure_manager.py +++ b/experiment/measurer/test_measure_manager.py @@ -15,6 +15,7 @@ import os import shutil from unittest import mock +import multiprocessing import queue import pytest @@ -662,9 +663,7 @@ def test_gcloud_measure_manager_start_workers(mock_gcloud_measure_worker, gcloud_measure_manager): """Tests that the start workers method is calling the measure worker loop method, a number of times equal to the number of measurers CPUs.""" - # Changing this to 1 temporarily to debug gcloud worker - cpus_available = 1 - # cpus_available = multiprocessing.cpu_count() + cpus_available = multiprocessing.cpu_count() gcloud_measure_manager.measurers_cpus = cpus_available with mock.patch('multiprocessing.pool.Pool.apply_async') as pool: gcloud_measure_manager.start_workers('request-queue-topic',