Skip to content

Commit

Permalink
Reverting previous debug commit, and changing get_task method
Browse files Browse the repository at this point in the history
  • Loading branch information
gustavogaldinoo committed Aug 19, 2024
1 parent 4b2ec86 commit 0c7987d
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 41 deletions.
3 changes: 0 additions & 3 deletions experiment/measurer/measure_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -937,10 +937,7 @@ def start_workers(self, request_queue, response_queue, pool):
'experiment': self.experiment,
}
google_cloud_worker = measure_worker.GoogleCloudMeasureWorker(config)
# Only creating one worker for debugging purposes
pool.apply_async(google_cloud_worker.measure_worker_loop)

return
# Since each worker is going to be in an infinite loop, we dont need
# result return. Workers' life scope will end automatically when
# there are no more snapshots left to measure.
Expand Down
75 changes: 40 additions & 35 deletions experiment/measurer/measure_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,19 @@ def measure_worker_loop(self):
# 'SnapshotMeasureRequest', ['fuzzer', 'benchmark', 'trial_id',
# 'cycle']
request = self.get_task_from_request_queue()
logger.info(
'Measurer worker: Got request %s %s %d %d from request queue',
request.fuzzer, request.benchmark, request.trial_id,
request.cycle)
measured_snapshot = measure_manager.measure_snapshot_coverage(
request.fuzzer, request.benchmark, request.trial_id,
request.cycle, self.region_coverage)
result, retry = self.process_measured_snapshot_result(
measured_snapshot, request)
self.put_result_in_response_queue(result, retry)
if request:
logger.info(
'Measurer worker: Got request %s %s %d %d from request queue', # pylint: disable=line-too-long
request.fuzzer,
request.benchmark,
request.trial_id,
request.cycle)
measured_snapshot = measure_manager.measure_snapshot_coverage(
request.fuzzer, request.benchmark, request.trial_id,
request.cycle, self.region_coverage)
result, retry = self.process_measured_snapshot_result(
measured_snapshot, request)
self.put_result_in_response_queue(result, retry)
time.sleep(MEASUREMENT_TIMEOUT)


Expand Down Expand Up @@ -162,32 +165,34 @@ def _create_request_queue_subscription(self):
return None

def get_task_from_request_queue(
self) -> measurer_datatypes.SnapshotMeasureRequest:
while True:
try:
response = self.subscriber_client.pull(request={
'subscription': self.subscription_path,
'max_messages': 1
})
self) -> Optional[measurer_datatypes.SnapshotMeasureRequest]:
try:
response = self.subscriber_client.pull(request={
'subscription': self.subscription_path,
'max_messages': 1
})
except google.api_core.exceptions.GoogleAPICallError as error:
logger.error('Error when calling pubsub API: %s', error)
return None

if not response.received_messages:
return None

if response.received_messages:
message = response.received_messages[0]
ack_ids = [message.ack_id]

# Acknowledge the received message to remove it from the
# queue.
self.subscriber_client.acknowledge(request={
'subscription': self.subscription_path,
'ack_ids': ack_ids
})

# Needs to deserialize data from bytes to
# SnapshotMeasureRequest
serialized_data = json.loads(message.message.data)
return measurer_datatypes.from_dict_to_snapshot_measure_request( # pylint: disable=line-too-long
serialized_data)
except google.api_core.exceptions.GoogleAPICallError as error:
logger.error('Error when calling pubsub API: %s', error)
message = response.received_messages[0]
ack_ids = [message.ack_id]

# Acknowledge the received message to remove it from the
# queue.
self.subscriber_client.acknowledge(request={
'subscription': self.subscription_path,
'ack_ids': ack_ids
})

# Needs to deserialize data from bytes to
# SnapshotMeasureRequest
serialized_data = json.loads(message.message.data)
return measurer_datatypes.from_dict_to_snapshot_measure_request( # pylint: disable=line-too-long
serialized_data)

def process_measured_snapshot_result(self, measured_snapshot, request):
if measured_snapshot:
Expand Down
5 changes: 2 additions & 3 deletions experiment/measurer/test_measure_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import os
import shutil
from unittest import mock
import multiprocessing
import queue
import pytest

Expand Down Expand Up @@ -662,9 +663,7 @@ def test_gcloud_measure_manager_start_workers(mock_gcloud_measure_worker,
gcloud_measure_manager):
"""Tests that the start workers method is calling the measure worker loop
method, a number of times equal to the number of measurers CPUs."""
# Changing this to 1 temporarily to debug gcloud worker
cpus_available = 1
# cpus_available = multiprocessing.cpu_count()
cpus_available = multiprocessing.cpu_count()
gcloud_measure_manager.measurers_cpus = cpus_available
with mock.patch('multiprocessing.pool.Pool.apply_async') as pool:
gcloud_measure_manager.start_workers('request-queue-topic',
Expand Down

0 comments on commit 0c7987d

Please sign in to comment.