Skip to content

Commit

Permalink
feat: Workflow hooks #60 (#170)
Browse files Browse the repository at this point in the history
Signed-off-by: Enzo Hamelin <enzo.hamelin@ovhcloud.com>
  • Loading branch information
zozoh94 committed Dec 8, 2022
1 parent 03a8d15 commit bccd812
Show file tree
Hide file tree
Showing 13 changed files with 340 additions and 45 deletions.
45 changes: 39 additions & 6 deletions director/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from director.models import StatusType
from director.models.tasks import Task
from director.models.workflows import Workflow
from director.tasks.workflows import start, end
from director.tasks.workflows import start, end, failure_hooks_launcher


class WorkflowBuilder(object):
Expand All @@ -20,6 +20,12 @@ def __init__(self, workflow_id):
self.tasks = cel_workflows.get_tasks(str(self.workflow))
self.canvas = []

self.failure_hook = cel_workflows.get_failure_hook_task(str(self.workflow))
self.failure_hook_canvas = []

self.success_hook = cel_workflows.get_success_hook_task(str(self.workflow))
self.success_hook_canvas = []

# Pointer to the previous task(s)
self.previous = []

Expand All @@ -29,7 +35,7 @@ def workflow(self):
self._workflow = Workflow.query.filter_by(id=self.workflow_id).first()
return self._workflow

def new_task(self, task_name, single=True):
def new_task(self, task_name, is_hook, single=True):
task_id = uuid()

queue = self.custom_queues.get(task_name, self.queue)
Expand All @@ -48,6 +54,7 @@ def new_task(self, task_name, single=True):
previous=self.previous,
workflow_id=self.workflow.id,
status=StatusType.pending,
is_hook=is_hook,
)
task.save()

Expand All @@ -63,20 +70,20 @@ def parse_queues(self):
if type(self.queue) is not str or type(self.custom_queues) is not dict:
raise WorkflowSyntaxError()

def parse(self, tasks):
def parse(self, tasks, is_hook=False):
canvas = []

for task in tasks:
if type(task) is str:
signature = self.new_task(task)
signature = self.new_task(task, is_hook)
canvas.append(signature)
elif type(task) is dict:
name = list(task)[0]
if "type" not in task[name] and task[name]["type"] != "group":
raise WorkflowSyntaxError()

sub_canvas_tasks = [
self.new_task(t, single=False) for t in task[name]["tasks"]
self.new_task(t, is_hook, single=False) for t in task[name]["tasks"]
]

sub_canvas = group(*sub_canvas_tasks, task_id=uuid())
Expand All @@ -93,14 +100,40 @@ def build(self):
self.canvas.insert(0, start.si(self.workflow.id).set(queue=self.queue))
self.canvas.append(end.si(self.workflow.id).set(queue=self.queue))

def build_hooks(self):
initial_previous = self.previous

if self.failure_hook and not self.failure_hook_canvas:
self.previous = None
self.failure_hook_canvas = [
failure_hooks_launcher.si(
self.workflow.id,
self.queue,
[self.failure_hook],
self.workflow.payload,
).set(queue=self.queue),
]

if self.success_hook and not self.success_hook_canvas:
self.previous = None
self.success_hook_canvas = [self.parse([self.success_hook], True)[0]]

self.previous = initial_previous

def run(self):
if not self.canvas:
self.build()

canvas = chain(*self.canvas, task_id=uuid())

self.build_hooks()

try:
return canvas.apply_async()
return canvas.apply_async(
link=self.success_hook_canvas,
link_error=self.failure_hook_canvas,
)

except Exception as e:
self.workflow.status = StatusType.error
self.workflow.save()
Expand Down
15 changes: 12 additions & 3 deletions director/commands/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from director.utils import validate, format_schema_errors, build_celery_schedule


def tasks_to_ascii(tasks):
def tasks_to_ascii(tasks, hooks):
tasks_str = ""
# Wrap the tasks list
for task in tasks:
Expand All @@ -26,6 +26,11 @@ def tasks_to_ascii(tasks):
else:
tasks_str += f"{task}\n"

if "failure" in hooks:
tasks_str += f"Failure hook: {hooks['failure']}\n"
if "success" in hooks:
tasks_str += f"Success hook: {hooks['success']}\n"

# Just remove the last newline
if tasks_str:
tasks_str = tasks_str[:-1]
Expand Down Expand Up @@ -54,7 +59,9 @@ def list_workflow(ctx):
periodic = "--"
if conf.get("periodic"):
periodic, _ = build_celery_schedule(name, conf["periodic"])
tasks_str = tasks_to_ascii(conf["tasks"])
tasks_str = tasks_to_ascii(
conf["tasks"], conf["hooks"] if "hooks" in conf else {}
)
data.append([name, periodic, tasks_str])

table = AsciiTable(data)
Expand All @@ -74,7 +81,9 @@ def show_workflow(ctx, name):
click.echo(f"Error: {e}")
raise click.Abort()

tasks_str = tasks_to_ascii(_workflow["tasks"])
tasks_str = tasks_to_ascii(
_workflow["tasks"], _workflow["hooks"] if "hooks" in _workflow else {}
)
periodic = "--"
if _workflow.get("periodic"):
periodic, _ = build_celery_schedule(name, _workflow["periodic"])
Expand Down
14 changes: 14 additions & 0 deletions director/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ def get_by_name(self, name):
def get_tasks(self, name):
return self.get_by_name(name)["tasks"]

def get_hook_task(self, name, hook_name):
if (
"hooks" in self.get_by_name(name)
and hook_name in self.get_by_name(name)["hooks"]
):
return self.get_by_name(name)["hooks"][hook_name]
return None

def get_failure_hook_task(self, name):
return self.get_hook_task(name, "failure")

def get_success_hook_task(self, name):
return self.get_hook_task(name, "success")

def get_queue(self, name):
try:
return self.get_by_name(name)["queue"]
Expand Down
24 changes: 24 additions & 0 deletions director/migrations/versions/9817ccf13cb5_add_is_hook_on_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Add is_hook on Task
Revision ID: 9817ccf13cb5
Revises: 46e4acde004e
Create Date: 2022-10-21 15:32:26.186650
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "9817ccf13cb5"
down_revision = "46e4acde004e"
branch_labels = None
depends_on = None


def upgrade():
op.add_column("tasks", sa.Column("is_hook", sa.Boolean(), nullable=True))


def downgrade():
op.drop_column("tasks", "is_hook")
2 changes: 2 additions & 0 deletions director/models/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Task(BaseModel):
status = db.Column(db.Enum(StatusType), default=StatusType.pending, nullable=False)
previous = db.Column(JSONBType, default=[])
result = db.Column(PickleType)
is_hook = db.Column(db.Boolean, default=False)

# Relationship
workflow_id = db.Column(
Expand All @@ -39,6 +40,7 @@ def to_dict(self):
"task": self.id,
"previous": self.previous,
"result": self.result,
"is_hook": self.is_hook,
}
)
return d
88 changes: 57 additions & 31 deletions director/static/script.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const store = new Vuex.Store({
selectedTask: null,
taskIndex: null,
loading: true,
hideHooks: false
},
actions: {
listWorkflows({ commit }) {
Expand Down Expand Up @@ -88,9 +89,24 @@ const store = new Vuex.Store({
state.selectedTask = task;
},
refreshNetwork(state, tasks) {
var g = new dagreD3.graphlib.Graph().setGraph({});
var graphMain = new dagreD3.graphlib.Graph().setGraph({});
var graphHook = new dagreD3.graphlib.Graph().setGraph({});

var terminatedStatus = ["success", "cancel", "error"];

var haveHook = false;

for (let i = 0; i < tasks.length; i++) {
if(tasks[i].is_hook && !terminatedStatus.includes(tasks[i].status)) {
continue;
}

var graph = graphMain;
if(tasks[i].is_hook) {
graph = graphHook;
haveHook = true;
}

var className = tasks[i].status;
var html = "<div class=pointer>";
html += "<span class=status></span>";
Expand All @@ -99,7 +115,7 @@ const store = new Vuex.Store({
html += "<span class=details>" + tasks[i].status + "</span>";
html += "</div>";

g.setNode(tasks[i].id, {
graph.setNode(tasks[i].id, {
labelType: "html",
label: html,
rx: 3,
Expand All @@ -109,43 +125,52 @@ const store = new Vuex.Store({
});

for (let j = 0; j < tasks[i].previous.length; j++) {
g.setEdge(tasks[i].previous[j], tasks[i].id, {});
graph.setEdge(tasks[i].previous[j], tasks[i].id, {});
}
}

// Set some general styles
g.nodes().forEach(function (v) {
var node = g.node(v);
node.rx = node.ry = 5;
});
function initGraph(graph, svgClass) {
// Set some general styles
graph.nodes().forEach(function (v) {
var node = graph.node(v);
node.rx = node.ry = 5;
});

var svg = d3.select("svg"),
inner = svg.select("g");
var svg = d3.select("svg."+svgClass),
inner = svg.select('g');

// Set up zoom support
var zoom = d3.zoom().on("zoom", function () {
inner.attr("transform", d3.event.transform);
});
inner.call(zoom.transform, d3.zoomIdentity);
svg.call(zoom);
// Set up zoom support
var zoom = d3.zoom().on("zoom", function () {
inner.attr("transform", d3.event.transform);
});
inner.call(zoom.transform, d3.zoomIdentity);
svg.call(zoom);

// Create the renderer
var render = new dagreD3.render();
render(inner, g);
// Create the renderer
var render = new dagreD3.render();
render(inner, graph);

// Handle the click
var nodes = inner.selectAll("g.node");
nodes.on("click", function (task_id) {
g.nodes().forEach(function (v) {
if (v == task_id) {
g.node(v).style = "fill: #f0f0f0; stroke-width: 2px; stroke: #777;";
} else {
g.node(v).style = "fill: white";
}
});
// Handle the click
var nodes = inner.selectAll("g.node");
nodes.on("click", function (task_id) {
graph.nodes().forEach(function (v) {
if (v == task_id) {
graph.node(v).style = "fill: #f0f0f0; stroke-width: 2px; stroke: #777;";
} else {
graph.node(v).style = "fill: white";
}
});

render(inner, graph);
state.selectedTask = tasks.find((c) => c.id == task_id);
});
}

state.hideHooks = !haveHook;

render(inner, g);
state.selectedTask = tasks.find((c) => c.id == task_id);
Vue.nextTick(function () {
initGraph(graphMain, "svg-main");
initGraph(graphHook, "svg-hooks");
});
},
changeLoadingState(state, loading) {
Expand Down Expand Up @@ -230,6 +255,7 @@ new Vue({
"taskIndex",
"network",
"loading",
"hideHooks"
]),
},
store,
Expand Down
Loading

0 comments on commit bccd812

Please sign in to comment.