From 454f5113b84849e5b18de0845ea9ab210d24d778 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Mon, 24 Jun 2024 15:24:25 -0400 Subject: [PATCH] Add toil debug-job tutorial (#4976) * Hook up container file dumping for expression-evaluation jobs * Keep leader logs and user streams even when a job fails * Detect and explain gRPC FUSE problems on Mac This should fix https://github.com/DataBiosphere/toil/issues/4542 * Note how Docker Desktop for Mac needs to be configured in the docs * Have a debugging workflow which fails * Add a debugging writeup for a hanging workflow where debug-job plausibly helps * Use the right or nearly-right syntax highlighting --------- Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- docs/gettingStarted/install.rst | 26 +- docs/running/debugging.rst | 235 +++++++++++++++++- .../docs/scripts/tutorial_debugging.patch | 12 + .../docs/scripts/tutorial_debugging_hangs.wdl | 126 ++++++++++ .../docs/scripts/tutorial_debugging_works.wdl | 129 ++++++++++ src/toil/utils/toilDebugJob.py | 1 + src/toil/wdl/wdltoil.py | 138 +++++++--- src/toil/worker.py | 52 ++-- 8 files changed, 651 insertions(+), 68 deletions(-) create mode 100644 src/toil/test/docs/scripts/tutorial_debugging.patch create mode 100644 src/toil/test/docs/scripts/tutorial_debugging_hangs.wdl create mode 100644 src/toil/test/docs/scripts/tutorial_debugging_works.wdl diff --git a/docs/gettingStarted/install.rst b/docs/gettingStarted/install.rst index 9f0c7a20a8..890b93d12d 100644 --- a/docs/gettingStarted/install.rst +++ b/docs/gettingStarted/install.rst @@ -6,21 +6,37 @@ Installation ============ -This document describes how to prepare for and install Toil. Note that Toil requires that the user run all commands -inside of a Python `virtualenv`_. Instructions for installing and creating a Python virtual environment are provided -below. +This document describes how to prepare for and install Toil. -.. _virtualenv: https://virtualenv.pypa.io/en/stable/ +.. _dependencyPrep: + +Installing System-Level Dependencies +------------------------------------ + +Toil by itself only needs Python 3.8 or newer. + +However, to run CWL and WDL workflows, you need a container engine for running containers. Toil is able to use either `Singularity`_ or `Docker`_. So make sure to install one of those first and configure your system so that `your user has permission to run containers`_. + +.. admonition:: Docker Desktop for Mac + + If using the proprietary `Docker Desktop for Mac`_, make sure to set your "file sharing implementation" in the ``General`` section of the settings to ``VirtIO``. The default ``gRPC FUSE`` implentation sometimes makes containers see recently created files as empty, which breaks Toil's ability to run containers properly. + +.. _Singularity: https://wiki.debian.org/singularity +.. _Docker: https://docs.docker.com/engine/install/ +.. _your user has permission to run containers: https://askubuntu.com/a/1389518 +.. _Docker Desktop for Mac: https://docs.docker.com/desktop/install/mac-install/ .. _venvPrep: Preparing Your Python Runtime Environment ----------------------------------------- -It is recommended to install Toil into a virtual environment. This is useful +It is recommended to install Toil into a Python `virtual environment`_. This is useful for automatically deploying Python workflows, and is the only supported way to install Toil for Toil development. +.. _virtual environment: https://virtualenv.pypa.io/en/stable/ + If not already present, please install the latest Python ``virtualenv`` using pip_:: $ pip install --user virtualenv diff --git a/docs/running/debugging.rst b/docs/running/debugging.rst index 76441bbe12..53c43200aa 100644 --- a/docs/running/debugging.rst +++ b/docs/running/debugging.rst @@ -111,7 +111,240 @@ You will end up with a directory tree that looks, accorfing to ``tree``, somethi 15 directories, 4 files You can see where Toil downloaded the input files for the job to the worker's temporary directory, and how they would be mounted into the container. - + +.. _shellInContainer: + +Interactively Investigating Running Jobs +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Say you have a **broken WDL workflow** that can't complete. Whenever you run ``tutorial_debugging_hangs.wdl``, it hangs: + +.. literalinclude:: ../../src/toil/test/docs/scripts/tutorial_debugging_hangs.wdl + :language: python + +You can try to run it like this, using Docker containers. Pretend this was actually a run on a large cluster: + +.. code-block:: console + + $ toil-wdl-runner --jobStore ./store tutorial_debugging_hangs.wdl --container docker + +If you run this, it will hang at the ``TutorialDebugging.CompressFiles.command`` step: + +.. code-block:: none + + [2024-06-18T12:12:49-0400] [MainThread] [I] [toil.leader] Issued job 'WDLTaskJob' TutorialDebugging.CompressFiles.command kind-WDLTaskJob/instance-y0ga_907 v1 with job batch system ID: 16 and disk: 2.0 Gi, memory: 2.0 Gi, cores: 1, accelerators: [], preemptible: False + + Workflow Progress 94%|██████████▎| 15/16 (0 failures) [00:36<00:02, 0.42 jobs/s] + +Say you want to find out why it is stuck. First, you need to kill the workflow. Open a new shell in the same directory and run: + +.. code-block:: console + + # toil kill ./store + +You can also hit ``Control+C`` in its terminal window and wait for it to stop. + +Then, you need to use ``toil debug-job`` to run the stuck job on your local machine: + +.. code-block:: console + + $ toil debug-job ./store TutorialDebugging.CompressFiles.command + +This produces some more informative logging messages, showing that the Docker container is managing to start up, but that it stays running indefinitely, with a repeating message: + +.. code-block:: none + + [2024-06-18T12:18:00-0400] [MainThread] [N] [MiniWDLContainers] docker task running :: service: "lhui2bdzmzmg", task: "sg371eb2yk", node: "zyu9drdp6a", message: "started" + [2024-06-18T12:18:01-0400] [MainThread] [D] [MiniWDLContainers] docker task status :: Timestamp: "2024-06-18T16:17:58.545272049Z", State: "running", Message: "started", ContainerStatus: {"ContainerID": "b7210b346637210b49e7b6353dd24108bc3632bbf2ce7479829d450df6ee453a", "PID": 36510, "ExitCode": 0}, PortStatus: {} + [2024-06-18T12:18:03-0400] [MainThread] [D] [MiniWDLContainers] docker task status :: Timestamp: "2024-06-18T16:17:58.545272049Z", State: "running", Message: "started", ContainerStatus: {"ContainerID": "b7210b346637210b49e7b6353dd24108bc3632bbf2ce7479829d450df6ee453a", "PID": 36510, "ExitCode": 0}, PortStatus: {} + [2024-06-18T12:18:04-0400] [MainThread] [D] [MiniWDLContainers] docker task status :: Timestamp: "2024-06-18T16:17:58.545272049Z", State: "running", Message: "started", ContainerStatus: {"ContainerID": "b7210b346637210b49e7b6353dd24108bc3632bbf2ce7479829d450df6ee453a", "PID": 36510, "ExitCode": 0}, PortStatus: {} + ... + +This also gives you the Docker container ID of the running container, ``b7210b346637210b49e7b6353dd24108bc3632bbf2ce7479829d450df6ee453a``. You can use that to get a shell inside the running container: + +.. code-block:: console + + $ docker exec -ti b7210b346637210b49e7b6353dd24108bc3632bbf2ce7479829d450df6ee453a bash + root@b7210b346637:/mnt/miniwdl_task_container/work# + +Your shell is already in the working directory of the task, so we can inspect the files there to get an idea of how far the task has gotten. Has it managed to create ``script.py``? Has the script managed to create ``compressed.zip``? Let's check: + +.. code-block:: console + + # ls -lah + total 6.1M + drwxrwxr-x 6 root root 192 Jun 18 16:17 . + drwxr-xr-x 3 root root 4.0K Jun 18 16:17 .. + drwxr-xr-x 3 root root 96 Jun 18 16:17 .toil_wdl_runtime + drwxrwxr-x 8 root root 256 Jun 18 16:17 _miniwdl_inputs + -rw-r--r-- 1 root root 6.0M Jun 18 16:23 compressed.zip + -rw-r--r-- 1 root root 1.3K Jun 18 16:17 script.py + +So we can see that the script exists, and the zip file also exists. So maybe the script is still running? We can check with ``ps``, but we need the ``-x`` option to include processes not under the current shell. We can also include the ``-u`` option to get statistics: + +.. code-block:: console + + # ps -xu + USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND + root 1 0.0 0.0 2316 808 ? Ss 16:17 0:00 /bin/sh -c /bin/ + root 7 0.0 0.0 4208 3056 ? S 16:17 0:00 /bin/bash ../com + root 8 0.1 0.0 4208 1924 ? S 16:17 0:00 /bin/bash ../com + root 20 95.0 0.4 41096 36428 ? R 16:17 7:09 python script.py + root 645 0.0 0.0 4472 3492 pts/0 Ss 16:21 0:00 bash + root 1379 0.0 0.0 2636 764 ? S 16:25 0:00 sleep 1 + root 1380 0.0 0.0 8584 3912 pts/0 R+ 16:25 0:00 ps -xu + +Here we can see that ``python`` is indeed running, and it is using 95% of a CPU core. So we can surmise that Python is probably stuck spinning around in **an infinite loop**. Let's look at our files again: + +.. code-block:: console + + # ls -lah + total 8.1M + drwxrwxr-x 6 root root 192 Jun 18 16:17 . + drwxr-xr-x 3 root root 4.0K Jun 18 16:17 .. + drwxr-xr-x 3 root root 96 Jun 18 16:17 .toil_wdl_runtime + drwxrwxr-x 8 root root 256 Jun 18 16:17 _miniwdl_inputs + -rw-r--r-- 1 root root 7.6M Jun 18 2024 compressed.zip + -rw-r--r-- 1 root root 1.3K Jun 18 16:17 script.py + +Note that, while we've been investigating, our ``compressed.zip`` file has grown from ``6.0M`` to ``7.6M``. So we now know that, not only is the Python script stuck in a loop, it is also **writing to the ZIP file** inside that loop. + +Let's inspect the inputs: + +.. code-block:: console + + # ls -lah _miniwdl_inputs/* + _miniwdl_inputs/0: + total 4.0K + drwxrwxr-x 3 root root 96 Jun 18 16:17 . + drwxrwxr-x 8 root root 256 Jun 18 16:17 .. + -rw-r--r-- 1 root root 65 Jun 18 16:15 stdout.txt + + _miniwdl_inputs/1: + total 4.0K + drwxrwxr-x 3 root root 96 Jun 18 16:17 . + drwxrwxr-x 8 root root 256 Jun 18 16:17 .. + -rw-r--r-- 1 root root 65 Jun 18 16:15 stdout.txt + + _miniwdl_inputs/2: + total 4.0K + drwxrwxr-x 3 root root 96 Jun 18 16:17 . + drwxrwxr-x 8 root root 256 Jun 18 16:17 .. + -rw-r--r-- 1 root root 65 Jun 18 16:15 stdout.txt + + _miniwdl_inputs/3: + total 4.0K + drwxrwxr-x 3 root root 96 Jun 18 16:17 . + drwxrwxr-x 8 root root 256 Jun 18 16:17 .. + -rw-r--r-- 1 root root 384 Jun 18 16:15 stdout.txt + + _miniwdl_inputs/4: + total 4.0K + drwxrwxr-x 3 root root 96 Jun 18 16:17 . + drwxrwxr-x 8 root root 256 Jun 18 16:17 .. + -rw-r--r-- 1 root root 387 Jun 18 16:15 stdout.txt + + _miniwdl_inputs/5: + total 4.0K + drwxrwxr-x 3 root root 96 Jun 18 16:17 . + drwxrwxr-x 8 root root 256 Jun 18 16:17 .. + -rw-r--r-- 1 root root 378 Jun 18 16:15 stdout.txt + +There are the files that are meant to be being compressed into that ZIP file. But, hang on, there are only six of these files, and none of them is over 400 bytes in size. How did we get a multi-megabyte ZIP file? The script must be putting **more data than we expected** into the ZIP file it is writing. + +Taking what we know, we can now inspect the Python script again and see if we can find **a way in which it could get stuck in an infinite loop, writing much more data to the ZIP than is actually in the input files**. We can also inspect it for WDL variable substitutions (there aren't any). Let's look at it with line numbers using the ``nl`` tool, numbering even blank lines with ``-b a``: + +.. code-block:: console + + # nl -b a script.py + 1 import sys + 2 from zipfile import ZipFile + 3 import os + 4 + 5 # Interpret command line arguments + 6 to_compress = list(reversed(sys.argv[1:])) + 7 + 8 with ZipFile("compressed.zip", "w") as z: + 9 while to_compress != []: + 10 # Grab the file to add off the end of the list + 11 input_filename = to_compress[-1] + 12 # Now we need to write this to the zip file. + 13 # What internal filename should we use? + 14 basename = os.path.basename(input_filename) + 15 disambiguation_number = 0 + 16 while True: + 17 target_filename = str(disambiguation_number) + basename + 18 try: + 19 z.getinfo(target_filename) + 20 except KeyError: + 21 # Filename is free + 22 break + 23 # Otherwise try another name + 24 disambiguation_number += 1 + 25 # Now we can actually make the compressed file + 26 with z.open(target_filename, 'w') as out_stream: + 27 with open(input_filename) as in_stream: + 28 for line in in_stream: + 29 # Prefix each line of text with the original input file + 30 # it came from. + 31 # Also remember to encode the text as the zip file + 32 # stream is in binary mode. + 33 out_stream.write(f"{basename}: {line}".encode("utf-8")) + +We have three loops here: ``while to_compress != []`` on line 9, ``while True`` on line 16, and ``for line in in_stream`` on line 28. + +The ``while True`` loop is immediately suspicious, but none of the code inside it writes to the ZIP file, so we know we can't be stuck in there. + +The ``for line in in_stream`` loop contains the only call that writes data to the ZIP, so we must be spending time inside it, but it is constrained to loop over a single file at a time, so it can't be the *infinite* loop we're looking for. + +So then we must be infinitely looping at ``while to_compress != []``, and indeed we can see that ``to_compress`` **is never modified**, so it can never become ``[]``. + +So now we have a theory as to what the problem is, and we can ``exit`` out of our shell in the container, and stop ``toil debug-job`` with ``Control+C``. Then we can make the following change to our workflow, adding code to the script to actually pop the handled files off the end of the list: + +.. literalinclude:: ../../src/toil/test/docs/scripts/tutorial_debugging.patch + :language: diff + +If we apply that change and produce a new file, ``tutorial_debugging_works.wdl``, we can clean up from the old failed run and run a new one: + +.. code-block:: console + + $ toil clean ./store + $ toil-wdl-runner --jobStore ./store tutorial_debugging_works.wdl --container docker + +This will produce a successful log, ending with something like: + +.. code-block:: none + + [2024-06-18T12:42:20-0400] [MainThread] [I] [toil.leader] Finished toil run successfully. + + Workflow Progress 100%|███████████| 17/17 (0 failures) [00:24<00:00, 0.72 jobs/s] + {"TutorialDebugging.compressed": "/Users/anovak/workspace/toil/src/toil/test/docs/scripts/wdl-out-u7fkgqbe/f5e16468-0cf6-4776-a5c1-d93d993c4db2/compressed.zip"} + [2024-06-18T12:42:20-0400] [MainThread] [I] [toil.common] Successfully deleted the job store: FileJobStore(/Users/anovak/workspace/toil/src/toil/test/docs/scripts/store) + +Note the line to standard output giving us the path on disk where the ``TutorialDebugging.compressed`` output from the workflow is. If you look at that ZIP file, you can see it contains the expected files, such as ``3stdout.txt``, which should contain this suitably prefixed dismayed whale: + +.. code-block:: none + + stdout.txt: ________ + stdout.txt: < Uh-oh! > + stdout.txt: -------- + stdout.txt: \ + stdout.txt: \ + stdout.txt: \ + stdout.txt: ## . + stdout.txt: ## ## ## == + stdout.txt: ## ## ## ## === + stdout.txt: /""""""""""""""""___/ === + stdout.txt: ~~~ {~~ ~~~~ ~~~ ~~~~ ~~ ~ / ===- ~~~ + stdout.txt: \______ o __/ + stdout.txt: \ \ __/ + stdout.txt: \____\______/ + +When we're done inspecting the output, and satisfied that the workflow now works, we might want to clean up all the auto-generated WDL output directories from the successful and failed run(s): + +.. code-block:: console + + $ rm -Rf wdl-out-* Introspecting the Job Store --------------------------- diff --git a/src/toil/test/docs/scripts/tutorial_debugging.patch b/src/toil/test/docs/scripts/tutorial_debugging.patch new file mode 100644 index 0000000000..84f065f9a4 --- /dev/null +++ b/src/toil/test/docs/scripts/tutorial_debugging.patch @@ -0,0 +1,12 @@ +--- tutorial_debugging_works.wdl 2024-06-18 12:03:32 ++++ tutorial_debugging_hangs.wdl 2024-06-18 12:03:53 +@@ -112,9 +112,6 @@ + # Also remember to encode the text as the zip file + # stream is in binary mode. + out_stream.write(f"{basename}: {line}".encode("utf-8")) +- # Even though we got distracted by zip file manipulation, remember +- # to pop off the file we just did. +- to_compress.pop() + EOF + python script.py ~{sep(" ", files)} + >>> diff --git a/src/toil/test/docs/scripts/tutorial_debugging_hangs.wdl b/src/toil/test/docs/scripts/tutorial_debugging_hangs.wdl new file mode 100644 index 0000000000..8d16e85361 --- /dev/null +++ b/src/toil/test/docs/scripts/tutorial_debugging_hangs.wdl @@ -0,0 +1,126 @@ +version 1.1 + +workflow TutorialDebugging { + + input { + Array[String] messages = ["Uh-oh!", "Oh dear", "Oops"] + } + + scatter(message in messages) { + + call WhaleSay { + input: + message = message + } + + call CountLines { + input: + to_count = WhaleSay.result + } + } + + Array[File] to_compress = flatten([CountLines.result, WhaleSay.result]) + + call CompressFiles { + input: + files = to_compress + } + + output { + File compressed = CompressFiles.result + } + +} + +# Draw ASCII art +task WhaleSay { + input { + String message + } + + command <<< + cowsay "~{message}" + >>> + + output { + File result = stdout() + } + + runtime { + container: "docker/whalesay" + } +} + +# Count the lines in a file +task CountLines { + input { + File to_count + } + + command <<< + wc -l ~{to_count} + >>> + + output { + File result = stdout() + } + + runtime { + container: ["ubuntu:latest", "https://gcr.io/standard-images/ubuntu:latest"] + } +} + +# Compress files into a ZIP +task CompressFiles { + input { + Array[File] files + } + + command <<< + set -e + cat >script.py <<'EOF' + import sys + from zipfile import ZipFile + import os + + # Interpret command line arguments + to_compress = list(reversed(sys.argv[1:])) + + with ZipFile("compressed.zip", "w") as z: + while to_compress != []: + # Grab the file to add off the end of the list + input_filename = to_compress[-1] + # Now we need to write this to the zip file. + # What internal filename should we use? + basename = os.path.basename(input_filename) + disambiguation_number = 0 + while True: + target_filename = str(disambiguation_number) + basename + try: + z.getinfo(target_filename) + except KeyError: + # Filename is free + break + # Otherwise try another name + disambiguation_number += 1 + # Now we can actually make the compressed file + with z.open(target_filename, 'w') as out_stream: + with open(input_filename) as in_stream: + for line in in_stream: + # Prefix each line of text with the original input file + # it came from. + # Also remember to encode the text as the zip file + # stream is in binary mode. + out_stream.write(f"{basename}: {line}".encode("utf-8")) + EOF + python script.py ~{sep(" ", files)} + >>> + + output { + File result = "compressed.zip" + } + + runtime { + container: "python:3.11" + } +} diff --git a/src/toil/test/docs/scripts/tutorial_debugging_works.wdl b/src/toil/test/docs/scripts/tutorial_debugging_works.wdl new file mode 100644 index 0000000000..c2831fd1f8 --- /dev/null +++ b/src/toil/test/docs/scripts/tutorial_debugging_works.wdl @@ -0,0 +1,129 @@ +version 1.1 + +workflow TutorialDebugging { + + input { + Array[String] messages = ["Uh-oh!", "Oh dear", "Oops"] + } + + scatter(message in messages) { + + call WhaleSay { + input: + message = message + } + + call CountLines { + input: + to_count = WhaleSay.result + } + } + + Array[File] to_compress = flatten([CountLines.result, WhaleSay.result]) + + call CompressFiles { + input: + files = to_compress + } + + output { + File compressed = CompressFiles.result + } + +} + +# Draw ASCII art +task WhaleSay { + input { + String message + } + + command <<< + cowsay "~{message}" + >>> + + output { + File result = stdout() + } + + runtime { + container: "docker/whalesay" + } +} + +# Count the lines in a file +task CountLines { + input { + File to_count + } + + command <<< + wc -l ~{to_count} + >>> + + output { + File result = stdout() + } + + runtime { + container: ["ubuntu:latest", "https://gcr.io/standard-images/ubuntu:latest"] + } +} + +# Compress files into a ZIP +task CompressFiles { + input { + Array[File] files + } + + command <<< + set -e + cat >script.py <<'EOF' + import sys + from zipfile import ZipFile + import os + + # Interpret command line arguments + to_compress = list(reversed(sys.argv[1:])) + + with ZipFile("compressed.zip", "w") as z: + while to_compress != []: + # Grab the file to add off the end of the list + input_filename = to_compress[-1] + # Now we need to write this to the zip file. + # What internal filename should we use? + basename = os.path.basename(input_filename) + disambiguation_number = 0 + while True: + target_filename = str(disambiguation_number) + basename + try: + z.getinfo(target_filename) + except KeyError: + # Filename is free + break + # Otherwise try another name + disambiguation_number += 1 + # Now we can actually make the compressed file + with z.open(target_filename, 'w') as out_stream: + with open(input_filename) as in_stream: + for line in in_stream: + # Prefix each line of text with the original input file + # it came from. + # Also remember to encode the text as the zip file + # stream is in binary mode. + out_stream.write(f"{basename}: {line}".encode("utf-8")) + # Even though we got distracted by zip file manipulation, remember + # to pop off the file we just did. + to_compress.pop() + EOF + python script.py ~{sep(" ", files)} + >>> + + output { + File result = "compressed.zip" + } + + runtime { + container: "python:3.11" + } +} diff --git a/src/toil/utils/toilDebugJob.py b/src/toil/utils/toilDebugJob.py index fce91af41e..c4ac2f139c 100644 --- a/src/toil/utils/toilDebugJob.py +++ b/src/toil/utils/toilDebugJob.py @@ -58,6 +58,7 @@ def main() -> None: config = jobStore.config # But override its options config.setOptions(options) + config.cleanWorkDir = "never" # Find the job diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 144d66a94b..53d10c6ebf 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -18,6 +18,7 @@ import json import logging import os +import platform import re import shlex import shutil @@ -105,7 +106,7 @@ def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] ) as e: # Don't expose tracebacks to the user for exceptions that may be expected log("Could not " + task + " because:") - + # These are the errors that MiniWDL's parser can raise and its reporter # can report (plus some extras). See # https://github.com/chanzuckerberg/miniwdl/blob/a780b1bf2db61f18de37616068968b2bb4c2d21c/WDL/CLI.py#L91-L97. @@ -162,7 +163,7 @@ def remove_common_leading_whitespace(expression: WDL.Expr.String, tolerate_blank to reduce the common whitespace prefix. :param debug: If True, the function will show its work by logging at debug - level. + level. """ # The expression has a "parts" list consisting of interleaved string @@ -250,7 +251,7 @@ def remove_common_leading_whitespace(expression: WDL.Expr.String, tolerate_blank if common_whitespace_prefix is None: common_whitespace_prefix = "" - + if debug: logger.debug("Common Prefix: '%s'", common_whitespace_prefix) @@ -266,7 +267,7 @@ def first_mismatch(prefix: str, value: str) -> int: if c1 != c2: return n return min(len(prefix), len(value)) - + # Trim up to the first mismatch vs. the common prefix if the line starts with a string literal. stripped_lines = [ ( @@ -316,7 +317,7 @@ def first_mismatch(prefix: str, value: str) -> int: if debug: logger.debug("New Parts Merged: %s", new_parts_merged) - + modified = WDL.Expr.String(expression.pos, new_parts_merged, expression.command) # Fake the type checking of the modified expression. # TODO: Make MiniWDL expose a real way to do this? @@ -747,6 +748,13 @@ def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = self._execution_dir = execution_dir + def get_local_paths(self) -> List[str]: + """ + Get all the local paths of files devirtualized (or virtualized) through the stdlib. + """ + + return list(self._virtualized_to_devirtualized.values()) + def share_files(self, other: "ToilWDLStdLibBase") -> None: """ Share caches for devirtualizing and virtualizing files with another instance. @@ -913,7 +921,7 @@ def _virtualize_filename(self, filename: str) -> str: return result file_id = self._file_store.writeGlobalFile(abs_filename) - + file_dir = os.path.dirname(abs_filename) parent_id = self._parent_dir_to_ids.setdefault(file_dir, uuid.uuid4()) result = pack_toil_uri(file_id, parent_id, os.path.basename(abs_filename)) @@ -1859,6 +1867,9 @@ def add_injections(self, command_string: str, task_container: TaskContainer) -> Currently doesn't implement the MiniWDL plugin system, but does add resource usage monitoring to Docker containers. """ + + parts = [] + if isinstance(task_container, SwarmContainer): # We're running on Docker Swarm, so we need to monitor CPU usage # and so on from inside the container, since it won't be attributed @@ -1896,10 +1907,37 @@ def add_injections(self, command_string: str, task_container: TaskContainer) -> done } """) - parts = [script, f"_toil_resource_monitor {self.INJECTED_MESSAGE_DIR} &", command_string] - return "\n".join(parts) - else: - return command_string + parts.append(script) + parts.append(f"_toil_resource_monitor {self.INJECTED_MESSAGE_DIR} &") + + if isinstance(task_container, SwarmContainer) and platform.system() == "Darwin": + # With gRPC FUSE file sharing, files immediately downloaded before + # being mounted may appear as size 0 in the container due to a race + # condition. Check for this and produce an approperiate error. + + script = textwrap.dedent("""\ + function _toil_check_size () { + TARGET_FILE="${1}" + GOT_SIZE="$(stat -c %s "${TARGET_FILE}")" + EXPECTED_SIZE="${2}" + if [[ "${GOT_SIZE}" != "${EXPECTED_SIZE}" ]] ; then + echo >&2 "Toil Error:" + echo >&2 "File size visible in container for ${TARGET_FILE} is size ${GOT_SIZE} but should be size ${EXPECTED_SIZE}" + echo >&2 "Are you using gRPC FUSE file sharing in Docker Desktop?" + echo >&2 "It doesn't work: see ." + exit 1 + fi + } + """) + parts.append(script) + for host_path, job_path in task_container.input_path_map.items(): + expected_size = os.path.getsize(host_path) + if expected_size != 0: + parts.append(f"_toil_check_size \"{job_path}\" {expected_size}") + + parts.append(command_string) + + return "\n".join(parts) def handle_injection_messages(self, outputs_library: ToilWDLStdLibTaskOutputs) -> None: """ @@ -2901,7 +2939,11 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Get what to scatter over with monkeypatch_coerce(standard_library): - scatter_value = evaluate_named_expression(self._scatter, self._scatter.variable, None, self._scatter.expr, bindings, standard_library) + try: + scatter_value = evaluate_named_expression(self._scatter, self._scatter.variable, None, self._scatter.expr, bindings, standard_library) + finally: + # Report all files are downloaded now that all expressions are evaluated. + self.files_downloaded_hook([(p, p) for p in standard_library.get_local_paths()]) if not isinstance(scatter_value, WDL.Value.Array): raise RuntimeError("The returned value from a scatter is not an Array type.") @@ -3036,7 +3078,11 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Get the expression value. Fake a name. with monkeypatch_coerce(standard_library): - expr_value = evaluate_named_expression(self._conditional, "", WDL.Type.Boolean(), self._conditional.expr, bindings, standard_library) + try: + expr_value = evaluate_named_expression(self._conditional, "", WDL.Type.Boolean(), self._conditional.expr, bindings, standard_library) + finally: + # Report all files are downloaded now that all expressions are evaluated. + self.files_downloaded_hook([(p, p) for p in standard_library.get_local_paths()]) if expr_value.value: # Evaluated to true! @@ -3099,9 +3145,13 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: if self._workflow.inputs: with monkeypatch_coerce(standard_library): - for input_decl in self._workflow.inputs: - # Evaluate all the inputs that aren't pre-set - bindings = bindings.bind(input_decl.name, evaluate_defaultable_decl(input_decl, bindings, standard_library)) + try: + for input_decl in self._workflow.inputs: + # Evaluate all the inputs that aren't pre-set + bindings = bindings.bind(input_decl.name, evaluate_defaultable_decl(input_decl, bindings, standard_library)) + finally: + # Report all files are downloaded now that all expressions are evaluated. + self.files_downloaded_hook([(p, p) for p in standard_library.get_local_paths()]) # Make jobs to run all the parts of the workflow sink = self.create_subgraph(self._workflow.body, [], bindings) @@ -3140,27 +3190,41 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: """ super().run(file_store) - if self._workflow.outputs is None: - # The output section is not declared - # So get all task outputs and return that - # First get all task output names - output_set = set() - for call in self._workflow.body: - if isinstance(call, WDL.Tree.Call): - for type_binding in call.effective_outputs: - output_set.add(type_binding.name) - # Collect all bindings that are task outputs - output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() - for binding in unwrap(self._bindings): - if binding.name in output_set: - # The bindings will already be namespaced with the task namespaces - output_bindings = output_bindings.bind(binding.name, binding.value) - else: - # Output section is declared and is nonempty, so evaluate normally - # Evaluate all the outputs in the normal, non-task-outputs library context - standard_library = ToilWDLStdLibBase(file_store, execution_dir=self._wdl_options.get("execution_dir")) - # Combine the bindings from the previous job - output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library) + # Evaluate all output expressions in the normal, non-task-outputs library context + standard_library = ToilWDLStdLibBase(file_store, execution_dir=self._wdl_options.get("execution_dir")) + + try: + if self._workflow.outputs is None: + # The output section is not declared + # So get all task outputs and return that + # First get all task output names + output_set = set() + for call in self._workflow.body: + if isinstance(call, WDL.Tree.Call): + for type_binding in call.effective_outputs: + output_set.add(type_binding.name) + # Collect all bindings that are task outputs + output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() + for binding in unwrap(self._bindings): + if binding.name in output_set: + # The bindings will already be namespaced with the task namespaces + output_bindings = output_bindings.bind(binding.name, binding.value) + else: + # Output section is declared and is nonempty, so evaluate normally + + # Combine the bindings from the previous job + output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library) + finally: + # We don't actually know when all our files are downloaded since + # anything we evaluate might devirtualize inside any expression. + # But we definitely know they're done being downloaded if we throw + # an error or if we finish, so hook in now and let the debugging + # logic stop the worker before any error does. + # + # Make sure to feed in all the paths we devirtualized as if they + # were mounted into a container at their actual paths. + self.files_downloaded_hook([(p, p) for p in standard_library.get_local_paths()]) + return self.postprocess(output_bindings) class WDLRootJob(WDLSectionJob): @@ -3347,7 +3411,7 @@ def devirtualize_output(filename: str) -> str: 'devirtualize' a file using the "toil" object instead of a filestore. Returns its local path. """ - # Make sure the output directory exists if we have output files + # Make sure the output directory exists `if we have output files # that might need to use it. os.makedirs(output_directory, exist_ok=True) return ToilWDLStdLibBase.devirtualize_to(filename, output_directory, toil, execution_dir, devirtualized_to_virtualized, virtualized_to_devirtualized) diff --git a/src/toil/worker.py b/src/toil/worker.py index 95a5999d60..fe4eaf5234 100644 --- a/src/toil/worker.py +++ b/src/toil/worker.py @@ -419,31 +419,33 @@ def blockFn() -> bool: # Create a fileStore object for the job fileStore = AbstractFileStore.createFileStore(job_store, jobDesc, local_worker_temp_dir, blockFn, caching=config.caching) - with job._executor(stats=statsDict if config.stats else None, - fileStore=fileStore): - with deferredFunctionManager.open() as defer: - with fileStore.open(job): - # Get the next block function to wait on committing this job - blockFn = fileStore.waitForCommit - - # Run the job, save new successors, and set up - # locally (but don't commit) successor - # relationships and job completion. - # Pass everything as name=value because Cactus - # likes to override _runner when it shouldn't and - # it needs some hope of finding the arguments it - # wants across multiple Toil versions. We also - # still pass a jobGraph argument to placate old - # versions of Cactus. - job._runner(jobGraph=None, jobStore=job_store, fileStore=fileStore, defer=defer) - - # When the executor for the job finishes it will - # kick off a commit with the link to the job body - # cut. - - # Accumulate messages from this job & any subsequent chained jobs - statsDict.workers.logs_to_leader += fileStore.logging_messages - statsDict.workers.logging_user_streams += fileStore.logging_user_streams + try: + with job._executor(stats=statsDict if config.stats else None, + fileStore=fileStore): + with deferredFunctionManager.open() as defer: + with fileStore.open(job): + # Get the next block function to wait on committing this job + blockFn = fileStore.waitForCommit + + # Run the job, save new successors, and set up + # locally (but don't commit) successor + # relationships and job completion. + # Pass everything as name=value because Cactus + # likes to override _runner when it shouldn't and + # it needs some hope of finding the arguments it + # wants across multiple Toil versions. We also + # still pass a jobGraph argument to placate old + # versions of Cactus. + job._runner(jobGraph=None, jobStore=job_store, fileStore=fileStore, defer=defer) + + # When the executor for the job finishes it will + # kick off a commit with the link to the job body + # cut. + finally: + # Accumulate messages from this job & any subsequent chained jobs. + # Keep the messages even if the job fails. + statsDict.workers.logs_to_leader += fileStore.logging_messages + statsDict.workers.logging_user_streams += fileStore.logging_user_streams logger.info("Completed body for %s", jobDesc)