Skip to content

Commit

Permalink
Merge branch 'main' into 4.0.x
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-schick committed Dec 15, 2023
2 parents 42bcd33 + 6c8ae5e commit adac92c
Show file tree
Hide file tree
Showing 17 changed files with 337 additions and 146 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11']
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12']
steps:
- name: Chechout code
uses: actions/checkout@v3.3.0
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## 3.5.0 (2023-12-06)

- add entry point `mara.commands` (for [mara-cli](https://github.com/mara/mara-cli) support)
- add mara-pipelines click command group (#104)
- add dynamic Task (#106)
- add max_retries for parallel tasks (#105)
- fix html_doc_items tuple of command ReadScriptOutput

## 3.4.0 (2023-05-01)

- upgrade to bootstrap 4.6 (from alpha 4) (#66)<br>
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Here is a pipeline "demo" consisting of three nodes that depend on each other: t
```python
from mara_pipelines.commands.bash import RunBash
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.ui.cli import run_pipeline, run_interactively
from mara_pipelines.cli import run_pipeline, run_interactively

pipeline = Pipeline(
id='demo',
Expand Down Expand Up @@ -115,7 +115,7 @@ CREATE TABLE data_integration_file_dependency (
This runs a pipeline with output to stdout:

```python
from mara_pipelines.ui.cli import run_pipeline
from mara_pipelines.cli import run_pipeline

run_pipeline(pipeline)
```
Expand All @@ -138,7 +138,7 @@ run_pipeline(sub_pipeline, nodes=[sub_pipeline.nodes['ping_amazon']], with_upstr
And finally, there is some sort of menu based on [pythondialog](http://pythondialog.sourceforge.net/) that allows to navigate and run pipelines like this:

```python
from mara_pipelines.ui.cli import run_interactively
from mara_pipelines.cli import run_interactively

run_interactively()
```
Expand Down
4 changes: 2 additions & 2 deletions docs/example.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Here is a pipeline "demo" consisting of three nodes that depend on each other: t
```python
from mara_pipelines.commands.bash import RunBash
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.ui.cli import run_pipeline, run_interactively
from mara_pipelines.cli import run_pipeline, run_interactively

pipeline = Pipeline(
id='demo',
Expand Down Expand Up @@ -68,7 +68,7 @@ CREATE TABLE data_integration_file_dependency (
This runs a pipeline with output to stdout:

```python
from mara_pipelines.ui.cli import run_pipeline
from mara_pipelines.cli import run_pipeline

run_pipeline(pipeline)
```
Expand Down
8 changes: 5 additions & 3 deletions mara_pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Make the functionalities of this package auto-discoverable by mara-app"""
__version__ = '3.4.0'
__version__ = '3.5.0'


def MARA_CONFIG_MODULES():
Expand Down Expand Up @@ -30,8 +30,10 @@ def MARA_ACL_RESOURCES():


def MARA_CLICK_COMMANDS():
from .ui import cli
return [cli.run, cli.run_interactively, cli.reset_incremental_processing]
from . import cli
from .ui import cli as old_cli
return [cli.mara_pipelines,
old_cli._run, old_cli._run_interactively, old_cli._reset_incremental_processing]


def MARA_NAVIGATION_ENTRIES():
Expand Down
153 changes: 153 additions & 0 deletions mara_pipelines/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""Auto-migrate command line interface"""

import click
import sys
from typing import Set

from . import config, pipelines


def run_pipeline(pipeline: pipelines.Pipeline, nodes: Set[pipelines.Node] = None,
with_upstreams: bool = False,
interactively_started: bool = False,
disable_colors: bool = False) -> bool:
"""
Runs a pipeline or parts of it with output printed to stdout
Args:
pipeline: The pipeline to run
nodes: A list of pipeline children that should run
with_upstreams: When true and `nodes` are provided, then all upstreams of `nodes` in `pipeline` are also run
interactively_started: Whether or not this run was started interactively, passed on in RunStarted and
RunFinished events.
disable_colors: If true, don't use escape sequences to make the log colorful (default: colorful logging)
Return:
True when the pipeline run succeeded
"""
from .logging import logger, pipeline_events
from . import execution

RESET_ALL = 'reset_all'
PATH_COLOR = 'path_color'
ERROR_COLOR = 'error_color'

# https://godoc.org/github.com/whitedevops/colors
colorful = {logger.Format.STANDARD: '\033[01m', # bold
logger.Format.ITALICS: '\033[02m', # dim
logger.Format.VERBATIM: '',
PATH_COLOR: '\033[36m', # cyan
ERROR_COLOR: '\033[91m', # light red
RESET_ALL: '\033[0m', # reset all
}
plain = {key: '' for key in colorful.keys()}

theme = plain if disable_colors else colorful

succeeded = False
for event in execution.run_pipeline(pipeline, nodes, with_upstreams, interactively_started=interactively_started):
if isinstance(event, pipeline_events.Output):
print(f'{theme[PATH_COLOR]}{" / ".join(event.node_path)}{":" if event.node_path else ""}{theme[RESET_ALL]} '
+ theme[event.format] + (theme[ERROR_COLOR] if event.is_error else '')
+ event.message + theme[RESET_ALL])
elif isinstance(event, pipeline_events.RunFinished):
if event.succeeded:
succeeded = True

return succeeded


# -----------------------------------------------------------------------------


@click.group()
def mara_pipelines():
"""Mara pipelines commands"""
pass


@mara_pipelines.command()
@click.option('--path', default='',
help='The id of of the pipeline to run. Example: "pipeline-id"; "" (default) is the root pipeline.')
@click.option('--nodes',
help='IDs of sub-nodes of the pipeline to run, separated by comma. When provided, then only these nodes are run. Example: "do-this,do-that".')
@click.option('--with_upstreams', default=False, is_flag=True,
help='Also run all upstreams of --nodes within the pipeline.')
@click.option('--disable-colors', default=False, is_flag=True,
help='Output logs without coloring them.')
def run(path, nodes, with_upstreams, disable_colors: bool = False):
"""Runs a pipeline or a sub-set of its nodes"""

# the pipeline to run
path = path.split(',')
pipeline, found = pipelines.find_node(path)
if not found:
print(f'Pipeline {path} not found', file=sys.stderr)
sys.exit(-1)
if not isinstance(pipeline, pipelines.Pipeline):
print(f'Node {path} is not a pipeline, but a {pipeline.__class__.__name__}', file=sys.stderr)
sys.exit(-1)

# a list of nodes to run selectively in the pipeline
_nodes = set()
for id in (nodes.split(',') if nodes else []):
node = pipeline.nodes.get(id)
if not node:
print(f'Node "{id}" not found in pipeline {path}', file=sys.stderr)
sys.exit(-1)
else:
_nodes.add(node)

if not run_pipeline(pipeline, _nodes, with_upstreams, interactively_started=False, disable_colors=disable_colors):
sys.exit(-1)


@mara_pipelines.command()
def run_interactively():
"""Select and run data pipelines"""
from dialog import Dialog

d = Dialog(dialog="dialog", autowidgetsize=True) # see http://pythondialog.sourceforge.net/doc/widgets.html

def run_pipeline_and_notify(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None):
if not run_pipeline(pipeline, nodes, interactively_started=True):
sys.exit(-1)

def menu(node: pipelines.Node):
if isinstance(node, pipelines.Pipeline):

code, choice = d.menu(
text='Pipeline ' + '.'.join(node.path()) if node.parent else 'Root pipeline',
choices=[('▶ ', 'Run'), ('>> ', 'Run selected')]
+ [(child.id, '→' if isinstance(child, pipelines.Pipeline) else 'Run')
for child in node.nodes.values()])
if code == d.CANCEL:
return

if choice == '▶ ':
run_pipeline_and_notify(node)
elif choice == '>> ':
code, node_ids = d.checklist('Select sub-nodes to run. If you want to run all, then select none.',
choices=[(node_id, '', False) for node_id in node.nodes.keys()])
if code == d.OK:
run_pipeline_and_notify(node, {node.nodes[id] for id in node_ids})
else:
menu(node.nodes[choice])
return
else:
run_pipeline_and_notify(pipeline=node.parent, nodes=[node])

menu(config.root_pipeline())


@mara_pipelines.command()
@click.option('--path', default='',
help='The parent ids of of the node to reset. Example: "pipeline-id,sub-pipeline-id".')
def reset_incremental_processing(path):
"""Reset status of incremental processing for a node"""
from .incremental_processing import reset

path = path.split(',') if path else []
node, found = pipelines.find_node(path)
if not found:
print(f'Node {path} not found', file=sys.stderr)
sys.exit(-1)
reset.reset_incremental_processing(path)
32 changes: 20 additions & 12 deletions mara_pipelines/parallel_tasks/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ def __init__(self, id: str, description: str, file_pattern: str, read_mode: Read
max_number_of_parallel_tasks: Optional[int] = None, file_dependencies: Optional[List[str]] = None, date_regex: Optional[str] = None,
partition_target_table_by_day_id: bool = False, truncate_partitions: bool = False,
commands_before: Optional[List[pipelines.Command]] = None, commands_after: Optional[List[pipelines.Command]] = None,
db_alias: Optional[str] = None, storage_alias: Optional[str] = None, timezone: Optional[str] = None) -> None:
db_alias: Optional[str] = None, storage_alias: Optional[str] = None, timezone: Optional[str] = None,
max_retries: Optional[int] = None) -> None:
pipelines.ParallelTask.__init__(self, id=id, description=description,
max_number_of_parallel_tasks=max_number_of_parallel_tasks,
commands_before=commands_before, commands_after=commands_after)
commands_before=commands_before, commands_after=commands_after,
max_retries=max_retries)
self.file_pattern = file_pattern
self.read_mode = read_mode
self.date_regex = date_regex
Expand Down Expand Up @@ -150,12 +152,14 @@ def update_file_dependencies():
id='create_partitions',
description='Creates required target table partitions',
commands=[sql.ExecuteSQL(sql_statement='\n'.join(slice), echo_queries=False, db_alias=self.db_alias)
for slice in more_itertools.sliced(sql_statements, 50)])
for slice in more_itertools.sliced(sql_statements, 50)],
max_retries=self.max_retries)

sub_pipeline.add(create_partitions_task)

for n, chunk in enumerate(more_itertools.chunked(files_per_day.items(), chunk_size)):
task = pipelines.Task(id=str(n), description='Reads a portion of the files')
task = pipelines.Task(id=str(n), description='Reads a portion of the files',
max_retries=self.max_retries)
for (day, files) in chunk:
target_table = self.target_table + '_' + day.strftime("%Y%m%d")
for file in files:
Expand All @@ -166,7 +170,8 @@ def update_file_dependencies():
for n, chunk in enumerate(more_itertools.chunked(files, chunk_size)):
sub_pipeline.add(
pipelines.Task(id=str(n), description=f'Reads {len(chunk)} files',
commands=sum([self.parallel_commands(x[0]) for x in chunk], [])))
commands=sum([self.parallel_commands(x[0]) for x in chunk], []),
max_retries=self.max_retries))

def parallel_commands(self, file_name: str) -> List[pipelines.Command]:
return [self.read_command(file_name)] + (
Expand All @@ -187,14 +192,16 @@ def __init__(self, id: str, description: str, file_pattern: str, read_mode: Read
mapper_script_file_name: Optional[str] = None, make_unique: bool = False, db_alias: Optional[str] = None,
delimiter_char: Optional[str] = None, quote_char: Optional[str] = None, null_value_string: Optional[str] = None,
skip_header: Optional[bool] = None, csv_format: bool = False, storage_alias: Optional[str] = None,
timezone: Optional[str] = None, max_number_of_parallel_tasks: Optional[int] = None) -> None:
timezone: Optional[str] = None, max_number_of_parallel_tasks: Optional[int] = None,
max_retries: Optional[int] = None) -> None:
_ParallelRead.__init__(self, id=id, description=description, file_pattern=file_pattern,
read_mode=read_mode, target_table=target_table, file_dependencies=file_dependencies,
date_regex=date_regex, partition_target_table_by_day_id=partition_target_table_by_day_id,
truncate_partitions=truncate_partitions,
commands_before=commands_before, commands_after=commands_after,
db_alias=db_alias, storage_alias=storage_alias, timezone=timezone,
max_number_of_parallel_tasks=max_number_of_parallel_tasks)
max_number_of_parallel_tasks=max_number_of_parallel_tasks,
max_retries=max_retries)
self.compression = compression
self.mapper_script_file_name = mapper_script_file_name or ''
self.make_unique = make_unique
Expand Down Expand Up @@ -239,18 +246,19 @@ def html_doc_items(self) -> List[Tuple[str, str]]:

class ParallelReadSqlite(_ParallelRead):
def __init__(self, id: str, description: str, file_pattern: str, read_mode: ReadMode, sql_file_name: str,
target_table: str, file_dependencies: List[str] = None, date_regex: str = None,
target_table: str, file_dependencies: Optional[List[str]] = None, date_regex: Optional[str] = None,
partition_target_table_by_day_id: bool = False, truncate_partitions: bool = False,
commands_before: List[pipelines.Command] = None, commands_after: List[pipelines.Command] = None,
db_alias: str = None, timezone=None, max_number_of_parallel_tasks: int = None,
storage_alias: str = None) -> None:
commands_before: Optional[List[pipelines.Command]] = None, commands_after: Optional[List[pipelines.Command]] = None,
db_alias: Optional[str] = None, storage_alias: str = None, timezone=None, max_number_of_parallel_tasks: Optional[int] = None,
max_retries: Optional[int] = None) -> None:
_ParallelRead.__init__(self, id=id, description=description, file_pattern=file_pattern,
read_mode=read_mode, target_table=target_table, file_dependencies=file_dependencies,
date_regex=date_regex, partition_target_table_by_day_id=partition_target_table_by_day_id,
truncate_partitions=truncate_partitions,
commands_before=commands_before, commands_after=commands_after, db_alias=db_alias,
storage_alias=storage_alias,
timezone=timezone, max_number_of_parallel_tasks=max_number_of_parallel_tasks)
timezone=timezone, max_number_of_parallel_tasks=max_number_of_parallel_tasks
max_retries=max_retries)
self.sql_file_name = sql_file_name

def read_command(self, file_name: str) -> List[pipelines.Command]:
Expand Down
6 changes: 4 additions & 2 deletions mara_pipelines/parallel_tasks/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def add_parallel_tasks(self, sub_pipeline: 'pipelines.Pipeline') -> None:
sub_pipeline.add(pipelines.Task(
id='_'.join([re.sub('[^0-9a-z\-_]+', '', str(x).lower().replace('-', '_')) for x in parameter_tuple]),
description=f'Runs the script with parameters {repr(parameter_tuple)}',
commands=[python.ExecutePython(file_name=self.file_name, args=list(parameter_tuple))]))
commands=[python.ExecutePython(file_name=self.file_name, args=list(parameter_tuple))],
max_retries=self.max_retries))

def html_doc_items(self) -> List[Tuple[str, str]]:
path = self.parent.base_path() / self.file_name
Expand Down Expand Up @@ -58,7 +59,8 @@ def add_parallel_tasks(self, sub_pipeline: 'pipelines.Pipeline') -> None:
sub_pipeline.add(pipelines.Task(
id=str(parameter).lower().replace(' ', '_').replace('-', '_'),
description=f'Runs the function with parameters {repr(parameter)}',
commands=[python.RunFunction(lambda args=parameter: self.function(args))]))
commands=[python.RunFunction(lambda args=parameter: self.function(args))],
max_retries=self.max_retries))

def html_doc_items(self) -> List[Tuple[str, str]]:
return [('function', _.pre[escape(str(self.function))]),
Expand Down
3 changes: 2 additions & 1 deletion mara_pipelines/parallel_tasks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def add_parallel_tasks(self, sub_pipeline: 'pipelines.Pipeline') -> None:
echo_queries=self.echo_queries, timezone=self.timezone, replace=replace)
if self.sql_file_name else
sql.ExecuteSQL(sql_statement=self.sql_statement, db_alias=self.db_alias,
echo_queries=self.echo_queries, timezone=self.timezone, replace=replace)]))
echo_queries=self.echo_queries, timezone=self.timezone, replace=replace)],
max_retries=self.max_retries))

def html_doc_items(self) -> List[Tuple[str, str]]:
return [('db', _.tt[self.db_alias])] \
Expand Down
Loading

0 comments on commit adac92c

Please sign in to comment.