From 33a78d1489b1b7a8a2200bbfba371a07f2f0c829 Mon Sep 17 00:00:00 2001 From: "Yash Gupta (FAIR)" Date: Wed, 4 Feb 2026 12:51:42 -0800 Subject: [PATCH] Add sprio collector for job priority logging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: Adds a new `sprio` collector to log Slurm job priority factors to GCM, enabling FAIR Passport to display priority breakdowns for users across clusters. ## What's New | Component | Description | |-----------|-------------| | CLI command | `gcm sprio --sink stdout --once` | | Schema | `SprioRow` dataclass with format code metadata | | Output | 14 priority fields (JOBID, PRIORITY, FAIRSHARE, etc.) | | Collection | Every 2 minutes via `collection_unixtime` snapshots | | Destination | Scuba table `fair_sprio` via OTEL sink | ## Key Design Decisions - **Schema-driven format**: Header and format spec are auto-generated from `SprioRow` field definitions, guaranteeing they always match - **Custom headers**: SLURM's sprio outputs incorrect headers for some codes (`%A` → "AGE" instead of "ASSOC"), so we define our own - **Consistent ordering**: Jobs sorted by partition and priority descending (`--sort=r,-y`) - **Type-safe fields**: Numeric fields (JOBID, PRIORITY, SITE, AGE, FAIRSHARE, JOBSIZE, PARTITION_PRIO, NICE) are parsed as float; identifiers remain strings - **No TRES field**: Removed TRES since it's rarely used and complicates parsing ## Related - Pattern: follows `sacctmgr_user` (D77916380) - Helm config: D92180317 Reviewed By: luccabb Differential Revision: D92178369 --- gcm/monitoring/cli/gcm.py | 2 + gcm/monitoring/cli/sprio.py | 187 ++++++++++++++++++ gcm/monitoring/slurm/client.py | 16 ++ gcm/schemas/slurm/sprio.py | 64 ++++++ gcm/tests/data/sample-sprio-expected.json | 5 + gcm/tests/data/sample-sprio.txt | 4 + gcm/tests/test_sprio.py | 69 +++++++ .../docs/GCM_Monitoring/collectors/README.md | 1 + .../docs/GCM_Monitoring/collectors/sprio.md | 93 +++++++++ 9 files changed, 441 insertions(+) create mode 100644 gcm/monitoring/cli/sprio.py create mode 100644 gcm/schemas/slurm/sprio.py create mode 100644 gcm/tests/data/sample-sprio-expected.json create mode 100644 gcm/tests/data/sample-sprio.txt create mode 100644 gcm/tests/test_sprio.py create mode 100644 website/docs/GCM_Monitoring/collectors/sprio.md diff --git a/gcm/monitoring/cli/gcm.py b/gcm/monitoring/cli/gcm.py index 726100f..a4e475d 100644 --- a/gcm/monitoring/cli/gcm.py +++ b/gcm/monitoring/cli/gcm.py @@ -20,6 +20,7 @@ scontrol_config, slurm_job_monitor, slurm_monitor, + sprio, storage, ) from gcm.monitoring.click import DaemonGroup, detach_option, toml_config_option @@ -44,6 +45,7 @@ def main(detach: bool) -> None: main.add_command(sacct_backfill.main, name="sacct_backfill") main.add_command(scontrol.main, name="scontrol") main.add_command(scontrol_config.main, name="scontrol_config") +main.add_command(sprio.main, name="sprio") main.add_command(storage.main, name="storage") if __name__ == "__main__": diff --git a/gcm/monitoring/cli/sprio.py b/gcm/monitoring/cli/sprio.py new file mode 100644 index 0000000..5e5500d --- /dev/null +++ b/gcm/monitoring/cli/sprio.py @@ -0,0 +1,187 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +import logging +from dataclasses import dataclass, field +from typing import ( + Collection, + Generator, + Literal, + Mapping, + Optional, + Protocol, + runtime_checkable, +) + +import click +import clusterscope +from gcm.exporters import registry + +from gcm.monitoring.click import ( + chunk_size_option, + click_default_cmd, + cluster_option, + dry_run_option, + heterogeneous_cluster_v1_option, + interval_option, + log_folder_option, + log_level_option, + once_option, + retries_option, + sink_option, + sink_opts_option, + stdout_option, +) +from gcm.monitoring.clock import Clock, ClockImpl, unixtime_to_pacific_datetime +from gcm.monitoring.dataclass_utils import instantiate_dataclass +from gcm.monitoring.sink.protocol import DataType, SinkAdditionalParams, SinkImpl +from gcm.monitoring.sink.utils import Factory, HasRegistry +from gcm.monitoring.slurm.client import SlurmCliClient, SlurmClient +from gcm.monitoring.slurm.derived_cluster import get_derived_cluster +from gcm.monitoring.utils.monitor import run_data_collection_loop +from gcm.schemas.slurm.sprio import SprioPayload, SprioRow +from typeguard import typechecked + +LOGGER_NAME = "sprio" +logger = logging.getLogger(LOGGER_NAME) # default logger to be overridden in main() + + +def sprio_iterator( + slurm_client: SlurmClient, + cluster: str, + collection_date: str, + collection_unixtime: int, + heterogeneous_cluster_v1: bool, +) -> Generator[SprioPayload, None, None]: + get_stdout = iter(slurm_client.sprio()) + field_names = next(get_stdout).strip().split("|") + for sprio_line in get_stdout: + values = sprio_line.strip().split("|") + raw_data: dict[str, str] = dict(zip(field_names, values)) + sprio_row = instantiate_dataclass(SprioRow, raw_data, logger=logger) + derived_cluster = get_derived_cluster( + data=raw_data, + heterogeneous_cluster_v1=heterogeneous_cluster_v1, + cluster=cluster, + ) + yield SprioPayload( + ds=collection_date, + collection_unixtime=collection_unixtime, + cluster=cluster, + derived_cluster=derived_cluster, + sprio=sprio_row, + ) + + +def collect_sprio( + clock: Clock, + cluster: str, + slurm_client: SlurmClient, + heterogeneous_cluster_v1: bool, +) -> Generator[SprioPayload, None, None]: + + log_time = clock.unixtime() + collection_date = unixtime_to_pacific_datetime(log_time).strftime("%Y-%m-%d") + + records = sprio_iterator( + slurm_client, + cluster, + collection_date, + collection_unixtime=log_time, + heterogeneous_cluster_v1=heterogeneous_cluster_v1, + ) + return records + + +@runtime_checkable +class CliObject(HasRegistry[SinkImpl], Protocol): + @property + def clock(self) -> Clock: ... + + def cluster(self) -> str: ... + @property + def slurm_client(self) -> SlurmClient: ... + + +@dataclass +class CliObjectImpl: + registry: Mapping[str, Factory[SinkImpl]] = field(default_factory=lambda: registry) + clock: Clock = field(default_factory=ClockImpl) + slurm_client: SlurmClient = field(default_factory=SlurmCliClient) + + def cluster(self) -> str: + return clusterscope.cluster() + + +# construct at module-scope because printing sink documentation relies on the object +_default_obj: CliObject = CliObjectImpl() + + +@click_default_cmd(context_settings={"obj": _default_obj}) +@cluster_option +@sink_option +@sink_opts_option +@log_level_option +@log_folder_option +@stdout_option +@heterogeneous_cluster_v1_option +@interval_option(default=120) +@once_option +@retries_option +@dry_run_option +@chunk_size_option +@click.pass_obj +@typechecked +def main( + obj: CliObject, + cluster: Optional[str], + sink: str, + sink_opts: Collection[str], + log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + log_folder: str, + stdout: bool, + heterogeneous_cluster_v1: bool, + interval: int, + once: bool, + retries: int, + dry_run: bool, + chunk_size: int, +) -> None: + """ + Collects slurm job priority information (sprio) and sends to sink. + """ + + def collect_sprio_callable( + cluster: str, interval: int, logger: logging.Logger + ) -> Generator[SprioPayload, None, None]: + return collect_sprio( + clock=obj.clock, + cluster=cluster, + slurm_client=obj.slurm_client, + heterogeneous_cluster_v1=heterogeneous_cluster_v1, + ) + + run_data_collection_loop( + logger_name=LOGGER_NAME, + log_folder=log_folder, + stdout=stdout, + log_level=log_level, + cluster=obj.cluster() if cluster is None else cluster, + clock=obj.clock, + once=once, + interval=interval, + data_collection_tasks=[ + ( + collect_sprio_callable, + SinkAdditionalParams( + data_type=DataType.LOG, + heterogeneous_cluster_v1=heterogeneous_cluster_v1, + ), + ), + ], + sink=sink, + sink_opts=sink_opts, + retries=retries, + chunk_size=chunk_size, + dry_run=dry_run, + registry=obj.registry, + ) diff --git a/gcm/monitoring/slurm/client.py b/gcm/monitoring/slurm/client.py index ce42cc6..7ae5ba4 100644 --- a/gcm/monitoring/slurm/client.py +++ b/gcm/monitoring/slurm/client.py @@ -33,6 +33,7 @@ from gcm.schemas.slurm.sinfo import Sinfo from gcm.schemas.slurm.sinfo_node import SinfoNode from gcm.schemas.slurm.sinfo_row import SinfoRow +from gcm.schemas.slurm.sprio import SPRIO_FORMAT_SPEC, SPRIO_HEADER from gcm.schemas.slurm.squeue import JOB_DATA_SLURM_FIELDS, JobData if TYPE_CHECKING: @@ -122,6 +123,14 @@ def scontrol_config(self) -> Iterable[str]: def count_runaway_jobs(self) -> int: """Return the count of runaway jobs""" + def sprio(self) -> Iterable[str]: + """Get lines of sprio output showing job priority factors. + Each line should be pipe separated. + The first line defines the fieldnames. The rest are the rows. + Lines should not have a trailing newline. + If an error occurs during execution, RuntimeError should be raised. + """ + class SlurmCliClient(SlurmClient): def __init__( @@ -318,3 +327,10 @@ def count_runaway_jobs(self) -> int: for st in lines: return int(st) raise Exception(f"Could not count sacctmgr show runaway lines: {lines}") + + def sprio(self) -> Iterable[str]: + # Sort by partition (r) and priority descending (-y) for consistent ordering + yield SPRIO_HEADER + yield from _gen_lines( + self.__popen(["sprio", "-h", "--sort=r,-y", "-o", SPRIO_FORMAT_SPEC]) + ) diff --git a/gcm/schemas/slurm/sprio.py b/gcm/schemas/slurm/sprio.py new file mode 100644 index 0000000..9b83384 --- /dev/null +++ b/gcm/schemas/slurm/sprio.py @@ -0,0 +1,64 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +from dataclasses import dataclass, field, fields + +from typing import Callable, cast, TypeVar + +from gcm.monitoring.coerce import maybe_float +from gcm.schemas.slurm.derived_cluster import DerivedCluster + + +T = TypeVar("T") + + +def sprio_parsed_field(format_code: str, parser: Callable[[str], T] = str) -> T: + """Field with sprio format code and parser metadata. + + Combines format_code (for generating sprio command) with parser + (for instantiate_dataclass compatibility like parsed_field). + """ + return cast(T, field(default=None, metadata={"format_code": format_code, "parser": parser})) + + +@dataclass +class SprioRow: + """sprio output schema. Maps field names to SLURM format codes. + + Note: sprio only supports -o (format codes), not -O (field names). + SLURM outputs incorrect headers for some codes: + %A → "AGE" (should be ASSOC) + %P → "PARTITION" (should be PARTITION_PRIO) + We use custom headers to avoid this confusion. + + Uses sprio_parsed_field which is compatible with instantiate_dataclass(). + """ + + JOBID: float | None = sprio_parsed_field("%i", parser=maybe_float) + PARTITION: str | None = sprio_parsed_field("%r") + USER: str | None = sprio_parsed_field("%u") + ACCOUNT: str | None = sprio_parsed_field("%o") + PRIORITY: float | None = sprio_parsed_field("%Y", parser=maybe_float) + SITE: float | None = sprio_parsed_field("%S", parser=maybe_float) + AGE: float | None = sprio_parsed_field("%a", parser=maybe_float) + ASSOC: float | None = sprio_parsed_field("%A", parser=maybe_float) + FAIRSHARE: float | None = sprio_parsed_field("%F", parser=maybe_float) + JOBSIZE: float | None = sprio_parsed_field("%J", parser=maybe_float) + PARTITION_PRIO: float | None = sprio_parsed_field("%P", parser=maybe_float) + QOSNAME: str | None = sprio_parsed_field("%n") + QOS: str | None = sprio_parsed_field("%Q") + NICE: float | None = sprio_parsed_field("%N", parser=maybe_float) + + +# Auto-generate header and format spec from dataclass fields. +# We define our own headers instead of using sprio's default output, +# which avoids breakage when Slurm upgrades change header text. +SPRIO_HEADER = "|".join(f.name for f in fields(SprioRow)) +SPRIO_FORMAT_SPEC = "|".join(f.metadata["format_code"] for f in fields(SprioRow)) + + +@dataclass(kw_only=True) +class SprioPayload(DerivedCluster): + ds: str + collection_unixtime: int + cluster: str + sprio: SprioRow diff --git a/gcm/tests/data/sample-sprio-expected.json b/gcm/tests/data/sample-sprio-expected.json new file mode 100644 index 0000000..4d541c9 --- /dev/null +++ b/gcm/tests/data/sample-sprio-expected.json @@ -0,0 +1,5 @@ +[ + {"JOBID": 85957.0, "PARTITION": "learn", "USER": "generate", "ACCOUNT": "general_", "PRIORITY": 1000629.0, "SITE": 0.0, "AGE": 0.0, "ASSOC": 0.0, "FAIRSHARE": 530.0, "JOBSIZE": 0.0, "PARTITION_PRIO": 1000000.0, "QOSNAME": "lowest", "QOS": "100", "NICE": 0.0}, + {"JOBID": 85968.0, "PARTITION": "learn", "USER": "generate", "ACCOUNT": "general_", "PRIORITY": 1000629.0, "SITE": 0.0, "AGE": 0.0, "ASSOC": 0.0, "FAIRSHARE": 530.0, "JOBSIZE": 0.0, "PARTITION_PRIO": 1000000.0, "QOSNAME": "lowest", "QOS": "100", "NICE": 0.0}, + {"JOBID": 85981.0, "PARTITION": "learn", "USER": "generate", "ACCOUNT": "general_", "PRIORITY": 1000629.0, "SITE": 0.0, "AGE": 0.0, "ASSOC": 0.0, "FAIRSHARE": 530.0, "JOBSIZE": 0.0, "PARTITION_PRIO": 1000000.0, "QOSNAME": "lowest", "QOS": "100", "NICE": 0.0} +] diff --git a/gcm/tests/data/sample-sprio.txt b/gcm/tests/data/sample-sprio.txt new file mode 100644 index 0000000..5422fb3 --- /dev/null +++ b/gcm/tests/data/sample-sprio.txt @@ -0,0 +1,4 @@ +JOBID|PARTITION|USER|ACCOUNT|PRIORITY|SITE|AGE|ASSOC|FAIRSHARE|JOBSIZE|PARTITION_PRIO|QOSNAME|QOS|NICE +85957|learn|generate|general_|1000629|0|0|0|530|0|1000000|lowest|100|0 +85968|learn|generate|general_|1000629|0|0|0|530|0|1000000|lowest|100|0 +85981|learn|generate|general_|1000629|0|0|0|530|0|1000000|lowest|100|0 diff --git a/gcm/tests/test_sprio.py b/gcm/tests/test_sprio.py new file mode 100644 index 0000000..d12ba61 --- /dev/null +++ b/gcm/tests/test_sprio.py @@ -0,0 +1,69 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +import json +from importlib import resources +from importlib.resources import as_file, files +from typing import Generator +from unittest.mock import Mock + +import pytest +from gcm.monitoring.cli.sprio import collect_sprio +from gcm.monitoring.clock import unixtime_to_pacific_datetime +from gcm.monitoring.slurm.client import SlurmCliClient +from gcm.schemas.log import Log +from gcm.schemas.slurm.sprio import SprioPayload, SprioRow +from gcm.tests import data +from gcm.tests.fakes import FakeClock + + +TEST_CLUSTER = "test_cluster" +TEST_DS = "test_ds" + + +class FakeSlurmClient(SlurmCliClient): + + def sprio(self) -> Generator[str, None, None]: + with resources.open_text(data, "sample-sprio.txt") as f: + for line in f: + yield line.rstrip("\n") + + +@pytest.fixture(scope="module") +def dataset_contents() -> list[dict[str, str | float]]: + dataset = "sample-sprio-expected.json" + with as_file(files(data).joinpath(dataset)) as path: + return json.load(path.open()) + + +def test_collect_sprio(dataset_contents: list[dict[str, str | float]]) -> None: + sink_impl = Mock() + + data_result = collect_sprio( + clock=FakeClock(), + cluster=TEST_CLUSTER, + slurm_client=FakeSlurmClient(), + heterogeneous_cluster_v1=False, + ) + log = Log( + ts=FakeClock().unixtime(), + message=data_result, + ) + sink_impl.write(data=log) + + def sprio_iterator() -> Generator[SprioPayload, None, None]: + for sprio_data in dataset_contents: + sprio_row = SprioRow(**sprio_data) + yield SprioPayload( + ds=unixtime_to_pacific_datetime(FakeClock().unixtime()).strftime( + "%Y-%m-%d" + ), + collection_unixtime=FakeClock().unixtime(), + cluster=TEST_CLUSTER, + derived_cluster=TEST_CLUSTER, + sprio=sprio_row, + ) + + expected = Log(ts=FakeClock().unixtime(), message=sprio_iterator()) + actual = sink_impl.write.call_args.kwargs + assert actual["data"].ts == expected.ts + assert list(actual["data"].message) == list(expected.message) diff --git a/website/docs/GCM_Monitoring/collectors/README.md b/website/docs/GCM_Monitoring/collectors/README.md index cd185fa..1ce3645 100644 --- a/website/docs/GCM_Monitoring/collectors/README.md +++ b/website/docs/GCM_Monitoring/collectors/README.md @@ -16,6 +16,7 @@ This directory contains documentation for all GCM monitoring collectors. Collect - **[scontrol_config](scontrol_config.md)** - Collects cluster-wide configuration - **[slurm_job_monitor](slurm_job_monitor.md)** - Real-time node and job monitoring - **[slurm_monitor](slurm_monitor.md)** - Comprehensive cluster-wide metrics aggregation +- **[sprio](sprio.md)** - Collects job priority factors for pending jobs ## Common Concepts diff --git a/website/docs/GCM_Monitoring/collectors/sprio.md b/website/docs/GCM_Monitoring/collectors/sprio.md new file mode 100644 index 0000000..1e08196 --- /dev/null +++ b/website/docs/GCM_Monitoring/collectors/sprio.md @@ -0,0 +1,93 @@ +# sprio + +## Overview +Collects SLURM job priority information using `sprio` and publishes it at regular intervals. This enables FAIR Passport and other tools to display job priority factors for users across clusters. + +**Data Type**: `DataType.LOG`, **Schema**: `SprioPayload` + +## Execution Scope + +Single node in the cluster. + +## Output Schema + +### SprioPayload +Published with `DataType.LOG`: + +```python +{ + "ds": str, # Collection date (YYYY-MM-DD in Pacific time) + "collection_unixtime": int, # Unix timestamp of collection (for snapshot identification) + "cluster": str, # Cluster identifier + "derived_cluster": str, # Sub-cluster (same as cluster if not `--heterogeneous-cluster-v1`) + "sprio": { # Dictionary of job priority attributes + "JOBID": float, # Job ID + "PARTITION": str, # Partition name + "USER": str, # Username + "ACCOUNT": str, # Account name + "PRIORITY": float, # Total priority score + "SITE": float, # Site priority factor + "AGE": float, # Age priority factor + "ASSOC": str, # Association priority factor + "FAIRSHARE": float, # Fairshare priority factor + "JOBSIZE": float, # Job size priority factor + "PARTITION_PRIO": float, # Partition priority factor + "QOSNAME": str, # QoS name + "QOS": str, # QoS priority factor + "NICE": float, # Nice value adjustment + } +} +``` + +**Important Notes:** +1. Each pending job creates a separate record +2. Numeric priority factors are floats; identifiers are strings + +### Data Collection Commands +The collector executes: + +```bash +sprio -h --sort=r,-y -o "%i|%r|%u|%o|%Y|%S|%a|%A|%F|%J|%P|%n|%Q|%N" +``` + +The custom format string avoids duplicate column names that appear in `sprio -l` output. Jobs are sorted by partition (ascending) and priority (descending). + +> **Implementation Note**: This format string is auto-generated from the [`SprioRow`](../../../../gcm/schemas/slurm/sprio.py) dataclass. Each field's `format_code` metadata defines the corresponding sprio format specifier. To add or modify fields, update `SprioRow` - the format string regenerates automatically. + +## Command-Line Options + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `--cluster` | String | Auto-detected | Cluster name for metadata enrichment | +| `--sink` | String | **Required** | Sink destination, see [Exporters](../exporters) | +| `--sink-opts` | Multiple | - | Sink-specific options | +| `--log-level` | Choice | INFO | DEBUG, INFO, WARNING, ERROR, CRITICAL | +| `--log-folder` | String | `/var/log/fb-monitoring` | Log directory | +| `--stdout` | Flag | False | Display metrics to stdout in addition to logs | +| `--heterogeneous-cluster-v1` | Flag | False | Enable per-partition metrics for heterogeneous clusters | +| `--interval` | Integer | 120 | Seconds between collection cycles (2 minutes) | +| `--once` | Flag | False | Run once and exit (no continuous monitoring) | +| `--retries` | Integer | Shared default | Retry attempts on sink failures | +| `--dry-run` | Flag | False | Print to stdout instead of publishing to sink | +| `--chunk-size` | Integer | Shared default | The maximum size in bytes of each chunk when writing data to sink. | + +## Usage Examples + +### Basic Continuous Collection +```bash +gcm sprio --sink otel --sink-opts "log_resource_attributes={'attr_1': 'value1'}" +``` + +### One-Time Snapshot +```bash +gcm sprio --once --sink stdout +``` + +### Debug Mode with Local File Output +```bash +gcm sprio \ + --once \ + --log-level DEBUG \ + --stdout \ + --sink file --sink-opts filepath=/tmp/sprio_data.jsonl +```