Skip to content

Commit

Permalink
Merge pull request MolSSI#759 from MolSSI/waiting_reason
Browse files Browse the repository at this point in the history
Implement getting reason a record is waiting
  • Loading branch information
bennybp authored Sep 29, 2023
2 parents a262ad4 + a4a9a31 commit 23c4103
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 0 deletions.
6 changes: 6 additions & 0 deletions qcfractal/qcfractal/components/record_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ def bulk_delete_records_v1(body_data: RecordDeleteBody):
)


@api_v1.route("/records/<int:record_id>/waiting_reason", methods=["GET"])
@wrap_route("READ")
def get_record_waiting_reason_v1(record_id: int):
return storage_socket.records.get_waiting_reason(record_id)


#################################################################
# Routes for individual record types
# These can also be accessed through /records
Expand Down
58 changes: 58 additions & 0 deletions qcfractal/qcfractal/components/record_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
)

from qcfractal.components.auth.db_models import UserIDMapSubquery, GroupIDMapSubquery
from qcfractal.components.managers.db_models import ComputeManagerORM
from qcfractal.components.services.db_models import ServiceQueueORM, ServiceDependencyORM
from qcfractal.components.tasks.db_models import TaskQueueORM
from qcfractal.db_socket.helpers import (
Expand All @@ -27,6 +28,7 @@
)
from qcportal.compression import CompressionEnum, compress, decompress
from qcportal.exceptions import UserReportableError, MissingDataError
from qcportal.managers.models import ManagerStatusEnum
from qcportal.metadata_models import DeleteMetadata, UpdateMetadata
from qcportal.record_models import PriorityEnum, RecordStatusEnum, OutputTypeEnum
from qcportal.utils import chunk_iterable
Expand Down Expand Up @@ -1832,3 +1834,59 @@ def revert_generic(self, record_id: Sequence[int], revert_status: RecordStatusEn
return self.undelete(record_id)

raise RuntimeError(f"Unknown status to revert: ", revert_status)

def get_waiting_reason(self, record_id: int, *, session: Optional[Session] = None) -> Dict[str, Any]:
"""
Determines why a record/task is waiting
This function takes the record ID, not the task ID
"""

# For getting the record and task info
rec_stmt = select(
BaseRecordORM.status, BaseRecordORM.is_service, TaskQueueORM.tag, TaskQueueORM.required_programs
)
rec_stmt = rec_stmt.join(TaskQueueORM, TaskQueueORM.record_id == BaseRecordORM.id, isouter=True)
rec_stmt = rec_stmt.where(BaseRecordORM.id == record_id)

# All active managers
manager_stmt = select(ComputeManagerORM.name, ComputeManagerORM.tags, ComputeManagerORM.programs)
manager_stmt = manager_stmt.where(ComputeManagerORM.status == ManagerStatusEnum.active)

with self.root_socket.optional_session(session, True) as session:
rec = session.execute(rec_stmt).one_or_none()

if rec is None:
return {"reason": "Record does not exist"}

rec_status, rec_isservice, rec_tag, rec_programs = rec

if rec_isservice is True:
return {"reason": "Record is a service"}

if rec_status != RecordStatusEnum.waiting:
return {"reason": "Record is not waiting"}

if rec_tag is None or rec_programs is None:
return {"reason": "Missing task? This is a developer error"}

# Record is waiting. Test each active manager
managers = session.execute(manager_stmt).all()

if len(managers) == 0:
return {"reason": "No active managers"}

rec_programs = set(rec_programs)

ret = {"details": {}, "reason": "No manager matches programs & tags"}
for m_name, m_tags, m_programs in managers:
missing_programs = rec_programs - m_programs.keys()
if missing_programs:
ret["details"][m_name] = f"Manager missing programs: {missing_programs}"
elif rec_tag not in m_tags and "*" not in m_tags:
ret["details"][m_name] = f'Manager does not handle tag "{rec_tag}"'
else:
ret["details"][m_name] = f"Manager is busy"
ret["reason"] = f"Waiting for a free manager"

return ret
193 changes: 193 additions & 0 deletions qcfractal/qcfractal/components/test_record_client_waiting_reason.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
"""
Tests the tasks socket (claiming & returning data)
"""
from __future__ import annotations

import re
from typing import TYPE_CHECKING

from qcarchivetesting.testing_classes import QCATestingSnowflake
from qcfractal.components.optimization.testing_helpers import load_test_data as load_opt_test_data
from qcfractal.components.singlepoint.testing_helpers import (
load_test_data as load_sp_test_data,
)
from qcfractal.components.torsiondrive.testing_helpers import load_test_data as load_td_test_data
from qcportal.managers import ManagerName
from qcportal.record_models import PriorityEnum

if TYPE_CHECKING:
pass


def test_record_client_waiting_reason(snowflake: QCATestingSnowflake):
storage_socket = snowflake.get_storage_socket()
snowflake_client = snowflake.client()

input_spec_1, molecule_1, result_data_1 = load_sp_test_data("sp_psi4_water_energy")
input_spec_2, molecule_2, result_data_2 = load_opt_test_data("opt_psi4_benzene")
input_spec_3, molecule_3, result_data_3 = load_sp_test_data("sp_rdkit_benzene_energy")

meta, id_1 = storage_socket.records.singlepoint.add(
[molecule_1], input_spec_1, "tag1", PriorityEnum.low, None, None, True
)
meta, id_2 = storage_socket.records.optimization.add(
[molecule_2], input_spec_2, "tag2", PriorityEnum.normal, None, None, True
)
meta, id_3 = storage_socket.records.singlepoint.add(
[molecule_3], input_spec_3, "tag3", PriorityEnum.high, None, None, True
)
id_1 = id_1[0]
id_2 = id_2[0]
id_3 = id_3[0]
all_id = [id_1, id_2, id_3]

for i in all_id:
reason = snowflake_client.get_waiting_reason(i)
assert reason["reason"] == "No active managers"

# Activate some managers
mname1 = ManagerName(cluster="test_cluster", hostname="a_host1", uuid="1234-5678-1234-5678")
storage_socket.managers.activate(
name_data=mname1,
manager_version="v2.0",
username="bill",
programs={"qcengine": ["unknown"], "other_prog": ["unknown"]},
tags=["tag1"],
)

mname2 = ManagerName(cluster="test_cluster", hostname="a_host1", uuid="1234-5678-1234-7890")
storage_socket.managers.activate(
name_data=mname2,
manager_version="v2.0",
username="bill",
programs={"qcengine": ["unknown"], "geometric": ["unknown"]},
tags=["tag2"],
)

mname3 = ManagerName(cluster="test_cluster", hostname="a_host1", uuid="1234-5678-1234-8888")
storage_socket.managers.activate(
name_data=mname3,
manager_version="v2.0",
username="bill",
programs={"qcengine": ["unknown"], "psi4": ["unknown"]},
tags=["tag999"],
)

mname4 = ManagerName(cluster="test_cluster", hostname="a_host1", uuid="1234-5678-1234-0123")
storage_socket.managers.activate(
name_data=mname4,
manager_version="v2.0",
username="bill",
programs={"qcengine": ["unknown"], "psi4": ["unknown"], "geometric": ["unknown"]},
tags=["tag999"],
)

reason = snowflake_client.get_waiting_reason(id_1)
assert reason["reason"] == "No manager matches programs & tags"
assert re.search(r"missing programs.*psi4", reason["details"][mname1.fullname])
assert re.search(r"missing programs.*psi4", reason["details"][mname2.fullname])
assert re.search(r"does not handle tag.*tag1", reason["details"][mname3.fullname])
assert re.search(r"does not handle tag.*tag1", reason["details"][mname4.fullname])

reason = snowflake_client.get_waiting_reason(id_2)
assert reason["reason"] == "No manager matches programs & tags"
assert re.search(r"missing programs.*psi4", reason["details"][mname1.fullname])
assert re.search(r"missing programs.*geometric", reason["details"][mname1.fullname])
assert re.search(r"missing programs.*psi4", reason["details"][mname2.fullname])
assert re.search(r"missing programs.*geometric", reason["details"][mname3.fullname])
assert re.search(r"does not handle tag.*tag2", reason["details"][mname4.fullname])

# Add a working manager
mname5 = ManagerName(cluster="test_cluster", hostname="a_host1", uuid="1234-5678-1234-2222")
storage_socket.managers.activate(
name_data=mname5,
manager_version="v2.0",
username="bill",
programs={"qcengine": ["unknown"], "psi4": ["unknown"], "geometric": ["unknown"]},
tags=["tag1", "tag2"],
)

reason = snowflake_client.get_waiting_reason(id_1)
assert reason["reason"] == "Waiting for a free manager"
assert reason["details"][mname5.fullname] == "Manager is busy"

reason = snowflake_client.get_waiting_reason(id_2)
assert reason["reason"] == "Waiting for a free manager"
assert reason["details"][mname5.fullname] == "Manager is busy"

# third test record requires rdkit, which we haven't given yet
reason = snowflake_client.get_waiting_reason(id_3)
assert reason["reason"] == "No manager matches programs & tags"
assert re.search(r"missing programs.*rdkit", reason["details"][mname1.fullname])
assert re.search(r"missing programs.*rdkit", reason["details"][mname2.fullname])
assert re.search(r"missing programs.*rdkit", reason["details"][mname3.fullname])
assert re.search(r"missing programs.*rdkit", reason["details"][mname4.fullname])
assert re.search(r"missing programs.*rdkit", reason["details"][mname5.fullname])

# Also try through the record
record = snowflake_client.get_records(id_3)
assert record.get_waiting_reason() == reason

# Add a working manager with * tag
mname6 = ManagerName(cluster="test_cluster", hostname="a_host1", uuid="1234-5678-1234-0011")
storage_socket.managers.activate(
name_data=mname6,
manager_version="v2.0",
username="bill",
programs={"qcengine": ["unknown"], "psi4": ["unknown"], "geometric": ["unknown"], "rdkit": ["unknown"]},
tags=["tag1", "*"],
)

reason = snowflake_client.get_waiting_reason(id_3)
assert reason["reason"] == "Waiting for a free manager"
assert re.search(r"Manager is busy", reason["details"][mname6.fullname])


def test_record_client_waiting_reason_2(snowflake: QCATestingSnowflake):
storage_socket = snowflake.get_storage_socket()
snowflake_client = snowflake.client()

input_spec_1, molecule_1, result_data_1 = load_sp_test_data("sp_psi4_water_energy")
input_spec_2, molecule_2, _ = load_td_test_data("td_H2O2_mopac_pm6")

meta, id_1 = storage_socket.records.singlepoint.add(
[molecule_1], input_spec_1, "tag1", PriorityEnum.low, None, None, True
)

meta, id_2 = snowflake_client.add_torsiondrives(
[molecule_2],
"torsiondrive",
keywords=input_spec_2.keywords,
optimization_specification=input_spec_2.optimization_specification,
)

id_1 = id_1[0]
id_2 = id_2[0]

# Add a working manager with * tag
mname6 = ManagerName(cluster="test_cluster", hostname="a_host1", uuid="1234-5678-1234-0011")
storage_socket.managers.activate(
name_data=mname6,
manager_version="v2.0",
username="bill",
programs={"qcengine": ["unknown"], "psi4": ["unknown"], "geometric": ["unknown"], "rdkit": ["unknown"]},
tags=["tag1", "*"],
)

# Should be able to be picked up
reason = snowflake_client.get_waiting_reason(id_1)
assert reason["reason"] == "Waiting for a free manager"
assert re.search(r"Manager is busy", reason["details"][mname6.fullname])

# Reason: not actually waiting
storage_socket.records.cancel([id_1])
reason = snowflake_client.get_waiting_reason(id_1)
assert reason["reason"] == "Record is not waiting"

# Reason: does not exist
reason = snowflake_client.get_waiting_reason(id_1 + id_2)
assert reason["reason"] == "Record does not exist"

# Reason: is a service
reason = snowflake_client.get_waiting_reason(id_2)
assert reason["reason"] == "Record is a service"
20 changes: 20 additions & 0 deletions qcportal/qcportal/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,26 @@ def add_comment(self, record_ids: Union[int, Sequence[int]], comment: str) -> Up
body_data = RecordModifyBody(record_ids=record_ids, comment=comment)
return self.make_request("patch", "api/v1/records", UpdateMetadata, body=body_data)

def get_waiting_reason(self, record_id: int) -> Dict[str, Any]:
"""
Get the reason a record is in the waiting status
The return is a dictionary, with a 'reason' key containing the overall reason the record is
waiting. If appropriate, there is a 'details' key that contains information for each
active compute manager on why that manager is not able to pick up the record's task.
Parameters
----------
record_id
The record ID to test
Returns
-------
:
A dictionary containing information about why the record is not being picked up by compute managers
"""
return self.make_request("get", f"api/v1/records/{record_id}/waiting_reason", Dict[str, Any])

##############################################################
# Singlepoint calculations
##############################################################
Expand Down
3 changes: 3 additions & 0 deletions qcportal/qcportal/record_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,9 @@ def service(self) -> Optional[RecordService]:
self._fetch_service()
return self.service_

def get_waiting_reason(self) -> Dict[str, Any]:
return self._client.make_request("get", f"api/v1/records/{self.id}/waiting_reason", Dict[str, Any])

@property
def comments(self) -> Optional[List[RecordComment]]:
if self.comments_ is None:
Expand Down

0 comments on commit 23c4103

Please sign in to comment.