-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtasks.py
More file actions
144 lines (112 loc) · 4.98 KB
/
tasks.py
File metadata and controls
144 lines (112 loc) · 4.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import argparse
import functools
import inspect
import json
import logging
import sys
import time
from jsonschema import validate, ValidationError, SchemaError
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)
class BaseTask(object):
name = str()
def parse_user_params(params: str) -> dict:
""" Convert json-like params structure decoding it to python dict.
If 'params' is empty str or not a str at all, return empty dict.
"""
if not isinstance(params, str) or params == "":
log.error("Incorrect function params. Must be a non-empty string.")
return dict()
try:
args_dict = json.loads(params.replace("'", '"'))
log.info(type(args_dict), args_dict.__repr__())
except json.decoder.JSONDecodeError as e:
log.exception("Incorrect input data structure. Check your params!", e)
log.info("I will try to run function with its default args, if any")
else:
return args_dict
def validate_schema(user_params: dict, schema: str):
""" validates params data values with supplied json-schema """
try:
validate(user_params, schema)
except ValidationError as e:
log.error("Incorrect input data values.",
"\nValid schema:", schema, "\nError:", e)
raise
except SchemaError as e:
log.error("Incorrect validation schema", e)
raise
def run_cli():
parser = argparse.ArgumentParser()
subparser = parser.add_subparsers(dest="command")
func_callables = dict()
cls_callables = dict()
# First find and register all the functions that have been decorated and therefore has "name" attr
all_functions = inspect.getmembers(sys.modules["__main__"], inspect.isfunction)
# print("all_functions", all_functions)
for func_name, func_obj in all_functions:
func_parser = subparser.add_parser(func_name)
func_parser.add_argument("-p", "--params", dest="params", type=str)
if hasattr(func_obj, "name"):
func_callables[func_obj.name] = func_obj
func_parser = subparser.add_parser(func_obj.name)
func_parser.add_argument("-p", "--params", dest="params")
# Then inspect also classes in the module, if they have "name" field, register parsers too
all_classes = inspect.getmembers(sys.modules["__main__"], inspect.isclass)
# print("all_classes", all_classes)
for cls_name, cls_obj in all_classes:
cls_parser = subparser.add_parser(cls_name)
cls_parser.add_argument("-p", "--params", dest="params", type=str)
if hasattr(cls_obj, "name"):
try:
getattr(cls_obj, "run")
except AttributeError as e:
log.exception("Class " + cls_name + " has no run() method! Skipping.\n Error details:", e)
else:
cls_instance = cls_obj()
cls_callables[cls_obj.name] = cls_instance.run, cls_instance
cls_parser = subparser.add_parser(cls_obj.name)
cls_parser.add_argument("-p", "--params", dest="params")
args = parser.parse_args()
# print("args.command:", args.command, type(args.command))
# if desired command is a function - return it, otherwise look in class methods
func_to_run, cls_inst = (func_callables.get(args.command), None) if (
args.command in func_callables) else cls_callables.get(args.command)
if not func_to_run:
log.error("No any callable found with specified 'name'. Exiting...")
return None
# print("args.params:", args.params)
user_params = parse_user_params(args.params)
if user_params and cls_inst:
log.info("validate class method schema", cls_inst.json_schema)
validate_schema(user_params, cls_inst.json_schema)
elif user_params and inspect.isfunction(func_to_run):
log.info("validate function schema", func_to_run.json_schema)
validate_schema(user_params, func_to_run.json_schema)
return func_to_run(**user_params)
def task(name, json_schema):
""" task - wraps decorates function allowing it running from CLI """
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper.name = name
wrapper.json_schema = json_schema
return wrapper
return decorator
def delayed(attempts, delay_before_retry_sec):
""" delayed - recover decorated function when it's failed, with specified timeout and number of retries """
def internal(func):
def wrapper(*args, **kwargs):
attempts_counter = int()
while True:
try:
return func(*args, **kwargs)
except Exception as e:
attempts_counter += 1
if attempts_counter >= attempts:
raise e
time.sleep(delay_before_retry_sec)
continue
return wrapper
return internal