diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
index 646242c..23a3577 100644
--- a/.github/workflows/build.yaml
+++ b/.github/workflows/build.yaml
@@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- python-version: ['3.7', '3.8', '3.9', '3.10', '3.11']
+ python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12']
steps:
- name: Chechout code
uses: actions/checkout@v3.3.0
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e524113..841fee7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,13 @@
# Changelog
+## 3.5.0 (2023-12-06)
+
+- add entry point `mara.commands` (for [mara-cli](https://github.com/mara/mara-cli) support)
+- add mara-pipelines click command group (#104)
+- add dynamic Task (#106)
+- add max_retries for parallel tasks (#105)
+- fix html_doc_items tuple of command ReadScriptOutput
+
## 3.4.0 (2023-05-01)
- upgrade to bootstrap 4.6 (from alpha 4) (#66)
diff --git a/README.md b/README.md
index 071c8ad..cf8673c 100644
--- a/README.md
+++ b/README.md
@@ -52,7 +52,7 @@ Here is a pipeline "demo" consisting of three nodes that depend on each other: t
```python
from mara_pipelines.commands.bash import RunBash
from mara_pipelines.pipelines import Pipeline, Task
-from mara_pipelines.ui.cli import run_pipeline, run_interactively
+from mara_pipelines.cli import run_pipeline, run_interactively
pipeline = Pipeline(
id='demo',
@@ -115,7 +115,7 @@ CREATE TABLE data_integration_file_dependency (
This runs a pipeline with output to stdout:
```python
-from mara_pipelines.ui.cli import run_pipeline
+from mara_pipelines.cli import run_pipeline
run_pipeline(pipeline)
```
@@ -138,7 +138,7 @@ run_pipeline(sub_pipeline, nodes=[sub_pipeline.nodes['ping_amazon']], with_upstr
And finally, there is some sort of menu based on [pythondialog](http://pythondialog.sourceforge.net/) that allows to navigate and run pipelines like this:
```python
-from mara_pipelines.ui.cli import run_interactively
+from mara_pipelines.cli import run_interactively
run_interactively()
```
diff --git a/docs/example.md b/docs/example.md
index 106827a..4fc6a6e 100644
--- a/docs/example.md
+++ b/docs/example.md
@@ -5,7 +5,7 @@ Here is a pipeline "demo" consisting of three nodes that depend on each other: t
```python
from mara_pipelines.commands.bash import RunBash
from mara_pipelines.pipelines import Pipeline, Task
-from mara_pipelines.ui.cli import run_pipeline, run_interactively
+from mara_pipelines.cli import run_pipeline, run_interactively
pipeline = Pipeline(
id='demo',
@@ -68,7 +68,7 @@ CREATE TABLE data_integration_file_dependency (
This runs a pipeline with output to stdout:
```python
-from mara_pipelines.ui.cli import run_pipeline
+from mara_pipelines.cli import run_pipeline
run_pipeline(pipeline)
```
diff --git a/mara_pipelines/__init__.py b/mara_pipelines/__init__.py
index 142efa4..19131dd 100755
--- a/mara_pipelines/__init__.py
+++ b/mara_pipelines/__init__.py
@@ -1,5 +1,5 @@
"""Make the functionalities of this package auto-discoverable by mara-app"""
-__version__ = '3.4.0'
+__version__ = '3.5.0'
def MARA_CONFIG_MODULES():
@@ -30,8 +30,10 @@ def MARA_ACL_RESOURCES():
def MARA_CLICK_COMMANDS():
- from .ui import cli
- return [cli.run, cli.run_interactively, cli.reset_incremental_processing]
+ from . import cli
+ from .ui import cli as old_cli
+ return [cli.mara_pipelines,
+ old_cli._run, old_cli._run_interactively, old_cli._reset_incremental_processing]
def MARA_NAVIGATION_ENTRIES():
diff --git a/mara_pipelines/cli.py b/mara_pipelines/cli.py
new file mode 100644
index 0000000..d7b3f46
--- /dev/null
+++ b/mara_pipelines/cli.py
@@ -0,0 +1,153 @@
+"""Auto-migrate command line interface"""
+
+import click
+import sys
+from typing import Set
+
+from . import config, pipelines
+
+
+def run_pipeline(pipeline: pipelines.Pipeline, nodes: Set[pipelines.Node] = None,
+ with_upstreams: bool = False,
+ interactively_started: bool = False,
+ disable_colors: bool = False) -> bool:
+ """
+ Runs a pipeline or parts of it with output printed to stdout
+ Args:
+ pipeline: The pipeline to run
+ nodes: A list of pipeline children that should run
+ with_upstreams: When true and `nodes` are provided, then all upstreams of `nodes` in `pipeline` are also run
+ interactively_started: Whether or not this run was started interactively, passed on in RunStarted and
+ RunFinished events.
+ disable_colors: If true, don't use escape sequences to make the log colorful (default: colorful logging)
+ Return:
+ True when the pipeline run succeeded
+ """
+ from .logging import logger, pipeline_events
+ from . import execution
+
+ RESET_ALL = 'reset_all'
+ PATH_COLOR = 'path_color'
+ ERROR_COLOR = 'error_color'
+
+ # https://godoc.org/github.com/whitedevops/colors
+ colorful = {logger.Format.STANDARD: '\033[01m', # bold
+ logger.Format.ITALICS: '\033[02m', # dim
+ logger.Format.VERBATIM: '',
+ PATH_COLOR: '\033[36m', # cyan
+ ERROR_COLOR: '\033[91m', # light red
+ RESET_ALL: '\033[0m', # reset all
+ }
+ plain = {key: '' for key in colorful.keys()}
+
+ theme = plain if disable_colors else colorful
+
+ succeeded = False
+ for event in execution.run_pipeline(pipeline, nodes, with_upstreams, interactively_started=interactively_started):
+ if isinstance(event, pipeline_events.Output):
+ print(f'{theme[PATH_COLOR]}{" / ".join(event.node_path)}{":" if event.node_path else ""}{theme[RESET_ALL]} '
+ + theme[event.format] + (theme[ERROR_COLOR] if event.is_error else '')
+ + event.message + theme[RESET_ALL])
+ elif isinstance(event, pipeline_events.RunFinished):
+ if event.succeeded:
+ succeeded = True
+
+ return succeeded
+
+
+# -----------------------------------------------------------------------------
+
+
+@click.group()
+def mara_pipelines():
+ """Mara pipelines commands"""
+ pass
+
+
+@mara_pipelines.command()
+@click.option('--path', default='',
+ help='The id of of the pipeline to run. Example: "pipeline-id"; "" (default) is the root pipeline.')
+@click.option('--nodes',
+ help='IDs of sub-nodes of the pipeline to run, separated by comma. When provided, then only these nodes are run. Example: "do-this,do-that".')
+@click.option('--with_upstreams', default=False, is_flag=True,
+ help='Also run all upstreams of --nodes within the pipeline.')
+@click.option('--disable-colors', default=False, is_flag=True,
+ help='Output logs without coloring them.')
+def run(path, nodes, with_upstreams, disable_colors: bool = False):
+ """Runs a pipeline or a sub-set of its nodes"""
+
+ # the pipeline to run
+ path = path.split(',')
+ pipeline, found = pipelines.find_node(path)
+ if not found:
+ print(f'Pipeline {path} not found', file=sys.stderr)
+ sys.exit(-1)
+ if not isinstance(pipeline, pipelines.Pipeline):
+ print(f'Node {path} is not a pipeline, but a {pipeline.__class__.__name__}', file=sys.stderr)
+ sys.exit(-1)
+
+ # a list of nodes to run selectively in the pipeline
+ _nodes = set()
+ for id in (nodes.split(',') if nodes else []):
+ node = pipeline.nodes.get(id)
+ if not node:
+ print(f'Node "{id}" not found in pipeline {path}', file=sys.stderr)
+ sys.exit(-1)
+ else:
+ _nodes.add(node)
+
+ if not run_pipeline(pipeline, _nodes, with_upstreams, interactively_started=False, disable_colors=disable_colors):
+ sys.exit(-1)
+
+
+@mara_pipelines.command()
+def run_interactively():
+ """Select and run data pipelines"""
+ from dialog import Dialog
+
+ d = Dialog(dialog="dialog", autowidgetsize=True) # see http://pythondialog.sourceforge.net/doc/widgets.html
+
+ def run_pipeline_and_notify(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None):
+ if not run_pipeline(pipeline, nodes, interactively_started=True):
+ sys.exit(-1)
+
+ def menu(node: pipelines.Node):
+ if isinstance(node, pipelines.Pipeline):
+
+ code, choice = d.menu(
+ text='Pipeline ' + '.'.join(node.path()) if node.parent else 'Root pipeline',
+ choices=[('▶ ', 'Run'), ('>> ', 'Run selected')]
+ + [(child.id, '→' if isinstance(child, pipelines.Pipeline) else 'Run')
+ for child in node.nodes.values()])
+ if code == d.CANCEL:
+ return
+
+ if choice == '▶ ':
+ run_pipeline_and_notify(node)
+ elif choice == '>> ':
+ code, node_ids = d.checklist('Select sub-nodes to run. If you want to run all, then select none.',
+ choices=[(node_id, '', False) for node_id in node.nodes.keys()])
+ if code == d.OK:
+ run_pipeline_and_notify(node, {node.nodes[id] for id in node_ids})
+ else:
+ menu(node.nodes[choice])
+ return
+ else:
+ run_pipeline_and_notify(pipeline=node.parent, nodes=[node])
+
+ menu(config.root_pipeline())
+
+
+@mara_pipelines.command()
+@click.option('--path', default='',
+ help='The parent ids of of the node to reset. Example: "pipeline-id,sub-pipeline-id".')
+def reset_incremental_processing(path):
+ """Reset status of incremental processing for a node"""
+ from .incremental_processing import reset
+
+ path = path.split(',') if path else []
+ node, found = pipelines.find_node(path)
+ if not found:
+ print(f'Node {path} not found', file=sys.stderr)
+ sys.exit(-1)
+ reset.reset_incremental_processing(path)
diff --git a/mara_pipelines/parallel_tasks/files.py b/mara_pipelines/parallel_tasks/files.py
index 26e7e7c..c1f9b78 100644
--- a/mara_pipelines/parallel_tasks/files.py
+++ b/mara_pipelines/parallel_tasks/files.py
@@ -34,10 +34,12 @@ def __init__(self, id: str, description: str, file_pattern: str, read_mode: Read
max_number_of_parallel_tasks: Optional[int] = None, file_dependencies: Optional[List[str]] = None, date_regex: Optional[str] = None,
partition_target_table_by_day_id: bool = False, truncate_partitions: bool = False,
commands_before: Optional[List[pipelines.Command]] = None, commands_after: Optional[List[pipelines.Command]] = None,
- db_alias: Optional[str] = None, storage_alias: Optional[str] = None, timezone: Optional[str] = None) -> None:
+ db_alias: Optional[str] = None, storage_alias: Optional[str] = None, timezone: Optional[str] = None,
+ max_retries: Optional[int] = None) -> None:
pipelines.ParallelTask.__init__(self, id=id, description=description,
max_number_of_parallel_tasks=max_number_of_parallel_tasks,
- commands_before=commands_before, commands_after=commands_after)
+ commands_before=commands_before, commands_after=commands_after,
+ max_retries=max_retries)
self.file_pattern = file_pattern
self.read_mode = read_mode
self.date_regex = date_regex
@@ -150,12 +152,14 @@ def update_file_dependencies():
id='create_partitions',
description='Creates required target table partitions',
commands=[sql.ExecuteSQL(sql_statement='\n'.join(slice), echo_queries=False, db_alias=self.db_alias)
- for slice in more_itertools.sliced(sql_statements, 50)])
+ for slice in more_itertools.sliced(sql_statements, 50)],
+ max_retries=self.max_retries)
sub_pipeline.add(create_partitions_task)
for n, chunk in enumerate(more_itertools.chunked(files_per_day.items(), chunk_size)):
- task = pipelines.Task(id=str(n), description='Reads a portion of the files')
+ task = pipelines.Task(id=str(n), description='Reads a portion of the files',
+ max_retries=self.max_retries)
for (day, files) in chunk:
target_table = self.target_table + '_' + day.strftime("%Y%m%d")
for file in files:
@@ -166,7 +170,8 @@ def update_file_dependencies():
for n, chunk in enumerate(more_itertools.chunked(files, chunk_size)):
sub_pipeline.add(
pipelines.Task(id=str(n), description=f'Reads {len(chunk)} files',
- commands=sum([self.parallel_commands(x[0]) for x in chunk], [])))
+ commands=sum([self.parallel_commands(x[0]) for x in chunk], []),
+ max_retries=self.max_retries))
def parallel_commands(self, file_name: str) -> List[pipelines.Command]:
return [self.read_command(file_name)] + (
@@ -187,14 +192,16 @@ def __init__(self, id: str, description: str, file_pattern: str, read_mode: Read
mapper_script_file_name: Optional[str] = None, make_unique: bool = False, db_alias: Optional[str] = None,
delimiter_char: Optional[str] = None, quote_char: Optional[str] = None, null_value_string: Optional[str] = None,
skip_header: Optional[bool] = None, csv_format: bool = False, storage_alias: Optional[str] = None,
- timezone: Optional[str] = None, max_number_of_parallel_tasks: Optional[int] = None) -> None:
+ timezone: Optional[str] = None, max_number_of_parallel_tasks: Optional[int] = None,
+ max_retries: Optional[int] = None) -> None:
_ParallelRead.__init__(self, id=id, description=description, file_pattern=file_pattern,
read_mode=read_mode, target_table=target_table, file_dependencies=file_dependencies,
date_regex=date_regex, partition_target_table_by_day_id=partition_target_table_by_day_id,
truncate_partitions=truncate_partitions,
commands_before=commands_before, commands_after=commands_after,
db_alias=db_alias, storage_alias=storage_alias, timezone=timezone,
- max_number_of_parallel_tasks=max_number_of_parallel_tasks)
+ max_number_of_parallel_tasks=max_number_of_parallel_tasks,
+ max_retries=max_retries)
self.compression = compression
self.mapper_script_file_name = mapper_script_file_name or ''
self.make_unique = make_unique
@@ -239,18 +246,19 @@ def html_doc_items(self) -> List[Tuple[str, str]]:
class ParallelReadSqlite(_ParallelRead):
def __init__(self, id: str, description: str, file_pattern: str, read_mode: ReadMode, sql_file_name: str,
- target_table: str, file_dependencies: List[str] = None, date_regex: str = None,
+ target_table: str, file_dependencies: Optional[List[str]] = None, date_regex: Optional[str] = None,
partition_target_table_by_day_id: bool = False, truncate_partitions: bool = False,
- commands_before: List[pipelines.Command] = None, commands_after: List[pipelines.Command] = None,
- db_alias: str = None, timezone=None, max_number_of_parallel_tasks: int = None,
- storage_alias: str = None) -> None:
+ commands_before: Optional[List[pipelines.Command]] = None, commands_after: Optional[List[pipelines.Command]] = None,
+ db_alias: Optional[str] = None, storage_alias: str = None, timezone=None, max_number_of_parallel_tasks: Optional[int] = None,
+ max_retries: Optional[int] = None) -> None:
_ParallelRead.__init__(self, id=id, description=description, file_pattern=file_pattern,
read_mode=read_mode, target_table=target_table, file_dependencies=file_dependencies,
date_regex=date_regex, partition_target_table_by_day_id=partition_target_table_by_day_id,
truncate_partitions=truncate_partitions,
commands_before=commands_before, commands_after=commands_after, db_alias=db_alias,
storage_alias=storage_alias,
- timezone=timezone, max_number_of_parallel_tasks=max_number_of_parallel_tasks)
+ timezone=timezone, max_number_of_parallel_tasks=max_number_of_parallel_tasks
+ max_retries=max_retries)
self.sql_file_name = sql_file_name
def read_command(self, file_name: str) -> List[pipelines.Command]:
diff --git a/mara_pipelines/parallel_tasks/python.py b/mara_pipelines/parallel_tasks/python.py
index b274f77..bf9c4e7 100644
--- a/mara_pipelines/parallel_tasks/python.py
+++ b/mara_pipelines/parallel_tasks/python.py
@@ -27,7 +27,8 @@ def add_parallel_tasks(self, sub_pipeline: 'pipelines.Pipeline') -> None:
sub_pipeline.add(pipelines.Task(
id='_'.join([re.sub('[^0-9a-z\-_]+', '', str(x).lower().replace('-', '_')) for x in parameter_tuple]),
description=f'Runs the script with parameters {repr(parameter_tuple)}',
- commands=[python.ExecutePython(file_name=self.file_name, args=list(parameter_tuple))]))
+ commands=[python.ExecutePython(file_name=self.file_name, args=list(parameter_tuple))],
+ max_retries=self.max_retries))
def html_doc_items(self) -> List[Tuple[str, str]]:
path = self.parent.base_path() / self.file_name
@@ -58,7 +59,8 @@ def add_parallel_tasks(self, sub_pipeline: 'pipelines.Pipeline') -> None:
sub_pipeline.add(pipelines.Task(
id=str(parameter).lower().replace(' ', '_').replace('-', '_'),
description=f'Runs the function with parameters {repr(parameter)}',
- commands=[python.RunFunction(lambda args=parameter: self.function(args))]))
+ commands=[python.RunFunction(lambda args=parameter: self.function(args))],
+ max_retries=self.max_retries))
def html_doc_items(self) -> List[Tuple[str, str]]:
return [('function', _.pre[escape(str(self.function))]),
diff --git a/mara_pipelines/parallel_tasks/sql.py b/mara_pipelines/parallel_tasks/sql.py
index 9ec8f84..1006653 100644
--- a/mara_pipelines/parallel_tasks/sql.py
+++ b/mara_pipelines/parallel_tasks/sql.py
@@ -51,7 +51,8 @@ def add_parallel_tasks(self, sub_pipeline: 'pipelines.Pipeline') -> None:
echo_queries=self.echo_queries, timezone=self.timezone, replace=replace)
if self.sql_file_name else
sql.ExecuteSQL(sql_statement=self.sql_statement, db_alias=self.db_alias,
- echo_queries=self.echo_queries, timezone=self.timezone, replace=replace)]))
+ echo_queries=self.echo_queries, timezone=self.timezone, replace=replace)],
+ max_retries=self.max_retries))
def html_doc_items(self) -> List[Tuple[str, str]]:
return [('db', _.tt[self.db_alias])] \
diff --git a/mara_pipelines/pipelines.py b/mara_pipelines/pipelines.py
index 7fc538f..b021118 100644
--- a/mara_pipelines/pipelines.py
+++ b/mara_pipelines/pipelines.py
@@ -1,7 +1,7 @@
import copy
import pathlib
import re
-from typing import Optional, Dict, Set, List, Tuple, Union
+from typing import Optional, Dict, Set, List, Tuple, Union, Callable
from . import config
@@ -81,24 +81,53 @@ def html_doc_items(self) -> List[Tuple[str, str]]:
class Task(Node):
- def __init__(self, id: str, description: str, commands: Optional[List[Command]] = None, max_retries: Optional[int] = None) -> None:
+ def __init__(self, id: str, description: str, commands: Optional[Union[Callable, List[Command]]] = None, max_retries: Optional[int] = None) -> None:
super().__init__(id, description)
- self.commands = []
self.max_retries = max_retries
- for command in commands or []:
- self.add_command(command)
-
- def add_command(self, command: Command, prepend=False):
+ if callable(commands):
+ self._commands = None
+ self.__dynamic_commands_generator_func = commands
+ else:
+ self._commands = []
+ self._add_commands(commands or [])
+
+ @property
+ def is_dynamic_commands(self) -> bool:
+ """if the command list is generated dynamically via a function"""
+ return self.__dynamic_commands_generator_func is not None
+
+ def _assert_is_not_dynamic(self):
+ if self.is_dynamic_commands:
+ raise Exception('You cannot use add_command when the task is constructed with a callable commands function.')
+
+ @property
+ def commands(self) -> List:
+ if self._commands is None:
+ self._commands = []
+ # execute the callable command function and cache the result
+ for command in self.__dynamic_commands_generator_func() or []:
+ self._add_command(command)
+ return self._commands
+
+ def _add_command(self, command: Command, prepend=False):
if prepend:
- self.commands.insert(0, command)
+ self._commands.insert(0, command)
else:
- self.commands.append(command)
+ self._commands.append(command)
command.parent = self
- def add_commands(self, commands: List[Command]):
+ def _add_commands(self, commands: List[Command]):
for command in commands:
- self.add_command(command)
+ self._add_command(command)
+
+ def add_command(self, command: Command, prepend=False):
+ self._assert_is_not_dynamic()
+ self._add_command(command, prepend=prepend)
+
+ def add_commands(self, commands: List[Command]):
+ self._assert_is_not_dynamic()
+ self._add_commands(commands)
def run(self):
for command in self.commands:
@@ -109,7 +138,8 @@ def run(self):
class ParallelTask(Node):
def __init__(self, id: str, description: str, max_number_of_parallel_tasks: Optional[int] = None,
- commands_before: Optional[List[Command]] = None, commands_after: Optional[List[Command]] = None) -> None:
+ commands_before: Optional[List[Command]] = None, commands_after: Optional[List[Command]] = None,
+ max_retries: Optional[int] = None) -> None:
super().__init__(id, description)
self.commands_before = []
for command in commands_before or []:
@@ -117,6 +147,7 @@ def __init__(self, id: str, description: str, max_number_of_parallel_tasks: Opti
self.commands_after = []
for command in commands_after or []:
self.add_command_after(command)
+ self.max_retries = max_retries
self.max_number_of_parallel_tasks = max_number_of_parallel_tasks
def add_command_before(self, command: Command):
diff --git a/mara_pipelines/ui/cli.py b/mara_pipelines/ui/cli.py
index 15a3b48..c5f0431 100644
--- a/mara_pipelines/ui/cli.py
+++ b/mara_pipelines/ui/cli.py
@@ -1,11 +1,11 @@
-"""Command line interface for running data pipelines"""
-
-import sys
-from typing import Set
+"""(Deprecated) Command line interface for running data pipelines"""
import click
+from warnings import warn
+from typing import Set
-from .. import config, pipelines
+from .. import pipelines
+from .. import cli
def run_pipeline(pipeline: pipelines.Pipeline, nodes: Set[pipelines.Node] = None,
@@ -24,39 +24,11 @@ def run_pipeline(pipeline: pipelines.Pipeline, nodes: Set[pipelines.Node] = None
Return:
True when the pipeline run succeeded
"""
- from ..logging import logger, pipeline_events
- from .. import execution
-
- RESET_ALL = 'reset_all'
- PATH_COLOR = 'path_color'
- ERROR_COLOR = 'error_color'
-
- # https://godoc.org/github.com/whitedevops/colors
- colorful = {logger.Format.STANDARD: '\033[01m', # bold
- logger.Format.ITALICS: '\033[02m', # dim
- logger.Format.VERBATIM: '',
- PATH_COLOR: '\033[36m', # cyan
- ERROR_COLOR: '\033[91m', # light red
- RESET_ALL: '\033[0m', # reset all
- }
- plain = {key: '' for key in colorful.keys()}
+ warn("This method is deprecated. Please use `mara_pipelines.cli.run_pipeline` instead.")
+ return cli.run_pipeline(pipeline, nodes, with_upstreams, interactively_started, disable_colors)
- theme = plain if disable_colors else colorful
- succeeded = False
- for event in execution.run_pipeline(pipeline, nodes, with_upstreams, interactively_started=interactively_started):
- if isinstance(event, pipeline_events.Output):
- print(f'{theme[PATH_COLOR]}{" / ".join(event.node_path)}{":" if event.node_path else ""}{theme[RESET_ALL]} '
- + theme[event.format] + (theme[ERROR_COLOR] if event.is_error else '')
- + event.message + theme[RESET_ALL])
- elif isinstance(event, pipeline_events.RunFinished):
- if event.succeeded:
- succeeded = True
-
- return succeeded
-
-
-@click.command()
+@click.command("run")
@click.option('--path', default='',
help='The id of of the pipeline to run. Example: "pipeline-id"; "" (default) is the root pipeline.')
@click.option('--nodes',
@@ -65,81 +37,23 @@ def run_pipeline(pipeline: pipelines.Pipeline, nodes: Set[pipelines.Node] = None
help='Also run all upstreams of --nodes within the pipeline.')
@click.option('--disable-colors', default=False, is_flag=True,
help='Output logs without coloring them.')
-def run(path, nodes, with_upstreams, disable_colors: bool = False):
+def _run(path, nodes, with_upstreams, disable_colors: bool = False):
"""Runs a pipeline or a sub-set of its nodes"""
+ warn("CLI command ` mara_pipelines.ui.run` will be dropped in 4.0. Please use ` mara-pipelines run` instead.")
+ cli.run(path, nodes, with_upstreams, disable_colors)
- # the pipeline to run
- path = path.split(',')
- pipeline, found = pipelines.find_node(path)
- if not found:
- print(f'Pipeline {path} not found', file=sys.stderr)
- sys.exit(-1)
- if not isinstance(pipeline, pipelines.Pipeline):
- print(f'Node {path} is not a pipeline, but a {pipeline.__class__.__name__}', file=sys.stderr)
- sys.exit(-1)
-
- # a list of nodes to run selectively in the pipeline
- _nodes = set()
- for id in (nodes.split(',') if nodes else []):
- node = pipeline.nodes.get(id)
- if not node:
- print(f'Node "{id}" not found in pipeline {path}', file=sys.stderr)
- sys.exit(-1)
- else:
- _nodes.add(node)
- if not run_pipeline(pipeline, _nodes, with_upstreams, interactively_started=False, disable_colors=disable_colors):
- sys.exit(-1)
-
-
-@click.command()
-def run_interactively():
+@click.command("run_interactively")
+def _run_interactively():
"""Select and run data pipelines"""
- from dialog import Dialog
-
- d = Dialog(dialog="dialog", autowidgetsize=True) # see http://pythondialog.sourceforge.net/doc/widgets.html
-
- def run_pipeline_and_notify(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None):
- if not run_pipeline(pipeline, nodes, interactively_started=True):
- sys.exit(-1)
-
- def menu(node: pipelines.Node):
- if isinstance(node, pipelines.Pipeline):
+ warn("CLI command ` mara_pipelines.ui.run_interactively` will be dropped in 4.0. Please use ` mara-pipelines run_interactively` instead.")
+ cli.run_interactively()
- code, choice = d.menu(
- text='Pipeline ' + '.'.join(node.path()) if node.parent else 'Root pipeline',
- choices=[('▶ ', 'Run'), ('>> ', 'Run selected')]
- + [(child.id, '→' if isinstance(child, pipelines.Pipeline) else 'Run')
- for child in node.nodes.values()])
- if code == d.CANCEL:
- return
- if choice == '▶ ':
- run_pipeline_and_notify(node)
- elif choice == '>> ':
- code, node_ids = d.checklist('Select sub-nodes to run. If you want to run all, then select none.',
- choices=[(node_id, '', False) for node_id in node.nodes.keys()])
- if code == d.OK:
- run_pipeline_and_notify(node, {node.nodes[id] for id in node_ids})
- else:
- menu(node.nodes[choice])
- return
- else:
- run_pipeline_and_notify(pipeline=node.parent, nodes=[node])
-
- menu(config.root_pipeline())
-
-
-@click.command()
+@click.command("reset_incremental_processing")
@click.option('--path', default='',
help='The parent ids of of the node to reset. Example: "pipeline-id,sub-pipeline-id".')
-def reset_incremental_processing(path):
+def _reset_incremental_processing(path):
"""Reset status of incremental processing for a node"""
- from ..incremental_processing import reset
-
- path = path.split(',') if path else []
- node, found = pipelines.find_node(path)
- if not found:
- print(f'Node {path} not found', file=sys.stderr)
- sys.exit(-1)
- reset.reset_incremental_processing(path)
+ warn("CLI command ` mara_pipelines.ui.reset_incremental_processing` will be dropped in 4.0. Please use ` mara-pipelines reset_incremental_processing` instead.")
+ cli.reset_incremental_processing(path)
diff --git a/mara_pipelines/ui/node_page.py b/mara_pipelines/ui/node_page.py
index c680527..c88f463 100644
--- a/mara_pipelines/ui/node_page.py
+++ b/mara_pipelines/ui/node_page.py
@@ -80,6 +80,10 @@ def __(pipeline: pipelines.Pipeline):
def __(task: pipelines.Task):
if not acl.current_user_has_permission(views.acl_resource):
return bootstrap.card(header_left='Commands', body=acl.inline_permission_denied_message())
+ elif task.is_dynamic_commands:
+ return bootstrap.card(
+ header_left='Commands',
+ body=f""" {"... are defined dynamically during execution"}""")
else:
commands_card = bootstrap.card(
header_left='Commands',
diff --git a/setup.cfg b/setup.cfg
index 3c1250e..3fabd00 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -34,3 +34,7 @@ test =
pytest-dependency
mara_app>=1.5.2
mara-db[postgres,mssql]
+
+[options.entry_points]
+mara.commands =
+ pipelines = mara_pipelines.cli:mara_pipelines
diff --git a/tests/mssql/test_mssql.py b/tests/mssql/test_mssql.py
index b062719..fe91581 100644
--- a/tests/mssql/test_mssql.py
+++ b/tests/mssql/test_mssql.py
@@ -10,7 +10,7 @@
from mara_pipelines.commands.files import WriteFile
from mara_pipelines.commands.sql import ExecuteSQL
from mara_pipelines.pipelines import Pipeline, Task
-from mara_pipelines.ui.cli import run_pipeline
+from mara_pipelines.cli import run_pipeline
from tests.db_test_helper import db_is_responsive, db_replace_placeholders
from tests.local_config import MSSQL_SQLCMD_DB
diff --git a/tests/postgres/test_postgres.py b/tests/postgres/test_postgres.py
index 0820326..bdacacf 100644
--- a/tests/postgres/test_postgres.py
+++ b/tests/postgres/test_postgres.py
@@ -10,7 +10,7 @@
from mara_pipelines.commands.files import WriteFile
from mara_pipelines.commands.sql import ExecuteSQL
from mara_pipelines.pipelines import Pipeline, Task
-from mara_pipelines.ui.cli import run_pipeline
+from mara_pipelines.cli import run_pipeline
from tests.db_test_helper import db_is_responsive, db_replace_placeholders
from tests.local_config import POSTGRES_DB
diff --git a/tests/test_execute_pipeline.py b/tests/test_execute_pipeline.py
index 0b69094..2a864c3 100644
--- a/tests/test_execute_pipeline.py
+++ b/tests/test_execute_pipeline.py
@@ -12,7 +12,7 @@ def test_execute_without_db_success():
"""
from mara_pipelines.commands.python import RunFunction
from mara_pipelines.pipelines import Pipeline, Task
- from mara_pipelines.ui.cli import run_pipeline
+ from mara_pipelines.cli import run_pipeline
pipeline = Pipeline(
id='test_execute_without_db',
@@ -35,7 +35,7 @@ def test_execute_without_db_failed():
"""
from mara_pipelines.commands.python import RunFunction
from mara_pipelines.pipelines import Pipeline, Task
- from mara_pipelines.ui.cli import run_pipeline
+ from mara_pipelines.cli import run_pipeline
pipeline = Pipeline(
id='test_execute_without_db',
@@ -57,7 +57,7 @@ def test_demo_pipeline():
Run the demo pipeline
"""
from mara_pipelines.pipelines import demo_pipeline
- from mara_pipelines.ui.cli import run_pipeline
+ from mara_pipelines.cli import run_pipeline
pipeline = demo_pipeline()
diff --git a/tests/test_pipeline_tasks.py b/tests/test_pipeline_tasks.py
new file mode 100644
index 0000000..4b462a5
--- /dev/null
+++ b/tests/test_pipeline_tasks.py
@@ -0,0 +1,64 @@
+import pytest
+from typing import List
+
+from mara_pipelines.pipelines import Task
+from mara_pipelines.commands.python import RunFunction
+
+class _PythonFuncTestResult:
+ has_run = False
+
+
+def test_run_task():
+ """
+ A simple test executing a task.
+ """
+ test_result = _PythonFuncTestResult()
+
+ def python_test_function(result: _PythonFuncTestResult):
+ result.has_run = True # noqa: F841
+
+ assert not test_result.has_run
+
+ task = Task(
+ id='run_task',
+ description="Unit test test_run_task",
+ commands=[RunFunction(function=python_test_function, args=[test_result])])
+
+ assert not test_result.has_run
+
+ task.run()
+
+ assert test_result.has_run
+
+
+def test_run_task_dynamic_commands():
+ """
+ A simple test executing a task with callable commands
+ """
+ import mara_pipelines.ui.node_page
+
+ test_result = _PythonFuncTestResult()
+
+ def python_test_function(result: _PythonFuncTestResult):
+ result.has_run = True # noqa: F841
+
+ def generate_command_list() -> List:
+ yield RunFunction(function=lambda t: python_test_function(t), args=[test_result])
+
+ assert not test_result.has_run
+
+ task = Task(
+ id='run_task_dynamic_commands',
+ description="Unit test test_run_task_dynamic_commands",
+ commands=generate_command_list)
+
+ assert not test_result.has_run
+
+ content = mara_pipelines.ui.node_page.node_content(task)
+ assert content
+
+ assert not test_result.has_run
+
+ task.run()
+
+ assert test_result.has_run