Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gcm/monitoring/cli/gcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
scontrol_config,
slurm_job_monitor,
slurm_monitor,
sprio,
storage,
)
from gcm.monitoring.click import DaemonGroup, detach_option, toml_config_option
Expand All @@ -44,6 +45,7 @@ def main(detach: bool) -> None:
main.add_command(sacct_backfill.main, name="sacct_backfill")
main.add_command(scontrol.main, name="scontrol")
main.add_command(scontrol_config.main, name="scontrol_config")
main.add_command(sprio.main, name="sprio")
main.add_command(storage.main, name="storage")

if __name__ == "__main__":
Expand Down
187 changes: 187 additions & 0 deletions gcm/monitoring/cli/sprio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
import logging
from dataclasses import dataclass, field
from typing import (
Collection,
Generator,
Literal,
Mapping,
Optional,
Protocol,
runtime_checkable,
)

import click
import clusterscope
from gcm.exporters import registry

from gcm.monitoring.click import (
chunk_size_option,
click_default_cmd,
cluster_option,
dry_run_option,
heterogeneous_cluster_v1_option,
interval_option,
log_folder_option,
log_level_option,
once_option,
retries_option,
sink_option,
sink_opts_option,
stdout_option,
)
from gcm.monitoring.clock import Clock, ClockImpl, unixtime_to_pacific_datetime
from gcm.monitoring.dataclass_utils import instantiate_dataclass
from gcm.monitoring.sink.protocol import DataType, SinkAdditionalParams, SinkImpl
from gcm.monitoring.sink.utils import Factory, HasRegistry
from gcm.monitoring.slurm.client import SlurmCliClient, SlurmClient
from gcm.monitoring.slurm.derived_cluster import get_derived_cluster
from gcm.monitoring.utils.monitor import run_data_collection_loop
from gcm.schemas.slurm.sprio import SprioPayload, SprioRow
from typeguard import typechecked

LOGGER_NAME = "sprio"
logger = logging.getLogger(LOGGER_NAME) # default logger to be overridden in main()


def sprio_iterator(
slurm_client: SlurmClient,
cluster: str,
collection_date: str,
collection_unixtime: int,
heterogeneous_cluster_v1: bool,
) -> Generator[SprioPayload, None, None]:
get_stdout = iter(slurm_client.sprio())
field_names = next(get_stdout).strip().split("|")
for sprio_line in get_stdout:
values = sprio_line.strip().split("|")
raw_data: dict[str, str] = dict(zip(field_names, values))
sprio_row = instantiate_dataclass(SprioRow, raw_data, logger=logger)
derived_cluster = get_derived_cluster(
data=raw_data,
heterogeneous_cluster_v1=heterogeneous_cluster_v1,
cluster=cluster,
)
yield SprioPayload(
ds=collection_date,
collection_unixtime=collection_unixtime,
cluster=cluster,
derived_cluster=derived_cluster,
sprio=sprio_row,
)


def collect_sprio(
clock: Clock,
cluster: str,
slurm_client: SlurmClient,
heterogeneous_cluster_v1: bool,
) -> Generator[SprioPayload, None, None]:

log_time = clock.unixtime()
collection_date = unixtime_to_pacific_datetime(log_time).strftime("%Y-%m-%d")

records = sprio_iterator(
slurm_client,
cluster,
collection_date,
collection_unixtime=log_time,
heterogeneous_cluster_v1=heterogeneous_cluster_v1,
)
return records


@runtime_checkable
class CliObject(HasRegistry[SinkImpl], Protocol):
@property
def clock(self) -> Clock: ...

def cluster(self) -> str: ...
@property
def slurm_client(self) -> SlurmClient: ...


@dataclass
class CliObjectImpl:
registry: Mapping[str, Factory[SinkImpl]] = field(default_factory=lambda: registry)
clock: Clock = field(default_factory=ClockImpl)
slurm_client: SlurmClient = field(default_factory=SlurmCliClient)

def cluster(self) -> str:
return clusterscope.cluster()


# construct at module-scope because printing sink documentation relies on the object
_default_obj: CliObject = CliObjectImpl()


@click_default_cmd(context_settings={"obj": _default_obj})
@cluster_option
@sink_option
@sink_opts_option
@log_level_option
@log_folder_option
@stdout_option
@heterogeneous_cluster_v1_option
@interval_option(default=120)
@once_option
@retries_option
@dry_run_option
@chunk_size_option
@click.pass_obj
@typechecked
def main(
obj: CliObject,
cluster: Optional[str],
sink: str,
sink_opts: Collection[str],
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
log_folder: str,
stdout: bool,
heterogeneous_cluster_v1: bool,
interval: int,
once: bool,
retries: int,
dry_run: bool,
chunk_size: int,
) -> None:
"""
Collects slurm job priority information (sprio) and sends to sink.
"""

def collect_sprio_callable(
cluster: str, interval: int, logger: logging.Logger
) -> Generator[SprioPayload, None, None]:
return collect_sprio(
clock=obj.clock,
cluster=cluster,
slurm_client=obj.slurm_client,
heterogeneous_cluster_v1=heterogeneous_cluster_v1,
)

run_data_collection_loop(
logger_name=LOGGER_NAME,
log_folder=log_folder,
stdout=stdout,
log_level=log_level,
cluster=obj.cluster() if cluster is None else cluster,
clock=obj.clock,
once=once,
interval=interval,
data_collection_tasks=[
(
collect_sprio_callable,
SinkAdditionalParams(
data_type=DataType.LOG,
heterogeneous_cluster_v1=heterogeneous_cluster_v1,
),
),
],
sink=sink,
sink_opts=sink_opts,
retries=retries,
chunk_size=chunk_size,
dry_run=dry_run,
registry=obj.registry,
)
16 changes: 16 additions & 0 deletions gcm/monitoring/slurm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from gcm.schemas.slurm.sinfo import Sinfo
from gcm.schemas.slurm.sinfo_node import SinfoNode
from gcm.schemas.slurm.sinfo_row import SinfoRow
from gcm.schemas.slurm.sprio import SPRIO_FORMAT_SPEC, SPRIO_HEADER
from gcm.schemas.slurm.squeue import JOB_DATA_SLURM_FIELDS, JobData

if TYPE_CHECKING:
Expand Down Expand Up @@ -122,6 +123,14 @@ def scontrol_config(self) -> Iterable[str]:
def count_runaway_jobs(self) -> int:
"""Return the count of runaway jobs"""

def sprio(self) -> Iterable[str]:
"""Get lines of sprio output showing job priority factors.
Each line should be pipe separated.
The first line defines the fieldnames. The rest are the rows.
Lines should not have a trailing newline.
If an error occurs during execution, RuntimeError should be raised.
"""


class SlurmCliClient(SlurmClient):
def __init__(
Expand Down Expand Up @@ -318,3 +327,10 @@ def count_runaway_jobs(self) -> int:
for st in lines:
return int(st)
raise Exception(f"Could not count sacctmgr show runaway lines: {lines}")

def sprio(self) -> Iterable[str]:
# Sort by partition (r) and priority descending (-y) for consistent ordering
yield SPRIO_HEADER
yield from _gen_lines(
self.__popen(["sprio", "-h", "--sort=r,-y", "-o", SPRIO_FORMAT_SPEC])
)
64 changes: 64 additions & 0 deletions gcm/schemas/slurm/sprio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
from dataclasses import dataclass, field, fields

from typing import Callable, cast, TypeVar

from gcm.monitoring.coerce import maybe_float
from gcm.schemas.slurm.derived_cluster import DerivedCluster


T = TypeVar("T")


def sprio_parsed_field(format_code: str, parser: Callable[[str], T] = str) -> T:
"""Field with sprio format code and parser metadata.

Combines format_code (for generating sprio command) with parser
(for instantiate_dataclass compatibility like parsed_field).
"""
return cast(T, field(default=None, metadata={"format_code": format_code, "parser": parser}))


@dataclass
class SprioRow:
"""sprio output schema. Maps field names to SLURM format codes.

Note: sprio only supports -o (format codes), not -O (field names).
SLURM outputs incorrect headers for some codes:
%A → "AGE" (should be ASSOC)
%P → "PARTITION" (should be PARTITION_PRIO)
We use custom headers to avoid this confusion.

Uses sprio_parsed_field which is compatible with instantiate_dataclass().
"""

JOBID: float | None = sprio_parsed_field("%i", parser=maybe_float)
PARTITION: str | None = sprio_parsed_field("%r")
USER: str | None = sprio_parsed_field("%u")
ACCOUNT: str | None = sprio_parsed_field("%o")
PRIORITY: float | None = sprio_parsed_field("%Y", parser=maybe_float)
SITE: float | None = sprio_parsed_field("%S", parser=maybe_float)
AGE: float | None = sprio_parsed_field("%a", parser=maybe_float)
ASSOC: float | None = sprio_parsed_field("%A", parser=maybe_float)
FAIRSHARE: float | None = sprio_parsed_field("%F", parser=maybe_float)
JOBSIZE: float | None = sprio_parsed_field("%J", parser=maybe_float)
PARTITION_PRIO: float | None = sprio_parsed_field("%P", parser=maybe_float)
QOSNAME: str | None = sprio_parsed_field("%n")
QOS: str | None = sprio_parsed_field("%Q")
NICE: float | None = sprio_parsed_field("%N", parser=maybe_float)


# Auto-generate header and format spec from dataclass fields.
# We define our own headers instead of using sprio's default output,
# which avoids breakage when Slurm upgrades change header text.
SPRIO_HEADER = "|".join(f.name for f in fields(SprioRow))
SPRIO_FORMAT_SPEC = "|".join(f.metadata["format_code"] for f in fields(SprioRow))


@dataclass(kw_only=True)
class SprioPayload(DerivedCluster):
ds: str
collection_unixtime: int
cluster: str
sprio: SprioRow
5 changes: 5 additions & 0 deletions gcm/tests/data/sample-sprio-expected.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[
{"JOBID": 85957.0, "PARTITION": "learn", "USER": "generate", "ACCOUNT": "general_", "PRIORITY": 1000629.0, "SITE": 0.0, "AGE": 0.0, "ASSOC": 0.0, "FAIRSHARE": 530.0, "JOBSIZE": 0.0, "PARTITION_PRIO": 1000000.0, "QOSNAME": "lowest", "QOS": "100", "NICE": 0.0},
{"JOBID": 85968.0, "PARTITION": "learn", "USER": "generate", "ACCOUNT": "general_", "PRIORITY": 1000629.0, "SITE": 0.0, "AGE": 0.0, "ASSOC": 0.0, "FAIRSHARE": 530.0, "JOBSIZE": 0.0, "PARTITION_PRIO": 1000000.0, "QOSNAME": "lowest", "QOS": "100", "NICE": 0.0},
{"JOBID": 85981.0, "PARTITION": "learn", "USER": "generate", "ACCOUNT": "general_", "PRIORITY": 1000629.0, "SITE": 0.0, "AGE": 0.0, "ASSOC": 0.0, "FAIRSHARE": 530.0, "JOBSIZE": 0.0, "PARTITION_PRIO": 1000000.0, "QOSNAME": "lowest", "QOS": "100", "NICE": 0.0}
]
4 changes: 4 additions & 0 deletions gcm/tests/data/sample-sprio.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
JOBID|PARTITION|USER|ACCOUNT|PRIORITY|SITE|AGE|ASSOC|FAIRSHARE|JOBSIZE|PARTITION_PRIO|QOSNAME|QOS|NICE
85957|learn|generate|general_|1000629|0|0|0|530|0|1000000|lowest|100|0
85968|learn|generate|general_|1000629|0|0|0|530|0|1000000|lowest|100|0
85981|learn|generate|general_|1000629|0|0|0|530|0|1000000|lowest|100|0
69 changes: 69 additions & 0 deletions gcm/tests/test_sprio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
import json
from importlib import resources
from importlib.resources import as_file, files
from typing import Generator
from unittest.mock import Mock

import pytest
from gcm.monitoring.cli.sprio import collect_sprio
from gcm.monitoring.clock import unixtime_to_pacific_datetime
from gcm.monitoring.slurm.client import SlurmCliClient
from gcm.schemas.log import Log
from gcm.schemas.slurm.sprio import SprioPayload, SprioRow
from gcm.tests import data
from gcm.tests.fakes import FakeClock


TEST_CLUSTER = "test_cluster"
TEST_DS = "test_ds"


class FakeSlurmClient(SlurmCliClient):

def sprio(self) -> Generator[str, None, None]:
with resources.open_text(data, "sample-sprio.txt") as f:
for line in f:
yield line.rstrip("\n")


@pytest.fixture(scope="module")
def dataset_contents() -> list[dict[str, str | float]]:
dataset = "sample-sprio-expected.json"
with as_file(files(data).joinpath(dataset)) as path:
return json.load(path.open())


def test_collect_sprio(dataset_contents: list[dict[str, str | float]]) -> None:
sink_impl = Mock()

data_result = collect_sprio(
clock=FakeClock(),
cluster=TEST_CLUSTER,
slurm_client=FakeSlurmClient(),
heterogeneous_cluster_v1=False,
)
log = Log(
ts=FakeClock().unixtime(),
message=data_result,
)
sink_impl.write(data=log)

def sprio_iterator() -> Generator[SprioPayload, None, None]:
for sprio_data in dataset_contents:
sprio_row = SprioRow(**sprio_data)
yield SprioPayload(
ds=unixtime_to_pacific_datetime(FakeClock().unixtime()).strftime(
"%Y-%m-%d"
),
collection_unixtime=FakeClock().unixtime(),
cluster=TEST_CLUSTER,
derived_cluster=TEST_CLUSTER,
sprio=sprio_row,
)

expected = Log(ts=FakeClock().unixtime(), message=sprio_iterator())
actual = sink_impl.write.call_args.kwargs
assert actual["data"].ts == expected.ts
assert list(actual["data"].message) == list(expected.message)
1 change: 1 addition & 0 deletions website/docs/GCM_Monitoring/collectors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ This directory contains documentation for all GCM monitoring collectors. Collect
- **[scontrol_config](scontrol_config.md)** - Collects cluster-wide configuration
- **[slurm_job_monitor](slurm_job_monitor.md)** - Real-time node and job monitoring
- **[slurm_monitor](slurm_monitor.md)** - Comprehensive cluster-wide metrics aggregation
- **[sprio](sprio.md)** - Collects job priority factors for pending jobs

## Common Concepts

Expand Down
Loading