From 0d0d093bb043e208a3d4294e5c3535a52acaf7a9 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Fri, 14 Mar 2025 15:37:50 +0100 Subject: [PATCH] feat: excluding nodes of failed jobs from further submissions --- snakemake_executor_plugin_slurm/__init__.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 5688069..322fa9c 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -140,6 +140,7 @@ def __post_init__(self): self.warn_on_jobcontext() self.run_uuid = str(uuid.uuid4()) self.logger.info(f"SLURM run ID: {self.run_uuid}") + self._failed_nodes = [] self._fallback_account_arg = None self._fallback_partition = None self._preemption_warning = False # no preemption warning has been issued @@ -297,6 +298,15 @@ def run_job(self, job: JobExecutorInterface): self.check_slurm_extra(job) call += f" {job.resources.slurm_extra}" + # if the workflow encountered any failed jobs, due to node failures, + # we now exclude these nodes from the job submission + if self._failed_nodes: + call += f" --exclude={','.join(self._failed_nodes)}" + self.logger.debug( + f"Excluding the following nodes from job submission: " + f"{','.join(self._failed_nodes)}" + ) + exec_job = self.format_job_exec(job) # ensure that workdir is set correctly @@ -518,6 +528,11 @@ async def check_active_jobs( j, msg=msg, aux_logs=[j.aux["slurm_logfile"]._str] ) active_jobs_seen_by_sacct.remove(j.external_jobid) + if status == "NODE_FAIL": + # get the node from the job which failed + # and add it to the list of failed nodes + node = j.aux["slurm_logfile"].parent.parent.name + self._failed_nodes.append(node) else: # still running? yield j