From 68c4ebd6279bfa1d9975477e5adbdf724b4f48ff Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Wed, 17 Apr 2024 09:48:25 +0200 Subject: [PATCH] Don't create runtime_context.kill_switch by default So that the runtime_context object can still be pickled. Other cleanups --- cwltool/context.py | 4 ++-- cwltool/errors.py | 8 +++++--- cwltool/executors.py | 3 ++- cwltool/job.py | 13 ++++++++++--- cwltool/task_queue.py | 2 +- 5 files changed, 20 insertions(+), 10 deletions(-) diff --git a/cwltool/context.py b/cwltool/context.py index 00a940e843..f75488e145 100644 --- a/cwltool/context.py +++ b/cwltool/context.py @@ -183,7 +183,7 @@ def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None: self.select_resources: Optional[select_resources_callable] = None self.eval_timeout: float = 60 self.postScatterEval: Optional[Callable[[CWLObjectType], Optional[CWLObjectType]]] = None - self.on_error: Union[Literal["stop"], Literal["continue"]] = "stop" + self.on_error: Union[Literal["stop"], Literal["continue"], Literal["kill"]] = "stop" self.strict_memory_limit: bool = False self.strict_cpu_limit: bool = False self.cidfile_dir: Optional[str] = None @@ -200,7 +200,7 @@ def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None: self.default_stderr: Optional[Union[IO[bytes], TextIO]] = None self.validate_only: bool = False self.validate_stdout: Optional[Union[IO[bytes], TextIO, IO[str]]] = None - self.kill_switch = threading.Event() + self.kill_switch: Optional[threading.Event] = None super().__init__(kwargs) if self.tmp_outdir_prefix == "": self.tmp_outdir_prefix = self.tmpdir_prefix diff --git a/cwltool/errors.py b/cwltool/errors.py index 361853cdb5..ae55232b54 100644 --- a/cwltool/errors.py +++ b/cwltool/errors.py @@ -17,9 +17,11 @@ class GraphTargetMissingException(WorkflowException): class WorkflowKillSwitch(Exception): """When processStatus != "success" and on-error=kill, raise this exception.""" - def __init__(self, job_id, rcode): + def __init__(self, job_id: str, rcode: int) -> None: + """Record the job identifier and the error code.""" self.job_id = job_id self.rcode = rcode - def __str__(self): - return f'[job {self.job_id}] activated kill switch with return code {self.rcode}' + def __str__(self) -> str: + """Represent this exception as a string.""" + return f"[job {self.job_id}] activated kill switch with return code {self.rcode}" diff --git a/cwltool/executors.py b/cwltool/executors.py index 3f350ebb51..cc7362d056 100644 --- a/cwltool/executors.py +++ b/cwltool/executors.py @@ -477,7 +477,8 @@ def run_jobs( self.wait_for_next_completion(runtime_context) self.run_job(None, runtime_context) finally: - runtime_context.workflow_eval_lock.release() + if (lock := runtime_context.workflow_eval_lock) is not None: + lock.release() self.taskqueue.drain() self.taskqueue.join() diff --git a/cwltool/job.py b/cwltool/job.py index 3c27d2c417..eed7512c82 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -231,7 +231,9 @@ def _execute( runtime: List[str], env: MutableMapping[str, str], runtimeContext: RuntimeContext, - monitor_function: Optional[Callable[["subprocess.Popen[str]"], None]] = None, + monitor_function: Optional[ + Callable[["subprocess.Popen[str]", threading.Event], None] + ] = None, ) -> None: """Execute the tool, either directly or via script. @@ -333,6 +335,10 @@ def stderr_stdout_log_path( builder: Optional[Builder] = getattr(self, "builder", None) if builder is not None: job_script_contents = builder.build_job_script(commands) + if runtimeContext.kill_switch is None: + runtimeContext.kill_switch = kill_switch = threading.Event() + else: + kill_switch = runtimeContext.kill_switch rcode = _job_popen( commands, stdin_path=stdin_path, @@ -341,7 +347,7 @@ def stderr_stdout_log_path( env=env, cwd=self.outdir, make_job_dir=lambda: runtimeContext.create_outdir(), - kill_switch=runtimeContext.kill_switch, + kill_switch=kill_switch, job_script_contents=job_script_contents, timelimit=self.timelimit, name=self.name, @@ -547,7 +553,8 @@ def monitor_kill_switch() -> None: nonlocal ks_tm if kill_switch.is_set(): _logger.error("[job %s] terminating by kill switch", self.name) - if sproc.stdin: sproc.stdin.close() + if sproc.stdin: + sproc.stdin.close() sproc.terminate() else: ks_tm = Timer(interval=1, function=monitor_kill_switch) diff --git a/cwltool/task_queue.py b/cwltool/task_queue.py index fbd441e01e..51c4dfbb3e 100644 --- a/cwltool/task_queue.py +++ b/cwltool/task_queue.py @@ -7,8 +7,8 @@ import threading from typing import Callable, Optional -from .loghandler import _logger from .errors import WorkflowKillSwitch +from .loghandler import _logger class TaskQueue: