Skip to content

Commit

Permalink
Propagation of the WorkflowKillSwitch exception stops once it reaches…
Browse files Browse the repository at this point in the history
… an executor. The workflow_eval_lock release had to be moved to the finally block in MultithreadedJobExecutor.run_jobs(). Otherwise, TaskQueue threads running MultithreadedJobExecutor._runner() will never join() because _runner() waits indefinitely for the workflow_eval_lock in its own finally block.
  • Loading branch information
AlexTate authored and mr-c committed Apr 20, 2024
1 parent c13e458 commit a518fff
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions cwltool/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from .context import RuntimeContext, getdefault
from .cuda import cuda_version_and_device_count
from .cwlprov.provenance_profile import ProvenanceProfile
from .errors import WorkflowException
from .errors import WorkflowException, WorkflowKillSwitch
from .job import JobBase
from .loghandler import _logger
from .mutation import MutationManager
Expand Down Expand Up @@ -260,6 +260,11 @@ def run_jobs(
WorkflowException,
): # pylint: disable=try-except-raise
raise
except WorkflowKillSwitch as err:
_logger.error(
f"Workflow kill switch activated by [job {err.job_id}] "
f"because on-error={runtime_context.on_error}"
)
except Exception as err:
logger.exception("Got workflow error")
raise WorkflowException(str(err)) from err
Expand Down Expand Up @@ -332,6 +337,11 @@ def _runner(
except WorkflowException as err:
_logger.exception(f"Got workflow error: {err}")
self.exceptions.append(err)
except WorkflowKillSwitch as err:
_logger.error(
f"Workflow kill switch activated by [job {err.job_id}] "
f"because on-error={runtime_context.on_error}"
)
except Exception as err: # pylint: disable=broad-except
_logger.exception(f"Got workflow error: {err}")
self.exceptions.append(WorkflowException(str(err)))
Expand Down Expand Up @@ -466,9 +476,8 @@ def run_jobs(
while self.taskqueue.in_flight > 0:
self.wait_for_next_completion(runtime_context)
self.run_job(None, runtime_context)

runtime_context.workflow_eval_lock.release()
finally:
runtime_context.workflow_eval_lock.release()
self.taskqueue.drain()
self.taskqueue.join()

Expand Down

0 comments on commit a518fff

Please sign in to comment.