Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate input annotations to primary.cwlprov files #1678

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion cwltool/provenance_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def copy_job_order(
return customised_job



class ProvenanceProfile:
"""
Provenance profile.
Expand Down Expand Up @@ -247,6 +248,18 @@ def evaluate(
self.prospective_prov(job)
customised_job = copy_job_order(job, job_order_object)
self.used_artefacts(customised_job, self.workflow_run_uri)
# if CWLPROV['prov'].uri in job_order_object: # maybe move this to another place
# metadata = job_order_object[CWLPROV['prov'].uri] # change uri to CWLPROV['prov'].uri
# for item in metadata:
# # make a new entity with id
# # give it type additionalType value
# # add nested annotations
# # how much of this can we reuse from _add_nested_annotations?
# # how do we identify the correct file to write to? self.workflow_run_uri?
# #
# pass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First step in propagating metadata under cwlprov:prov to provenance as well.



def record_process_start(
self, process: Process, job: JobsType, process_run_id: Optional[str] = None
Expand Down Expand Up @@ -296,6 +309,27 @@ def record_process_end(
self.generate_output_prov(outputs, process_run_id, process_name)
self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when)

def _add_nested_annotations(self, annotation_key, annotation_value, e: ProvEntity) -> ProvEntity:
"""Propagate input data annotations to provenance."""
# Change https:// into http:// first
schema2_uri = "https://schema.org/"
if schema2_uri in annotation_key:
annotation_key = SCHEMA[annotation_key.replace(schema2_uri, '')].uri

if not isinstance(annotation_value, (MutableSequence, MutableMapping)):
e.add_attributes({annotation_key: str(annotation_value)})
elif isinstance(annotation_value, MutableSequence):
for item_value in annotation_value:
e = self._add_nested_annotations(annotation_key, item_value, e)
else:
nested_id = uuid.uuid4().urn
nested_entity = self.document.entity(nested_id)
e.add_attributes({annotation_key: nested_entity.identifier})
for nested_key in annotation_value:
nested_value = annotation_value[nested_key]
RenskeW marked this conversation as resolved.
Show resolved Hide resolved
nested_entity = self._add_nested_annotations(nested_key, nested_value, nested_entity)
return e

def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, str]:
if value["class"] != "File":
raise ValueError("Must have class:File: %s" % value)
Expand Down Expand Up @@ -350,6 +384,21 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st
file_entity.add_attributes({CWLPROV["nameext"]: value["nameext"]})
self.document.specializationOf(file_entity, entity)

# Identify all schema annotations
schema_annotations = dict([(v, value[v]) for v in value.keys() if 'schema.org' in v])
Fixed Show fixed Hide fixed

# Transfer SCHEMA annotations to provenance
for s in schema_annotations:
if "additionalType" in s:
additional_type = schema_annotations[s].split(sep='/')[-1] # find better method?
RenskeW marked this conversation as resolved.
Show resolved Hide resolved
file_entity.add_attributes( {PROV_TYPE: SCHEMA[additional_type]})
else:
file_entity = self._add_nested_annotations(s, schema_annotations[s], file_entity)

# Transfer format annotations to provenance:
if "format" in value:
file_entity.add_attributes({SCHEMA["encodingFormat"]: value["format"]})

# Check for secondaries
for sec in cast(
MutableSequence[CWLObjectType], value.get("secondaryFiles", [])
Expand Down Expand Up @@ -395,6 +444,7 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:
(PROV_TYPE, RO["Folder"]),
],
)

# ORE description of ro:Folder, saved separately
coll_b = dir_bundle.entity(
dir_id,
Expand Down Expand Up @@ -454,7 +504,18 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:

coll.add_attributes(coll_attribs)
coll_b.add_attributes(coll_b_attribs)


# Identify all schema annotations
schema_annotations = dict([(v, value[v]) for v in value.keys() if 'schema.org' in v])
Fixed Show fixed Hide fixed

# Transfer SCHEMA annotations to provenance
for s in schema_annotations:
if "additionalType" in s:
additional_type = schema_annotations[s].split(sep='/')[-1] # find better method?
coll.add_attributes( {PROV_TYPE: SCHEMA[additional_type]})
elif "hasPart" not in s:
coll = self._add_nested_annotations(s, schema_annotations[s], coll)

# Also Save ORE Folder as annotation metadata
ore_doc = ProvDocument()
ore_doc.add_namespace(ORE)
Expand Down