diff --git a/director/builder.py b/director/builder.py index f10b467a..ff54bdfd 100644 --- a/director/builder.py +++ b/director/builder.py @@ -6,7 +6,7 @@ from director.models import StatusType from director.models.tasks import Task from director.models.workflows import Workflow -from director.tasks.workflows import start, end +from director.tasks.workflows import start, end, failure_hooks_launcher class WorkflowBuilder(object): @@ -20,6 +20,12 @@ def __init__(self, workflow_id): self.tasks = cel_workflows.get_tasks(str(self.workflow)) self.canvas = [] + self.failure_hook = cel_workflows.get_failure_hook_task(str(self.workflow)) + self.failure_hook_canvas = [] + + self.success_hook = cel_workflows.get_success_hook_task(str(self.workflow)) + self.success_hook_canvas = [] + # Pointer to the previous task(s) self.previous = [] @@ -29,7 +35,7 @@ def workflow(self): self._workflow = Workflow.query.filter_by(id=self.workflow_id).first() return self._workflow - def new_task(self, task_name, single=True): + def new_task(self, task_name, is_hook, single=True): task_id = uuid() queue = self.custom_queues.get(task_name, self.queue) @@ -48,6 +54,7 @@ def new_task(self, task_name, single=True): previous=self.previous, workflow_id=self.workflow.id, status=StatusType.pending, + is_hook=is_hook, ) task.save() @@ -63,12 +70,12 @@ def parse_queues(self): if type(self.queue) is not str or type(self.custom_queues) is not dict: raise WorkflowSyntaxError() - def parse(self, tasks): + def parse(self, tasks, is_hook=False): canvas = [] for task in tasks: if type(task) is str: - signature = self.new_task(task) + signature = self.new_task(task, is_hook) canvas.append(signature) elif type(task) is dict: name = list(task)[0] @@ -76,7 +83,7 @@ def parse(self, tasks): raise WorkflowSyntaxError() sub_canvas_tasks = [ - self.new_task(t, single=False) for t in task[name]["tasks"] + self.new_task(t, is_hook, single=False) for t in task[name]["tasks"] ] sub_canvas = group(*sub_canvas_tasks, task_id=uuid()) @@ -93,14 +100,40 @@ def build(self): self.canvas.insert(0, start.si(self.workflow.id).set(queue=self.queue)) self.canvas.append(end.si(self.workflow.id).set(queue=self.queue)) + def build_hooks(self): + initial_previous = self.previous + + if self.failure_hook and not self.failure_hook_canvas: + self.previous = None + self.failure_hook_canvas = [ + failure_hooks_launcher.si( + self.workflow.id, + self.queue, + [self.failure_hook], + self.workflow.payload, + ).set(queue=self.queue), + ] + + if self.success_hook and not self.success_hook_canvas: + self.previous = None + self.success_hook_canvas = [self.parse([self.success_hook], True)[0]] + + self.previous = initial_previous + def run(self): if not self.canvas: self.build() canvas = chain(*self.canvas, task_id=uuid()) + self.build_hooks() + try: - return canvas.apply_async() + return canvas.apply_async( + link=self.success_hook_canvas, + link_error=self.failure_hook_canvas, + ) + except Exception as e: self.workflow.status = StatusType.error self.workflow.save() diff --git a/director/commands/workflows.py b/director/commands/workflows.py index 52f651e6..99d468a0 100644 --- a/director/commands/workflows.py +++ b/director/commands/workflows.py @@ -14,7 +14,7 @@ from director.utils import validate, format_schema_errors, build_celery_schedule -def tasks_to_ascii(tasks): +def tasks_to_ascii(tasks, hooks): tasks_str = "" # Wrap the tasks list for task in tasks: @@ -26,6 +26,11 @@ def tasks_to_ascii(tasks): else: tasks_str += f"{task}\n" + if "failure" in hooks: + tasks_str += f"Failure hook: {hooks['failure']}\n" + if "success" in hooks: + tasks_str += f"Success hook: {hooks['success']}\n" + # Just remove the last newline if tasks_str: tasks_str = tasks_str[:-1] @@ -54,7 +59,9 @@ def list_workflow(ctx): periodic = "--" if conf.get("periodic"): periodic, _ = build_celery_schedule(name, conf["periodic"]) - tasks_str = tasks_to_ascii(conf["tasks"]) + tasks_str = tasks_to_ascii( + conf["tasks"], conf["hooks"] if "hooks" in conf else {} + ) data.append([name, periodic, tasks_str]) table = AsciiTable(data) @@ -74,7 +81,9 @@ def show_workflow(ctx, name): click.echo(f"Error: {e}") raise click.Abort() - tasks_str = tasks_to_ascii(_workflow["tasks"]) + tasks_str = tasks_to_ascii( + _workflow["tasks"], _workflow["hooks"] if "hooks" in _workflow else {} + ) periodic = "--" if _workflow.get("periodic"): periodic, _ = build_celery_schedule(name, _workflow["periodic"]) diff --git a/director/extensions.py b/director/extensions.py index 0e22b263..5d233c73 100644 --- a/director/extensions.py +++ b/director/extensions.py @@ -41,6 +41,20 @@ def get_by_name(self, name): def get_tasks(self, name): return self.get_by_name(name)["tasks"] + def get_hook_task(self, name, hook_name): + if ( + "hooks" in self.get_by_name(name) + and hook_name in self.get_by_name(name)["hooks"] + ): + return self.get_by_name(name)["hooks"][hook_name] + return None + + def get_failure_hook_task(self, name): + return self.get_hook_task(name, "failure") + + def get_success_hook_task(self, name): + return self.get_hook_task(name, "success") + def get_queue(self, name): try: return self.get_by_name(name)["queue"] diff --git a/director/migrations/versions/9817ccf13cb5_add_is_hook_on_task.py b/director/migrations/versions/9817ccf13cb5_add_is_hook_on_task.py new file mode 100644 index 00000000..b924193e --- /dev/null +++ b/director/migrations/versions/9817ccf13cb5_add_is_hook_on_task.py @@ -0,0 +1,24 @@ +"""Add is_hook on Task + +Revision ID: 9817ccf13cb5 +Revises: 46e4acde004e +Create Date: 2022-10-21 15:32:26.186650 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "9817ccf13cb5" +down_revision = "46e4acde004e" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column("tasks", sa.Column("is_hook", sa.Boolean(), nullable=True)) + + +def downgrade(): + op.drop_column("tasks", "is_hook") diff --git a/director/models/tasks.py b/director/models/tasks.py index 9068241e..461a45e0 100644 --- a/director/models/tasks.py +++ b/director/models/tasks.py @@ -15,6 +15,7 @@ class Task(BaseModel): status = db.Column(db.Enum(StatusType), default=StatusType.pending, nullable=False) previous = db.Column(JSONBType, default=[]) result = db.Column(PickleType) + is_hook = db.Column(db.Boolean, default=False) # Relationship workflow_id = db.Column( @@ -39,6 +40,7 @@ def to_dict(self): "task": self.id, "previous": self.previous, "result": self.result, + "is_hook": self.is_hook, } ) return d diff --git a/director/static/script.js b/director/static/script.js index 41d1e325..72a440f5 100644 --- a/director/static/script.js +++ b/director/static/script.js @@ -26,6 +26,7 @@ const store = new Vuex.Store({ selectedTask: null, taskIndex: null, loading: true, + hideHooks: false }, actions: { listWorkflows({ commit }) { @@ -88,9 +89,24 @@ const store = new Vuex.Store({ state.selectedTask = task; }, refreshNetwork(state, tasks) { - var g = new dagreD3.graphlib.Graph().setGraph({}); + var graphMain = new dagreD3.graphlib.Graph().setGraph({}); + var graphHook = new dagreD3.graphlib.Graph().setGraph({}); + + var terminatedStatus = ["success", "cancel", "error"]; + + var haveHook = false; for (let i = 0; i < tasks.length; i++) { + if(tasks[i].is_hook && !terminatedStatus.includes(tasks[i].status)) { + continue; + } + + var graph = graphMain; + if(tasks[i].is_hook) { + graph = graphHook; + haveHook = true; + } + var className = tasks[i].status; var html = "
"; html += ""; @@ -99,7 +115,7 @@ const store = new Vuex.Store({ html += "" + tasks[i].status + ""; html += "
"; - g.setNode(tasks[i].id, { + graph.setNode(tasks[i].id, { labelType: "html", label: html, rx: 3, @@ -109,43 +125,52 @@ const store = new Vuex.Store({ }); for (let j = 0; j < tasks[i].previous.length; j++) { - g.setEdge(tasks[i].previous[j], tasks[i].id, {}); + graph.setEdge(tasks[i].previous[j], tasks[i].id, {}); } } - // Set some general styles - g.nodes().forEach(function (v) { - var node = g.node(v); - node.rx = node.ry = 5; - }); + function initGraph(graph, svgClass) { + // Set some general styles + graph.nodes().forEach(function (v) { + var node = graph.node(v); + node.rx = node.ry = 5; + }); - var svg = d3.select("svg"), - inner = svg.select("g"); + var svg = d3.select("svg."+svgClass), + inner = svg.select('g'); - // Set up zoom support - var zoom = d3.zoom().on("zoom", function () { - inner.attr("transform", d3.event.transform); - }); - inner.call(zoom.transform, d3.zoomIdentity); - svg.call(zoom); + // Set up zoom support + var zoom = d3.zoom().on("zoom", function () { + inner.attr("transform", d3.event.transform); + }); + inner.call(zoom.transform, d3.zoomIdentity); + svg.call(zoom); - // Create the renderer - var render = new dagreD3.render(); - render(inner, g); + // Create the renderer + var render = new dagreD3.render(); + render(inner, graph); - // Handle the click - var nodes = inner.selectAll("g.node"); - nodes.on("click", function (task_id) { - g.nodes().forEach(function (v) { - if (v == task_id) { - g.node(v).style = "fill: #f0f0f0; stroke-width: 2px; stroke: #777;"; - } else { - g.node(v).style = "fill: white"; - } - }); + // Handle the click + var nodes = inner.selectAll("g.node"); + nodes.on("click", function (task_id) { + graph.nodes().forEach(function (v) { + if (v == task_id) { + graph.node(v).style = "fill: #f0f0f0; stroke-width: 2px; stroke: #777;"; + } else { + graph.node(v).style = "fill: white"; + } + }); + + render(inner, graph); + state.selectedTask = tasks.find((c) => c.id == task_id); + }); + } + + state.hideHooks = !haveHook; - render(inner, g); - state.selectedTask = tasks.find((c) => c.id == task_id); + Vue.nextTick(function () { + initGraph(graphMain, "svg-main"); + initGraph(graphHook, "svg-hooks"); }); }, changeLoadingState(state, loading) { @@ -230,6 +255,7 @@ new Vue({ "taskIndex", "network", "loading", + "hideHooks" ]), }, store, diff --git a/director/tasks/workflows.py b/director/tasks/workflows.py index 8c112e98..a58e93b3 100644 --- a/director/tasks/workflows.py +++ b/director/tasks/workflows.py @@ -1,10 +1,14 @@ import time from celery.utils.log import get_task_logger +from celery import chain +from celery.utils import uuid + from director.extensions import cel from director.models import StatusType from director.models.workflows import Workflow +from director.models.tasks import Task logger = get_task_logger(__name__) @@ -37,3 +41,56 @@ def end(workflow_id): if workflow.status != StatusType.error: workflow.status = StatusType.success workflow.save() + + +@cel.task() +def mark_as_canceled_pending_tasks(workflow_id): + logger.info(f"Mark as cancelled pending tasks of the workflow {workflow_id}") + tasks = Task.query.filter_by(workflow_id=workflow_id, status=StatusType.pending) + for task in tasks: + task.status = StatusType.canceled + task.save() + + +@cel.task() +def failure_hooks_launcher(workflow_id, queue, tasks_names, payload): + canvas = [] + + for task_name in tasks_names: + task_id = uuid() + + # We create the Celery task specifying its UID + signature = cel.tasks.get(task_name).subtask( + kwargs={"workflow_id": workflow_id, "payload": payload}, + task_id=task_id, + ) + + # Director task has the same UID + task = Task( + id=task_id, + key=task_name, + workflow_id=workflow_id, + status=StatusType.pending, + is_hook=True, + ) + task.save() + + canvas.append(signature) + + canvas = chain(*canvas, task_id=uuid()) + + result = canvas.apply_async() + + try: + result.get() + except Exception: + pass + + task_id = uuid() + signature_mark_as_canceled = cel.tasks.get( + "director.tasks.workflows.mark_as_canceled_pending_tasks" + ).subtask( + args=(workflow_id,), + task_id=task_id, + ) + signature_mark_as_canceled.apply_async() diff --git a/director/templates/index.html b/director/templates/index.html index bec2a7ff..7e201737 100644 --- a/director/templates/index.html +++ b/director/templates/index.html @@ -229,7 +229,18 @@
- + + + +
+
+
+ + +
+ + Hooks triggered +
@@ -433,7 +444,9 @@ -