-
Notifications
You must be signed in to change notification settings - Fork 37
refactor: first step to outsource query command #339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughAdds cluster-aware SLURM status querying and validation, a new job_status_query helper module, additional ExecutorSettings (status_command, status_attempts, efficiency reporting, log retention, qos/reservation), status-command fallback logic and validation, log cleanup, and optional efficiency-report generation during shutdown. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant User
participant Executor
participant QueryUtils as job_status_query
participant SLURM
rect rgba(230,245,255,0.6)
note over Executor: Initialization / validation
User->>Executor: instantiate Executor(settings)
Executor->>QueryUtils: is_query_tool_available(sacct), is_query_tool_available(squeue)
Executor->>QueryUtils: get_min_job_age()
Executor->>Executor: compute default status_command / _validate_status_command_settings()
end
rect rgba(240,255,230,0.6)
note over Executor: Status polling
Executor->>Executor: get_status_command()
alt using sacct
Executor->>SLURM: run sacct (command built by QueryUtils)
else using squeue
Executor->>SLURM: run squeue (command built by QueryUtils)
end
SLURM-->>Executor: job states
Executor->>Executor: retry up to status_attempts on transient failures
end
sequenceDiagram
autonumber
participant Executor
participant FS as Filesystem
note over Executor: Shutdown
Executor->>Executor: clean_old_logs()
alt efficiency_report enabled
Executor->>FS: write efficiency report to efficiency_report_path
else
Executor->>Executor: finalize shutdown
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (7)
snakemake_executor_plugin_slurm/job_status_query.py (5)
42-49
: Defensive: support minute units (if present) instead of silently assuming seconds.If MinJobAge is set like “5min”, we currently misinterpret it as 5 seconds.
Apply this diff:
- if unit in ("h", "hour", "hours"): + if unit in ("h", "hour", "hours"): return value * 3600 - elif unit in ("s", "sec", "secs", "second", "seconds", ""): + elif unit in ("m", "min", "mins", "minute", "minutes"): + return value * 60 + elif unit in ("s", "sec", "secs", "second", "seconds", ""): return value else: # Unknown unit, assume seconds return value
56-67
: Use shutil.which instead of spawning a subprocess.This avoids S603 concerns and is faster/portable.
Apply this diff:
+import shutil @@ -def is_query_tool_available(tool_name): +def is_query_tool_available(tool_name): """ - Check if the sacct command is available on the system. - Returns True if sacct is available, False otherwise. + Check if a given command is available on the system. + Returns True if available, False otherwise. """ - cmd = f"which {tool_name}" - cmd = shlex.split(cmd) - try: - subprocess.check_output(cmd, stderr=subprocess.PIPE) - return True - except subprocess.CalledProcessError: - return False + return shutil.which(tool_name) is not None
70-98
: Clarify function intent and docstring.Name says “recommend”, docstring says “available”. Keep it consistent: the function returns True when squeue is recommended (MinJobAge >= threshold). No code change needed; just update the docstring.
Apply this diff:
def should_recommend_squeue_status_command(min_threshold_seconds=120): """ - Determine if the status query with squeue should be recommended based on - the MinJobAge configuration (if very low, squeue might not work well) + Determine if squeue should be recommended based on MinJobAge. + If MinJobAge >= min_threshold_seconds, squeue is considered reliable. + Otherwise prefer sacct. @@ - Returns True if the option should be available, False otherwise. + Returns True if squeue is recommended, False otherwise. """
100-111
: Fix return type and docstring; build a safe, readable sacct command.The function returns a string, not a list; the docstring mentions unrelated args. Also avoid manual split-join; build tokens then shlex.join for readability.
Apply this diff:
-def query_job_status_sacct(jobuid) -> list: +def query_job_status_sacct(jobuid) -> str: """ - Query job status using sacct command + Build a sacct command to query job status for a given workflow run UUID. @@ - job_ids: List of SLURM job IDs - timeout: Timeout in seconds for subprocess call + jobuid: Workflow run UUID used as SLURM job name @@ - Dictionary mapping job ID to JobStatus object + Command string suitable for subprocess with shell=True """ @@ - # -X: only show main job, no substeps - query_command = f"""sacct -X --parsable2 \ - --clusters all \ - --noheader --format=JobIdRaw,State \ - --starttime {sacct_starttime} \ - --endtime now --name {jobuid}""" - - # for better redability in verbose output - query_command = " ".join(shlex.split(query_command)) - - return query_command + # -X: only show main job, no substeps + cmd = [ + "sacct", + "-X", + "--parsable2", + "--clusters", + "all", + "--noheader", + "--format=JobIdRaw,State", + "--starttime", + sacct_starttime, + "--endtime", + "now", + "--name", + str(jobuid), + ] + # readable one-liner for logs while keeping quoting correct + return shlex.join(cmd)Also applies to: 119-129
132-154
: Unify return type with sacct variant; return a string via shlex.join.Keeps consumers consistent and improves logging readability.
Apply this diff:
-def query_job_status_squeue(jobuid) -> list: +def query_job_status_squeue(jobuid) -> str: """ - Query job status using squeue command (newer SLURM functionality) + Build an squeue command to query job status (newer SLURM functionality). @@ - Returns: - Dictionary mapping job ID to JobStatus object + Returns: + Command string suitable for subprocess with shell=True @@ - query_command = [ + cmd = [ "squeue", "--format=%i|%T", "--states=all", "--noheader", "--name", - f"{jobuid}" + str(jobuid), ] - - return query_command + return shlex.join(cmd)snakemake_executor_plugin_slurm/__init__.py (2)
601-610
: De‑duplicate: build the sacct command via the new helper.Use query_job_status_sacct(self.run_uuid) to avoid drift between implementations.
Apply this diff (and import the helper):
-from .job_status_query import ( - get_min_job_age, - is_query_tool_available, - should_recommend_squeue_status_command, -) +from .job_status_query import ( + get_min_job_age, + is_query_tool_available, + should_recommend_squeue_status_command, + query_job_status_sacct, +) @@ - # -X: only show main job, no substeps - sacct_command = f"""sacct -X --parsable2 \ - --clusters all \ - --noheader --format=JobIdRaw,State \ - --starttime {sacct_starttime} \ - --endtime now --name {self.run_uuid}""" - - # for better redability in verbose output - sacct_command = " ".join(shlex.split(sacct_command)) + # Build via helper (one-liner string for logging) + sacct_command = query_job_status_sacct(self.run_uuid)
81-84
: Nit: extra space and wording in help message.“(MinJobAge is sufficient )” contains a stray space; also prefer “sufficient for reliable usage”.
Apply this diff:
- msg_part1 = "'squeue' recommended (MinJobAge is sufficient )" - msg_part2 = " for reliable usage" + msg_part1 = "'squeue' recommended (MinJobAge is sufficient" + msg_part2 = ") for reliable usage"
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
snakemake_executor_plugin_slurm/__init__.py
(8 hunks)snakemake_executor_plugin_slurm/job_status_query.py
(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: cmeesters
PR: snakemake/snakemake-executor-plugin-slurm#0
File: :0-0
Timestamp: 2025-01-13T09:54:22.950Z
Learning: PR #173 (adding gres resource specification) depends on PR #28 in snakemake-executor-plugin-slurm-jobstep repository, as changes were required in the cpu-settings function of the jobstep-Executor module.
📚 Learning: 2025-03-10T15:20:51.829Z
Learnt from: johanneskoester
PR: snakemake/snakemake-executor-plugin-slurm#173
File: docs/further.md:96-96
Timestamp: 2025-03-10T15:20:51.829Z
Learning: PR #173 in snakemake-executor-plugin-slurm implements GPU job support by adding resources: `gres` for generic resource specifications (e.g., 'gpu:1'), `gpu`/`gpus` for specifying GPU counts, and `gpu_model`/`gpu_manufacturer` for specifying GPU types, allowing users to request GPU resources directly rather than having to use slurm_extra.
Applied to files:
snakemake_executor_plugin_slurm/job_status_query.py
📚 Learning: 2024-12-11T14:17:08.749Z
Learnt from: cmeesters
PR: snakemake/snakemake-executor-plugin-slurm#178
File: snakemake_executor_plugin_slurm/__init__.py:0-0
Timestamp: 2024-12-11T14:17:08.749Z
Learning: In the `snakemake-executor-plugin-slurm` project, when handling exceptions in `snakemake_executor_plugin_slurm/__init__.py`, prefer concise error messages and avoid unnecessary verbosity or exception chaining when it's not necessary.
Applied to files:
snakemake_executor_plugin_slurm/__init__.py
🧬 Code graph analysis (1)
snakemake_executor_plugin_slurm/__init__.py (1)
snakemake_executor_plugin_slurm/job_status_query.py (3)
get_min_job_age
(7-53)is_query_tool_available
(56-67)should_recommend_squeue_status_command
(70-97)
🪛 Ruff (0.12.2)
snakemake_executor_plugin_slurm/job_status_query.py
16-16: subprocess
call: check for execution of untrusted input
(S603)
64-64: subprocess
call: check for execution of untrusted input
(S603)
65-65: Consider moving this statement to an else
block
(TRY300)
snakemake_executor_plugin_slurm/__init__.py
52-55: Avoid specifying long messages outside the exception class
(TRY003)
320-320: String contains ambiguous ×
(MULTIPLICATION SIGN). Did you mean x
(LATIN SMALL LETTER X)?
(RUF001)
329-329: String contains ambiguous ×
(MULTIPLICATION SIGN). Did you mean x
(LATIN SMALL LETTER X)?
(RUF001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: testing
🔇 Additional comments (2)
snakemake_executor_plugin_slurm/__init__.py (2)
45-60
: Default selection logic looks good.sacct preferred, squeue as fallback, and hard-fail if neither is present makes sense.
583-587
: status_command validated but not wired into status checksget_status_command exists and job_status_query implements both sacct and squeue, but active-job checks still appear to use sacct only — either wire get_status_command into check_active_jobs (use squeue when selected) or hide the status_command option behind a feature flag. To confirm, run:
rg -nP -C2 '\bcheck_active_jobs\b' snakemake_executor_plugin_slurm
rg -nP -C2 '\bquery_job_status_sacct\b|\bquery_job_status_squeue\b' snakemake_executor_plugin_slurm
status_command: Optional[str] = field( | ||
default_factory=_get_status_command_default, | ||
metadata={ | ||
"help": _get_status_command_help(), | ||
"env_var": False, | ||
"required": False, | ||
}, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not call _get_status_command_help() in metadata; use a static string.
The call evaluates at import time and triggers detections. Replace with the static help string above.
Apply this diff:
- status_command: Optional[str] = field(
- default_factory=_get_status_command_default,
- metadata={
- "help": _get_status_command_help(),
- "env_var": False,
- "required": False,
- },
- )
+ status_command: Optional[str] = field(
+ default_factory=_get_status_command_default,
+ metadata={
+ "help": _get_status_command_help(),
+ "env_var": False,
+ "required": False,
+ },
+ )
Note: after applying the previous change, _get_status_command_help() is pure and safe to call here.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In snakemake_executor_plugin_slurm/__init__.py around lines 204 to 211, the
metadata "help" value currently calls _get_status_command_help() at import time;
replace that call with the static help string used above (i.e., the literal help
text defined for status_command earlier in the file) so metadata contains a
plain string instead of invoking the function at import. Do not add any new
function calls; just paste the existing static help text into the metadata
"help" field and keep the rest of the field declaration unchanged.
def _validate_status_command_settings(self): | ||
"""Validate and provide feedback about status_command configuration.""" | ||
if hasattr(self.workflow.executor_settings, "status_command"): | ||
status_command = self.workflow.executor_settings.status_command | ||
if status_command: | ||
min_job_age = get_min_job_age() | ||
sacct_available = is_query_tool_available("sacct") | ||
|
||
# Calculate dynamic threshold: 3 times the initial status check interval | ||
# The plugin starts with 40 seconds and increases, so we use (3 * 10) + 40 = 70 seconds as minimum | ||
between_status_check_seconds = getattr( | ||
self.workflow.executor_settings, "seconds_between_status_checks", 70 | ||
) | ||
dynamic_check_threshold = 3 * between_status_check_seconds | ||
|
||
if not sacct_available and status_command == "sacct": | ||
self.logger.warning( | ||
"The 'sacct' command is not available on this system. " | ||
"Using 'squeue' instead for job status queries." | ||
) | ||
elif sacct_available and min_job_age is not None: | ||
if ( | ||
min_job_age < dynamic_check_threshold | ||
and status_command == "sacct" | ||
): | ||
self.logger.warning( | ||
f"MinJobAge is {min_job_age} seconds (< {dynamic_check_threshold}s). " | ||
f"This may cause 'sacct' to report inaccurate job states and the status_command option may be unreliable. " | ||
f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)" | ||
) | ||
elif ( | ||
min_job_age >= dynamic_check_threshold | ||
and status_command == "squeue" | ||
): | ||
self.logger.warning( | ||
f"MinJobAge is {min_job_age} seconds (>= {dynamic_check_threshold}s). " | ||
f"The 'squeue' command should work reliably for status queries. " | ||
f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)" | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix miswired warnings and threshold calculation in _validate_status_command_settings.
- MinJobAge affects squeue visibility, not sacct. Current warnings invert this.
- Threshold should derive from init_seconds_before_status_checks (40 by default), not a non-existent seconds_between_status_checks.
- Replace the ambiguous × character in log messages.
Apply this diff:
- # Calculate dynamic threshold: 3 times the initial status check interval
- # The plugin starts with 40 seconds and increases, so we use (3 * 10) + 40 = 70 seconds as minimum
- between_status_check_seconds = getattr(
- self.workflow.executor_settings, "seconds_between_status_checks", 70
- )
- dynamic_check_threshold = 3 * between_status_check_seconds
+ # Threshold: 3x initial status check interval (default 40s)
+ initial_interval = getattr(
+ self.workflow.executor_settings,
+ "init_seconds_before_status_checks",
+ 40,
+ )
+ dynamic_check_threshold = 3 * initial_interval
@@
- elif sacct_available and min_job_age is not None:
- if (
- min_job_age < dynamic_check_threshold
- and status_command == "sacct"
- ):
- self.logger.warning(
- f"MinJobAge is {min_job_age} seconds (< {dynamic_check_threshold}s). "
- f"This may cause 'sacct' to report inaccurate job states and the status_command option may be unreliable. "
- f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)"
- )
- elif (
- min_job_age >= dynamic_check_threshold
- and status_command == "squeue"
- ):
- self.logger.warning(
- f"MinJobAge is {min_job_age} seconds (>= {dynamic_check_threshold}s). "
- f"The 'squeue' command should work reliably for status queries. "
- f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)"
- )
+ elif sacct_available and min_job_age is not None:
+ if min_job_age < dynamic_check_threshold and status_command == "squeue":
+ self.logger.warning(
+ f"MinJobAge is {min_job_age} seconds (< {dynamic_check_threshold}s). "
+ "This may cause 'squeue' to miss recently finished jobs; consider using 'sacct'. "
+ f"(Threshold is 3x initial interval: 3 x {initial_interval}s = {dynamic_check_threshold}s)"
+ )
+ elif min_job_age >= dynamic_check_threshold and status_command == "sacct":
+ self.logger.warning(
+ f"MinJobAge is {min_job_age} seconds (>= {dynamic_check_threshold}s). "
+ "The 'squeue' command should also work reliably for status queries. "
+ f"(Threshold is 3x initial interval: 3 x {initial_interval}s = {dynamic_check_threshold}s)"
+ )
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def _validate_status_command_settings(self): | |
"""Validate and provide feedback about status_command configuration.""" | |
if hasattr(self.workflow.executor_settings, "status_command"): | |
status_command = self.workflow.executor_settings.status_command | |
if status_command: | |
min_job_age = get_min_job_age() | |
sacct_available = is_query_tool_available("sacct") | |
# Calculate dynamic threshold: 3 times the initial status check interval | |
# The plugin starts with 40 seconds and increases, so we use (3 * 10) + 40 = 70 seconds as minimum | |
between_status_check_seconds = getattr( | |
self.workflow.executor_settings, "seconds_between_status_checks", 70 | |
) | |
dynamic_check_threshold = 3 * between_status_check_seconds | |
if not sacct_available and status_command == "sacct": | |
self.logger.warning( | |
"The 'sacct' command is not available on this system. " | |
"Using 'squeue' instead for job status queries." | |
) | |
elif sacct_available and min_job_age is not None: | |
if ( | |
min_job_age < dynamic_check_threshold | |
and status_command == "sacct" | |
): | |
self.logger.warning( | |
f"MinJobAge is {min_job_age} seconds (< {dynamic_check_threshold}s). " | |
f"This may cause 'sacct' to report inaccurate job states and the status_command option may be unreliable. " | |
f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)" | |
) | |
elif ( | |
min_job_age >= dynamic_check_threshold | |
and status_command == "squeue" | |
): | |
self.logger.warning( | |
f"MinJobAge is {min_job_age} seconds (>= {dynamic_check_threshold}s). " | |
f"The 'squeue' command should work reliably for status queries. " | |
f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)" | |
) | |
def _validate_status_command_settings(self): | |
"""Validate and provide feedback about status_command configuration.""" | |
if hasattr(self.workflow.executor_settings, "status_command"): | |
status_command = self.workflow.executor_settings.status_command | |
if status_command: | |
min_job_age = get_min_job_age() | |
sacct_available = is_query_tool_available("sacct") | |
# Threshold: 3x initial status check interval (default 40s) | |
initial_interval = getattr( | |
self.workflow.executor_settings, | |
"init_seconds_before_status_checks", | |
40, | |
) | |
dynamic_check_threshold = 3 * initial_interval | |
if not sacct_available and status_command == "sacct": | |
self.logger.warning( | |
"The 'sacct' command is not available on this system. " | |
"Using 'squeue' instead for job status queries." | |
) | |
elif sacct_available and min_job_age is not None: | |
if min_job_age < dynamic_check_threshold and status_command == "squeue": | |
self.logger.warning( | |
f"MinJobAge is {min_job_age} seconds (< {dynamic_check_threshold}s). " | |
"This may cause 'squeue' to miss recently finished jobs; consider using 'sacct'. " | |
f"(Threshold is 3x initial interval: 3 x {initial_interval}s = {dynamic_check_threshold}s)" | |
) | |
elif min_job_age >= dynamic_check_threshold and status_command == "sacct": | |
self.logger.warning( | |
f"MinJobAge is {min_job_age} seconds (>= {dynamic_check_threshold}s). " | |
"The 'squeue' command should also work reliably for status queries. " | |
f"(Threshold is 3x initial interval: 3 x {initial_interval}s = {dynamic_check_threshold}s)" | |
) |
🧰 Tools
🪛 Ruff (0.12.2)
320-320: String contains ambiguous ×
(MULTIPLICATION SIGN). Did you mean x
(LATIN SMALL LETTER X)?
(RUF001)
329-329: String contains ambiguous ×
(MULTIPLICATION SIGN). Did you mean x
(LATIN SMALL LETTER X)?
(RUF001)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
snakemake_executor_plugin_slurm/job_status_query.py (1)
13-18
: Harden scontrol probe: add timeout and catch FileNotFoundError/TimeoutExpired.Prevents hangs and handles missing binaries gracefully.
Apply:
- output = subprocess.check_output(cmd, text=True, stderr=subprocess.PIPE) - except subprocess.CalledProcessError: + output = subprocess.check_output( + cmd, text=True, stderr=subprocess.PIPE, timeout=10 + ) + except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): return Nonesnakemake_executor_plugin_slurm/__init__.py (2)
67-105
: Make help static to avoid external calls at import time.Prevents import-time scontrol/sacct probes and potential hangs.
Apply:
def _get_status_command_help(): - """Get help text with computed default.""" - default_cmd = _get_status_command_default() - sacct_available = is_query_tool_available("sacct") - squeue_recommended = should_recommend_squeue_status_command() - - base_help = "Command to query job status. Options: 'sacct', 'squeue'. " - - if default_cmd == "sacct": - if sacct_available and not squeue_recommended: - info = ( - "'sacct' detected and will be used " - "(MinJobAge may be too low for reliable 'squeue' usage)" - ) - else: - info = "'sacct' detected and will be used" - else: # default_cmd == "squeue" - if squeue_recommended: - # cumbersome, due to black and the need to stay below 80 chars - msg_part1 = "'squeue' recommended (MinJobAge is sufficient )" - msg_part2 = " for reliable usage" - info = msg_part1 + msg_part2 - elif not sacct_available: - info = ( - "'sacct' not available, falling back to 'squeue'. " - "WARNING: 'squeue' may not work reliably if MinJobAge is " - "too low" - ) - else: - info = ( - "'squeue' will be used. " - "WARNING: MinJobAge may be too low for reliable 'squeue' usage" - ) - - return ( - f"{base_help}Default: '{default_cmd}' ({info}). " - f"Set explicitly to override auto-detection." - ) + """Help: job status command.""" + return ( + "Command to query job status. Options: 'sacct' (preferred), 'squeue'. " + "Default is chosen dynamically: 'sacct' if available, otherwise 'squeue'. " + "Note: 'squeue' reliability depends on MinJobAge; see plugin docs." + )
302-354
: Fix MinJobAge logic and threshold source; replace ambiguous ×.MinJobAge gates squeue visibility, not sacct; derive threshold from init_seconds_before_status_checks.
Apply:
- # Calculate dynamic threshold: 3 times the initial - # status check interval. - # The plugin starts with 40 seconds and increases, - # so we use (3 * 10) + 40 = 70 seconds as minimum - between_status_check_seconds = getattr( - self.workflow.executor_settings, - "seconds_between_status_checks", - 70, - ) - dynamic_check_threshold = 3 * between_status_check_seconds + # Threshold: 3x initial status check interval (default 40s) + initial_interval = getattr( + self.workflow.executor_settings, + "init_seconds_before_status_checks", + 40, + ) + dynamic_check_threshold = 3 * initial_interval @@ - elif sacct_available and min_job_age is not None: - if ( - min_job_age < dynamic_check_threshold - and status_command == "sacct" - ): - self.logger.warning( - f"MinJobAge is {min_job_age} seconds " - f"(< {dynamic_check_threshold}s). " - f"This may cause 'sacct' to report inaccurate " - "job states and the status_command option may " - "be unreliable. " - "(Threshold is 3x status check interval: 3 × " - f"{between_status_check_seconds}s = " - f"{dynamic_check_threshold}s)" - ) - elif ( - min_job_age >= dynamic_check_threshold - and status_command == "squeue" - ): - self.logger.warning( - f"MinJobAge is {min_job_age} seconds (>= " - f"{dynamic_check_threshold}s). " - f"The 'squeue' command should work reliably for " - "status queries. " - "(Threshold is 3x status check interval: 3 × " - f"{between_status_check_seconds}s = " - f"{dynamic_check_threshold}s)" - ) + elif sacct_available and min_job_age is not None: + if min_job_age < dynamic_check_threshold and status_command == "squeue": + self.logger.warning( + f"MinJobAge is {min_job_age} seconds (< {dynamic_check_threshold}s). " + "This may cause 'squeue' to miss recently finished jobs; consider using 'sacct'. " + f"(Threshold is 3x initial interval: 3 x {initial_interval}s = {dynamic_check_threshold}s)" + ) + elif min_job_age >= dynamic_check_threshold and status_command == "sacct": + self.logger.warning( + f"MinJobAge is {min_job_age} seconds (>= {dynamic_check_threshold}s). " + "The 'squeue' command should also work reliably for status queries. " + f"(Threshold is 3x initial interval: 3 x {initial_interval}s = {dynamic_check_threshold}s)" + )Run to confirm no stray uses of the removed attribute:
#!/bin/bash rg -nP -C2 '\bseconds_between_status_checks\b|\binit_seconds_before_status_checks\b'
🧹 Nitpick comments (7)
snakemake_executor_plugin_slurm/job_status_query.py (4)
56-67
: Use shutil.which instead of spawning 'which'.Faster, portable, and avoids S603 concerns; also fix docstring.
Apply:
def is_query_tool_available(tool_name): - """ - Check if the sacct command is available on the system. - Returns True if sacct is available, False otherwise. - """ - cmd = f"which {tool_name}" - cmd = shlex.split(cmd) - try: - subprocess.check_output(cmd, stderr=subprocess.PIPE) - return True - except subprocess.CalledProcessError: - return False + """ + Check if a given CLI tool is available on PATH. + """ + return shutil.which(tool_name) is not NoneAdd import:
+import shutil
97-127
: Fix return type/docstring of query_job_status_sacct (returns str, not list/dict).Align signature, doc, and behavior.
Apply:
-def query_job_status_sacct(jobuid) -> list: +def query_job_status_sacct(jobuid) -> str: """ - Query job status using sacct command + Build the sacct command used to query job status for a given workflow UUID. Args: - job_ids: List of SLURM job IDs - timeout: Timeout in seconds for subprocess call + jobuid: Workflow UUID used as job name filter. Returns: - Dictionary mapping job ID to JobStatus object + str: command string to pass to subprocess """
129-151
: Unify squeue builder to return a command string (consistent with sacct path).Ensures compatibility with job_stati(), which expects a string.
Apply:
-def query_job_status_squeue(jobuid) -> list: +def query_job_status_squeue(jobuid) -> str: """ - Query job status using squeue command (newer SLURM functionality) + Build the squeue command used to query job status for a given workflow UUID. @@ - Returns: - Dictionary mapping job ID to JobStatus object + Returns: + str: command string to pass to subprocess @@ - query_command = [ + query_command = [ "squeue", "--format=%i|%T", "--states=all", "--noheader", "--name", f"{jobuid}", ] - return query_command + return shlex.join(query_command)
123-124
: Typo.“redability” → “readability”.
Apply:
- # for better redability in verbose output + # for better readability in verbose outputsnakemake_executor_plugin_slurm/__init__.py (3)
355-363
: Return default when status_command is None.Avoids propagating None.
Apply:
def get_status_command(self): """Get the status command to use, with fallback logic.""" if hasattr(self.workflow.executor_settings, "status_command"): - return self.workflow.executor_settings.status_command + value = self.workflow.executor_settings.status_command + return value or _get_status_command_default() else: # Fallback: determine the best command based on # cluster configuration return _get_status_command_default()
386-395
: Guard against None for delete_logfiles_older_than.Make cleanup robust to explicit None.
Apply:
- if age_cutoff <= 0 or keep_all: + if age_cutoff is None or age_cutoff <= 0 or keep_all: return
377-384
: Don't let efficiency report failures abort shutdown.Wrap to log and continue.
Apply:
- if self.workflow.executor_settings.efficiency_report: - threshold = self.workflow.executor_settings.efficiency_threshold - report_path = self.workflow.executor_settings.efficiency_report_path - create_efficiency_report( - e_threshold=threshold, - run_uuid=self.run_uuid, - e_report_path=report_path, - logger=self.logger, - ) + if self.workflow.executor_settings.efficiency_report: + threshold = self.workflow.executor_settings.efficiency_threshold + report_path = self.workflow.executor_settings.efficiency_report_path + try: + create_efficiency_report( + e_threshold=threshold, + run_uuid=self.run_uuid, + e_report_path=report_path, + logger=self.logger, + ) + except Exception as e: # best-effort + self.logger.warning(f"Efficiency report failed: {e}")
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
pyproject.toml
(1 hunks)snakemake_executor_plugin_slurm/__init__.py
(19 hunks)snakemake_executor_plugin_slurm/efficiency_report.py
(1 hunks)snakemake_executor_plugin_slurm/job_status_query.py
(1 hunks)
✅ Files skipped from review due to trivial changes (2)
- pyproject.toml
- snakemake_executor_plugin_slurm/efficiency_report.py
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: cmeesters
PR: snakemake/snakemake-executor-plugin-slurm#0
File: :0-0
Timestamp: 2025-01-13T09:54:22.950Z
Learning: PR #173 (adding gres resource specification) depends on PR #28 in snakemake-executor-plugin-slurm-jobstep repository, as changes were required in the cpu-settings function of the jobstep-Executor module.
📚 Learning: 2024-12-11T14:17:08.749Z
Learnt from: cmeesters
PR: snakemake/snakemake-executor-plugin-slurm#178
File: snakemake_executor_plugin_slurm/__init__.py:0-0
Timestamp: 2024-12-11T14:17:08.749Z
Learning: In the `snakemake-executor-plugin-slurm` project, when handling exceptions in `snakemake_executor_plugin_slurm/__init__.py`, prefer concise error messages and avoid unnecessary verbosity or exception chaining when it's not necessary.
Applied to files:
snakemake_executor_plugin_slurm/__init__.py
snakemake_executor_plugin_slurm/job_status_query.py
📚 Learning: 2025-03-10T15:20:51.829Z
Learnt from: johanneskoester
PR: snakemake/snakemake-executor-plugin-slurm#173
File: docs/further.md:96-96
Timestamp: 2025-03-10T15:20:51.829Z
Learning: PR #173 in snakemake-executor-plugin-slurm implements GPU job support by adding resources: `gres` for generic resource specifications (e.g., 'gpu:1'), `gpu`/`gpus` for specifying GPU counts, and `gpu_model`/`gpu_manufacturer` for specifying GPU types, allowing users to request GPU resources directly rather than having to use slurm_extra.
Applied to files:
snakemake_executor_plugin_slurm/job_status_query.py
🧬 Code graph analysis (1)
snakemake_executor_plugin_slurm/__init__.py (2)
snakemake_executor_plugin_slurm/job_status_query.py (3)
get_min_job_age
(7-53)is_query_tool_available
(56-67)should_recommend_squeue_status_command
(70-94)snakemake_executor_plugin_slurm/efficiency_report.py (1)
create_efficiency_report
(190-219)
🪛 Ruff (0.12.2)
snakemake_executor_plugin_slurm/__init__.py
56-60: Avoid specifying long messages outside the exception class
(TRY003)
337-337: String contains ambiguous ×
(MULTIPLICATION SIGN). Did you mean x
(LATIN SMALL LETTER X)?
(RUF001)
350-350: String contains ambiguous ×
(MULTIPLICATION SIGN). Did you mean x
(LATIN SMALL LETTER X)?
(RUF001)
566-566: Avoid specifying long messages outside the exception class
(TRY003)
snakemake_executor_plugin_slurm/job_status_query.py
16-16: subprocess
call: check for execution of untrusted input
(S603)
64-64: subprocess
call: check for execution of untrusted input
(S603)
65-65: Consider moving this statement to an else
block
(TRY300)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
snakemake_executor_plugin_slurm/__init__.py (1)
745-747
: Fix path handling for aux_logs (Path has no _str).Use str(path) to avoid AttributeError during error reporting.
- self.report_job_error( - j, msg=msg, aux_logs=[j.aux["slurm_logfile"]._str] - ) + self.report_job_error( + j, msg=msg, aux_logs=[str(j.aux["slurm_logfile"])] + )
♻️ Duplicate comments (4)
snakemake_executor_plugin_slurm/__init__.py (3)
66-104
: Make help text static; avoid external checks at import time.This prevents scontrol/sacct/squeue probes during import. Prior review asked for this.
-def _get_status_command_help(): - """Get help text with computed default.""" - default_cmd = _get_status_command_default() - sacct_available = is_query_tool_available("sacct") - squeue_recommended = should_recommend_squeue_status_command() - - base_help = "Command to query job status. Options: 'sacct', 'squeue'. " - - if default_cmd == "sacct": - if sacct_available and not squeue_recommended: - info = ( - "'sacct' detected and will be used " - "(MinJobAge may be too low for reliable 'squeue' usage)" - ) - else: - info = "'sacct' detected and will be used" - else: # default_cmd == "squeue" - if squeue_recommended: - # cumbersome, due to black and the need to stay below 80 chars - msg_part1 = "'squeue' recommended (MinJobAge is sufficient )" - msg_part2 = " for reliable usage" - info = msg_part1 + msg_part2 - elif not sacct_available: - info = ( - "'sacct' not available, falling back to 'squeue'. " - "WARNING: 'squeue' may not work reliably if MinJobAge is " - "too low" - ) - else: - info = ( - "'squeue' will be used. " - "WARNING: MinJobAge may be too low for reliable 'squeue' usage" - ) - - return ( - f"{base_help}Default: '{default_cmd}' ({info}). " - f"Set explicitly to override auto-detection." - ) +def _get_status_command_help(): + """Help: job status command.""" + return ( + "Command to query job status. Options: 'sacct' (preferred), 'squeue'. " + "Default is chosen dynamically at runtime: 'sacct' if available, otherwise 'squeue'. " + "Note: 'squeue' reliability depends on MinJobAge; see plugin docs." + )
210-217
: Avoid calling a function in dataclass metadata; inline the static help string.Even though the helper is now pure, inlining avoids any import-time call.
- status_command: Optional[str] = field( - default_factory=_get_status_command_default, - metadata={ - "help": _get_status_command_help(), - "env_var": False, - "required": False, - }, - ) + status_command: Optional[str] = field( + default_factory=_get_status_command_default, + metadata={ + "help": ( + "Command to query job status. Options: 'sacct' (preferred), 'squeue'. " + "Default is chosen dynamically at runtime: 'sacct' if available, otherwise 'squeue'. " + "Note: 'squeue' reliability depends on MinJobAge; see plugin docs." + ), + "env_var": False, + "required": False, + }, + )
301-353
: Fix threshold source and inverted warnings in _validate_status_command_settings.Use init_seconds_before_status_checks (default 40s) and warn about squeue when MinJobAge is low; replace × with x. This repeats earlier guidance.
- # Calculate dynamic threshold: 3 times the initial - # status check interval. - # The plugin starts with 40 seconds and increases, - # so we use (3 * 10) + 40 = 70 seconds as minimum - between_status_check_seconds = getattr( - self.workflow.executor_settings, - "seconds_between_status_checks", - 70, - ) - dynamic_check_threshold = 3 * between_status_check_seconds + # Threshold: 3x initial status check interval (default 40s) + initial_interval = getattr( + self.workflow.executor_settings, + "init_seconds_before_status_checks", + 40, + ) + dynamic_check_threshold = 3 * initial_interval @@ - if ( - min_job_age < dynamic_check_threshold - and status_command == "sacct" - ): - self.logger.warning( - f"MinJobAge is {min_job_age} seconds " - f"(< {dynamic_check_threshold}s). " - f"This may cause 'sacct' to report inaccurate " - "job states and the status_command option may " - "be unreliable. " - "(Threshold is 3x status check interval: 3 × " - f"{between_status_check_seconds}s = " - f"{dynamic_check_threshold}s)" - ) - elif ( - min_job_age >= dynamic_check_threshold - and status_command == "squeue" - ): - self.logger.warning( - f"MinJobAge is {min_job_age} seconds (>= " - f"{dynamic_check_threshold}s). " - f"The 'squeue' command should work reliably for " - "status queries. " - "(Threshold is 3x status check interval: 3 × " - f"{between_status_check_seconds}s = " - f"{dynamic_check_threshold}s)" - ) + if min_job_age < dynamic_check_threshold and status_command == "squeue": + self.logger.warning( + f"MinJobAge is {min_job_age} seconds (< {dynamic_check_threshold}s). " + "This may cause 'squeue' to miss recently finished jobs; consider using 'sacct'. " + f"(Threshold is 3x initial interval: 3 x {initial_interval}s = {dynamic_check_threshold}s)" + ) + elif min_job_age >= dynamic_check_threshold and status_command == "sacct": + self.logger.warning( + f"MinJobAge is {min_job_age} seconds (>= {dynamic_check_threshold}s). " + "The 'squeue' command should also work reliably for status queries. " + f"(Threshold is 3x initial interval: 3 x {initial_interval}s = {dynamic_check_threshold}s)" + )snakemake_executor_plugin_slurm/job_status_query.py (1)
14-19
: Add timeout and catch FileNotFoundError/TimeoutExpired to avoid hangs.If scontrol is missing or stalls, get_min_job_age() can block startup. Add timeout and broaden exceptions. This mirrors a prior review; re-applying here.
Apply this diff:
- output = subprocess.check_output(cmd, text=True, stderr=subprocess.PIPE) - except subprocess.CalledProcessError: + output = subprocess.check_output( + cmd, text=True, stderr=subprocess.PIPE, timeout=10 + ) + except ( + subprocess.CalledProcessError, + FileNotFoundError, + subprocess.TimeoutExpired, + ): return None
🧹 Nitpick comments (7)
snakemake_executor_plugin_slurm/job_status_query.py (4)
1-5
: Import shutil (prep for safer tool detection).Follow-up to next comment: we’ll use shutil.which instead of invoking which.
import shlex import subprocess import re from datetime import datetime, timedelta +import shutil
57-68
: Use shutil.which; fix docstring; silence S603.Avoid spawning which; simpler, faster, and safer.
-def is_query_tool_available(tool_name): - """ - Check if the sacct command is available on the system. - Returns True if sacct is available, False otherwise. - """ - cmd = f"which {tool_name}" - cmd = shlex.split(cmd) - try: - subprocess.check_output(cmd, stderr=subprocess.PIPE) - return True - except subprocess.CalledProcessError: - return False +def is_query_tool_available(tool_name: str) -> bool: + """Return True if tool_name is on PATH, False otherwise.""" + return shutil.which(tool_name) is not None
98-128
: Fix return type and doc for query_job_status_sacct.Annotation and docstring claim a list/dict but the function returns a string command.
-def query_job_status_sacct(jobuid) -> list: - """ - Query job status using sacct command - - Args: - job_ids: List of SLURM job IDs - timeout: Timeout in seconds for subprocess call - - Returns: - Dictionary mapping job ID to JobStatus object - """ +def query_job_status_sacct(jobuid: str) -> str: + """ + Build the sacct command (as a shell string) to query status for the given + workflow run identifier (jobuid). The caller is responsible for execution. + """
130-151
: Align squeue helper: clarify return type/doc; add --clusters=all for parity.Keep returning argv list (to allow shell=False later) but document it; include clusters flag like sacct.
-def query_job_status_squeue(jobuid) -> list: - """ - Query job status using squeue command (newer SLURM functionality) - - Args: - job_ids: List of SLURM job IDs - timeout: Timeout in seconds for subprocess call - - Returns: - Dictionary mapping job ID to JobStatus object - """ +def query_job_status_squeue(jobuid: str) -> list[str]: + """ + Build the squeue command (argv list) to query status for the given run id. + The caller is responsible for execution. + """ @@ query_command = [ "squeue", "--format=%i|%T", "--states=all", "--noheader", + "--clusters=all", "--name", f"{jobuid}", ]snakemake_executor_plugin_slurm/__init__.py (3)
39-43
: Import the new query builders to avoid duplicate command construction.Use the helpers instead of re-assembling sacct strings locally; this matches the PR goal.
from .job_status_query import ( get_min_job_age, is_query_tool_available, should_recommend_squeue_status_command, + query_job_status_sacct, + query_job_status_squeue, )
619-626
: Initialize variables used after the loop (status_attempts could be 0).Prevents UnboundLocalError when status_attempts == 0.
- sacct_query_durations = [] + sacct_query_durations = [] + status_of_jobs = None + sacct_query_duration = None
629-646
: Use outsourced query builders and honor status_command.Respect the configured/default status command and remove duplicate sacct assembly.
- # We use this sacct syntax for argument 'starttime' to keep it - # compatible with slurm < 20.11 - sacct_starttime = f"{datetime.now() - timedelta(days=2):%Y-%m-%dT%H:00}" - # previously we had - # f"--starttime now-2days --endtime now --name {self.run_uuid}" - # in line 218 - once v20.11 is definitively not in use any more, - # the more readable version ought to be re-adapted - - # -X: only show main job, no substeps - sacct_command = f"""sacct -X --parsable2 \ - --clusters all \ - --noheader --format=JobIdRaw,State \ - --starttime {sacct_starttime} \ - --endtime now --name {self.run_uuid}""" - - # for better redability in verbose output - sacct_command = " ".join(shlex.split(sacct_command)) + # Build status query command according to selected backend + selected = self.get_status_command() + if selected == "squeue": + argv = query_job_status_squeue(self.run_uuid) + status_query_cmd = " ".join(shlex.quote(a) for a in argv) + else: + status_query_cmd = query_job_status_sacct(self.run_uuid)And below (Line 651) replace sacct_command with status_query_cmd:
- (status_of_jobs, sacct_query_duration) = await self.job_stati( - sacct_command - ) + (status_of_jobs, sacct_query_duration) = await self.job_stati( + status_query_cmd + )
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
snakemake_executor_plugin_slurm/__init__.py
(18 hunks)snakemake_executor_plugin_slurm/job_status_query.py
(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: cmeesters
PR: snakemake/snakemake-executor-plugin-slurm#0
File: :0-0
Timestamp: 2025-01-13T09:54:22.950Z
Learning: PR #173 (adding gres resource specification) depends on PR #28 in snakemake-executor-plugin-slurm-jobstep repository, as changes were required in the cpu-settings function of the jobstep-Executor module.
📚 Learning: 2024-12-11T14:17:08.749Z
Learnt from: cmeesters
PR: snakemake/snakemake-executor-plugin-slurm#178
File: snakemake_executor_plugin_slurm/__init__.py:0-0
Timestamp: 2024-12-11T14:17:08.749Z
Learning: In the `snakemake-executor-plugin-slurm` project, when handling exceptions in `snakemake_executor_plugin_slurm/__init__.py`, prefer concise error messages and avoid unnecessary verbosity or exception chaining when it's not necessary.
Applied to files:
snakemake_executor_plugin_slurm/__init__.py
snakemake_executor_plugin_slurm/job_status_query.py
📚 Learning: 2025-03-10T15:20:51.829Z
Learnt from: johanneskoester
PR: snakemake/snakemake-executor-plugin-slurm#173
File: docs/further.md:96-96
Timestamp: 2025-03-10T15:20:51.829Z
Learning: PR #173 in snakemake-executor-plugin-slurm implements GPU job support by adding resources: `gres` for generic resource specifications (e.g., 'gpu:1'), `gpu`/`gpus` for specifying GPU counts, and `gpu_model`/`gpu_manufacturer` for specifying GPU types, allowing users to request GPU resources directly rather than having to use slurm_extra.
Applied to files:
snakemake_executor_plugin_slurm/job_status_query.py
🧬 Code graph analysis (1)
snakemake_executor_plugin_slurm/__init__.py (2)
snakemake_executor_plugin_slurm/job_status_query.py (3)
get_min_job_age
(7-54)is_query_tool_available
(57-68)should_recommend_squeue_status_command
(71-95)snakemake_executor_plugin_slurm/efficiency_report.py (1)
create_efficiency_report
(190-219)
🪛 Ruff (0.12.2)
snakemake_executor_plugin_slurm/__init__.py
55-59: Avoid specifying long messages outside the exception class
(TRY003)
336-336: String contains ambiguous ×
(MULTIPLICATION SIGN). Did you mean x
(LATIN SMALL LETTER X)?
(RUF001)
349-349: String contains ambiguous ×
(MULTIPLICATION SIGN). Did you mean x
(LATIN SMALL LETTER X)?
(RUF001)
565-565: Avoid specifying long messages outside the exception class
(TRY003)
snakemake_executor_plugin_slurm/job_status_query.py
17-17: subprocess
call: check for execution of untrusted input
(S603)
65-65: subprocess
call: check for execution of untrusted input
(S603)
66-66: Consider moving this statement to an else
block
(TRY300)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: testing
🔇 Additional comments (1)
snakemake_executor_plugin_slurm/__init__.py (1)
581-606
: Confirm squeue output format on your SLURM cluster
squeue isn’t available in this environment. Please run on your target cluster:squeue --format=%i\|%T --states=all --noheader --clusters=all | head -5and ensure each line is exactly
<id>|<state>
with no multi-cluster prefixes or extra fields.
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
snakemake_executor_plugin_slurm/job_status_query.py (1)
14-21
: Good: timeout + broader exceptions in get_min_job_age.This resolves the earlier concern about hangs/missing scontrol. Looks solid.
🧹 Nitpick comments (4)
snakemake_executor_plugin_slurm/job_status_query.py (4)
23-56
: Parse minutes and avoid ambiguous units.Support “m/min/minute(s)” and fail closed on unknown units rather than silently treating them as seconds. Also simplifies conversion via a map.
- # Use regex to parse value and optional unit - # Pattern matches: number + optional whitespace + optional unit - match = re.match(r"^(\d+)\s*([a-zA-Z]*)", value_part) + # Parse number + optional unit + match = re.match(r"^(\d+)\s*([a-zA-Z]+)?", value_part) if not match: continue value_str = match.group(1) - unit = match.group(2).lower() if match.group(2) else "" + unit = (match.group(2) or "").lower() try: value = int(value_str) - - # Convert to seconds based on unit - if unit in ("h", "hour", "hours"): - return value * 3600 - elif unit in ("s", "sec", "secs", "second", "seconds", ""): - return value - else: - # Unknown unit, assume seconds - return value + unit_map = { + "": 1, "s": 1, "sec": 1, "secs": 1, "second": 1, "seconds": 1, + "m": 60, "min": 60, "mins": 60, "minute": 60, "minutes": 60, + "h": 3600, "hour": 3600, "hours": 3600, + } + factor = unit_map.get(unit) + if factor is None: + return None # unknown unit → safer to disable recommendation + return value * factor
59-70
: Use shutil.which instead of spawning ‘which’.Removes a subprocess, fixes Ruff S603, and is more portable. Also generalize the docstring.
+import shutil @@ -def is_query_tool_available(tool_name): - """ - Check if the sacct command is available on the system. - Returns True if sacct is available, False otherwise. - """ - cmd = f"which {tool_name}" - cmd = shlex.split(cmd) - try: - subprocess.check_output(cmd, stderr=subprocess.PIPE) - return True - except subprocess.CalledProcessError: - return False +def is_query_tool_available(tool_name: str) -> bool: + """ + Check if a CLI tool is available on PATH. + """ + return shutil.which(tool_name) is not None
73-97
: Clarify intent in docstring/name.Function returns whether squeue is viable, not a recommendation per se. Consider rewording docstring (or rename to is_squeue_viable) to avoid confusion.
- """ - Determine if the status query with squeue should be recommended based on - the MinJobAge configuration (if very low, squeue might not work well) + """ + Determine if using squeue for status queries is viable based on MinJobAge + (low values make completed jobs disappear quickly). @@ - Returns True if the option should be available, False otherwise. + Returns True if squeue is viable, False otherwise.
132-153
: Fix docstring/type-hint drift and clarify jobuid semantics (job-name == run_uuid).query_job_status_squeue returns an argv list; its docstring and signature are stale — jobuid is the run_uuid used as sbatch --job-name (submit_string.py:28; init.py:463–465). Update signature/docstring as shown.
-def query_job_status_squeue(jobuid) -> list: - """ - Query job status using squeue command (newer SLURM functionality) - - Args: - job_ids: List of SLURM job IDs - timeout: Timeout in seconds for subprocess call - - Returns: - Dictionary mapping job ID to JobStatus object - """ +def query_job_status_squeue(jobuid: str) -> List[str]: + """ + Build the squeue argv to query status by job name/UID. + Returns a list of arguments suitable for subprocess without shell=True. + """Also add the necessary typing import (or use list[str] to match the codebase).
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
snakemake_executor_plugin_slurm/job_status_query.py
(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: cmeesters
PR: snakemake/snakemake-executor-plugin-slurm#0
File: :0-0
Timestamp: 2025-01-13T09:54:22.950Z
Learning: PR #173 (adding gres resource specification) depends on PR #28 in snakemake-executor-plugin-slurm-jobstep repository, as changes were required in the cpu-settings function of the jobstep-Executor module.
📚 Learning: 2024-12-11T14:17:08.749Z
Learnt from: cmeesters
PR: snakemake/snakemake-executor-plugin-slurm#178
File: snakemake_executor_plugin_slurm/__init__.py:0-0
Timestamp: 2024-12-11T14:17:08.749Z
Learning: In the `snakemake-executor-plugin-slurm` project, when handling exceptions in `snakemake_executor_plugin_slurm/__init__.py`, prefer concise error messages and avoid unnecessary verbosity or exception chaining when it's not necessary.
Applied to files:
snakemake_executor_plugin_slurm/job_status_query.py
🪛 Ruff (0.12.2)
snakemake_executor_plugin_slurm/job_status_query.py
17-17: subprocess
call: check for execution of untrusted input
(S603)
67-67: subprocess
call: check for execution of untrusted input
(S603)
68-68: Consider moving this statement to an else
block
(TRY300)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: testing
🔇 Additional comments (1)
snakemake_executor_plugin_slurm/job_status_query.py (1)
100-131
: Return argv List[str] from query_job_status_sacct (fix docstring/type-hint/return mismatch)Change the function to return a List[str] argv suitable for subprocess.run(..., shell=False) and update the signature/docstring. Repo search found no callers outside the definition; verify external callers/tests before merging.
-from datetime import datetime, timedelta +from datetime import datetime, timedelta +from typing import List @@ -def query_job_status_sacct(jobuid) -> list: - """ - Query job status using sacct command - - Args: - job_ids: List of SLURM job IDs - timeout: Timeout in seconds for subprocess call - - Returns: - Dictionary mapping job ID to JobStatus object - """ +def query_job_status_sacct(jobuid: str) -> List[str]: + """ + Build the sacct argv to query job status for a given job name/UID. + Returns a list of arguments suitable for subprocess without shell=True. + """ @@ - # -X: only show main job, no substeps - query_command = f"""sacct -X --parsable2 \ - --clusters all \ - --noheader --format=JobIdRaw,State \ - --starttime {sacct_starttime} \ - --endtime now --name {jobuid}""" - - # for better redability in verbose output - query_command = " ".join(shlex.split(query_command)) - - return query_command + # -X: only show main job, no substeps + query_command = [ + "sacct", + "-X", + "--parsable2", + "--clusters", "all", + "--noheader", + "--format=JobIdRaw,State", + "--starttime", sacct_starttime, + "--endtime", "now", + "--name", str(jobuid), + ] + return query_command
This is an attempt to fulfil the request outlined in PR #336 .
Summary by CodeRabbit
New Features
Improvements
Chores