Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 80 additions & 8 deletions gcm/monitoring/slurm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,39 @@ def sdiag_structured(self) -> Sdiag:
sdiag_output = json.loads(
subprocess.check_output(["sdiag", "--all", "--json"], text=True)
)

return Sdiag(
server_thread_count=sdiag_output["statistics"]["server_thread_count"],
agent_queue_size=sdiag_output["statistics"]["agent_queue_size"],
agent_count=sdiag_output["statistics"]["agent_count"],
agent_thread_count=sdiag_output["statistics"]["agent_thread_count"],
dbd_agent_queue_size=sdiag_output["statistics"]["dbd_agent_queue_size"],
stats = sdiag_output["statistics"]

result = Sdiag(
server_thread_count=stats.get("server_thread_count"),
agent_queue_size=stats.get("agent_queue_size"),
agent_count=stats.get("agent_count"),
agent_thread_count=stats.get("agent_thread_count"),
dbd_agent_queue_size=stats.get("dbd_agent_queue_size"),
schedule_cycle_max=stats.get("schedule_cycle_max"),
schedule_cycle_mean=stats.get("schedule_cycle_mean"),
schedule_cycle_sum=stats.get("schedule_cycle_sum"),
schedule_cycle_total=stats.get("schedule_cycle_total"),
schedule_cycle_per_minute=stats.get("schedule_cycle_per_minute"),
schedule_queue_length=stats.get("schedule_queue_length"),
sdiag_jobs_submitted=stats.get("jobs_submitted"),
sdiag_jobs_started=stats.get("jobs_started"),
sdiag_jobs_completed=stats.get("jobs_completed"),
sdiag_jobs_canceled=stats.get("jobs_canceled"),
sdiag_jobs_failed=stats.get("jobs_failed"),
sdiag_jobs_pending=stats.get("jobs_pending"),
sdiag_jobs_running=stats.get("jobs_running"),
bf_backfilled_jobs=stats.get("bf_backfilled_jobs"),
bf_cycle_mean=stats.get("bf_cycle_mean"),
bf_cycle_sum=stats.get("bf_cycle_sum"),
bf_cycle_max=stats.get("bf_cycle_max"),
bf_queue_len=stats.get("bf_queue_len"),
)

# Reset sdiag counters after collection
self._reset_sdiag_counters()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we resetting sdiag counter on each collection?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that these counters are accumulative if we don't call sdiag reset. Adding a reset call everytime after we collect sdiag stats. This will make the data more meaningful on a timely basis.

@yonglimeta maybe add a cli flag to control this, seems fine resetting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because these sdiag counters are accumulative, if not reset, will keep increasing. Resetting allow us to collect true time-series sdiag data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay to start with this.

I hope these race conditions are somehow covered or may not be a big issue.
1/ Data collected --> sdiag reset
|
--> In parallel, New data came which got reset too.
2/ sdiag resets every midnight. Multiple race condition scenarios here too.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seconding on the potential data loss between each collection and reset. Also it seems that Slurm natively reset the counter at 12am server time.


return result

sdiag_output = subprocess.check_output(["sdiag", "--all"], text=True)
metric_names = {
"Server thread count:": "server_thread_count",
Expand All @@ -203,7 +227,7 @@ def sdiag_structured(self) -> Sdiag:
"Agent thread count:": "agent_thread_count",
"DBD Agent queue size:": "dbd_agent_queue_size",
}
data = {
data: dict[str, Optional[int]] = {
"server_thread_count": 0,
"agent_queue_size": 0,
"agent_count": 0,
Expand All @@ -215,8 +239,56 @@ def sdiag_structured(self) -> Sdiag:
lines = re.search(rf".*{sdiag_name}.*", sdiag_output)
assert lines is not None, f"Sdiag metric {sdiag_name} not found: {lines}"
data[name] = int(lines.group().strip(f"{sdiag_name}"))

optional_metric_names = {
"Schedule cycle max:": "schedule_cycle_max",
"Schedule cycle mean:": "schedule_cycle_mean",
"Schedule cycle sum:": "schedule_cycle_sum",
"Schedule cycle total:": "schedule_cycle_total",
"Schedule cycle per minute:": "schedule_cycle_per_minute",
"Schedule queue length:": "schedule_queue_length",
"Jobs submitted:": "sdiag_jobs_submitted",
"Jobs started:": "sdiag_jobs_started",
"Jobs completed:": "sdiag_jobs_completed",
"Jobs canceled:": "sdiag_jobs_canceled",
"Jobs failed:": "sdiag_jobs_failed",
"Jobs pending:": "sdiag_jobs_pending",
"Jobs running:": "sdiag_jobs_running",
"Total backfilled jobs \\(since last slurm start\\):": "bf_backfilled_jobs",
"Backfill cycle mean:": "bf_cycle_mean",
"Backfill cycle sum:": "bf_cycle_sum",
"Backfill cycle max:": "bf_cycle_max",
"Backfill queue length:": "bf_queue_len",
}

for sdiag_name, name in optional_metric_names.items():
match = re.search(rf"{sdiag_name}\s*(\d+)", sdiag_output)
if match:
data[name] = int(match.group(1))
else:
data[name] = None

# Reset sdiag counters after collection
self._reset_sdiag_counters()

return Sdiag(**data)

def _reset_sdiag_counters(self) -> None:
"""Reset sdiag counters after collection.

This requires appropriate permissions (typically root or SlurmUser).
If the reset fails due to permission issues, a warning is logged.
"""
try:
subprocess.run(
["sdiag", "--reset"],
check=True,
capture_output=True,
text=True,
)
except subprocess.CalledProcessError as e:
logger.warning(f"Failed to reset sdiag counters: {e.stderr.strip()}")

def sinfo_structured(self) -> Sinfo:
fieldnames = [f.name for f in fields(SinfoRow)]

Expand Down
24 changes: 24 additions & 0 deletions gcm/schemas/slurm/sdiag.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,27 @@ class Sdiag:
agent_count: Optional[int]
agent_thread_count: Optional[int]
dbd_agent_queue_size: Optional[int]

# Schedule cycle statistics
schedule_cycle_max: Optional[int] = None
schedule_cycle_mean: Optional[int] = None
schedule_cycle_sum: Optional[int] = None
schedule_cycle_total: Optional[int] = None
schedule_cycle_per_minute: Optional[int] = None
schedule_queue_length: Optional[int] = None

# Job statistics (prefixed with sdiag_ to avoid collision with SLURMLog)
sdiag_jobs_submitted: Optional[int] = None
sdiag_jobs_started: Optional[int] = None
sdiag_jobs_completed: Optional[int] = None
sdiag_jobs_canceled: Optional[int] = None
sdiag_jobs_failed: Optional[int] = None
sdiag_jobs_pending: Optional[int] = None
sdiag_jobs_running: Optional[int] = None

# Backfill statistics
bf_backfilled_jobs: Optional[int] = None
bf_cycle_mean: Optional[int] = None
bf_cycle_sum: Optional[int] = None
bf_cycle_max: Optional[int] = None
bf_queue_len: Optional[int] = None
Loading
Loading