Skip to content

Commit

Permalink
add bfabric_slurm_queue_status.py
Browse files Browse the repository at this point in the history
  • Loading branch information
leoschwarz committed Jul 22, 2024
1 parent 818532b commit be1d78c
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 0 deletions.
73 changes: 73 additions & 0 deletions bfabric/scripts/bfabric_slurm_queue_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from __future__ import annotations

import argparse
import io
import json
import shlex
import subprocess
import sys

import polars as pl
from loguru import logger

from bfabric import Bfabric
from bfabric.entities import Workunit


def get_slurm_jobs(partition: str, ssh_host: str | None) -> pl.DataFrame:
"""Returns the active slurm jobs from the specified partition, if it is intended to be run remotely the
ssh host can be specified.
"""
target_command = ["squeue", "-p", partition, "--format=%i\t%j\t%N"]
if ssh_host is None:
command = target_command
else:
command = ["ssh", ssh_host, "bash -l -c " + shlex.quote(" ".join(shlex.quote(arg) for arg in target_command))]

logger.info(f"Running command: {' '.join(command)}")
output = subprocess.run(command, stdout=subprocess.PIPE, text=True, check=True)
stringio = io.StringIO(output.stdout)
df = pl.read_csv(stringio, separator="\t")
df = df.rename({"JOBID": "job_id", "NAME": "name", "NODELIST": "node_list"})
string_id_expr = pl.col("name").str.extract(r"WU(\d+)")
return df.with_columns(workunit_id=pl.when(string_id_expr.is_not_null()).then(string_id_expr.cast(int)))


def get_workunit_status(client: Bfabric, workunit_ids: list[int]) -> dict[int, str]:
"""Returns the status of the workunits with the specified ids, by consoluting the bfabric API.
If a workunit was deleted, but it is in the slurm queue, it will be considered a zombie.
"""
workunits = Workunit.find_all(ids=workunit_ids, client=client)
return {id: workunits[id].data_dict["status"] if id in workunits else "ZOMBIE" for id in workunit_ids}


def find_zombie_jobs(client: Bfabric, partition: str, ssh_host: str | None) -> pl.DataFrame:
"""Checks the status of the slurm jobs in the specified partition, and returns the ones that are zombies."""
slurm_jobs = get_slurm_jobs(partition=partition, ssh_host=ssh_host)
if slurm_jobs.is_empty():
return pl.DataFrame()
workunit_status = get_workunit_status(
client=client, workunit_ids=slurm_jobs["workunit_id"].drop_nulls().cast(int).to_list()
)
workunit_status_table = pl.from_dict(dict(workunit_id=workunit_status.keys(), status=workunit_status.values()))
logger.info(slurm_jobs.join(workunit_status_table, on="workunit_id", how="left").sort("workunit_id"))
logger.info(f"Active jobs: {workunit_status_table.height}")
logger.info(f"Found {workunit_status_table.filter(pl.col('status') == 'ZOMBIE').height} zombie jobs.")
return workunit_status_table.filter(pl.col("status") == "ZOMBIE")


def main() -> None:
"""Checks the status of the slurm jobs in the specified partition, and reports if there are any zombies."""
parser = argparse.ArgumentParser()
parser.add_argument("--partition", type=str, default="prx")
parser.add_argument("--ssh", type=str, default=None, help="SSH into the given node to obtain list.")
args = parser.parse_args()
client = Bfabric.from_config(verbose=True)
zombie_jobs = find_zombie_jobs(client, partition=args.partition, ssh_host=args.ssh)
print(json.dumps(zombie_jobs["workunit_id"].to_list()))
if not zombie_jobs.is_empty():
sys.exit(1)


if __name__ == "__main__":
main()
48 changes: 48 additions & 0 deletions bfabric/tests/unit/scripts/test_bfabric_slurm_queue_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import polars as pl
import polars.testing
import pytest

from bfabric.entities import Workunit
from bfabric.scripts.bfabric_slurm_queue_status import get_slurm_jobs, get_workunit_status


@pytest.fixture
def command_output():
return (
"JOBID\tNAME\tNODELIST\n"
"1234\tWU5000\tserver-01\n"
"5678\tWU5001\tserver-02\n"
"9999\tsomething else\tserver-01\n"
)


def test_get_slurm_jobs_when_local(mocker, command_output):
mocker.patch("subprocess.run", return_value=mocker.Mock(stdout=command_output))
df = get_slurm_jobs("mypartition", None)

expected_df = pl.DataFrame(
[
[1234, "WU5000", "server-01", 5000],
[5678, "WU5001", "server-02", 5001],
[9999, "something else", "server-01", None],
],
schema=["job_id", "name", "node_list", "workunit_id"],
orient="row",
)
pl.testing.assert_frame_equal(df, expected_df)


def test_get_workunit_status(mocker):
mock_client = mocker.Mock(name="mock_client")
mock_find_all = mocker.patch.object(Workunit, "find_all")
workunit_ids = [5000, 5001]
mock_find_all.return_value = {
5001: Workunit({"id": 5000, "status": "RUNNING"}),
}
status = get_workunit_status(mock_client, workunit_ids)
assert status == {5000: "ZOMBIE", 5001: "RUNNING"}
mock_find_all.assert_called_once_with(ids=workunit_ids, client=mock_client)


if __name__ == "__main__":
pytest.main()
1 change: 1 addition & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Versioning currently follows `X.Y.Z` where

- Add `Workunit`, `Parameter`, and `Resource` entities.
- Add concept of has_many and has_one relationships to entities.
- `bfabric_slurm_queue_status.py` to quickly check slurm queue status.

## \[1.13.3\] - 2024-07-18

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Repository = "https://github.com/fgcz/bfabricPy"
#bfabric_save_resource="bfabric.scripts.bfabric_save_resource:main"
"bfabric_save_workunit_attribute.py"="bfabric.scripts.bfabric_save_workunit_attribute:main"
"bfabric_save_workflowstep.py"="bfabric.scripts.bfabric_save_workflowstep:main"
"bfabric_slurm_queue_status.py"="bfabric.scripts.bfabric_slurm_queue_status:main"

[tool.black]
line-length = 120
Expand Down

0 comments on commit be1d78c

Please sign in to comment.