From 572e8655656e49b2a09d54d5b80966ee2c0f9486 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Sat, 3 Feb 2024 14:37:01 -0800 Subject: [PATCH 01/29] Adding inputs for new "kill" mode of the on-error parameter --- cwltool/argparser.py | 7 ++++--- cwltool/context.py | 1 + cwltool/job.py | 7 +++++-- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/cwltool/argparser.py b/cwltool/argparser.py index 7b3125d94..9eea2ff5d 100644 --- a/cwltool/argparser.py +++ b/cwltool/argparser.py @@ -596,10 +596,11 @@ def arg_parser() -> argparse.ArgumentParser: parser.add_argument( "--on-error", help="Desired workflow behavior when a step fails. One of 'stop' (do " - "not submit any more steps) or 'continue' (may submit other steps that " - "are not downstream from the error). Default is 'stop'.", + "not submit any more steps), 'continue' (may submit other steps that " + "are not downstream from the error), or kill (same as stop, but also " + "terminates running jobs in the active step(s)). Default is 'stop'.", default="stop", - choices=("stop", "continue"), + choices=("stop", "continue", "kill"), ) checkgroup = parser.add_mutually_exclusive_group() diff --git a/cwltool/context.py b/cwltool/context.py index 237a90968..13bf562df 100644 --- a/cwltool/context.py +++ b/cwltool/context.py @@ -189,6 +189,7 @@ def __init__(self, kwargs: Optional[dict[str, Any]] = None) -> None: self.default_stderr: Optional[Union[IO[bytes], TextIO]] = None self.validate_only: bool = False self.validate_stdout: Optional["SupportsWrite[str]"] = None + self.kill_switch = threading.Event() super().__init__(kwargs) if self.tmp_outdir_prefix == "": self.tmp_outdir_prefix = self.tmpdir_prefix diff --git a/cwltool/job.py b/cwltool/job.py index b360be25f..d1abaac8f 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -327,6 +327,7 @@ def stderr_stdout_log_path( env=env, cwd=self.outdir, make_job_dir=lambda: runtimeContext.create_outdir(), + kill_switch=runtimeContext.kill_switch, job_script_contents=job_script_contents, timelimit=self.timelimit, name=self.name, @@ -492,8 +493,8 @@ def prepare_environment( # Set on ourselves self.environment = env - def process_monitor(self, sproc: "subprocess.Popen[str]") -> None: - """Watch a process, logging its max memory usage.""" + def process_monitor(self, sproc: "subprocess.Popen[str]", kill_switch: threading.Event) -> None: + """Watch a process, logging its max memory usage or terminating it if kill_switch is activated.""" monitor = psutil.Process(sproc.pid) # Value must be list rather than integer to utilise pass-by-reference in python memory_usage: MutableSequence[Optional[int]] = [None] @@ -835,6 +836,7 @@ def docker_monitor( cleanup_cidfile: bool, docker_exe: str, process: "subprocess.Popen[str]", + kill_switch: threading.Event, ) -> None: """Record memory usage of the running Docker container.""" # Todo: consider switching to `docker create` / `docker start` @@ -911,6 +913,7 @@ def _job_popen( env: Mapping[str, str], cwd: str, make_job_dir: Callable[[], str], + kill_switch: threading.Event, job_script_contents: Optional[str] = None, timelimit: Optional[int] = None, name: Optional[str] = None, From d7b7712663d8ccbd7453c7fa352603a8a9875369 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Sat, 3 Feb 2024 14:46:45 -0800 Subject: [PATCH 02/29] Adding kill switch trigger. If a job's processStatus != "success" and runtimeContext.on_error = "kill", then the switch is activated. WorkflowKillSwitch is raised so it can be handled at the workflow and executor levels --- cwltool/errors.py | 11 +++++++++++ cwltool/job.py | 11 +++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/cwltool/errors.py b/cwltool/errors.py index 045b9b383..260873b10 100644 --- a/cwltool/errors.py +++ b/cwltool/errors.py @@ -18,3 +18,14 @@ class UnsupportedRequirement(WorkflowException): class ArgumentException(Exception): """Mismatched command line arguments provided.""" + + +class WorkflowKillSwitch(Exception): + """When processStatus != "success" and on-error=kill, raise this exception.""" + + def __init__(self, job_id, rcode): + self.job_id = job_id + self.rcode = rcode + + def __str__(self): + return f'[job {self.job_id}] activated kill switch with return code {self.rcode}' diff --git a/cwltool/job.py b/cwltool/job.py index d1abaac8f..b7488fc67 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -30,7 +30,7 @@ from .builder import Builder from .context import RuntimeContext from .cuda import cuda_check -from .errors import UnsupportedRequirement, WorkflowException +from .errors import UnsupportedRequirement, WorkflowException, WorkflowKillSwitch from .loghandler import _logger from .pathmapper import MapperEnt, PathMapper from .process import stage_files @@ -348,7 +348,9 @@ def stderr_stdout_log_path( processStatus = "permanentFail" if processStatus != "success": - if rcode < 0: + if runtimeContext.kill_switch.is_set(): + return + elif rcode < 0: _logger.warning( "[job %s] was terminated by signal: %s", self.name, @@ -356,6 +358,9 @@ def stderr_stdout_log_path( ) else: _logger.warning("[job %s] exited with status: %d", self.name, rcode) + if runtimeContext.on_error == "kill": + runtimeContext.kill_switch.set() + raise WorkflowKillSwitch(self.name, rcode) if "listing" in self.generatefiles: if self.generatemapper: @@ -386,6 +391,8 @@ def stderr_stdout_log_path( except WorkflowException as err: _logger.error("[job %s] Job error:\n%s", self.name, str(err)) processStatus = "permanentFail" + except WorkflowKillSwitch: + raise except Exception: _logger.exception("Exception while running job") processStatus = "permanentFail" From 788a89d315c28cea6facf01d3ef8217ea0b77ed1 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Sat, 3 Feb 2024 14:56:00 -0800 Subject: [PATCH 03/29] Actively running jobs respond to the kill switch by checking the switch's status in the monitor function. The monitor function, up to this point, has been for gathering memory usage statistics via a timer thread. A second timer thread now monitors the kill switch. --- cwltool/job.py | 59 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 54 insertions(+), 5 deletions(-) diff --git a/cwltool/job.py b/cwltool/job.py index b7488fc67..d000bf2d0 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -507,6 +507,7 @@ def process_monitor(self, sproc: "subprocess.Popen[str]", kill_switch: threading memory_usage: MutableSequence[Optional[int]] = [None] mem_tm: "Optional[Timer]" = None + ks_tm: "Optional[Timer]" = None def get_tree_mem_usage(memory_usage: MutableSequence[Optional[int]]) -> None: nonlocal mem_tm @@ -528,10 +529,27 @@ def get_tree_mem_usage(memory_usage: MutableSequence[Optional[int]]) -> None: if mem_tm is not None: mem_tm.cancel() + def monitor_kill_switch() -> None: + nonlocal ks_tm + if kill_switch.is_set(): + _logger.error("[job %s] terminating by kill switch", self.name) + if sproc.stdin: sproc.stdin.close() + sproc.terminate() + else: + ks_tm = Timer(interval=1, function=monitor_kill_switch) + ks_tm.daemon = True + ks_tm.start() + + ks_tm = Timer(interval=1, function=monitor_kill_switch) + ks_tm.daemon = True + ks_tm.start() + mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,)) mem_tm.daemon = True mem_tm.start() + sproc.wait() + ks_tm.cancel() mem_tm.cancel() if memory_usage[0] is not None: _logger.info( @@ -845,13 +863,40 @@ def docker_monitor( process: "subprocess.Popen[str]", kill_switch: threading.Event, ) -> None: - """Record memory usage of the running Docker container.""" + """Record memory usage of the running Docker container. Terminate if kill_switch is activated.""" + + ks_tm: "Optional[Timer]" = None + cid: Optional[str] = None + + def monitor_kill_switch() -> None: + nonlocal ks_tm + if kill_switch.is_set(): + _logger.error("[job %s] terminating by kill switch", self.name) + if process.stdin: + process.stdin.close() + if cid is not None: + kill_proc = subprocess.Popen( # nosec + [docker_exe, "kill", cid], shell=False # nosec + ) + try: + kill_proc.wait(timeout=10) + except subprocess.TimeoutExpired: + kill_proc.kill() + process.terminate() # Always terminate, even if we tried with the cidfile + else: + ks_tm = Timer(interval=1, function=monitor_kill_switch) + ks_tm.daemon = True + ks_tm.start() + + ks_tm = Timer(interval=1, function=monitor_kill_switch) + ks_tm.daemon = True + ks_tm.start() + # Todo: consider switching to `docker create` / `docker start` # instead of `docker run` as `docker create` outputs the container ID # to stdout, but the container is frozen, thus allowing us to start the # monitoring process without dealing with the cidfile or too-fast # container execution - cid: Optional[str] = None while cid is None: time.sleep(1) # This is needed to avoid a race condition where the job @@ -859,6 +904,7 @@ def docker_monitor( if process.returncode is None: process.poll() if process.returncode is not None: + ks_tm.cancel() if cleanup_cidfile: try: os.remove(cidfile) @@ -890,6 +936,9 @@ def docker_monitor( except OSError as exc: _logger.warning("Ignored error with %s stats: %s", docker_exe, exc) return + finally: + ks_tm.cancel() + max_mem_percent: float = 0.0 mem_percent: float = 0.0 with open(stats_file_name) as stats: @@ -924,7 +973,7 @@ def _job_popen( job_script_contents: Optional[str] = None, timelimit: Optional[int] = None, name: Optional[str] = None, - monitor_function: Optional[Callable[["subprocess.Popen[str]"], None]] = None, + monitor_function: Optional[Callable[["subprocess.Popen[str]", "threading.Event"], None]] = None, default_stdout: Optional[Union[IO[bytes], TextIO]] = None, default_stderr: Optional[Union[IO[bytes], TextIO]] = None, ) -> int: @@ -979,7 +1028,7 @@ def terminate(): # type: () -> None tm.daemon = True tm.start() if monitor_function: - monitor_function(sproc) + monitor_function(sproc, kill_switch) rcode = sproc.wait() if tm is not None: @@ -1055,7 +1104,7 @@ def terminate(): # type: () -> None tm.daemon = True tm.start() if monitor_function: - monitor_function(sproc) + monitor_function(sproc, kill_switch) rcode = sproc.wait() From de73938e2be0c3907f06a1fc6b64ce0e8ccdf6d8 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Sat, 3 Feb 2024 15:34:47 -0800 Subject: [PATCH 04/29] When the WorkflowKillSwitch exception reaches the TaskQueue, try to prevent pending tasks from starting by simply draining the queue. This is a very loose policy, but since kill switch response is handled at the job level, any tasks that start after the kill switch is activated will take care of themselves and self terminate --- cwltool/task_queue.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cwltool/task_queue.py b/cwltool/task_queue.py index 59b1609e9..fbd441e01 100644 --- a/cwltool/task_queue.py +++ b/cwltool/task_queue.py @@ -8,6 +8,7 @@ from typing import Callable, Optional from .loghandler import _logger +from .errors import WorkflowKillSwitch class TaskQueue: @@ -55,6 +56,9 @@ def _task_queue_func(self) -> None: return try: task() + except WorkflowKillSwitch: + self.drain() + break except BaseException as e: # noqa: B036 _logger.exception("Unhandled exception running task", exc_info=e) self.error = e From a9a2ab68ccbab317649fddee39ac39c21e82c90d Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Sat, 3 Feb 2024 16:01:38 -0800 Subject: [PATCH 05/29] Propagation of the WorkflowKillSwitch exception stops once it reaches an executor. The workflow_eval_lock release had to be moved to the finally block in MultithreadedJobExecutor.run_jobs(). Otherwise, TaskQueue threads running MultithreadedJobExecutor._runner() will never join() because _runner() waits indefinitely for the workflow_eval_lock in its own finally block. --- cwltool/executors.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/cwltool/executors.py b/cwltool/executors.py index e25426c9d..cccee5dd0 100644 --- a/cwltool/executors.py +++ b/cwltool/executors.py @@ -20,7 +20,7 @@ from .context import RuntimeContext, getdefault from .cuda import cuda_version_and_device_count from .cwlprov.provenance_profile import ProvenanceProfile -from .errors import WorkflowException +from .errors import WorkflowException, WorkflowKillSwitch from .job import JobBase from .loghandler import _logger from .mutation import MutationManager @@ -251,6 +251,11 @@ def run_jobs( WorkflowException, ): # pylint: disable=try-except-raise raise + except WorkflowKillSwitch as err: + _logger.error( + f"Workflow kill switch activated by [job {err.job_id}] " + f"because on-error={runtime_context.on_error}" + ) except Exception as err: logger.exception("Got workflow error") raise WorkflowException(str(err)) from err @@ -323,6 +328,11 @@ def _runner( except WorkflowException as err: _logger.exception(f"Got workflow error: {err}") self.exceptions.append(err) + except WorkflowKillSwitch as err: + _logger.error( + f"Workflow kill switch activated by [job {err.job_id}] " + f"because on-error={runtime_context.on_error}" + ) except Exception as err: # pylint: disable=broad-except _logger.exception(f"Got workflow error: {err}") self.exceptions.append(WorkflowException(str(err))) @@ -457,9 +467,8 @@ def run_jobs( while self.taskqueue.in_flight > 0: self.wait_for_next_completion(runtime_context) self.run_job(None, runtime_context) - - runtime_context.workflow_eval_lock.release() finally: + runtime_context.workflow_eval_lock.release() self.taskqueue.drain() self.taskqueue.join() From 0a00582ab043d0891c03266ce084bc1f041e163c Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Wed, 17 Apr 2024 09:48:25 +0200 Subject: [PATCH 06/29] Don't create runtime_context.kill_switch by default So that the runtime_context object can still be pickled. Other cleanups --- cwltool/context.py | 4 ++-- cwltool/errors.py | 8 +++++--- cwltool/executors.py | 3 ++- cwltool/job.py | 13 ++++++++++--- cwltool/task_queue.py | 2 +- 5 files changed, 20 insertions(+), 10 deletions(-) diff --git a/cwltool/context.py b/cwltool/context.py index 13bf562df..a5bdcd99b 100644 --- a/cwltool/context.py +++ b/cwltool/context.py @@ -172,7 +172,7 @@ def __init__(self, kwargs: Optional[dict[str, Any]] = None) -> None: self.select_resources: Optional[select_resources_callable] = None self.eval_timeout: float = 60 self.postScatterEval: Optional[Callable[[CWLObjectType], Optional[CWLObjectType]]] = None - self.on_error: Union[Literal["stop"], Literal["continue"]] = "stop" + self.on_error: Union[Literal["stop"], Literal["continue"], Literal["kill"]] = "stop" self.strict_memory_limit: bool = False self.strict_cpu_limit: bool = False self.cidfile_dir: Optional[str] = None @@ -189,7 +189,7 @@ def __init__(self, kwargs: Optional[dict[str, Any]] = None) -> None: self.default_stderr: Optional[Union[IO[bytes], TextIO]] = None self.validate_only: bool = False self.validate_stdout: Optional["SupportsWrite[str]"] = None - self.kill_switch = threading.Event() + self.kill_switch: Optional[threading.Event] = None super().__init__(kwargs) if self.tmp_outdir_prefix == "": self.tmp_outdir_prefix = self.tmpdir_prefix diff --git a/cwltool/errors.py b/cwltool/errors.py index 260873b10..1eb92d21c 100644 --- a/cwltool/errors.py +++ b/cwltool/errors.py @@ -23,9 +23,11 @@ class ArgumentException(Exception): class WorkflowKillSwitch(Exception): """When processStatus != "success" and on-error=kill, raise this exception.""" - def __init__(self, job_id, rcode): + def __init__(self, job_id: str, rcode: int) -> None: + """Record the job identifier and the error code.""" self.job_id = job_id self.rcode = rcode - def __str__(self): - return f'[job {self.job_id}] activated kill switch with return code {self.rcode}' + def __str__(self) -> str: + """Represent this exception as a string.""" + return f"[job {self.job_id}] activated kill switch with return code {self.rcode}" diff --git a/cwltool/executors.py b/cwltool/executors.py index cccee5dd0..61e42ca1f 100644 --- a/cwltool/executors.py +++ b/cwltool/executors.py @@ -468,7 +468,8 @@ def run_jobs( self.wait_for_next_completion(runtime_context) self.run_job(None, runtime_context) finally: - runtime_context.workflow_eval_lock.release() + if (lock := runtime_context.workflow_eval_lock) is not None: + lock.release() self.taskqueue.drain() self.taskqueue.join() diff --git a/cwltool/job.py b/cwltool/job.py index d000bf2d0..18d8b3285 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -217,7 +217,9 @@ def _execute( runtime: list[str], env: MutableMapping[str, str], runtimeContext: RuntimeContext, - monitor_function: Optional[Callable[["subprocess.Popen[str]"], None]] = None, + monitor_function: Optional[ + Callable[["subprocess.Popen[str]", threading.Event], None] + ] = None, ) -> None: """Execute the tool, either directly or via script. @@ -319,6 +321,10 @@ def stderr_stdout_log_path( builder: Optional[Builder] = getattr(self, "builder", None) if builder is not None: job_script_contents = builder.build_job_script(commands) + if runtimeContext.kill_switch is None: + runtimeContext.kill_switch = kill_switch = threading.Event() + else: + kill_switch = runtimeContext.kill_switch rcode = _job_popen( commands, stdin_path=stdin_path, @@ -327,7 +333,7 @@ def stderr_stdout_log_path( env=env, cwd=self.outdir, make_job_dir=lambda: runtimeContext.create_outdir(), - kill_switch=runtimeContext.kill_switch, + kill_switch=kill_switch, job_script_contents=job_script_contents, timelimit=self.timelimit, name=self.name, @@ -533,7 +539,8 @@ def monitor_kill_switch() -> None: nonlocal ks_tm if kill_switch.is_set(): _logger.error("[job %s] terminating by kill switch", self.name) - if sproc.stdin: sproc.stdin.close() + if sproc.stdin: + sproc.stdin.close() sproc.terminate() else: ks_tm = Timer(interval=1, function=monitor_kill_switch) diff --git a/cwltool/task_queue.py b/cwltool/task_queue.py index fbd441e01..51c4dfbb3 100644 --- a/cwltool/task_queue.py +++ b/cwltool/task_queue.py @@ -7,8 +7,8 @@ import threading from typing import Callable, Optional -from .loghandler import _logger from .errors import WorkflowKillSwitch +from .loghandler import _logger class TaskQueue: From beaa3f44a00f8613d2dfd3d2714969d94ced2277 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Tue, 30 Jul 2024 18:38:30 -0700 Subject: [PATCH 07/29] This commit brings kill switch initialization and monitoring to the TaskQueue. This helps to better synchronize the kill switch event and avoid adding/executing tasks after the switch has been set. This approach is tighter than my previous draft, but a race condition still exists where a task might be started after the kill switch has been set and announced. If this happens then the leaked job's monitor function will kill it and the subprocess' lifespan will be a maximum of the monitor's timer interval (currently 1 second). So when this rare event happens, the console output will be potentially confusing since it will show a new job starting after the kill switch has been announced. --- cwltool/executors.py | 2 +- cwltool/task_queue.py | 16 +++++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/cwltool/executors.py b/cwltool/executors.py index 61e42ca1f..08c4593be 100644 --- a/cwltool/executors.py +++ b/cwltool/executors.py @@ -439,7 +439,7 @@ def run_jobs( logger: logging.Logger, runtime_context: RuntimeContext, ) -> None: - self.taskqueue: TaskQueue = TaskQueue(threading.Lock(), int(math.ceil(self.max_cores))) + self.taskqueue: TaskQueue = TaskQueue(threading.Lock(), int(math.ceil(self.max_cores)), runtime_context) try: jobiter = process.job(job_order_object, self.output_callback, runtime_context) diff --git a/cwltool/task_queue.py b/cwltool/task_queue.py index 51c4dfbb3..b10b68940 100644 --- a/cwltool/task_queue.py +++ b/cwltool/task_queue.py @@ -8,6 +8,7 @@ from typing import Callable, Optional from .errors import WorkflowKillSwitch +from .context import RuntimeContext from .loghandler import _logger @@ -34,7 +35,7 @@ class TaskQueue: in_flight: int = 0 """The number of tasks in the queue.""" - def __init__(self, lock: threading.Lock, thread_count: int): + def __init__(self, lock: threading.Lock, thread_count: int, runtime_context: RuntimeContext): """Create a new task queue using the specified lock and number of threads.""" self.thread_count = thread_count self.task_queue: queue.Queue[Optional[Callable[[], None]]] = queue.Queue( @@ -44,6 +45,11 @@ def __init__(self, lock: threading.Lock, thread_count: int): self.lock = lock self.error: Optional[BaseException] = None + if runtime_context.kill_switch is None: + self.kill_switch = runtime_context.kill_switch = threading.Event() + else: + self.kill_switch = runtime_context.kill_switch + for _r in range(0, self.thread_count): t = threading.Thread(target=self._task_queue_func) self.task_queue_threads.append(t) @@ -52,11 +58,12 @@ def __init__(self, lock: threading.Lock, thread_count: int): def _task_queue_func(self) -> None: while True: task = self.task_queue.get() - if task is None: + if task is None or self.kill_switch.is_set(): return try: task() except WorkflowKillSwitch: + self.kill_switch.set() self.drain() break except BaseException as e: # noqa: B036 @@ -96,7 +103,10 @@ def add( try: if unlock is not None: unlock.release() - if check_done is not None and check_done.is_set(): + if ( + (check_done is not None and check_done.is_set()) + or self.kill_switch.is_set() + ): with self.lock: self.in_flight -= 1 return From b52333df71d7c8adc903ae921d0a1a5d52230479 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Tue, 30 Jul 2024 19:11:16 -0700 Subject: [PATCH 08/29] JobBase._execute() previously skipped some important post-job actions when exiting due to kill switch. Those actions have been placed under a `finally` block so that they are executed by both the "switching" job and the "responding" jobs. However, some of these post actions added a lot of redundant and unhelpful terminal output when handling jobs killed DUE TO the kill switch. The verbose output obscured the error's cause which isn't helpful. Two new process statuses have been added in order to better handle the event: - indeterminant: a default value for processStatus. - killed: the job was killed due to the kill switch being set. This approach also means that partial outputs aren't collected from jobs that have been killed. --- cwltool/job.py | 106 +++++++++++++++++++++------------------- cwltool/workflow.py | 13 ++--- cwltool/workflow_job.py | 29 +++++------ 3 files changed, 77 insertions(+), 71 deletions(-) diff --git a/cwltool/job.py b/cwltool/job.py index 18d8b3285..a8f012612 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -284,6 +284,7 @@ def _execute( "{}".format(runtimeContext) ) outputs: CWLObjectType = {} + processStatus = "indeterminate" try: stdin_path = None if self.stdin is not None: @@ -355,6 +356,7 @@ def stderr_stdout_log_path( if processStatus != "success": if runtimeContext.kill_switch.is_set(): + processStatus = "killed" return elif rcode < 0: _logger.warning( @@ -398,62 +400,64 @@ def stderr_stdout_log_path( _logger.error("[job %s] Job error:\n%s", self.name, str(err)) processStatus = "permanentFail" except WorkflowKillSwitch: + processStatus = "permanentFail" raise except Exception: _logger.exception("Exception while running job") processStatus = "permanentFail" - if ( - runtimeContext.research_obj is not None - and self.prov_obj is not None - and runtimeContext.process_run_id is not None - ): - # creating entities for the outputs produced by each step (in the provenance document) - self.prov_obj.record_process_end( - str(self.name), - runtimeContext.process_run_id, - outputs, - datetime.datetime.now(), - ) - if processStatus != "success": - _logger.warning("[job %s] completed %s", self.name, processStatus) - else: - _logger.info("[job %s] completed %s", self.name, processStatus) - - if _logger.isEnabledFor(logging.DEBUG): - _logger.debug("[job %s] outputs %s", self.name, json_dumps(outputs, indent=4)) - - if self.generatemapper is not None and runtimeContext.secret_store is not None: - # Delete any runtime-generated files containing secrets. - for _, p in self.generatemapper.items(): - if p.type == "CreateFile": - if runtimeContext.secret_store.has_secret(p.resolved): - host_outdir = self.outdir - container_outdir = self.builder.outdir - host_outdir_tgt = p.target - if p.target.startswith(container_outdir + "/"): - host_outdir_tgt = os.path.join( - host_outdir, p.target[len(container_outdir) + 1 :] - ) - os.remove(host_outdir_tgt) - - if runtimeContext.workflow_eval_lock is None: - raise WorkflowException("runtimeContext.workflow_eval_lock must not be None") - - if self.output_callback: - with runtimeContext.workflow_eval_lock: - self.output_callback(outputs, processStatus) - - if runtimeContext.rm_tmpdir and self.stagedir is not None and os.path.exists(self.stagedir): - _logger.debug( - "[job %s] Removing input staging directory %s", - self.name, - self.stagedir, - ) - shutil.rmtree(self.stagedir, True) + finally: + if ( + runtimeContext.research_obj is not None + and self.prov_obj is not None + and runtimeContext.process_run_id is not None + ): + # creating entities for the outputs produced by each step (in the provenance document) + self.prov_obj.record_process_end( + str(self.name), + runtimeContext.process_run_id, + outputs, + datetime.datetime.now(), + ) + if processStatus != "success": + _logger.warning("[job %s] completed %s", self.name, processStatus) + else: + _logger.info("[job %s] completed %s", self.name, processStatus) + + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("[job %s] outputs %s", self.name, json_dumps(outputs, indent=4)) + + if self.generatemapper is not None and runtimeContext.secret_store is not None: + # Delete any runtime-generated files containing secrets. + for _, p in self.generatemapper.items(): + if p.type == "CreateFile": + if runtimeContext.secret_store.has_secret(p.resolved): + host_outdir = self.outdir + container_outdir = self.builder.outdir + host_outdir_tgt = p.target + if p.target.startswith(container_outdir + "/"): + host_outdir_tgt = os.path.join( + host_outdir, p.target[len(container_outdir) + 1 :] + ) + os.remove(host_outdir_tgt) + + if runtimeContext.workflow_eval_lock is None: + raise WorkflowException("runtimeContext.workflow_eval_lock must not be None") + + if self.output_callback: + with runtimeContext.workflow_eval_lock: + self.output_callback(outputs, processStatus) + + if runtimeContext.rm_tmpdir and self.stagedir is not None and os.path.exists(self.stagedir): + _logger.debug( + "[job %s] Removing input staging directory %s", + self.name, + self.stagedir, + ) + shutil.rmtree(self.stagedir, True) - if runtimeContext.rm_tmpdir: - _logger.debug("[job %s] Removing temporary directory %s", self.name, self.tmpdir) - shutil.rmtree(self.tmpdir, True) + if runtimeContext.rm_tmpdir: + _logger.debug("[job %s] Removing temporary directory %s", self.name, self.tmpdir) + shutil.rmtree(self.tmpdir, True) @abstractmethod def _required_env(self) -> dict[str, str]: diff --git a/cwltool/workflow.py b/cwltool/workflow.py index 3bf32251f..853faa31a 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -401,12 +401,13 @@ def receive_output( processStatus: str, ) -> None: output = {} - for i in self.tool["outputs"]: - field = shortname(i["id"]) - if field in jobout: - output[i["id"]] = jobout[field] - else: - processStatus = "permanentFail" + if processStatus != "killed": + for i in self.tool["outputs"]: + field = shortname(i["id"]) + if field in jobout: + output[i["id"]] = jobout[field] + else: + processStatus = "permanentFail" output_callback(output, processStatus) def job( diff --git a/cwltool/workflow_job.py b/cwltool/workflow_job.py index 6cd0b2e7c..4ac0ea7d1 100644 --- a/cwltool/workflow_job.py +++ b/cwltool/workflow_job.py @@ -548,24 +548,25 @@ def receive_output( jobout: CWLObjectType, processStatus: str, ) -> None: - for i in outputparms: - if "id" in i: - iid = cast(str, i["id"]) - if iid in jobout: - self.state[iid] = WorkflowStateItem(i, jobout[iid], processStatus) - else: - _logger.error("[%s] Output is missing expected field %s", step.name, iid) - processStatus = "permanentFail" if _logger.isEnabledFor(logging.DEBUG): _logger.debug("[%s] produced output %s", step.name, json_dumps(jobout, indent=4)) + if processStatus != "killed": + for i in outputparms: + if "id" in i: + iid = cast(str, i["id"]) + if iid in jobout: + self.state[iid] = WorkflowStateItem(i, jobout[iid], processStatus) + else: + _logger.error("[%s] Output is missing expected field %s", step.name, iid) + processStatus = "permanentFail" - if processStatus not in ("success", "skipped"): - if self.processStatus != "permanentFail": - self.processStatus = processStatus + if processStatus not in ("success", "skipped"): + if self.processStatus != "permanentFail": + self.processStatus = processStatus - _logger.warning("[%s] completed %s", step.name, processStatus) - else: - _logger.info("[%s] completed %s", step.name, processStatus) + _logger.warning("[%s] completed %s", step.name, processStatus) + else: + _logger.info("[%s] completed %s", step.name, processStatus) step.completed = True # Release the iterable related to this step to From a1ab910f41bfdefee1825b224ec3a0d68474d927 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Tue, 6 Aug 2024 19:11:51 -0700 Subject: [PATCH 09/29] Adding a unit test. Two outcomes are measured: 1) Once a job has been terminated, all other parallel jobs should also terminate. In this test, the runtime of the workflow indicates whether the kill switch has been handled correctly. If the kill switch is successful then the workflow's runtime should be significantly shorter than sleep_time. 2) Outputs produced by a successful step should still be collected. In this case, the completed step is make_array. To be frank, this test could be simplified by using a ToolTimeLimit requirement rather than process_roulette.cwl --- tests/process_roulette.cwl | 35 ++++++++++++++ tests/test_parallel.py | 23 +++++++++- tests/wf/on-error_kill.cwl | 93 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 tests/process_roulette.cwl create mode 100644 tests/wf/on-error_kill.cwl diff --git a/tests/process_roulette.cwl b/tests/process_roulette.cwl new file mode 100644 index 000000000..e1b02ff31 --- /dev/null +++ b/tests/process_roulette.cwl @@ -0,0 +1,35 @@ +#!/usr/bin/env cwl-runner + +cwlVersion: v1.2 +class: CommandLineTool + + +doc: | + This tool selects a random process whose associated command matches + search_str, terminates it, and reports the PID of the terminated process. + The search_str supports regex. Example search_strs: + - "sleep" + - "sleep 33" + - "sleep [0-9]+" + + +baseCommand: [ 'bash', '-c' ] +arguments: + - | + sleep $(inputs.delay) + pid=\$(ps -ef | grep '$(inputs.search_str)' | grep -v grep | awk '{print $2}' | shuf | head -n 1) + echo "$pid" | tee >(xargs kill -SIGTERM) +inputs: + search_str: + type: string + delay: + type: int? + default: 3 +stdout: "pid.txt" +outputs: + pid: + type: string + outputBinding: + glob: pid.txt + loadContents: true + outputEval: $(self[0].contents) \ No newline at end of file diff --git a/tests/test_parallel.py b/tests/test_parallel.py index 8c86d41fc..0cfb22784 100644 --- a/tests/test_parallel.py +++ b/tests/test_parallel.py @@ -1,9 +1,10 @@ import json +import time from pathlib import Path from cwltool.context import RuntimeContext from cwltool.executors import MultithreadedJobExecutor -from cwltool.factory import Factory +from cwltool.factory import Factory, WorkflowStatus from .util import get_data, needs_docker @@ -29,3 +30,23 @@ def test_scattered_workflow() -> None: echo = factory.make(get_data(test_file)) with open(get_data(job_file)) as job: assert echo(**json.load(job)) == {"out": ["foo one three", "foo two four"]} + + +def test_on_error_kill() -> None: + test_file = "tests/wf/on-error_kill.cwl" + runtime_context = RuntimeContext() + runtime_context.on_error = "kill" + factory = Factory(MultithreadedJobExecutor(), None, runtime_context) + ks_test = factory.make(get_data(test_file)) + + # arbitrary test values + sleep_time = 33 # a "sufficiently large" timeout + n_sleepers = 5 + + try: + start_time = time.time() + ks_test(sleep_time=sleep_time) + except WorkflowStatus as e: + assert e.out == {"instructed_sleep_times": [sleep_time] * n_sleepers} + assert time.time() - start_time < sleep_time + print("sharty barty") diff --git a/tests/wf/on-error_kill.cwl b/tests/wf/on-error_kill.cwl new file mode 100644 index 000000000..0908614b7 --- /dev/null +++ b/tests/wf/on-error_kill.cwl @@ -0,0 +1,93 @@ +#!/usr/bin/env cwl-runner + +cwlVersion: v1.2 +class: Workflow +requirements: + ScatterFeatureRequirement: {} + InlineJavascriptRequirement: {} + StepInputExpressionRequirement: {} + + +doc: | + This workflow tests the optional argument --on-error kill. + MultithreadedJobExecutor() or --parallel should be used. + A successful run should: + 1) Finish in (much) less than sleep_time seconds. + 2) Return outputs produced by successful steps. + + +inputs: + sleep_time: { type: int, default: 33 } + n_sleepers: { type: int?, default: 5 } + + +steps: + make_array: + doc: | + This step produces an array of sleep_time values to be used + as inputs for the scatter_step. The array also serves as the + workflow output which should be collected despite the + kill switch triggered in the kill step below. + in: { sleep_time: sleep_time, n_sleepers: n_sleepers } + out: [ times ] + run: + class: ExpressionTool + inputs: + sleep_time: { type: int } + n_sleepers: { type: int } + outputs: { times: { type: "int[]" } } + expression: | + ${ return {"times": Array(inputs.n_sleepers).fill(inputs.sleep_time)} } + + scatter_step: + doc: | + This step starts several parallel jobs that each sleep for + sleep_time seconds. + in: + time: make_array/times + scatter: time + out: [ ] + run: + class: CommandLineTool + baseCommand: sleep + inputs: + time: { type: int, inputBinding: { position: 1 } } + outputs: { } + + kill: + doc: | + This step waits a few seconds and selects a random scatter_step job to kill. + When `--on-error kill` is used, the runner should respond by terminating all + remaining jobs and exiting. This means the workflow's overall runtime should be + much less than max(sleep_time). The input force_upstream_order ensures that + this step runs after make_array, and therefore roughly parallel to scatter_step. + in: + force_upstream_order: make_array/times + sleep_time: sleep_time + search_str: + valueFrom: $("sleep " + inputs.sleep_time) + out: [ pid ] + run: ../process_roulette.cwl + + dangling_step: + doc: | + This step should never run. It confirms that additional jobs aren't + submitted and allowed to run to completion after the kill switch has + been set. The input force_downstream_order ensures that this step runs + after the kill step. + in: + force_downstream_order: kill/pid + time: sleep_time + out: [ ] + run: + class: CommandLineTool + baseCommand: sleep + inputs: + time: { type: int, inputBinding: { position: 1 } } + outputs: { } + + +outputs: + instructed_sleep_times: + type: int[] + outputSource: make_array/times From 5924cfe87589a4d59e6f50e64e223e4850ae5a89 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Tue, 6 Aug 2024 19:18:15 -0700 Subject: [PATCH 10/29] Formatting corrections provided by `make cleanup` which are relevant to this issue. Other changes were offered by the tool, but they are outside the scope of this issue. --- cwltool/task_queue.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/cwltool/task_queue.py b/cwltool/task_queue.py index b10b68940..cbb0fa78a 100644 --- a/cwltool/task_queue.py +++ b/cwltool/task_queue.py @@ -7,8 +7,8 @@ import threading from typing import Callable, Optional -from .errors import WorkflowKillSwitch from .context import RuntimeContext +from .errors import WorkflowKillSwitch from .loghandler import _logger @@ -103,10 +103,7 @@ def add( try: if unlock is not None: unlock.release() - if ( - (check_done is not None and check_done.is_set()) - or self.kill_switch.is_set() - ): + if (check_done is not None and check_done.is_set()) or self.kill_switch.is_set(): with self.lock: self.in_flight -= 1 return From 1bee55f6652a846c03a4bb932a5bd254ec532c23 Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Wed, 7 Aug 2024 12:54:07 +0200 Subject: [PATCH 11/29] cleanups --- cwltool/job.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cwltool/job.py b/cwltool/job.py index a8f012612..ac5507a7e 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -447,7 +447,11 @@ def stderr_stdout_log_path( with runtimeContext.workflow_eval_lock: self.output_callback(outputs, processStatus) - if runtimeContext.rm_tmpdir and self.stagedir is not None and os.path.exists(self.stagedir): + if ( + runtimeContext.rm_tmpdir + and self.stagedir is not None + and os.path.exists(self.stagedir) + ): _logger.debug( "[job %s] Removing input staging directory %s", self.name, @@ -875,7 +879,6 @@ def docker_monitor( kill_switch: threading.Event, ) -> None: """Record memory usage of the running Docker container. Terminate if kill_switch is activated.""" - ks_tm: "Optional[Timer]" = None cid: Optional[str] = None From 7eb2c0dc4e0d6a364053c604118b0092b6439bcf Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Wed, 7 Aug 2024 13:05:35 +0200 Subject: [PATCH 12/29] DO NOT MERGE, let all tests run --- .github/workflows/ci-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-tests.yml b/.github/workflows/ci-tests.yml index e5d9a7e83..95d039522 100644 --- a/.github/workflows/ci-tests.yml +++ b/.github/workflows/ci-tests.yml @@ -9,7 +9,7 @@ on: concurrency: group: build-${{ github.event.pull_request.number || github.ref }} - cancel-in-progress: true + #cancel-in-progress: true env: TOX_SKIP_MISSING_INTERPRETERS: False From 03cde296ac4352fed27b1c774eda80603e5d805b Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Mon, 21 Oct 2024 17:14:13 -0700 Subject: [PATCH 13/29] Removing irrelevant debug line from test body --- tests/test_parallel.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_parallel.py b/tests/test_parallel.py index 0cfb22784..1930a6d4c 100644 --- a/tests/test_parallel.py +++ b/tests/test_parallel.py @@ -49,4 +49,3 @@ def test_on_error_kill() -> None: except WorkflowStatus as e: assert e.out == {"instructed_sleep_times": [sleep_time] * n_sleepers} assert time.time() - start_time < sleep_time - print("sharty barty") From 2b9d76a7b293bf68641e7edb49fcba2c4c4d771e Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Mon, 21 Oct 2024 18:01:10 -0700 Subject: [PATCH 14/29] Removing CPU resource constraints for the test. My intention is to have MultithreadedJobExecutor ignore allocated resources when deciding whether to run the next parallel job. The steps in this workflow aren't resource intensive, and delaying their execution on this basis will cause the test to fail. --- tests/wf/on-error_kill.cwl | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/wf/on-error_kill.cwl b/tests/wf/on-error_kill.cwl index 0908614b7..0e54f48e5 100644 --- a/tests/wf/on-error_kill.cwl +++ b/tests/wf/on-error_kill.cwl @@ -6,6 +6,7 @@ requirements: ScatterFeatureRequirement: {} InlineJavascriptRequirement: {} StepInputExpressionRequirement: {} + ResourceRequirement: {coresMin: 0, ramMin: 0} doc: | From 36f12cf85242ff8b9b76773079e2d7e1d99ce9dd Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Tue, 22 Oct 2024 22:03:38 -0700 Subject: [PATCH 15/29] Using a different approach to remove the "one job per core" resource constraint. The current ResourceRequirement implementation doesn't allow {coresMin: 0}. However, this can still be achieved with a custom RuntimeContext.select_resources() --- tests/test_parallel.py | 12 ++++++++++++ tests/wf/on-error_kill.cwl | 1 - 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/test_parallel.py b/tests/test_parallel.py index 1930a6d4c..1c554f350 100644 --- a/tests/test_parallel.py +++ b/tests/test_parallel.py @@ -1,5 +1,6 @@ import json import time +import math from pathlib import Path from cwltool.context import RuntimeContext @@ -34,8 +35,19 @@ def test_scattered_workflow() -> None: def test_on_error_kill() -> None: test_file = "tests/wf/on-error_kill.cwl" + def selectResources(request, _): + # Remove the "one job per core" resource constraint so that + # parallel jobs aren't withheld on machines with few cores + return { + "cores": 0, + "ram": math.ceil(request["ramMin"]), # default + "tmpdirSize": math.ceil(request["tmpdirMin"]), # default + "outdirSize": math.ceil(request["outdirMin"]), # default + } + runtime_context = RuntimeContext() runtime_context.on_error = "kill" + runtime_context.select_resources = selectResources factory = Factory(MultithreadedJobExecutor(), None, runtime_context) ks_test = factory.make(get_data(test_file)) diff --git a/tests/wf/on-error_kill.cwl b/tests/wf/on-error_kill.cwl index 0e54f48e5..0908614b7 100644 --- a/tests/wf/on-error_kill.cwl +++ b/tests/wf/on-error_kill.cwl @@ -6,7 +6,6 @@ requirements: ScatterFeatureRequirement: {} InlineJavascriptRequirement: {} StepInputExpressionRequirement: {} - ResourceRequirement: {coresMin: 0, ramMin: 0} doc: | From 791fe7bbc7d70ff4429f98788c8c8748396f44f8 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Wed, 23 Oct 2024 09:41:51 -0700 Subject: [PATCH 16/29] Adding suggested changes to the --on-error help string per @mr-c. Formatting compliance updates in test_parallel.py --- cwltool/argparser.py | 2 +- tests/test_parallel.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cwltool/argparser.py b/cwltool/argparser.py index 9eea2ff5d..2638b04f9 100644 --- a/cwltool/argparser.py +++ b/cwltool/argparser.py @@ -597,7 +597,7 @@ def arg_parser() -> argparse.ArgumentParser: "--on-error", help="Desired workflow behavior when a step fails. One of 'stop' (do " "not submit any more steps), 'continue' (may submit other steps that " - "are not downstream from the error), or kill (same as stop, but also " + "are not downstream from the error), or 'kill' (same as 'stop', but also " "terminates running jobs in the active step(s)). Default is 'stop'.", default="stop", choices=("stop", "continue", "kill"), diff --git a/tests/test_parallel.py b/tests/test_parallel.py index 1c554f350..350b2f0ea 100644 --- a/tests/test_parallel.py +++ b/tests/test_parallel.py @@ -1,6 +1,6 @@ import json -import time import math +import time from pathlib import Path from cwltool.context import RuntimeContext @@ -35,12 +35,13 @@ def test_scattered_workflow() -> None: def test_on_error_kill() -> None: test_file = "tests/wf/on-error_kill.cwl" + def selectResources(request, _): # Remove the "one job per core" resource constraint so that # parallel jobs aren't withheld on machines with few cores return { "cores": 0, - "ram": math.ceil(request["ramMin"]), # default + "ram": math.ceil(request["ramMin"]), # default "tmpdirSize": math.ceil(request["tmpdirMin"]), # default "outdirSize": math.ceil(request["outdirMin"]), # default } From 93948fbd0185fa7e8b03aa73e92ca897e3dc2165 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Wed, 23 Oct 2024 16:36:23 -0700 Subject: [PATCH 17/29] Missed running `make mypy` before pushing last commit. Opting to disable type checking on selectResources() because it's just an implementation detail for the test --- tests/test_parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_parallel.py b/tests/test_parallel.py index 350b2f0ea..37f32ad2d 100644 --- a/tests/test_parallel.py +++ b/tests/test_parallel.py @@ -36,7 +36,7 @@ def test_scattered_workflow() -> None: def test_on_error_kill() -> None: test_file = "tests/wf/on-error_kill.cwl" - def selectResources(request, _): + def selectResources(request, _): # type: ignore # Remove the "one job per core" resource constraint so that # parallel jobs aren't withheld on machines with few cores return { From d938bf8b8139ce0ae37af04af89c65cede85723c Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Thu, 24 Oct 2024 11:04:41 +0200 Subject: [PATCH 18/29] don't cancel jobs; typecheck the test just in case --- .github/workflows/ci-tests.yml | 2 +- tests/test_parallel.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci-tests.yml b/.github/workflows/ci-tests.yml index 95d039522..deca97bbd 100644 --- a/.github/workflows/ci-tests.yml +++ b/.github/workflows/ci-tests.yml @@ -9,7 +9,7 @@ on: concurrency: group: build-${{ github.event.pull_request.number || github.ref }} - #cancel-in-progress: true + cancel-in-progress: false env: TOX_SKIP_MISSING_INTERPRETERS: False diff --git a/tests/test_parallel.py b/tests/test_parallel.py index 37f32ad2d..95b9189e1 100644 --- a/tests/test_parallel.py +++ b/tests/test_parallel.py @@ -2,6 +2,7 @@ import math import time from pathlib import Path +from typing import Union from cwltool.context import RuntimeContext from cwltool.executors import MultithreadedJobExecutor @@ -36,7 +37,9 @@ def test_scattered_workflow() -> None: def test_on_error_kill() -> None: test_file = "tests/wf/on-error_kill.cwl" - def selectResources(request, _): # type: ignore + def selectResources( + request: dict[str, Union[int, float]], _: RuntimeContext + ) -> dict[str, Union[int, float]]: # Remove the "one job per core" resource constraint so that # parallel jobs aren't withheld on machines with few cores return { From 5d92d2d4f3dbe8e1da4e86d7ea77b68c3b4b15d5 Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Thu, 24 Oct 2024 11:13:17 +0200 Subject: [PATCH 19/29] test_on_error_kill: allow for some longer runtime due to busy systems --- tests/test_parallel.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_parallel.py b/tests/test_parallel.py index 95b9189e1..45e7ddfb2 100644 --- a/tests/test_parallel.py +++ b/tests/test_parallel.py @@ -63,5 +63,6 @@ def selectResources( start_time = time.time() ks_test(sleep_time=sleep_time) except WorkflowStatus as e: + end_time = time.time() assert e.out == {"instructed_sleep_times": [sleep_time] * n_sleepers} - assert time.time() - start_time < sleep_time + assert end_time - start_time < (sleep_time + 4) From fe53fb82f6dcc1a344f285fcaaec26f1d4f33007 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Mon, 11 Nov 2024 19:14:34 -0800 Subject: [PATCH 20/29] Fixing ReceiveScatterOutput to allow outputs to be collected from successful steps when 1) these steps are upstream from a scattered subworkflow, 2) the workflow kill switch is activated by one of the scatter jobs, and 3) on_error==kill --- cwltool/workflow_job.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/cwltool/workflow_job.py b/cwltool/workflow_job.py index 4ac0ea7d1..387d50e0a 100644 --- a/cwltool/workflow_job.py +++ b/cwltool/workflow_job.py @@ -89,7 +89,7 @@ def completed(self) -> int: """The number of completed internal jobs.""" return len(self._completed) - def receive_scatter_output(self, index: int, jobout: CWLObjectType, processStatus: str) -> None: + def receive_scatter_output(self, index: int, runtimeContext: RuntimeContext, jobout: CWLObjectType, processStatus: str) -> None: """Record the results of a scatter operation.""" for key, val in jobout.items(): self.dest[key][index] = val @@ -102,6 +102,8 @@ def receive_scatter_output(self, index: int, jobout: CWLObjectType, processStatu if processStatus != "success": if self.processStatus != "permanentFail": self.processStatus = processStatus + if runtimeContext.on_error == "kill": + self.output_callback(self.dest, self.processStatus) if index not in self._completed: self._completed.add(index) @@ -156,7 +158,7 @@ def parallel_steps( except WorkflowException as exc: _logger.error("Cannot make scatter job: %s", str(exc)) _logger.debug("", exc_info=True) - rc.receive_scatter_output(index, {}, "permanentFail") + rc.receive_scatter_output(index, runtimeContext, {}, "permanentFail") if not made_progress and rc.completed < rc.total: yield None @@ -185,7 +187,7 @@ def nested_crossproduct_scatter( if len(scatter_keys) == 1: if runtimeContext.postScatterEval is not None: sjob = runtimeContext.postScatterEval(sjob) - curriedcallback = functools.partial(rc.receive_scatter_output, index) + curriedcallback = functools.partial(rc.receive_scatter_output, index, runtimeContext) if sjob is not None: steps.append(process.job(sjob, curriedcallback, runtimeContext)) else: @@ -197,7 +199,7 @@ def nested_crossproduct_scatter( process, sjob, scatter_keys[1:], - functools.partial(rc.receive_scatter_output, index), + functools.partial(rc.receive_scatter_output, index, runtimeContext), runtimeContext, ) ) @@ -257,7 +259,7 @@ def _flat_crossproduct_scatter( if len(scatter_keys) == 1: if runtimeContext.postScatterEval is not None: sjob = runtimeContext.postScatterEval(sjob) - curriedcallback = functools.partial(callback.receive_scatter_output, put) + curriedcallback = functools.partial(callback.receive_scatter_output, put, runtimeContext) if sjob is not None: steps.append(process.job(sjob, curriedcallback, runtimeContext)) else: @@ -307,7 +309,7 @@ def dotproduct_scatter( if runtimeContext.postScatterEval is not None: sjobo = runtimeContext.postScatterEval(sjobo) - curriedcallback = functools.partial(rc.receive_scatter_output, index) + curriedcallback = functools.partial(rc.receive_scatter_output, index, runtimeContext) if sjobo is not None: steps.append(process.job(sjobo, curriedcallback, runtimeContext)) else: From b639dfae89456dc3674a6ccd3d44af1516faa4d0 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Mon, 11 Nov 2024 19:26:22 -0800 Subject: [PATCH 21/29] Simplifying value checks for RuntimeContext.on_error in parallel_steps() and WorkflowJob.job(). There isn't a need to use getdefault() when querying the value because a default is already set when RuntimeContext is constructed. The checked condition additionally applies to on_error==kill, so the logic can be simplified to on_error!=continue. --- cwltool/workflow_job.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/cwltool/workflow_job.py b/cwltool/workflow_job.py index 387d50e0a..df74d8f2e 100644 --- a/cwltool/workflow_job.py +++ b/cwltool/workflow_job.py @@ -135,7 +135,7 @@ def parallel_steps( while rc.completed < rc.total: made_progress = False for index, step in enumerate(steps): - if getdefault(runtimeContext.on_error, "stop") == "stop" and rc.processStatus not in ( + if runtimeContext.on_error != "continue" and rc.processStatus not in ( "success", "skipped", ): @@ -144,9 +144,7 @@ def parallel_steps( continue try: for j in step: - if getdefault( - runtimeContext.on_error, "stop" - ) == "stop" and rc.processStatus not in ("success", "skipped"): + if runtimeContext.on_error != "continue" and rc.processStatus not in ("success", "skipped"): break if j is not None: made_progress = True @@ -808,7 +806,7 @@ def job( for step in self.steps: if ( - getdefault(runtimeContext.on_error, "stop") == "stop" + runtimeContext.on_error != "continue" and self.processStatus != "success" ): break @@ -825,7 +823,7 @@ def job( try: for newjob in step.iterable: if ( - getdefault(runtimeContext.on_error, "stop") == "stop" + runtimeContext.on_error != "continue" and self.processStatus != "success" ): break From 8e0b6f2652c8439efb2fa039058be094cf392fd6 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Mon, 11 Nov 2024 19:31:52 -0800 Subject: [PATCH 22/29] Adding __repr__ to WorkflowStep, WorkflowJob, and WorkflowJobStep classes. This can be VERY helpful while debugging, particularly when unraveling callback chains. --- cwltool/workflow.py | 4 ++++ cwltool/workflow_job.py | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/cwltool/workflow.py b/cwltool/workflow.py index 853faa31a..d4132f3d0 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -1,3 +1,4 @@ +import os import copy import datetime import functools @@ -452,3 +453,6 @@ def job( def visit(self, op: Callable[[CommentedMap], None]) -> None: self.embedded_tool.visit(op) + + def __repr__(self): + return f"<{self.__class__.__name__} [{os.path.basename(self.id)}]>" diff --git a/cwltool/workflow_job.py b/cwltool/workflow_job.py index df74d8f2e..6eeed125d 100644 --- a/cwltool/workflow_job.py +++ b/cwltool/workflow_job.py @@ -66,6 +66,9 @@ def job( yield from self.step.job(joborder, output_callback, runtimeContext) + def __repr__(self): + return f"<{self.__class__.__name__} [{self.name}]>" + class ReceiveScatterOutput: """Produced by the scatter generators.""" @@ -851,6 +854,8 @@ def job( # depends which one comes first. All steps are completed # or all outputs have been produced. + def __repr__(self): + return f"<{self.__class__.__name__} [{self.name}]>" class WorkflowJobLoopStep: """Generated for each step in Workflow.steps() containing a `loop` directive.""" From 32b22d84c425eeff96ac71f620aafc10891a04e4 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Mon, 11 Nov 2024 19:52:51 -0800 Subject: [PATCH 23/29] It is critical for WorkflowJob.processStatus to update in WorkflowJob.receive_output(). Otherwise, MultithreadedJobExecutor.run_jobs() will likely stop iterating over the topmost WorkflowJob.job() before WorkflowJob.do_output_callback() is called to deliver the final workflow outputs. --- cwltool/workflow_job.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cwltool/workflow_job.py b/cwltool/workflow_job.py index 6eeed125d..5d7ffc8e7 100644 --- a/cwltool/workflow_job.py +++ b/cwltool/workflow_job.py @@ -563,13 +563,13 @@ def receive_output( _logger.error("[%s] Output is missing expected field %s", step.name, iid) processStatus = "permanentFail" - if processStatus not in ("success", "skipped"): - if self.processStatus != "permanentFail": - self.processStatus = processStatus + if processStatus not in ("success", "skipped"): + if self.processStatus != "permanentFail": + self.processStatus = processStatus - _logger.warning("[%s] completed %s", step.name, processStatus) - else: - _logger.info("[%s] completed %s", step.name, processStatus) + _logger.warning("[%s] completed %s", step.name, processStatus) + else: + _logger.info("[%s] completed %s", step.name, processStatus) step.completed = True # Release the iterable related to this step to From 6485cff95b303f88c817308b8cdc34859646d5f8 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Mon, 11 Nov 2024 20:18:19 -0800 Subject: [PATCH 24/29] Simplifying the approach in on-error_kill.cwl. It now activates the kill switch via a ToolTimeLimit requirement. It also uses a much longer timeout which will hopefully be sufficient for the CI server when it is congested. --- tests/test_parallel.py | 15 ++++--- tests/wf/on-error_kill.cwl | 81 +++++++++++++++++--------------------- 2 files changed, 47 insertions(+), 49 deletions(-) diff --git a/tests/test_parallel.py b/tests/test_parallel.py index 45e7ddfb2..17d27c9eb 100644 --- a/tests/test_parallel.py +++ b/tests/test_parallel.py @@ -56,13 +56,18 @@ def selectResources( ks_test = factory.make(get_data(test_file)) # arbitrary test values - sleep_time = 33 # a "sufficiently large" timeout - n_sleepers = 5 + sleep_time = 3333 # a "sufficiently large" timeout + n_sleepers = 4 + start_time = 0 try: start_time = time.time() - ks_test(sleep_time=sleep_time) + ks_test( + sleep_time=sleep_time, + n_sleepers=n_sleepers, + ) except WorkflowStatus as e: end_time = time.time() - assert e.out == {"instructed_sleep_times": [sleep_time] * n_sleepers} - assert end_time - start_time < (sleep_time + 4) + output = e.out["roulette_mask"] + assert len(output) == n_sleepers and sum(output) == 1 + assert end_time - start_time < sleep_time \ No newline at end of file diff --git a/tests/wf/on-error_kill.cwl b/tests/wf/on-error_kill.cwl index 0908614b7..82846a9de 100644 --- a/tests/wf/on-error_kill.cwl +++ b/tests/wf/on-error_kill.cwl @@ -17,77 +17,70 @@ doc: | inputs: - sleep_time: { type: int, default: 33 } - n_sleepers: { type: int?, default: 5 } + sleep_time: {type: int, default: 3333} + n_sleepers: {type: int, default: 4} steps: - make_array: + roulette: doc: | - This step produces an array of sleep_time values to be used - as inputs for the scatter_step. The array also serves as the - workflow output which should be collected despite the - kill switch triggered in the kill step below. - in: { sleep_time: sleep_time, n_sleepers: n_sleepers } - out: [ times ] + This step produces a boolean array with exactly one true value + whose index is assigned at random. + in: {n_sleepers: n_sleepers} + out: [mask] run: class: ExpressionTool - inputs: - sleep_time: { type: int } - n_sleepers: { type: int } - outputs: { times: { type: "int[]" } } + inputs: {n_sleepers: {type: int}} + outputs: {mask: {type: "boolean[]"}} expression: | - ${ return {"times": Array(inputs.n_sleepers).fill(inputs.sleep_time)} } + ${ + var mask = Array(inputs.n_sleepers).fill(false); + var spin = Math.floor(Math.random() * inputs.n_sleepers); + mask[spin] = true; + return {"mask": mask} + } scatter_step: doc: | This step starts several parallel jobs that each sleep for - sleep_time seconds. + sleep_time seconds. The job whose k_mask value is true will + self-terminate early, thereby activating the kill switch. in: - time: make_array/times - scatter: time - out: [ ] + time: sleep_time + k_mask: roulette/mask + scatter: k_mask + out: [placeholder] run: class: CommandLineTool + requirements: + ToolTimeLimit: + timelimit: '${return inputs.k_mask ? 5 : inputs.time + 5}' # 5 is an arbitrary value baseCommand: sleep inputs: - time: { type: int, inputBinding: { position: 1 } } - outputs: { } - - kill: - doc: | - This step waits a few seconds and selects a random scatter_step job to kill. - When `--on-error kill` is used, the runner should respond by terminating all - remaining jobs and exiting. This means the workflow's overall runtime should be - much less than max(sleep_time). The input force_upstream_order ensures that - this step runs after make_array, and therefore roughly parallel to scatter_step. - in: - force_upstream_order: make_array/times - sleep_time: sleep_time - search_str: - valueFrom: $("sleep " + inputs.sleep_time) - out: [ pid ] - run: ../process_roulette.cwl + time: {type: int, inputBinding: {position: 1}} + k_mask: {type: boolean} + outputs: + placeholder: {type: string, outputBinding: {outputEval: $("foo")}} dangling_step: doc: | This step should never run. It confirms that additional jobs aren't submitted and allowed to run to completion after the kill switch has - been set. The input force_downstream_order ensures that this step runs - after the kill step. + been set. The input force_downstream_order ensures that this step + doesn't run before scatter_step completes. in: - force_downstream_order: kill/pid + force_downstream_order: scatter_step/placeholder time: sleep_time - out: [ ] + out: [] run: class: CommandLineTool baseCommand: sleep inputs: - time: { type: int, inputBinding: { position: 1 } } - outputs: { } + time: {type: int, inputBinding: {position: 1}} + outputs: {} outputs: - instructed_sleep_times: - type: int[] - outputSource: make_array/times + roulette_mask: + type: boolean[] + outputSource: roulette/mask From beb6af72ae9ee3f8495302b738c0f20e16b7d148 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Mon, 11 Nov 2024 21:03:05 -0800 Subject: [PATCH 25/29] Reformatting and type hint updates --- cwltool/executors.py | 4 +++- cwltool/workflow.py | 4 ++-- cwltool/workflow_job.py | 22 ++++++++++++++-------- tests/test_parallel.py | 8 ++++---- 4 files changed, 23 insertions(+), 15 deletions(-) diff --git a/cwltool/executors.py b/cwltool/executors.py index 08c4593be..33db1b8a1 100644 --- a/cwltool/executors.py +++ b/cwltool/executors.py @@ -439,7 +439,9 @@ def run_jobs( logger: logging.Logger, runtime_context: RuntimeContext, ) -> None: - self.taskqueue: TaskQueue = TaskQueue(threading.Lock(), int(math.ceil(self.max_cores)), runtime_context) + self.taskqueue: TaskQueue = TaskQueue( + threading.Lock(), int(math.ceil(self.max_cores)), runtime_context + ) try: jobiter = process.job(job_order_object, self.output_callback, runtime_context) diff --git a/cwltool/workflow.py b/cwltool/workflow.py index d4132f3d0..40d94a8ed 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -1,8 +1,8 @@ -import os import copy import datetime import functools import logging +import os import random from collections.abc import Mapping, MutableMapping, MutableSequence from typing import Callable, Optional, cast @@ -454,5 +454,5 @@ def job( def visit(self, op: Callable[[CommentedMap], None]) -> None: self.embedded_tool.visit(op) - def __repr__(self): + def __repr__(self) -> str: return f"<{self.__class__.__name__} [{os.path.basename(self.id)}]>" diff --git a/cwltool/workflow_job.py b/cwltool/workflow_job.py index 5d7ffc8e7..d09d5731c 100644 --- a/cwltool/workflow_job.py +++ b/cwltool/workflow_job.py @@ -66,7 +66,7 @@ def job( yield from self.step.job(joborder, output_callback, runtimeContext) - def __repr__(self): + def __repr__(self) -> str: return f"<{self.__class__.__name__} [{self.name}]>" @@ -92,7 +92,9 @@ def completed(self) -> int: """The number of completed internal jobs.""" return len(self._completed) - def receive_scatter_output(self, index: int, runtimeContext: RuntimeContext, jobout: CWLObjectType, processStatus: str) -> None: + def receive_scatter_output( + self, index: int, runtimeContext: RuntimeContext, jobout: CWLObjectType, processStatus: str + ) -> None: """Record the results of a scatter operation.""" for key, val in jobout.items(): self.dest[key][index] = val @@ -147,7 +149,10 @@ def parallel_steps( continue try: for j in step: - if runtimeContext.on_error != "continue" and rc.processStatus not in ("success", "skipped"): + if runtimeContext.on_error != "continue" and rc.processStatus not in ( + "success", + "skipped", + ): break if j is not None: made_progress = True @@ -260,7 +265,9 @@ def _flat_crossproduct_scatter( if len(scatter_keys) == 1: if runtimeContext.postScatterEval is not None: sjob = runtimeContext.postScatterEval(sjob) - curriedcallback = functools.partial(callback.receive_scatter_output, put, runtimeContext) + curriedcallback = functools.partial( + callback.receive_scatter_output, put, runtimeContext + ) if sjob is not None: steps.append(process.job(sjob, curriedcallback, runtimeContext)) else: @@ -808,10 +815,7 @@ def job( self.made_progress = False for step in self.steps: - if ( - runtimeContext.on_error != "continue" - and self.processStatus != "success" - ): + if runtimeContext.on_error != "continue" and self.processStatus != "success": break if not step.submitted: @@ -855,8 +859,10 @@ def job( # or all outputs have been produced. def __repr__(self): + def __repr__(self) -> str: return f"<{self.__class__.__name__} [{self.name}]>" + class WorkflowJobLoopStep: """Generated for each step in Workflow.steps() containing a `loop` directive.""" diff --git a/tests/test_parallel.py b/tests/test_parallel.py index 17d27c9eb..93f735adf 100644 --- a/tests/test_parallel.py +++ b/tests/test_parallel.py @@ -2,7 +2,7 @@ import math import time from pathlib import Path -from typing import Union +from typing import Union, cast from cwltool.context import RuntimeContext from cwltool.executors import MultithreadedJobExecutor @@ -58,7 +58,7 @@ def selectResources( # arbitrary test values sleep_time = 3333 # a "sufficiently large" timeout n_sleepers = 4 - start_time = 0 + start_time = 0.0 try: start_time = time.time() @@ -68,6 +68,6 @@ def selectResources( ) except WorkflowStatus as e: end_time = time.time() - output = e.out["roulette_mask"] + output = cast(dict[str, list[bool]], e.out)["roulette_mask"] assert len(output) == n_sleepers and sum(output) == 1 - assert end_time - start_time < sleep_time \ No newline at end of file + assert end_time - start_time < sleep_time From 50f279374d51b80be197630017651729ffeaa1c6 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Mon, 11 Nov 2024 22:24:20 -0800 Subject: [PATCH 26/29] Formatting mistake fix --- cwltool/workflow_job.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cwltool/workflow_job.py b/cwltool/workflow_job.py index d09d5731c..a3d7c9b2a 100644 --- a/cwltool/workflow_job.py +++ b/cwltool/workflow_job.py @@ -858,7 +858,6 @@ def job( # depends which one comes first. All steps are completed # or all outputs have been produced. - def __repr__(self): def __repr__(self) -> str: return f"<{self.__class__.__name__} [{self.name}]>" From 20fd703d9bddbea32b606515bc0703a4c25cf736 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Tue, 12 Nov 2024 13:26:15 -0800 Subject: [PATCH 27/29] Adding instance ID to repr strings, improving readability of WorkflowStep's repr string, and adding docstrings. Also adding docstring to parallel_steps() because pydocstyle yelled about it. At first it also yelled about object_from_state() and now it doesn't, so... I guess we'll see what the CI run says because I'm not familiar enough with this function to write a docstring for it. --- cwltool/workflow.py | 9 ++++++++- cwltool/workflow_job.py | 7 +++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/cwltool/workflow.py b/cwltool/workflow.py index 40d94a8ed..8898a489b 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -455,4 +455,11 @@ def visit(self, op: Callable[[CommentedMap], None]) -> None: self.embedded_tool.visit(op) def __repr__(self) -> str: - return f"<{self.__class__.__name__} [{os.path.basename(self.id)}]>" + """Return a non-expression string representation of the object instance.""" + if "#" in self.id: + wf_file, step_id = self.id.rsplit("#", 1) + step_name = "#".join([os.path.basename(wf_file), step_id]) + else: + step_name = self.id + + return f"<{self.__class__.__name__} [{step_name}] at {hex(id(self))}>" diff --git a/cwltool/workflow_job.py b/cwltool/workflow_job.py index a3d7c9b2a..70df00277 100644 --- a/cwltool/workflow_job.py +++ b/cwltool/workflow_job.py @@ -67,7 +67,8 @@ def job( yield from self.step.job(joborder, output_callback, runtimeContext) def __repr__(self) -> str: - return f"<{self.__class__.__name__} [{self.name}]>" + """Return a non-expression string representation of the object instance.""" + return f"<{self.__class__.__name__} [{self.name}] at {hex(id(self))}>" class ReceiveScatterOutput: @@ -137,6 +138,7 @@ def parallel_steps( rc: ReceiveScatterOutput, runtimeContext: RuntimeContext, ) -> JobsGeneratorType: + """Yield scatter jobs (or None if there's no work to do) until all scatter jobs complete.""" while rc.completed < rc.total: made_progress = False for index, step in enumerate(steps): @@ -859,7 +861,8 @@ def job( # or all outputs have been produced. def __repr__(self) -> str: - return f"<{self.__class__.__name__} [{self.name}]>" + """Return a non-expression string representation of the object instance.""" + return f"<{self.__class__.__name__} [{self.name}] at {hex(id(self))}>" class WorkflowJobLoopStep: From e8f9284f146d2800da9c16ffc17f36db3474b501 Mon Sep 17 00:00:00 2001 From: Alex Tate <0xalextate@gmail.com> Date: Tue, 12 Nov 2024 13:28:15 -0800 Subject: [PATCH 28/29] Changing TaskQueue constructor to accept the kill_switch threading.Event as an argument rather than an entire RuntimeContext, per @mr-c --- cwltool/executors.py | 7 ++++++- cwltool/task_queue.py | 9 ++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cwltool/executors.py b/cwltool/executors.py index 33db1b8a1..2a0f69a9c 100644 --- a/cwltool/executors.py +++ b/cwltool/executors.py @@ -102,6 +102,7 @@ def check_for_abstract_op(tool: CWLObjectType) -> None: runtime_context.mutation_manager = MutationManager() runtime_context.toplevel = True runtime_context.workflow_eval_lock = threading.Condition(threading.RLock()) + runtime_context.kill_switch = threading.Event() job_reqs: Optional[list[CWLObjectType]] = None if "https://w3id.org/cwl/cwl#requirements" in job_order_object: @@ -439,9 +440,13 @@ def run_jobs( logger: logging.Logger, runtime_context: RuntimeContext, ) -> None: + if runtime_context.kill_switch is None: + runtime_context.kill_switch = threading.Event() + self.taskqueue: TaskQueue = TaskQueue( - threading.Lock(), int(math.ceil(self.max_cores)), runtime_context + threading.Lock(), int(math.ceil(self.max_cores)), runtime_context.kill_switch ) + try: jobiter = process.job(job_order_object, self.output_callback, runtime_context) diff --git a/cwltool/task_queue.py b/cwltool/task_queue.py index cbb0fa78a..7606cf369 100644 --- a/cwltool/task_queue.py +++ b/cwltool/task_queue.py @@ -7,7 +7,6 @@ import threading from typing import Callable, Optional -from .context import RuntimeContext from .errors import WorkflowKillSwitch from .loghandler import _logger @@ -35,7 +34,7 @@ class TaskQueue: in_flight: int = 0 """The number of tasks in the queue.""" - def __init__(self, lock: threading.Lock, thread_count: int, runtime_context: RuntimeContext): + def __init__(self, lock: threading.Lock, thread_count: int, kill_switch: threading.Event): """Create a new task queue using the specified lock and number of threads.""" self.thread_count = thread_count self.task_queue: queue.Queue[Optional[Callable[[], None]]] = queue.Queue( @@ -44,11 +43,7 @@ def __init__(self, lock: threading.Lock, thread_count: int, runtime_context: Run self.task_queue_threads = [] self.lock = lock self.error: Optional[BaseException] = None - - if runtime_context.kill_switch is None: - self.kill_switch = runtime_context.kill_switch = threading.Event() - else: - self.kill_switch = runtime_context.kill_switch + self.kill_switch = kill_switch for _r in range(0, self.thread_count): t = threading.Thread(target=self._task_queue_func) From f17f678c44d7f8f0811bc8e626f88b420923d411 Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Wed, 13 Nov 2024 18:56:46 +0100 Subject: [PATCH 29/29] codecov: don't fail CI if there is an error --- .github/workflows/ci-tests.yml | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/.github/workflows/ci-tests.yml b/.github/workflows/ci-tests.yml index deca97bbd..9fe83a8ce 100644 --- a/.github/workflows/ci-tests.yml +++ b/.github/workflows/ci-tests.yml @@ -9,7 +9,7 @@ on: concurrency: group: build-${{ github.event.pull_request.number || github.ref }} - cancel-in-progress: false + cancel-in-progress: true env: TOX_SKIP_MISSING_INTERPRETERS: False @@ -82,8 +82,6 @@ jobs: - name: Upload coverage to Codecov if: ${{ matrix.step == 'unit' }} uses: codecov/codecov-action@v4 - with: - fail_ci_if_error: true env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} @@ -218,8 +216,6 @@ jobs: **/cwltool_conf*.xml - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 - with: - fail_ci_if_error: true env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} release_test: @@ -303,7 +299,5 @@ jobs: run: tox - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 - with: - fail_ci_if_error: true env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}