Skip to content

Commit

Permalink
Support for generating instances with WfFormat 1.5 (closes #43)
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelfsilva committed Sep 9, 2024
1 parent 76b1acb commit 8b1fdc9
Show file tree
Hide file tree
Showing 16 changed files with 351 additions and 286 deletions.
13 changes: 6 additions & 7 deletions wfcommons/common/file.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020-2023 The WfCommons Team.
# Copyright (c) 2020-2024 The WfCommons Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand All @@ -25,8 +25,8 @@ class FileLink(NoValue):
class File:
"""Representation of a file.
:param name: The name of the file.
:type name: str
:param file_id: The id of the file.
:type file_id: str
:param size: File size in bytes.
:type size: int
:param link: Type of file link.
Expand All @@ -35,11 +35,11 @@ class File:
:type logger: Optional[Logger]
"""

def __init__(self, name: str, size: int, link: FileLink, logger: Optional[Logger] = None) -> None:
def __init__(self, file_id: str, size: int, link: FileLink, logger: Optional[Logger] = None) -> None:
"""A file used by tasks."""
self.logger: Logger = logger if logger else logging.getLogger(__name__)

self.name: str = name
self.file_id: str = file_id
self.size: int = size
self.link: FileLink = link

Expand All @@ -50,7 +50,6 @@ def as_dict(self) -> Dict[str, Union[str, int, FileLink]]:
:rtype: Dict[str, Union[str, int, FileLink]]
"""
return {
'link': self.link.value,
'name': self.name,
'id': self.file_id,
'sizeInBytes': self.size
}
17 changes: 9 additions & 8 deletions wfcommons/common/machine.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020-2023 The WfCommons Team.
# Copyright (c) 2020-2024 The WfCommons Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -35,8 +35,9 @@ class Machine:
.. code-block:: python
cpu = {
'count': 48,
'speed': 1200
'coreCount': 48,
'speedInMHz': 1200,
'vendor': 'Vendor Name'
}
:type cpu: Dict[str, Union[int, str]]
Expand Down Expand Up @@ -73,9 +74,9 @@ def __init__(self,
self.release: str = release
self.hashcode = hashcode

self.cpu_cores: int = cpu['count']
self.cpu_speed: int = cpu['speed'] if 'speed' in cpu else 0
self.cpu_flops: int = cpu['count'] * cpu['speed'] * 10 ^ 6 if 'speed' in cpu else 0
self.cpu_cores: int = cpu['coreCount']
self.cpu_speed: int = cpu['speedInMHz'] if 'speedInMHz' in cpu else 0
self.cpu_flops: int = cpu['coreCount'] * cpu['speedInMHz'] * 10 ^ 6 if 'speedInMHz' in cpu else 0
self.cpu_vendor: str = cpu['vendor'] if 'vendor' in cpu else None

self.logger.debug(f"created machine: {self.name} with {self.cpu_cores} cores and {self.cpu_flops} FLOPS.")
Expand All @@ -96,9 +97,9 @@ def as_dict(self) -> Dict[str, Union[int, str]]:
if self.release:
machine['release'] = self.release
if self.cpu_cores:
machine['cpu'] = {'count': self.cpu_cores}
machine['cpu'] = {'coreCount': self.cpu_cores}
if self.cpu_speed:
machine['cpu']['speed'] = self.cpu_speed
machine['cpu']['speedInMHz'] = self.cpu_speed
if self.cpu_vendor:
machine['cpu']['vendor'] = self.cpu_vendor
return machine
93 changes: 51 additions & 42 deletions wfcommons/common/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,20 @@ class Task:
:param name: The name of the task.
:type name: str
:param task_type: The type of the task.
:type task_type: TaskType
:param task_id: Task unique ID (e.g., ID0000001).
:type task_id: str
:param runtime: Task runtime in seconds.
:type runtime: float
:param input_files: List of input files used by the task.
:type input_files: Optional[List[File]]
:param output_files: List of output files used by the task.
:type output_files: Optional[List[File]]
:param cores: Number of cores required by the task.
:type cores: float
:param task_id: Task unique ID (e.g., ID0000001).
:type task_id: Optional[str]
:param category: Task category (can be used, for example, to define tasks that use the same program).
:type category: Optional[str]
:param machine: Machine on which is the task has been executed.
:type machine: Optional[Machine]
:param machines: Machines on which is the task has been executed.
:type machines: Optional[List[Machine]]
:param program: Program name.
:type program: Optional[str]
:param args: List of task arguments.
Expand All @@ -61,20 +63,21 @@ class Task:
:type avg_power: Optional[float]
:param priority: Task priority.
:type priority: Optional[int]
:param files: List of input/output files used by the task.
:type files: Optional[List[File]]
:param task_type: The type of the task.
:type task_type: TaskType
:param logger: The logger where to log information/warning or errors.
:type logger: Optional[Logger]
"""

def __init__(self,
name: str,
task_type: TaskType,
task_id: str,
runtime: float,
input_files: Optional[List[File]] = None,
output_files: Optional[List[File]] = None,
cores: float = 1.0,
task_id: Optional[str] = None,
category: Optional[str] = None,
machine: Optional[Machine] = None,
machines: Optional[List[Machine]] = None,
program: Optional[str] = None,
args: Optional[List[str]] = None,
avg_cpu: Optional[float] = None,
Expand All @@ -84,19 +87,18 @@ def __init__(self,
energy: Optional[int] = None,
avg_power: Optional[float] = None,
priority: Optional[int] = None,
files: Optional[List[File]] = None,
logger: Optional[Logger] = None,
executedAt: Optional[str] = None,
task_type: Optional[TaskType] = None,
launch_dir: Optional[str] = None,
start_time: Optional[str] = None,
logger: Optional[Logger] = None,
) -> None:
"""A task in a workflow."""
self.logger: Logger = logging.getLogger(
__name__) if logger is None else logger
self.name: str = name
self.type: TaskType = task_type
self.task_id: str = task_id
self.runtime: float = runtime
self.cores: Optional[float] = cores
self.task_id: Optional[str] = task_id
self.category: Optional[str] = category
self.program: Optional[str] = program
self.args: List[str] = args if args else []
Expand All @@ -106,40 +108,45 @@ def __init__(self,
self.memory: Optional[int] = memory
self.energy: Optional[int] = energy
self.avg_power: Optional[float] = avg_power
self.files: List[File] = files if files else []
self.machine: Machine = machine
self.input_files: List[File] = input_files if input_files else []
self.output_files: List[File] = output_files if output_files else []
self.machines: Optional[List[Machine]] = machines
self.priority: Optional[int] = priority
self.type: Optional[TaskType] = task_type
self.launch_dir: Optional[str] = launch_dir
self.start_time: Optional[str] = str(datetime.now().astimezone().isoformat()) if not start_time else start_time
self.start_time: Optional[str] = str(datetime.now().astimezone().isoformat()) if not executedAt else executedAt
self.logger.debug(
f"created {self.type} task {self.name}: runtime => {self.runtime} seconds.")
f"created task {self.task_id}: runtime => {self.runtime} seconds.")

def as_dict(self) -> Dict:
"""A JSON representation of the task.
def specification_as_dict(self) -> Dict:
"""A JSON representation of the task specification.
:return: A JSON object representation of the task.
:rtype: Dict
"""
task_files = []
for f in self.files:
task_files.append(f.as_dict())

task_obj = {
'name': self.name,
'type': self.type.value,
'command': {},
'id': self.task_id,
'parents': [],
'children': [],
'files': task_files,
'input_files': [f.file_id for f in self.input_files],
'output_files': [f.file_id for f in self.output_files]
}
return task_obj

def execution_as_dict(self) -> Dict:
"""A JSON representation of the task execution.
:return: A JSON object representation of the task.
:rtype: Dict
"""
task_obj = {
'id': self.task_id,
'runtimeInSeconds': self.runtime,
'command': {}
}
if self.runtime is not None:
task_obj['runtimeInSeconds'] = self.runtime
if self.cores is not None:
task_obj['cores'] = self.cores
if self.task_id is not None:
task_obj['id'] = self.task_id
if self.category is not None:
task_obj['category'] = self.category
task_obj['coreCount'] = self.cores
if self.avg_cpu is not None:
task_obj['avgCPU'] = self.avg_cpu
if self.bytes_read is not None:
Expand All @@ -149,19 +156,21 @@ def as_dict(self) -> Dict:
if self.memory is not None:
task_obj['memoryInBytes'] = self.memory
if self.energy is not None:
task_obj['energy'] = self.energy
task_obj['energyInKWh'] = self.energy
if self.avg_power is not None:
task_obj['avgPower'] = self.avg_power
task_obj['avgPowerInKWh'] = self.avg_power
if self.priority is not None:
task_obj['priority'] = self.priority
if self.program is not None:
task_obj['command']['program'] = self.program
if self.args is not None:
task_obj['command']['arguments'] = self.args
if self.machine is not None:
task_obj['machine'] = self.machine.name
if self.machines is not None:
task_obj['machines'] = [m.name for m in self.machines]
if self.start_time:
task_obj['executedAt'] = self.start_time
if self.category is not None:
task_obj['category'] = self.category
if self.launch_dir:
task_obj['launchDir'] = self.launch_dir
if self.start_time:
task_obj['startedAt'] = self.start_time
return task_obj
Loading

0 comments on commit 8b1fdc9

Please sign in to comment.