diff --git a/wfcommons/common/workflow.py b/wfcommons/common/workflow.py index 36c2ebe..3031498 100644 --- a/wfcommons/common/workflow.py +++ b/wfcommons/common/workflow.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # -# Copyright (c) 2020-2023 The WfCommons Team. +# Copyright (c) 2020-2024 The WfCommons Team. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -15,7 +15,7 @@ from datetime import datetime from typing import Optional, List -from ..common.task import Task +from ..common.task import Task, TaskType from ..version import __version__ from ..wfchef.utils import create_graph @@ -172,6 +172,27 @@ def write_dot(self, dot_file_path: Optional[pathlib.Path] = None) -> None: dot_file_path = pathlib.Path(f"{self.name.lower()}.dot") nx.nx_agraph.write_dot(self, dot_file_path) + def read_dot(self, dot_file_path: Optional[pathlib.Path] = None) -> None: + """ + Read a dot file of the workflow instance. + + :param dot_file_path: DOT input file name. + :type dot_file_path: Optional[pathlib.Path] + """ + if not dot_file_path: + raise FileNotFoundError(f"Not able to find the dot file: {dot_file_path}.") + graph = nx.drawing.nx_pydot.read_dot(dot_file_path) + + tasks_map = {} + for node in graph.nodes(data=True): + task_name = f"{node[1]['label']}_{node[0]}" + task = Task(name=task_name, task_type=TaskType.COMPUTE, runtime=0, task_id=node[0]) + self.add_task(task) + tasks_map[node[0]] = task_name + + for edge in graph.edges: + self.add_dependency(tasks_map[edge[0]], tasks_map[edge[1]]) + def to_nx_digraph(self) -> nx.DiGraph: with tempfile.NamedTemporaryFile() as temp: self.write_json(pathlib.Path(temp.name))