Skip to content

Commit

Permalink
Merge pull request #63 from LightForm-group/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
aplowman authored Dec 16, 2020
2 parents 4b008c7 + f8da1ab commit 4e32973
Show file tree
Hide file tree
Showing 26 changed files with 1,919 additions and 475 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,6 @@ dmypy.json
# VS Code
/.vscode
*.code-workspace

# Intellij IDEs
/.idea
24 changes: 23 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,32 @@
# Change Log

## [0.x.xx] - xxxx.xx.xx
## [0.2.12] - 2020.12.16

### Added

- Add `Workflow.figures` attribute for storing associated figure definitions.
- Add `Workflow.metadata` attribute for storing arbitrary metadata (will later be used for Zenodo archiving).
- Add various `Workflow` static methods to help with retrieving information in the viewer without loading the whole workflow via `hickle`.
- Add `get_task_schemas` to API to load the available task schemas without generating a workflow.
- Add `refresh` bool parameter to `Config.set_config`, to force a reload of the configuration.
- Support inputs as dependencies as well as outputs.
- Support "parameter modifying" tasks (a task which outputs a parameter that is also an input to that task).
- Add `iterate_run_options` to Workflow.
- Add new methods for finding dependent and dependency tasks/parameters, upstream/downstream parameter values associated with a given element.
- Add input option: `include_all_iterations`. If True, inputs from all iterations are passed to input map functions.

### Fixed

- Only save input/output map files if they exist!
- Fix bug in propagating groups correctly
- Various code formatting issues
- Fix failure to raise on invalid schemas.
- Fix bug when the same file is to be saved from multiple output maps.

### Changed
- Redo task sorting algorithm such that minimal ordering changes are made.
- Set `stats` bool to False by default.
- Bump hpcflow version to v0.1.12.

## [0.2.11] - 2020.09.29

Expand Down
1 change: 1 addition & 0 deletions matflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
append_schema_source,
prepend_schema_source,
validate,
get_task_schemas,
)
2 changes: 1 addition & 1 deletion matflow/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.2.11'
__version__ = '0.2.12'
57 changes: 41 additions & 16 deletions matflow/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@

import copy
from pathlib import Path
from pprint import pprint

import pyperclip
from ruamel.yaml import YAML
from hpcflow import kill as hpcflow_kill
from hpcflow import cloud_connect as hpcflow_cloud_connect

Expand Down Expand Up @@ -47,6 +45,13 @@ def make_workflow(profile_path, directory=None, write_dirs=True):
profile_str = handle.read()

profile = {'file': profile_str, 'parsed': copy.deepcopy(workflow_dict)}

iterate_run_opts = {
**Config.get('default_sticky_iterate_run_options'),
**Config.get('default_iterate_run_options'),
}
workflow_dict.update({'iterate_run_options': iterate_run_opts})

workflow = Workflow(**workflow_dict, stage_directory=directory, profile=profile)
workflow.set_ids()

Expand Down Expand Up @@ -103,55 +108,57 @@ def load_workflow(directory, full_path=False):
return workflow


def prepare_task(task_idx, directory, is_array=False):
'Prepare a task for execution by setting inputs and running input maps.'
def prepare_task(task_idx, iteration_idx, directory, is_array=False):
"""Prepare a task (iteration) for execution by setting inputs and running input
maps."""

load_extensions()
workflow = load_workflow(directory)
workflow.prepare_task(task_idx, is_array=is_array)
workflow.prepare_task(task_idx, iteration_idx, is_array=is_array)


def prepare_task_element(task_idx, element_idx, directory, is_array=False):
'Prepare a task element for execution by setting inputs and running input maps.'
"""Prepare a task element for execution by setting inputs and running input maps."""
load_extensions()
workflow = load_workflow(directory)
workflow.prepare_task_element(task_idx, element_idx, is_array=is_array)


def process_task(task_idx, directory, is_array=False):
'Process a completed task by running the output map.'
def process_task(task_idx, iteration_idx, directory, is_array=False):
"""Process a completed task (iteration) by running the output map."""
load_extensions()
workflow = load_workflow(directory)
workflow.process_task(task_idx, is_array=is_array)
workflow.process_task(task_idx, iteration_idx, is_array=is_array)


def process_task_element(task_idx, element_idx, directory, is_array=False):
'Process a task element for execution by running output maps and saving outputs.'
"""Process a task element for execution by running output maps and saving outputs."""
load_extensions()
workflow = load_workflow(directory)
workflow.process_task_element(task_idx, element_idx, is_array=is_array)


def run_python_task(task_idx, element_idx, directory):
'Run a (commandless) Python task.'
"""Run a (commandless) Python task."""
load_extensions()
workflow = load_workflow(directory)
workflow.run_python_task(task_idx, element_idx)


def prepare_sources(task_idx, directory):
'Prepare source files.'
def prepare_sources(task_idx, iteration_idx, directory):
"""Prepare source files."""
load_extensions()
workflow = load_workflow(directory)
workflow.prepare_sources(task_idx)
workflow.prepare_sources(task_idx, iteration_idx)


def append_schema_source(schema_source_path):
'Add a task schema source file to the end of the schema source list.'
"""Add a task schema source file to the end of the schema source list."""
Config.append_schema_source(schema_source_path)


def prepend_schema_source(schema_source_path):
'Add a task schema source file to the front of the schema source list.'
"""Add a task schema source file to the front of the schema source list."""
Config.prepend_schema_source(schema_source_path)


Expand All @@ -167,3 +174,21 @@ def kill(directory):
def cloud_connect(provider):
Config.set_config()
hpcflow_cloud_connect(provider, config_dir=Config.get('hpcflow_config_dir'))


def write_element_directories(iteration_idx, directory):
'Generate element directories for a given iteration.'
load_extensions()
workflow = load_workflow(directory)
if workflow.iterate:
num_iters = workflow.iterate['num_iterations']
else:
num_iters = workflow.num_iterations
if iteration_idx < num_iters:
workflow.write_element_directories(iteration_idx)
workflow.prepare_iteration(iteration_idx)


def get_task_schemas():
Config.set_config()
return Config.get('task_schemas')
29 changes: 18 additions & 11 deletions matflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
Module that exposes a command line interface for `matflow`.
"""

from pathlib import Path

import click

from matflow import __version__
Expand Down Expand Up @@ -38,11 +35,12 @@ def go(workflow_path, directory=None):

@cli.command()
@click.option('--task-idx', '-t', type=click.INT, required=True)
@click.option('--iteration-idx', '-i', type=click.INT, required=True)
@click.option('--directory', '-d', type=click.Path(exists=True))
@click.option('--array', is_flag=True)
def prepare_task(task_idx, directory=None, array=False):
def prepare_task(task_idx, iteration_idx, directory=None, array=False):
print('matflow.cli.prepare_task', flush=True)
api.prepare_task(task_idx, directory, is_array=array)
api.prepare_task(task_idx, iteration_idx, directory, is_array=array)


@cli.command()
Expand All @@ -57,11 +55,12 @@ def prepare_task_element(task_idx, element_idx, directory=None, array=False):

@cli.command()
@click.option('--task-idx', '-t', type=click.INT, required=True)
@click.option('--iteration-idx', '-i', type=click.INT, required=True)
@click.option('--directory', '-d', type=click.Path(exists=True))
@click.option('--array', is_flag=True)
def process_task(task_idx, directory=None, array=False):
def process_task(task_idx, iteration_idx, directory=None, array=False):
print('matflow.cli.process_task', flush=True)
api.process_task(task_idx, directory, is_array=array)
api.process_task(task_idx, iteration_idx, directory, is_array=array)


@cli.command()
Expand All @@ -85,10 +84,11 @@ def run_python_task(task_idx, element_idx, directory=None):

@cli.command()
@click.option('--task-idx', '-t', type=click.INT, required=True)
@click.option('--iteration-idx', '-i', type=click.INT, required=True)
@click.option('--directory', '-d', type=click.Path(exists=True))
def prepare_sources(task_idx, directory=None):
def prepare_sources(task_idx, iteration_idx, directory=None):
print('matflow.cli.prepare_sources', flush=True)
api.prepare_sources(task_idx, directory)
api.prepare_sources(task_idx, iteration_idx, directory)


@cli.command()
Expand All @@ -105,7 +105,7 @@ def prepend_schema_source(schema_source_path):

@cli.command()
def validate():
'Load and validate task schemas against available extensions.'
"""Load and validate task schemas against available extensions."""
api.validate()


Expand All @@ -118,9 +118,16 @@ def cloud_connect(provider):
@cli.command()
@click.argument('directory', type=click.Path(exists=True))
def kill(directory):
'Kill all pending and executing tasks.'
"""Kill all pending and executing tasks."""
api.kill(directory)


@cli.command()
@click.option('--iteration-idx', '-i', type=click.INT, required=True)
@click.option('--directory', '-d', type=click.Path(exists=True))
def write_element_directories(iteration_idx, directory=None):
api.write_element_directories(iteration_idx, directory)


if __name__ == '__main__':
cli()
24 changes: 14 additions & 10 deletions matflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from pathlib import Path
from warnings import warn

from hpcflow.config import Config as hpcflow_config
from ruamel.yaml import YAML, safe_load


Expand All @@ -19,9 +18,11 @@ class Config(object):
'default_run_options',
'default_preparation_run_options',
'default_processing_run_options',
'default_iterate_run_options',
'default_sticky_run_options',
'default_sticky_preparation_run_options',
'default_sticky_processing_run_options',
'default_sticky_iterate_run_options',
'parallel_modes',
'archive_locations',
]
Expand Down Expand Up @@ -107,21 +108,22 @@ def get_config_file(config_dir):
raise ConfigurationError(msg)

if 'software_sources' not in config_dat:
msg = (f'Missing `software_sources` from configuration file: {config_file}')
msg = f'Missing `software_sources` from configuration file: {config_file}'
raise ConfigurationError(msg)

return config_dat, config_file

@staticmethod
def set_config(config_dir=None, raise_on_set=False):
'Load configuration from a YAML file.'
def set_config(config_dir=None, raise_on_set=False, refresh=False):
"""Load configuration from a YAML file."""

config_dir = Config.resolve_config_dir(config_dir)

if Config._is_set:
if raise_on_set:
raise ConfigurationError('Configuration is already set.')
return
elif not refresh:
return

config_dat, _ = Config.get_config_file(config_dir)
schema_sources = [Path(i).expanduser() for i in config_dat['task_schema_sources']]
Expand Down Expand Up @@ -230,9 +232,11 @@ def set_config(config_dir=None, raise_on_set=False):
'default_run_options',
'default_preparation_run_options',
'default_processing_run_options',
'default_iterate_run_options',
'default_sticky_run_options',
'default_sticky_preparation_run_options',
'default_sticky_processing_run_options',
'default_sticky_iterate_run_options',
]:
Config.__conf[i] = config_dat.get(i, {})

Expand Down Expand Up @@ -275,7 +279,7 @@ def _get_software_safe(software_name):

@staticmethod
def _get_key_safe(key):
return (key[0], key[1], Config._get_software_safe(key[2]))
return key[0], key[1], Config._get_software_safe(key[2])

@staticmethod
def _validate_extension_setter():
Expand All @@ -295,7 +299,7 @@ def set_input_map(key, input_file, func):
if key not in Config.__conf['input_maps']:
Config.__conf['input_maps'].update({key: {}})
if input_file in Config.__conf['input_maps'][key]:
msg = (f'Input file name "{input_file}" already exists in the input map.')
msg = f'Input file name "{input_file}" already exists in the input map.'
raise MatflowExtensionError(msg)
Config.__conf['input_maps'][key][input_file] = func

Expand All @@ -306,7 +310,7 @@ def set_output_map(key, output_name, func):
if key not in Config.__conf['output_maps']:
Config.__conf['output_maps'].update({key: {}})
if output_name in Config.__conf['output_maps'][key]:
msg = (f'Output name "{output_name}" already exists in the output map.')
msg = f'Output name "{output_name}" already exists in the output map.'
raise MatflowExtensionError(msg)
Config.__conf['output_maps'][key][output_name] = func

Expand All @@ -315,7 +319,7 @@ def set_func_map(key, func):
if Config._validate_extension_setter():
key = Config._get_key_safe(key)
if key in Config.__conf['func_maps']:
msg = (f'Function map "{key}" already exists in the function map.')
msg = f'Function map "{key}" already exists in the function map.'
raise MatflowExtensionError(msg)
Config.__conf['func_maps'][key] = func

Expand Down Expand Up @@ -360,7 +364,7 @@ def set_output_file_map(key, file_reference, file_name):
Config.__conf['output_file_maps'].update({key: {}})
file_ref_full = '__file__' + file_reference
if file_ref_full in Config.__conf['output_file_maps'][key]:
msg = (f'File name "{file_name}" already exists in the output files map.')
msg = f'File name "{file_name}" already exists in the output files map.'
raise MatflowExtensionError(msg)
Config.__conf['output_file_maps'][key].update({file_ref_full: file_name})

Expand Down
Loading

0 comments on commit 4e32973

Please sign in to comment.