Skip to content

Commit

Permalink
Fixes input/output keys
Browse files Browse the repository at this point in the history
  • Loading branch information
tainagdcoleman committed Sep 13, 2024
2 parents 42f3ec0 + 3e761e7 commit 022fcec
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 20 deletions.
9 changes: 9 additions & 0 deletions wfcommons/common/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,18 @@ def write_json(self, json_file_path: Optional[pathlib.Path] = None) -> None:
machines_list.append(machine.name)
workflow_machines.append(machine.as_dict())

# add files to the workflow json object (input and output)
for file in task.input_files:
files.append(file.as_dict())
for file in task.output_files:
files.append(file.as_dict())

if workflow_machines:
workflow_json["workflow"]["execution"]["machines"] = workflow_machines

if files and len(files) > 0:
workflow_json["workflow"]["specification"]["files"] = files

# write to file
if not json_file_path:
json_file_path = pathlib.Path(f"{self.name.lower()}.json")
Expand Down
2 changes: 1 addition & 1 deletion wfcommons/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

import warnings
warnings.filterwarnings('ignore')

Expand All @@ -25,6 +24,7 @@
from typing import Any, Dict, Optional, List, Tuple



class NoValue(Enum):
def __repr__(self):
return '<%s.%s>' % (self.__class__.__name__, self.name)
Expand Down
2 changes: 1 addition & 1 deletion wfcommons/wfchef/chef.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def uninstall_recipe(module_name:str,
Uninstalls a recipe installed in the system.
"""

dst = pathlib.Path(this_dir.joinpath(f"recipes/{savedir.stem}")).resolve()
dst = f"wfcommons.wfchef.recipe.{savedir.stem}"
try:
subprocess.run(["pip", "uninstall", "-y", dst])
traceback.print_exc()
Expand Down
21 changes: 13 additions & 8 deletions wfcommons/wfgen/abstract_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ def _generate_task_files(self, task: Task) -> List[File]:

# generate output files
output_files_list = self._generate_files(task.task_id, task_recipe['output'], FileLink.OUTPUT)
task.files = self.tasks_files[task.task_id]
task.output_files = self.tasks_files[task.task_id]

# obtain input files from parents
input_files = []

if task.name in self.tasks_parents.keys():
for parent_task_name in self.tasks_parents[task.name]:
output_files = self._generate_task_files(self.tasks_map[parent_task_name])
Expand All @@ -215,14 +216,16 @@ def _generate_task_files(self, task: Task) -> List[File]:
input_files.extend(output_files)

for input_file in input_files:
if input_file.name not in self.tasks_files_names[task.task_id]:
self.tasks_files[task.task_id].append(File(name=input_file.name,
link=FileLink.INPUT,
size=input_file.size))
self.tasks_files_names[task.task_id].append(input_file.name)
if input_file not in self.tasks_files_names[task.task_id]:
self.tasks_files[task.task_id].append(File(name=input_file,
link=FileLink.INPUT,
size=input_file.size))
self.tasks_files_names[task.task_id].append(input_file)

# generate additional input files
self._generate_files(task.task_id, task_recipe['input'], FileLink.INPUT)
task.input_files = [ifile for ifile in self.tasks_files[task.task_id] if ifile.link == FileLink.INPUT]


return output_files_list

Expand All @@ -244,9 +247,9 @@ def _generate_files(self, task_id: str, recipe: Dict[str, Any], link: FileLink)
extension_list: List[str] = []
for f in self.tasks_files[task_id]:
if f.link == link:
extension_list.append(path.splitext(f.file_id)[1] if '.' in f.file_id else f.file_id)
files_list.append(f)
extension_list.append(path.splitext(f.name)[1] if '.' in f.name else f.name)


for extension in recipe:
if extension not in extension_list:
file = self._generate_file(extension, recipe, link)
Expand Down Expand Up @@ -274,10 +277,12 @@ def _generate_file(self, extension: str, recipe: Dict[str, Any], link: FileLink)
else self.output_file_size_factor) * generate_rvs(recipe[extension]['distribution'],
recipe[extension]['min'],
recipe[extension]['max']))

return File(file_id=str(uuid.uuid4()) + extension,
link=link,
size=size)


def _get_files_by_task_and_link(self, task_id: str, link: FileLink) -> List[File]:
"""
Get the list of files for a task ID and link type.
Expand Down
25 changes: 15 additions & 10 deletions wfcommons/wfinstances/instance_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import math
import numpy
import scipy.stats
import warnings

from logging import Logger
from matplotlib import pyplot
Expand Down Expand Up @@ -105,26 +106,29 @@ def build_summary(self,
for task in self.tasks_summary[task_name]:
runtime_list.append(task.runtime)


# For each input_file and output_file, append the file size to the dictionary
for infile in task.input_files:
extension: str = path.splitext(infile.file_id)[1] if '.' in infile.file_id else infile.file_id

if extension[1:].isnumeric():
extension = path.splitext(infile.file_id.replace(extension, ''))[1]

# Check if the file is definetly an input
assert infile.link == FileLink.INPUT, f"{infile.file_id} is not set as input"
_append_file_to_dict(extension, inputs_dict, infile.size)


for outfile in task.output_files:
extension: str = path.splitext(outfile.file_id)[1] if '.' in outfile.file_id else outfile.file_id
# print(f"file {outfile.file_id} extension: {extension}")
if extension[1:].isnumeric():
extension = path.splitext(outfile.file_id.replace(extension, ''))[1]

# Check if the file is definetly an output
assert outfile.link == FileLink.OUTPUT, f"{outfile.file_id} is not set as output"
_append_file_to_dict(extension, outputs_dict, outfile.size)


# Find the best fit distribution for each file type
_best_fit_distribution_for_file(inputs_dict, include_raw_data)
_best_fit_distribution_for_file(outputs_dict, include_raw_data)
Expand Down Expand Up @@ -197,7 +201,6 @@ def _append_file_to_dict(extension: str, dict_obj: Dict[str, Any], file_size: in
dict_obj[extension] = {'data': [], 'distribution': None}
dict_obj[extension]['data'].append(file_size)


def _best_fit_distribution_for_file(dict_obj, include_raw_data) -> None:
"""
Find the best fit distribution for a file.
Expand All @@ -207,14 +210,16 @@ def _best_fit_distribution_for_file(dict_obj, include_raw_data) -> None:
:param include_raw_data:
:type include_raw_data: bool
"""
for ext in dict_obj:
dict_obj[ext]['min'] = min(dict_obj[ext]['data'])
dict_obj[ext]['max'] = max(dict_obj[ext]['data'])
if dict_obj[ext]['min'] != dict_obj[ext]['max']:
dict_obj[ext]['distribution'] = _json_format_distribution_fit(
best_fit_distribution(dict_obj[ext]['data']))
if not include_raw_data:
del dict_obj[ext]['data']
with warnings.catch_warnings():
warnings.simplefilter("ignore")
for ext in dict_obj:
dict_obj[ext]['min'] = min(dict_obj[ext]['data'])
dict_obj[ext]['max'] = max(dict_obj[ext]['data'])
if dict_obj[ext]['min'] != dict_obj[ext]['max']:
dict_obj[ext]['distribution'] = _json_format_distribution_fit(
best_fit_distribution(dict_obj[ext]['data']))
if not include_raw_data:
del dict_obj[ext]['data']


def _json_format_distribution_fit(dist_tuple: Tuple) -> Dict[str, Any]:
Expand Down

0 comments on commit 022fcec

Please sign in to comment.