Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No listing workflow #1861

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
8 changes: 8 additions & 0 deletions cwltool.code-workspace
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"folders": [
{
"path": "."
}
],
"settings": {}
}
18 changes: 18 additions & 0 deletions cwltool/argparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,24 @@ def arg_parser() -> argparse.ArgumentParser:
type=str,
)

# TO DO: Not yet implemented
provgroup.add_argument(
"--no-data", # Maybe change to no-input and no-intermediate to ignore those kind of files?...
default=False,
action="store_true",
help="Disables the storage of input and output data files",
dest="no_data",
)

# TO DO: Not yet implemented
provgroup.add_argument(
"--no-input", # Maybe change to no-input and no-intermediate to ignore those kind of files?...
default=False,
action="store_true",
help="Disables the storage of input data files",
dest="no_input",
)

printgroup = parser.add_mutually_exclusive_group()
printgroup.add_argument(
"--print-rdf",
Expand Down
4 changes: 4 additions & 0 deletions cwltool/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,10 @@ def addsf(
datum = cast(CWLObjectType, datum)
ll = schema.get("loadListing") or self.loadListing
if ll and ll != "no_listing":
# Debug show
for k in datum:
_logger.debug("Datum: %s: %s" % (k, datum[k]))
_logger.debug("----------------------------------------")
get_listing(
self.fs_access,
datum,
Expand Down
37 changes: 35 additions & 2 deletions cwltool/cwlprov/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
import re
import uuid
from getpass import getuser
from typing import IO, Any, Callable, Dict, List, Optional, Tuple, Union
from typing import IO, Any, Dict, List, Optional, Tuple, Union

from typing_extensions import TypedDict

from cwltool.cwlprov.provenance_constants import Hasher

from ..loghandler import _logger


def _whoami() -> Tuple[str, str]:
"""Return the current operating system account as (username, fullname)."""
Expand Down Expand Up @@ -135,7 +139,7 @@ def _valid_orcid(orcid: Optional[str]) -> str:
def checksum_copy(
src_file: IO[Any],
dst_file: Optional[IO[Any]] = None,
hasher: Optional[Callable[[], "hashlib._Hash"]] = None,
hasher=Hasher, # type: Callable[[], hashlib._Hash]
buffersize: int = 1024 * 1024,
) -> str:
"""Compute checksums while copying a file."""
Expand All @@ -158,6 +162,35 @@ def checksum_copy(
pass
if os.path.exists(temp_location):
os.rename(temp_location, dst_file.name) # type: ignore

return content_processor(contents, src_file, dst_file, checksum, buffersize)


def checksum_only(
src_file: IO[Any],
dst_file: Optional[IO[Any]] = None,
hasher=Hasher, # type: Callable[[], hashlib._Hash]
buffersize: int = 1024 * 1024,
) -> str:
"""Calculate the checksum only, does not copy the data files."""
if dst_file is not None:
_logger.error("Destination file should be None but it is %s", dst_file)
"""Compute checksums while copying a file."""
# TODO: Use hashlib.new(Hasher_str) instead?
checksum = hasher()
contents = src_file.read(buffersize)
# TODO Could be a function for both checksum_only and checksum_copy?
return content_processor(contents, src_file, dst_file, checksum, buffersize)


def content_processor(
contents: Any,
src_file: IO[Any],
dst_file: Optional[IO[Any]],
checksum: "hashlib._Hash",
buffersize: int,
) -> str:
"""Calculate the checksum based on the content."""
while contents != b"":
if dst_file is not None:
dst_file.write(contents)
Expand Down
9 changes: 6 additions & 3 deletions cwltool/cwlprov/provenance_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from ..loghandler import _logger
from ..process import Process, shortname
from ..stdfsaccess import StdFsAccess
from ..utils import CWLObjectType, JobsType, get_listing, posix_path, versionstring
from ..utils import CWLObjectType, JobsType, posix_path, versionstring
from ..workflow_job import WorkflowJob
from .provenance_constants import (
ACCOUNT_UUID,
Expand Down Expand Up @@ -243,6 +243,7 @@ def evaluate(
# record provenance of workflow executions
self.prospective_prov(job)
customised_job = copy_job_order(job, job_order_object)
# Note to self: Listing goes ok here
self.used_artefacts(customised_job, self.workflow_run_uri)

def record_process_start(
Expand Down Expand Up @@ -287,6 +288,7 @@ def record_process_end(
process_run_id: str,
outputs: Union[CWLObjectType, MutableSequence[CWLObjectType], None],
when: datetime.datetime,
# load_listing: None,
) -> None:
self.generate_output_prov(outputs, process_run_id, process_name)
self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when)
Expand Down Expand Up @@ -408,8 +410,8 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:
# a later call to this method will sort that
is_empty = True

if "listing" not in value:
get_listing(self.fsaccess, value)
# if "listing" not in value:
# get_listing(self.fsaccess, value)
for entry in cast(MutableSequence[CWLObjectType], value.get("listing", [])):
is_empty = False
# Declare child-artifacts
Expand Down Expand Up @@ -604,6 +606,7 @@ def used_artefacts(
job_order: Union[CWLObjectType, List[CWLObjectType]],
process_run_id: str,
name: Optional[str] = None,
load_listing=None,
) -> None:
"""Add used() for each data artefact."""
if isinstance(job_order, list):
Expand Down
51 changes: 42 additions & 9 deletions cwltool/cwlprov/ro.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@
posix_path,
versionstring,
)
from . import Aggregate, Annotation, AuthoredBy, _valid_orcid, _whoami, checksum_copy
from . import (
Aggregate,
Annotation,
AuthoredBy,
_valid_orcid,
_whoami,
checksum_copy,
checksum_only,
)
from .provenance_constants import (
ACCOUNT_UUID,
CWLPROV_VERSION,
Expand Down Expand Up @@ -66,6 +74,8 @@ def __init__(
temp_prefix_ro: str = "tmp",
orcid: str = "",
full_name: str = "",
no_data: bool = False,
no_input: bool = False,
) -> None:
"""Initialize the ResearchObject."""
self.temp_prefix = temp_prefix_ro
Expand All @@ -88,6 +98,8 @@ def __init__(
self.cwltool_version = f"cwltool {versionstring().split()[-1]}"
self.has_manifest = False
self.relativised_input_object: CWLObjectType = {}
self.no_data = no_data
self.no_input = no_input

self._initialize()
_logger.debug("[provenance] Temporary research object: %s", self.folder)
Expand Down Expand Up @@ -180,13 +192,22 @@ def add_tagfile(self, path: str, timestamp: Optional[datetime.datetime] = None)
# Below probably OK for now as metadata files
# are not too large..?

checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1)
if self.no_input:
_logger.debug("NO INPUT DATA TO BE CAPTURED!!!")

checksums[SHA1] = checksum_only(tag_file, hasher=hashlib.sha1)
tag_file.seek(0)
checksums[SHA256] = checksum_only(tag_file, hasher=hashlib.sha256)
tag_file.seek(0)
checksums[SHA512] = checksum_only(tag_file, hasher=hashlib.sha512)
else:
checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1)

tag_file.seek(0)
checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256)
tag_file.seek(0)
checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256)

tag_file.seek(0)
checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512)
tag_file.seek(0)
checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512)

rel_path = posix_path(os.path.relpath(path, self.folder))
self.tagfiles.add(rel_path)
Expand Down Expand Up @@ -469,10 +490,14 @@ def add_data_file(
content_type: Optional[str] = None,
) -> str:
"""Copy inputs to data/ folder."""
# TODO Skip if no-input or no-data is used...?
self.self_check()
tmp_dir, tmp_prefix = os.path.split(self.temp_prefix)
with tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir, delete=False) as tmp:
checksum = checksum_copy(from_fp, tmp)
if self.no_data:
checksum = checksum_only(from_fp)
else:
checksum = checksum_copy(from_fp, tmp)

# Calculate hash-based file path
folder = os.path.join(self.folder, DATA, checksum[0:2])
Expand All @@ -493,7 +518,12 @@ def add_data_file(
_logger.warning("[provenance] Unknown hash method %s for bagit manifest", Hasher)
# Inefficient, bagit support need to checksum again
self._add_to_bagit(rel_path)
_logger.debug("[provenance] Added data file %s", path)
if "dir" in self.relativised_input_object:
_logger.debug(
"[provenance] Directory :%s", self.relativised_input_object["dir"]["basename"]
)
else:
_logger.debug("[provenance] Added data file %s", path)
if timestamp is not None:
createdOn, createdBy = self._self_made(timestamp)
self._file_provenance[rel_path] = cast(
Expand Down Expand Up @@ -557,7 +587,10 @@ def _add_to_bagit(self, rel_path: str, **checksums: str) -> None:
checksums = dict(checksums)
with open(lpath, "rb") as file_path:
# FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile?
checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1)
if self.data_option:
checksums[SHA1] = checksum_only(file_path, hasher=hashlib.sha1)
else:
checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1)

self.add_to_manifest(rel_path, checksums)

Expand Down
4 changes: 3 additions & 1 deletion cwltool/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ def check_for_abstract_op(tool: CWLObjectType) -> None:
):
process_run_id: Optional[str] = None
name = "primary"
process.parent_wf.generate_output_prov(self.final_output[0], process_run_id, name)
process.parent_wf.generate_output_prov(
self.final_output[0], process_run_id, name
) # Note to self... # , "generate_output_prov")
process.parent_wf.document.wasEndedBy(
process.parent_wf.workflow_run_uri,
None,
Expand Down
6 changes: 5 additions & 1 deletion cwltool/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,10 @@ def _execute(
and isinstance(job_order, (list, dict))
):
runtimeContext.prov_obj.used_artefacts(
job_order, runtimeContext.process_run_id, str(self.name)
job_order,
runtimeContext.process_run_id,
str(self.name),
load_listing=self.builder.loadListing,
)
else:
_logger.warning(
Expand Down Expand Up @@ -411,6 +414,7 @@ def stderr_stdout_log_path(
runtimeContext.process_run_id,
outputs,
datetime.datetime.now(),
# builder.loadListing # TODO FIX THIS
)
if processStatus != "success":
_logger.warning("[job %s] completed %s", self.name, processStatus)
Expand Down
25 changes: 22 additions & 3 deletions cwltool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import signal
import subprocess # nosec
import sys
import tempfile
import time
import urllib
import warnings
Expand Down Expand Up @@ -693,6 +694,8 @@ def setup_provenance(
temp_prefix_ro=args.tmpdir_prefix,
orcid=args.orcid,
full_name=args.cwl_full_name,
no_data=args.no_data,
no_input=args.no_input,
)
runtimeContext.research_obj = ro
log_file_io = open_log_file_for_activity(ro, ro.engine_uuid)
Expand Down Expand Up @@ -1138,12 +1141,28 @@ def main(
print(f"{args.workflow} is valid CWL.", file=stdout)
return 0

if args.print_rdf:
if args.print_rdf or args.provenance:
output = stdout
if args.provenance:
# Write workflow to temp directory
temp_workflow_dir = tempfile.TemporaryDirectory()
os.makedirs(temp_workflow_dir.name, exist_ok=True)
workflow_provenance = temp_workflow_dir.name + "/workflow.ttl"
# Sets up a turtle file for the workflow information
# (not yet in the provenance folder as it does
# not exist and creating it will give issues).
output = open(workflow_provenance, "w")
_logger.info("Writing workflow rdf to %s", workflow_provenance)
print(
printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer),
file=stdout,
file=output,
)
return 0
# close the output
if args.provenance:
output.close()
# Only print_rdf exits this way
if args.print_rdf:
return 0

if args.print_dot:
printdot(tool, loadingContext.loader.ctx, stdout)
Expand Down
Loading