From 25b3ddb12870cd5498275cd6f0226c04f27c7e66 Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Mon, 18 Dec 2023 12:58:39 +0100 Subject: [PATCH] store label & doc fields as prospective provenance TODO: fix intent list add/amend tests --- cwltool/cwlprov/provenance_profile.py | 52 +++++++++++++++------------ 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/cwltool/cwlprov/provenance_profile.py b/cwltool/cwlprov/provenance_profile.py index c8ceee232..15f1193f3 100644 --- a/cwltool/cwlprov/provenance_profile.py +++ b/cwltool/cwlprov/provenance_profile.py @@ -51,9 +51,14 @@ ) from .writablebagfile import create_job, write_bag_file # change this later +# from schema_salad.utils import convert_to_dict + + if TYPE_CHECKING: from .ro import ResearchObject +_attributes_type = Dict[str | Identifier, Any] + def copy_job_order(job: Union[Process, JobsType], job_order_object: CWLObjectType) -> CWLObjectType: """Create copy of job object for provenance.""" @@ -235,13 +240,13 @@ def evaluate( """Evaluate the nature of job.""" if not hasattr(process, "steps"): # record provenance of independent commandline tool executions - self.prospective_prov(job) + self.prospective_prov(job, process) customised_job = copy_job_order(job, job_order_object) self.used_artefacts(customised_job, self.workflow_run_uri) create_job(research_obj, customised_job) elif hasattr(job, "workflow"): # record provenance of workflow executions - self.prospective_prov(job) + self.prospective_prov(job, process) customised_job = copy_job_order(job, job_order_object) self.used_artefacts(customised_job, self.workflow_run_uri) # if CWLPROV['prov'].uri in job_order_object: # maybe move this to another place @@ -734,35 +739,38 @@ def generate_output_prov( entity, process_run_id, timestamp, None, {"prov:role": role} ) - def prospective_prov(self, job: JobsType) -> None: + def prospective_prov(self, job: JobsType, process: Process) -> None: """Create prospective prov recording as wfdesc prov:Plan.""" + prov_items: _attributes_type = { + PROV_TYPE: WFDESC["Workflow"] if isinstance(job, WorkflowJob) else WFDESC["Process"], + "prov:type": PROV["Plan"], + "prov:label": "Prospective provenance", + } + if "doc" in process.tool: + prov_items["schema:description"] = process.tool["doc"] + if "label" in process.tool: + prov_items["schema:name"] = process.tool["label"] + # # TypeError: unhashable type: 'list' + # if "intent" in process.tool: + # prov_items["schema:featureList"] = convert_to_dict(process.tool["intent"]) + self.document.entity("wf:main", prov_items) if not isinstance(job, WorkflowJob): - # direct command line tool execution - self.document.entity( - "wf:main", - { - PROV_TYPE: WFDESC["Process"], - "prov:type": PROV["Plan"], - "prov:label": "Prospective provenance", - }, - ) return - self.document.entity( - "wf:main", - { - PROV_TYPE: WFDESC["Workflow"], - "prov:type": PROV["Plan"], - "prov:label": "Prospective provenance", - }, - ) - for step in job.steps: stepnametemp = "wf:main/" + str(step.name)[5:] stepname = urllib.parse.quote(stepnametemp, safe=":/,#") + provstep_items: _attributes_type = { + PROV_TYPE: WFDESC["Process"], + "prov:type": PROV["Plan"], + } + if "doc" in step.tool: + provstep_items["schema:description"] = step.tool["doc"] + if "label" in step.tool: + provstep_items["schema:name"] = step.tool["label"] provstep = self.document.entity( stepname, - {PROV_TYPE: WFDESC["Process"], "prov:type": PROV["Plan"]}, + provstep_items, ) self.document.entity( "wf:main",