Skip to content

Commit

Permalink
Merge pull request #50 from chriscardillo/v0.16.0
Browse files Browse the repository at this point in the history
v0.16.0
  • Loading branch information
chriscardillo authored Nov 20, 2022
2 parents 6b8e5f0 + 51a6603 commit 2322d09
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 32 deletions.
1 change: 1 addition & 0 deletions gusty/building.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def build_task(spec, level_id, schematic):
if k
in inspect.signature(airflow.models.BaseOperator.__init__).parameters.keys()
or k in _get_operator_parameters(operator)
or k in _get_operator_parameters(operator.__base__)
}
args["task_id"] = spec["task_id"]
args["dag"] = get_top_level_dag(schematic)
Expand Down
45 changes: 14 additions & 31 deletions gusty/parsing/parsers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import yaml, ast, importlib.util, nbformat, jupytext
import yaml, nbformat, jupytext
from gusty.parsing.loaders import generate_loader
from gusty.importing import airflow_version
from absql.files.parsers import frontmatter_load
from gusty.parsing.utils import render_frontmatter
from gusty.parsing.utils import render_frontmatter, get_callable_from_file


def parse_generic(file_path, loader=None, runner=None):
Expand Down Expand Up @@ -49,38 +49,21 @@ def parse_py(file_path, loader=None, runner=None):

# search for a python callable if one is specified
if "python_callable" in job_spec.keys():
with open(file_path) as f:
tree = ast.parse(f.read())

class Visitor(ast.NodeVisitor):
def __init__(self):
self.has_callable = None

def visit_FunctionDef(self, node):
ast.NodeVisitor.generic_visit(self, node)
if node.name == job_spec["python_callable"]:
self.has_callable = True

v = Visitor()
v.visit(tree)
if v.has_callable:
mod_file = importlib.util.spec_from_file_location(
"".join(i for i in file_path if i.isalnum()), file_path
)
mod = importlib.util.module_from_spec(mod_file)
mod_file.loader.exec_module(mod)
job_spec.update(
{"python_callable": getattr(mod, job_spec["python_callable"])}
)
else:
assert (
False
), "{file_path} specifies python_callable {callable} but {callable} not found in {file_path}".format( # noqa
file_path=file_path, callable=job_spec["python_callable"]
)
callable_name = job_spec["python_callable"]
job_spec.update(
{"python_callable": get_callable_from_file(file_path, callable_name)}
)
# Default to sourcing this file for a PythonOperator
else:
job_spec.update({"python_callable": lambda: exec(open(file_path).read())})

# Support additional callables
if "extra_callables" in job_spec.keys():
for arg_name, callable_name in job_spec["extra_callables"].items():
job_spec.update(
{arg_name: get_callable_from_file(file_path, callable_name)}
)

# If no metadata then we also default to sourcing this file for a PythonOperator
else:
job_spec.update({"python_callable": lambda: exec(open(file_path).read())})
Expand Down
32 changes: 32 additions & 0 deletions gusty/parsing/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import ast, importlib.util
from absql import Runner as r


Expand All @@ -11,3 +12,34 @@ def render_frontmatter(frontmatter, runner=None, exclude=["sql"]):
)
frontmatter.update(rendered_frontmatter)
return frontmatter


def get_callable_from_file(file_path, callable_name):
v = CallableFinder(callable_name)
with open(file_path) as f:
tree = ast.parse(f.read())
v.visit(tree)
if v.has_callable:
mod_file = importlib.util.spec_from_file_location(
"".join(i for i in file_path if i.isalnum()), file_path
)
mod = importlib.util.module_from_spec(mod_file)
mod_file.loader.exec_module(mod)
return getattr(mod, callable_name)
else:
assert (
False
), "{file_path} specifies python_callable {callable} but {callable} not found in {file_path}".format( # noqa
file_path=file_path, callable=callable_name
)


class CallableFinder(ast.NodeVisitor):
def __init__(self, callable_name):
self.has_callable = None
self.callable_name = callable_name

def visit_FunctionDef(self, node):
ast.NodeVisitor.generic_visit(self, node)
if node.name == self.callable_name:
self.has_callable = True
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="gusty",
version="0.15.1",
version="0.16.0",
author="Chris Cardillo, Michael Chow, David Robinson",
author_email="cfcardillo23@gmail.com",
description="Making DAG construction easier",
Expand Down
2 changes: 2 additions & 0 deletions tests/dags/pydag/py_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
# external_dependencies:
# - a_whole_dag: all
# python_callable: hello_world
# extra_callables:
# python_callable: hello_world
# ---


Expand Down

0 comments on commit 2322d09

Please sign in to comment.