Skip to content

Commit

Permalink
Merge pull request PolusAI#215 from jfennick/doc_update2
Browse files Browse the repository at this point in the history
Doc update2
  • Loading branch information
jfennick authored Apr 23, 2024
2 parents 498f2a2 + 2b12d21 commit f8cfd9d
Show file tree
Hide file tree
Showing 14 changed files with 109 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ label: Speculatively executes an arbitrary CommandLineTool (upto max_times) by f
doc: |-
Speculatively executes an arbitrary CommandLineTool (upto max_times) by file watching / polling --cachedir. This is primarily intended for parsing logfiles before the associated CWL process has finished.

baseCommand: cwl_watcher
baseCommand: cwl_subinterpreter

inputs:
cachedir_path:
Expand Down Expand Up @@ -78,7 +78,7 @@ inputs:
prefix: --root_workflow_yml_path

homedir:
label: The full absolute path to the uers home directory.
label: The full absolute path to the users home directory.
doc: |
The full absolute path to the root users home directory.
type: string
Expand Down
8 changes: 4 additions & 4 deletions docs/advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ To use static dispatch, first you need to create a YAML file which aggregates th

```yaml
wic:
default_backend: implementation1
backends:
default_implementation: implementation1
implementations:
implementation1:
steps:
- implementation1.wic:
Expand All @@ -54,7 +54,7 @@ wic:
steps:
(2, static_dispatch.wic):
wic:
backend: implementation2
implementation: implementation2
```
The most common use case of static dispatch is to swap out 'identical' subworkflows. However, this constraint is intentionally not enforced and it is completely up to the user. In fact, you may want to swap out two implementations that use different algorithms to achieve the same high-level goal.
Expand All @@ -73,7 +73,7 @@ Note however that while insert_steps_automatically_*.cwl files can come from pre

For realtime monitoring, we want to asynchronously run an auxiliary workflow while the main workflow is still running. We implement this by invoking a subinterpreter.

The `cwl_watcher` subinterpreter will repeatedly run an auxiliary workflow which is completely independent from the main workflow, upto some fixed number of iterations. (A fixed number of iterations is used because the main interpreter and subinterpreter are completely independent; there is no way of passing status information between the interpreters.)
The `cwl_subinterpreter` subinterpreter will repeatedly run an auxiliary workflow which is completely independent from the main workflow, upto some fixed number of iterations. (A fixed number of iterations is used because the main interpreter and subinterpreter are completely independent; there is no way of passing status information between the interpreters.)

NOTE: This should be considered an experimental feature; the CWL standard does not consider realtime monitoring (and/or other implementation-specific details).

Expand Down
4 changes: 2 additions & 2 deletions docs/dev/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ wic.compiler
------------------------------------
.. automodule:: wic.compiler

wic.cwl_watcher
wic.cwl_subinterpreter
------------------------------------
.. automodule:: wic.cwl_watcher
.. automodule:: wic.cwl_subinterpreter

wic.inference
------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ all = ["workflow-inference-compiler[test,doc,plots,cyto,runners-src,mypy-types]"

[project.scripts]
wic = "wic.main:main"
cwl_watcher = "wic.cwl_watcher:main"
cwl_subinterpreter = "wic.cwl_subinterpreter:main"
cwltool_filterlog = "wic.run_local:cwltool_main"
# Need an extra entrypoint to avoid mixing up WIC cli flags with cwltool cli flags
cwltool_filterlog_pf = "wic.run_local:cwltool_main_pf"
Expand Down
54 changes: 27 additions & 27 deletions src/wic/ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ def read_ast_from_disk(homedir: str,
sys.exit(1)

wic = {'wic': yaml_tree.get('wic', {})}
if 'backends' in wic['wic']:
# Recursively expand each backend, but do NOT choose a specific backend.
if 'implementations' in wic['wic']:
# Recursively expand each implementation, but do NOT choose a specific implementation.
# Require back_name to be .wic? For now, yes.
backends_trees = []
for back_name, back in wic['wic']['backends'].items():
implementations_trees = []
for back_name, back in wic['wic']['implementations'].items():
plugin_ns = wic['wic'].get('namespace', 'global')
stepid = StepId(back_name, plugin_ns)
backends_tree = read_ast_from_disk(homedir, YamlTree(stepid, back), yml_paths, tools, validator,
ignore_validation_errors)
backends_trees.append(backends_tree)
yaml_tree['wic']['backends'] = dict(backends_trees)
implementations_tree = read_ast_from_disk(homedir, YamlTree(stepid, back), yml_paths, tools, validator,
ignore_validation_errors)
implementations_trees.append(implementations_tree)
yaml_tree['wic']['implementations'] = dict(implementations_trees)
return YamlTree(step_id, yaml_tree)

steps: List[Yaml] = yaml_tree['steps']
Expand Down Expand Up @@ -149,14 +149,14 @@ def merge_yml_trees(yaml_tree_tuple: YamlTree,
yaml_tree['wic'] = wic['wic']
wic_steps = wic['wic'].get('steps', {})

if 'backends' in wic['wic']:
# Recursively expand each backend, but do NOT choose a specific backend.
if 'implementations' in wic['wic']:
# Recursively expand each implementation, but do NOT choose a specific implementation.
# Require back_name to be .wic? For now, yes.
backends_trees = []
for stepid, back in wic['wic']['backends'].items():
backends_tree = merge_yml_trees(YamlTree(stepid, back), wic_parent, tools)
backends_trees.append(backends_tree)
yaml_tree['wic']['backends'] = dict(backends_trees)
implementations_trees = []
for stepid, back in wic['wic']['implementations'].items():
implementations_tree = merge_yml_trees(YamlTree(stepid, back), wic_parent, tools)
implementations_trees.append(implementations_tree)
yaml_tree['wic']['implementations'] = dict(implementations_trees)
return YamlTree(step_id, yaml_tree)

steps: List[Yaml] = yaml_tree['steps']
Expand Down Expand Up @@ -214,12 +214,12 @@ def tree_to_forest(yaml_tree_tuple: YamlTree, tools: Tools) -> YamlForest:
(step_id, yaml_tree) = yaml_tree_tuple

wic = {'wic': yaml_tree.get('wic', {})}
if 'backends' in wic['wic']:
backends_forest_list = []
for stepid, back in wic['wic']['backends'].items():
backend_forest = (stepid, tree_to_forest(YamlTree(stepid, back), tools))
backends_forest_list.append(backend_forest)
return YamlForest(YamlTree(step_id, yaml_tree), backends_forest_list)
if 'implementations' in wic['wic']:
implementations_forest_list = []
for stepid, back in wic['wic']['implementations'].items():
implementation_forest = (stepid, tree_to_forest(YamlTree(stepid, back), tools))
implementations_forest_list.append(implementation_forest)
return YamlForest(YamlTree(step_id, yaml_tree), implementations_forest_list)

steps: List[Yaml] = yaml_tree['steps']
wic_steps = wic['wic'].get('steps', {})
Expand Down Expand Up @@ -261,12 +261,12 @@ def python_script_generate_cwl(yaml_tree_tuple: YamlTree,

wic = {'wic': yaml_tree.get('wic', {})}

if 'backends' in wic['wic']:
backends_trees = []
for stepid, back in wic['wic']['backends'].items():
backends_tree = python_script_generate_cwl(YamlTree(stepid, back), root_yml_dir_abs, tools)
backends_trees.append(backends_tree)
yaml_tree['wic']['backends'] = dict(backends_trees)
if 'implementations' in wic['wic']:
implementations_trees = []
for stepid, back in wic['wic']['implementations'].items():
implementations_tree = python_script_generate_cwl(YamlTree(stepid, back), root_yml_dir_abs, tools)
implementations_trees.append(implementations_tree)
yaml_tree['wic']['implementations'] = dict(implementations_trees)
return YamlTree(step_id, yaml_tree)

steps: List[Yaml] = yaml_tree['steps']
Expand Down
6 changes: 3 additions & 3 deletions src/wic/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
NodeData, RoseTree, Tool, Tools, WorkflowInputsFile,
Yaml, YamlTree, StepId)

# NOTE: This must be initialized in main.py and/or cwl_watcher.py
# NOTE: This must be initialized in main.py and/or cwl_subinterpreter.py
inference_rules: Dict[str, str] = {}


Expand Down Expand Up @@ -157,7 +157,7 @@ def compile_workflow_once(yaml_tree_ast: YamlTree,

yaml_stem = Path(yaml_path).stem

(back_name_, yaml_tree) = utils.extract_backend(yaml_tree, wic['wic'], Path(yaml_path))
(back_name_, yaml_tree) = utils.extract_implementation(yaml_tree, wic['wic'], Path(yaml_path))
steps: List[Yaml] = yaml_tree['steps']

steps_keys = utils.get_steps_keys(steps)
Expand Down Expand Up @@ -362,7 +362,7 @@ def compile_workflow_once(yaml_tree_ast: YamlTree,
if 'in' not in steps[i][step_key]:
steps[i][step_key]['in'] = {}

if 'cwl_watcher' == step_key:
if 'cwl_subinterpreter' == step_key:
in_dict_in = steps[i][step_key]['in'] # NOTE: Mutates in_dict_in
io.write_absolute_yaml_tags(args, in_dict_in, namespaces, step_name_i, explicit_edge_calls_copy)

Expand Down
12 changes: 6 additions & 6 deletions src/wic/cwl_watcher.py → src/wic/cwl_subinterpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ def absolute_paths(config: Json, cachedir_path: Path) -> Json:
"""Recursively searches for paths in config and makes them absolute by prepending cachedir_path.
Args:
config (Json): The contents of the YAML cwl_watcher config: tag.
config (Json): The contents of the YAML cwl_subinterpreter config: tag.
cachedir_path (Path): The --cachedir directory of the main workflow.
Returns:
Json: The contents of the YAML cwl_watcher config: tag, with all paths prepended with cachedir_path.
Json: The contents of the YAML cwl_subinterpreter config: tag, with all paths prepended with cachedir_path.
"""
new_json: Json = {}
for key, val in config.items():
Expand All @@ -46,7 +46,7 @@ def absolute_paths(config: Json, cachedir_path: Path) -> Json:
new_val = str(cachedir_path / val) # type: ignore
changed_files = file_watcher_glob(cachedir_path, val, {})
# We require unique filenames, so there should only be one file.
# (except for files that get created within the cwl_watcher workflow itself)
# (except for files that get created within the cwl_subinterpreter workflow itself)
changed_files_lst = list(changed_files.items())
if len(changed_files_lst) == 0:
print(f'Warning! Changed files should be length one! {val}\n{changed_files_lst}')
Expand All @@ -73,7 +73,7 @@ def rerun_cwltool(homedir: str, _directory_realtime: Path, cachedir_path: Path,
workflow (which by design will likely be running concurrently with this code).
cachedir_path (Path): The --cachedir directory of the main workflow.
cwl_tool (str): The CWL CommandLineTool or YAML filename (without extension).
args_vals (Json): The contents of the YAML cwl_watcher config: tag.
args_vals (Json): The contents of the YAML cwl_subinterpreter config: tag.
tools_cwl (Tools): The CWL CommandLineTool definitions found using get_tools_cwl()
yml_paths (Dict[str, Dict[str, Path]]): The yml workflow definitions found using get_yml_paths()
validator (Draft202012Validator): Used to validate the yml files against the autogenerated schema.
Expand Down Expand Up @@ -239,7 +239,7 @@ def cli_watcher() -> argparse.Namespace:

def main() -> None:
"""See docs/userguide.md#real-time-analysis--speculative-execution"""
print('cwl_watcher sys.argv', sys.argv)
print('cwl_subinterpreter sys.argv', sys.argv)
args = cli_watcher()
logging_filters()

Expand All @@ -249,7 +249,7 @@ def main() -> None:
max_times = int(args.max_times)
root_workflow_yml_path = Path(args.root_workflow_yml_path)

# Create an empty 'logfile' so that cwl_watcher.cwl succeeds.
# Create an empty 'logfile' so that cwl_subinterpreter.cwl succeeds.
# TODO: Maybe capture cwl_tool stdout/stderr and redirect to this logfile.
logfile = Path(f'{cwl_tool}_only.log')
logfile.touch()
Expand Down
2 changes: 1 addition & 1 deletion src/wic/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .wic_types import (GraphReps, InternalOutputs, Namespaces, StepId, Tool, Tools,
WorkflowInputs, Yaml)

# NOTE: This must be initialized in main.py and/or cwl_watcher.py
# NOTE: This must be initialized in main.py and/or cwl_subinterpreter.py
renaming_conventions: List[Tuple[str, str]] = []


Expand Down
26 changes: 13 additions & 13 deletions src/wic/inlineing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

def get_inlineable_subworkflows(yaml_tree_tuple: YamlTree,
tools: Tools,
backend: bool = False,
implementation: bool = False,
namespaces_init: Namespaces = []) -> List[Namespaces]:
"""Traverses a yml AST and finds all subworkflows which can be inlined into their parent workflow.
Args:
yaml_tree_tuple (YamlTree): A tuple of name and yml AST
tools (Tools): The CWL CommandLineTool definitions found using get_tools_cwl()
backend (bool): True if the immediate parent workflow is a backend.
implementation (bool): True if the immediate parent workflow is a implementation.
namespaces_init (Namespaces): The initial subworkflow to start the traversal ([] == root)
Returns:
Expand All @@ -37,10 +37,10 @@ def get_inlineable_subworkflows(yaml_tree_tuple: YamlTree,
# Check for top-level yml dsl args
wic = {'wic': yaml_tree.get('wic', {})}

if 'backends' in wic['wic']:
if 'implementations' in wic['wic']:
# Use yaml_name (instead of back_name) and do not append to namespace_init.
sub_namespaces_list = []
for stepid, back in wic['wic']['backends'].items():
for stepid, back in wic['wic']['implementations'].items():
sub_namespaces = get_inlineable_subworkflows(YamlTree(stepid, back), tools, True, namespaces_init)
sub_namespaces_list.append(sub_namespaces)
return utils.flatten(sub_namespaces_list)
Expand All @@ -52,7 +52,7 @@ def get_inlineable_subworkflows(yaml_tree_tuple: YamlTree,

# All subworkflows are inlineable, except scattered subworkflows.
inlineable = wic['wic'].get('inlineable', True)
namespaces = [namespaces_init] if inlineable and namespaces_init != [] and not backend else []
namespaces = [namespaces_init] if inlineable and namespaces_init != [] and not implementation else []

for i, step_key in enumerate(steps_keys):
yaml_stem = Path(yaml_name).stem
Expand Down Expand Up @@ -84,20 +84,20 @@ def inline_subworkflow(yaml_tree_tuple: YamlTree, namespaces: Namespaces) -> Tup
yaml_name = step_id.stem

wic = {'wic': yaml_tree.get('wic', {})}
if 'backends' in wic['wic']:
if 'implementations' in wic['wic']:
if len(namespaces) == 1: # and namespaces[0] == yaml_name ?
(back_name_, yaml_tree) = utils.extract_backend(yaml_tree, wic['wic'], Path(''))
(back_name_, yaml_tree) = utils.extract_implementation(yaml_tree, wic['wic'], Path(''))
yaml_tree = {'steps': yaml_tree['steps']} # Remove wic tag
len_substeps = len(yaml_tree['steps'])
return YamlTree(StepId(back_name_, step_id.plugin_ns), yaml_tree), 0 # len_substeps # TODO: check step_id

# Pass namespaces through unmodified
backends_trees = []
for stepid, back in wic['wic']['backends'].items():
backend_tree, len_substeps = inline_subworkflow(YamlTree(stepid, back), namespaces)
backends_trees.append(backend_tree)
yaml_tree['wic']['backends'] = dict(backends_trees)
return YamlTree(step_id, yaml_tree), 0 # choose len_substeps from which backend?
implementations_trees = []
for stepid, back in wic['wic']['implementations'].items():
implementation_tree, len_substeps = inline_subworkflow(YamlTree(stepid, back), namespaces)
implementations_trees.append(implementation_tree)
yaml_tree['wic']['implementations'] = dict(implementations_trees)
return YamlTree(step_id, yaml_tree), 0 # choose len_substeps from which implementation?

steps: List[Yaml] = yaml_tree['steps']
steps_keys = utils.get_steps_keys(steps)
Expand Down
4 changes: 2 additions & 2 deletions src/wic/input_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,11 @@ def get_absolute_paths(sub_config: Json) -> Json:

def write_absolute_yaml_tags(args: argparse.Namespace, in_dict_in: Yaml, namespaces: Namespaces,
step_name_i: str, explicit_edge_calls_copy: ExplicitEdgeCalls) -> None:
"""cwl_watcher requires all paths to be absolute.
"""cwl_subinterpreter requires all paths to be absolute.
Args:
args (argparse.Namespace): The command line arguments
in_dict_in (Yaml): The in: subtag of a cwl_watcher: tag. (Mutates in_dict_in)
in_dict_in (Yaml): The in: subtag of a cwl_subinterpreter: tag. (Mutates in_dict_in)
namespaces (Namespaces): Specifies the path in the yml AST to the current subworkflow
step_name_i (str): The name of the current workflow step
explicit_edge_calls_copy (ExplicitEdgeCalls): Stores the (path, value) of the explicit edge call sites
Expand Down
24 changes: 12 additions & 12 deletions src/wic/schemas/wic_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ def cwl_schema(name: str, cwl: Json, id_prefix: str) -> Json:
{**config_schemas[name], **metadata}]}
continue

if key == 'config' and name == 'cwl_watcher':
if key == 'config' and name == 'cwl_subinterpreter':
# This may cause problems with hypothesis, but since the
# cwl_watcher config tag takes an arbitrary Json-encoded string
# cwl_subinterpreter config tag takes an arbitrary Json-encoded string
# as input, we cannot restrict this particular sub-schema.
empty_schema: Dict = {}
inputs_props[key] = empty_schema
Expand Down Expand Up @@ -348,20 +348,20 @@ def wic_tag_schema(hypothesis: bool = False) -> Json:
# additionalProperties = False still works with patternProperties FYI
steps['patternProperties'] = {pat_int_str: choices}

# backends = default_schema()
backends: Dict[Any, Any] = {}
backends['type'] = 'object'
backends['additionalProperties'] = True
# TODO: Restrict the backend properties and make default_backend an enum
# implementations = default_schema()
implementations: Dict[Any, Any] = {}
implementations['type'] = 'object'
implementations['additionalProperties'] = True
# TODO: Restrict the implementation properties and make default_implementation an enum

str_nonempty = {'type': 'string', 'minLength': 1}

namespace: Dict[Any, Any] = str_nonempty
# namespace['enum'] = ...
# TODO: Restrict the namespace properties to only those in search_paths_wic

backend = str_nonempty
default_backend = str_nonempty
implementation = str_nonempty
default_implementation = str_nonempty
inlineable = {'type': 'boolean'}

schema = default_schema(url=True)
Expand All @@ -375,13 +375,13 @@ def wic_tag_schema(hypothesis: bool = False) -> Json:
version = {'type': 'string', 'pattern': pat_semver}
driver = {'type': 'string', 'enum': ['slurm', 'argo']}

schema_props = {'graphviz': graphviz_schema, 'steps': steps, 'backend': backend,
'default_backend': default_backend,
schema_props = {'graphviz': graphviz_schema, 'steps': steps, 'implementation': implementation,
'default_implementation': default_implementation,
'version': str_nonempty, 'driver': driver,
'namespace': namespace, 'inlineable': inlineable}
if not hypothesis:
# {'additionalProperties': True} can cause problems with hypothesis.
schema_props['backends'] = backends
schema_props['implementations'] = implementations
schema['properties'] = schema_props
return schema

Expand Down
Loading

0 comments on commit f8cfd9d

Please sign in to comment.