Skip to content

Commit

Permalink
Add support for running Dockstore page URLs and TRS IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
adamnovak committed Sep 24, 2024
1 parent 027e89d commit 413d2bc
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 1 deletion.
6 changes: 6 additions & 0 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
from toil.batchSystems.registry import DEFAULT_BATCH_SYSTEM
from toil.common import Toil, addOptions
from toil.cwl import check_cwltool_version
from toil.lib.integration import resolve_workflow
from toil.lib.misc import call_command
from toil.provisioners.clusterScaler import JobTooBigError

Expand Down Expand Up @@ -3738,6 +3739,11 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
if options.restart:
outobj = toil.restart()
else:
# Before showing the options to any cwltool stuff that wants to
# load the workflow, transform options.cwltool, where our
# argument for what to run is, to handle Dockstore workflows.
options.cwltool = resolve_workflow(options.cwltool)

loading_context.hints = [
{
"class": "ResourceRequirement",
Expand Down
144 changes: 144 additions & 0 deletions src/toil/lib/integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import logging
import os
import sys
from typing import Dict, List, Optional

from urllib.parse import urlparse, unquote, quote
import requests

from toil.lib.retry import retry

logger = logging.getLogger(__name__)

def is_dockstore_workflow(workflow: str) -> bool:
"""
Returns True if a workflow string smells Dockstore-y.
Detects Dockstore page URLs and strings that could be Dockstore TRS IDs.
"""

return workflow.startswith("https://dockstore.org/workflows/") or workflow.startswith("#workflow/")

@retry(errors=[requests.exceptions.ConnectionError])
def get_workflow_root_from_dockstore(workflow: str) -> str:
"""
Given a Dockstore URL or TRS identifier, get the root WDL or CWL URL for the workflow.
Accepts inputs like:
- https://dockstore.org/workflows/github.com/dockstore-testing/md5sum-checker:master?tab=info
- #workflow/github.com/dockstore-testing/md5sum-checker
Assumes the input is actually one of the supported formats. See is_dockstore_workflow().
TODO: Needs to handle multi-workflow files if Dockstore can.
"""

if workflow.startswith("#workflow/"):
# Looks like a Dockstore TRS ID already.
# TODO: Does Dockstore guartantee we can recognize its TRS IDs like this?
logger.debug("Workflow %s is a TRS specifier already", workflow)
trs_spec = workflow
else:
# We need to get the right TRS ID from the Docstore URL
parsed = urlparse(workflow)
# TODO: We assume the Docksotre page URL structure and the TRS IDs are basically the same.
page_path = unquote(parsed.path)
if not page_path.startswith("/workflows/"):
raise RuntimeError("Cannot parse Dockstore URL " + workflow)
trs_spec = "#workflow/" + page_path[len("/workflows/"):]
logger.debug("Translated %s to TRS: %s", workflow, trs_spec)

# Parse the TRS ID
parts = trs_spec.split(':', 1)
trs_workflow_id = parts[0]
if len(parts) > 1:
# The ID has the version we want after a colon
trs_version = parts[1]
else:
# We don't know the version we want, we will have to pick one somehow.
trs_version = None

logger.debug("TRS %s parses to workflow %s and version %s", trs_spec, trs_workflow_id, trs_version)

# Fetch the main TRS document.
# See e.g. https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fdockstore-testing%2Fmd5sum-checker
trs_workflow_url = f"https://dockstore.org/api/ga4gh/trs/v2/tools/{quote(trs_workflow_id, safe='')}"
trs_workflow_document = requests.get(trs_workflow_url).json()

# Each version can have a different language so make a map of them.
# Also lets us check for a default version's existence.
version_to_language: Dict[str, str] = {}

for version_info in trs_workflow_document.get("versions", []):
# TODO: Why is there an array of descriptor types?
version_to_language[version_info["name"]] = version_info["descriptor_type"][0]
logger.debug("Workflow version %s is written in %s", version_info["name"], version_info["descriptor_type"][0])

for default_version in ['main', 'master']:
if trs_version is None and default_version in version_to_language:
# Fill in a version if the user didn't provide one.
trs_version = default_version
logger.debug("Defaulting to workflow version %s", default_version)
break

if trs_version is None and len(version_to_language) == 1:
# If there's just one version use that.
trs_version = next(iter(version_to_language.keys()))
logger.debug("Defaulting to only available workflow version %s", trs_version)

if trs_version is None:
raise RuntimeError(f"Workflow {workflow} does not specify a version; must be one of {list(version_to_language.keys())}")

if trs_version not in version_to_language:
raise RuntimeError(f"Workflow version {trs_version} from {workflow} does not exist; must be one of {list(version_to_language.keys())}")

# Language can be "CWL" or "WDL".
# TODO: We're probably already in a runner that expects one or the other.
language = version_to_language[trs_version]

# TODO: There's a {workflow}/versions/{version}/{language}/files endpoint
# that can say which file is the PRIMARY_DESCRIPTOR, but it can't give us a
# repo-root-relative path for that file, so we can't construct the
# {workflow}/versions/{version}/PLAIN_{language}/descriptor//path/from/root/to/file.ext
# URL that behaves properly with .. imports without special handling to
# actually put ".." in the URL.
#
# So we fetch {workflow}/versions/{version}/{language}/descriptor and
# follow its "url" to somewhere we hope the correct directory tree is.

trs_version_url = f"{trs_workflow_url}/versions/{quote(trs_version, safe='')}"
trs_descriptor_url = f"{trs_version_url}/{language}/descriptor"
logger.debug("Workflow descriptor URL: %s", trs_descriptor_url)
trs_descriptor_document = requests.get(trs_descriptor_url).json()

return trs_descriptor_document["url"]

def resolve_workflow(workflow: str) -> str:
"""
Find the real workflow URL or filename from a command line argument.
Transform a workflow URL or path that might actually be a Dockstore page
URL or TRS specifier to an actual URL or path to a workflow document.
"""

if is_dockstore_workflow(workflow):
# Ask Dockstore where to find Dockstore-y things
resolved = get_workflow_root_from_dockstore(workflow)
logger.info("Dockstore resolved workflow %s to %s", workflow, resolved)
return resolved
else:
# Pass other things through.
return workflow











19 changes: 19 additions & 0 deletions src/toil/test/cwl/cwlTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
needs_cwl,
needs_docker,
needs_docker_cuda,
needs_singularity_or_docker,
needs_env_var,
needs_fetchable_appliance,
needs_gridengine,
Expand Down Expand Up @@ -431,6 +432,24 @@ def test_run_colon_output(self) -> None:
self._expected_colon_output(self.outDir),
out_name="result",
)

@pytest.mark.integrative
@needs_singularity_or_docker
def test_run_dockstore_trs(self) -> None:
from toil.cwl import cwltoil

stdout = StringIO()
main_args = [
"--outdir",
self.outDir,
"#workflow/github.com/dockstore-testing/md5sum-checker",
"https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/refs/heads/master/md5sum/md5sum-input-cwl.json"
]
cwltoil.main(main_args, stdout=stdout)
out = json.loads(stdout.getvalue())
with open(out.get("output_file", {}).get("location")[len("file://") :]) as f:
computed_hash = f.read().strip()
self.assertEqual(computed_hash, "00579a00e3e7fa0674428ac7049423e2")

def test_glob_dir_bypass_file_store(self) -> None:
self.maxDiff = 1000
Expand Down
58 changes: 58 additions & 0 deletions src/toil/test/lib/test_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright (C) 2015-2024 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import pytest

from toil.lib.integration import get_workflow_root_from_dockstore
from toil.test import ToilTest, needs_online

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)

@pytest.mark.integrative
@needs_online
class DockstoreLookupTest(ToilTest):
"""
Make sure we can look up workflows on Dockstore.
"""

def test_lookup_from_page_url(self) -> None:
PAGE_URL = "https://dockstore.org/workflows/github.com/dockstore/bcc2020-training/HelloWorld:master?tab=info"
# If we go in through the website we get an extra refs/heads/ on the branch name.
WORKFLOW_URL = "https://raw.githubusercontent.com/dockstore/bcc2020-training/master/wdl-training/exercise1/HelloWorld.wdl"
looked_up = get_workflow_root_from_dockstore(PAGE_URL)
self.assertEqual(looked_up, WORKFLOW_URL)

def test_lookup_from_trs(self) -> None:
TRS_ID = "#workflow/github.com/dockstore-testing/md5sum-checker"
# Despite "-checker" in the ID, this actually refers to the base md5sum
# workflow that just happens to have a checker *available*, not to the
# checker workflow itself.
WORKFLOW_URL = "https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/master/md5sum/md5sum-workflow.cwl"
looked_up = get_workflow_root_from_dockstore(TRS_ID)
self.assertEqual(looked_up, WORKFLOW_URL)

def test_lookup_from_trs_with_version(self) -> None:
TRS_ID = "#workflow/github.com/dockstore-testing/md5sum-checker:workflowWithHTTPImport"
WORKFLOW_URL = "https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/workflowWithHTTPImport/md5sum/md5sum-workflow.cwl"
looked_up = get_workflow_root_from_dockstore(TRS_ID)
self.assertEqual(looked_up, WORKFLOW_URL)

def test_lookup_from_trs_nonexistent_version(self) -> None:
TRS_ID = "#workflow/github.com/dockstore-testing/md5sum-checker:notARealVersion"
with self.assertRaises(RuntimeError):
looked_up = get_workflow_root_from_dockstore(TRS_ID)


17 changes: 17 additions & 0 deletions src/toil/test/wdl/wdltoil_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,23 @@ def test_miniwdl_self_test_by_reference(self) -> None:
"""
self.test_miniwdl_self_test(extra_args=["--referenceInputs=True"])

@pytest.mark.integrative
@needs_singularity_or_docker
def test_dockstore_trs(self, extra_args: Optional[List[str]] = None) -> None:
wdl_file = "#workflow/github.com/dockstore/bcc2020-training/HelloWorld:master"
# Needs an input but doesn't provide a good one.
json_input = json.dumps({"hello_world.hello.myName": "https://raw.githubusercontent.com/dockstore/bcc2020-training/refs/heads/master/wdl-training/exercise1/name.txt"})

result_json = subprocess.check_output(
self.base_command + [wdl_file, json_input, '--logDebug', '-o', self.output_dir, '--outputDialect',
'miniwdl'] + (extra_args or []))
result = json.loads(result_json)

with open(result.get("outputs", {}).get("hello_world.helloFile")) as f:
result_text = f.read().strip()

self.assertEqual(result_text, "Hello World!\nMy name is potato.")

@slow
@needs_docker_cuda
def test_giraffe_deepvariant(self):
Expand Down
3 changes: 2 additions & 1 deletion src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
InvalidImportExportUrlException, LocatorException)
from toil.lib.accelerators import get_individual_local_accelerators
from toil.lib.conversions import convert_units, human2bytes
from toil.lib.integration import resolve_workflow
from toil.lib.io import mkdtemp
from toil.lib.memoize import memoize
from toil.lib.misc import get_user_name
Expand Down Expand Up @@ -3527,7 +3528,7 @@ def main() -> None:
output_bindings = toil.restart()
else:
# Load the WDL document
document: WDL.Tree.Document = WDL.load(options.wdl_uri, read_source=toil_read_source)
document: WDL.Tree.Document = WDL.load(resolve_workflow(options.wdl_uri), read_source=toil_read_source)

# See if we're going to run a workflow or a task
target: Union[WDL.Tree.Workflow, WDL.Tree.Task]
Expand Down

0 comments on commit 413d2bc

Please sign in to comment.