From 208aeaf4b5c85854ae8cdcbc2f19365600bb42a7 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Thu, 5 Feb 2026 12:34:45 +0530 Subject: [PATCH 1/3] [Improvement][Master] Initialize WorkflowGraph/WorkflowExecutionGraph when WorkflowStartLifecycleEvent fired This commit moves the initialization of WorkflowGraph and WorkflowExecutionGraph from command handling to when the WorkflowStartLifecycleEvent is fired. Changes: - Created IWorkflowExecutionGraphAssembler interface for deferred graph assembly - Modified WorkflowExecuteContext to support lazy initialization with assembler - Updated AbstractCommandHandler to use assembler pattern instead of direct initialization - Modified all command handlers to create assemblers instead of immediate graphs - Updated WorkflowRunningStateAction.onStartEvent to initialize the graph and handle failures Benefits: - Reduces transaction time during command processing - Enables proper workflow failure handling if graph initialization fails - Workflow can be properly marked as failed with WorkflowFailedLifecycleEvent Closes #17751 --- .../handler/AbstractCommandHandler.java | 31 ++++++- .../handler/ReRunWorkflowCommandHandler.java | 25 +++--- .../RecoverFailureTaskCommandHandler.java | 80 +++++++++--------- .../RecoverSerialWaitCommandHandler.java | 7 +- .../handler/RunWorkflowCommandHandler.java | 63 ++++++++------- .../WorkflowFailoverCommandHandler.java | 81 ++++++++++--------- .../IWorkflowExecutionGraphAssembler.java | 40 +++++++++ .../WorkflowRunningStateAction.java | 10 +++ .../runner/IWorkflowExecuteContext.java | 12 +++ .../master/runner/WorkflowExecuteContext.java | 59 +++++++++++++- 10 files changed, 284 insertions(+), 124 deletions(-) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraphAssembler.java diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java index 2c9df21a870c..47e1fa3f7b4d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.extract.master.command.ICommandParam; import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; import org.apache.dolphinscheduler.server.master.engine.command.ICommandHandler; +import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler; import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphFactory; import org.apache.dolphinscheduler.server.master.engine.workflow.listener.IWorkflowLifecycleListener; @@ -80,7 +81,7 @@ public WorkflowExecutionRunnable handleCommand(final Command command) { assembleWorkflowInstance(workflowExecuteContextBuilder); assembleWorkflowInstanceLifecycleListeners(workflowExecuteContextBuilder); assembleWorkflowEventBus(workflowExecuteContextBuilder); - assembleWorkflowExecutionGraph(workflowExecuteContextBuilder); + assembleWorkflowExecutionGraphAssembler(workflowExecuteContextBuilder); final WorkflowExecutionRunnableBuilder workflowExecutionRunnableBuilder = WorkflowExecutionRunnableBuilder .builder() @@ -125,8 +126,31 @@ protected void assembleWorkflowGraph( protected abstract void assembleWorkflowInstance( final WorkflowExecuteContextBuilder workflowExecuteContextBuilder); - protected abstract void assembleWorkflowExecutionGraph( - final WorkflowExecuteContextBuilder workflowExecuteContextBuilder); + /** + * Assemble the workflow execution graph assembler. + *

+ * The assembler is used to defer the initialization of the WorkflowExecutionGraph + * until the WorkflowStartLifecycleEvent is fired. This reduces transaction time + * during command processing. + */ + protected void assembleWorkflowExecutionGraphAssembler( + final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { + final IWorkflowExecutionGraphAssembler assembler = createWorkflowExecutionGraphAssembler( + workflowExecuteContextBuilder); + workflowExecuteContextBuilder.setWorkflowExecutionGraphAssembler(assembler); + } + + /** + * Create the workflow execution graph assembler. + *

+ * Subclasses should implement this method to provide the logic for building + * the WorkflowExecutionGraph. The returned assembler will be invoked when + * the WorkflowStartLifecycleEvent is fired. + * + * @return the assembler for creating the WorkflowExecutionGraph, or null if no graph is needed + */ + protected abstract IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler( + final WorkflowExecuteContextBuilder workflowExecuteContextBuilder); protected List parseStartNodesFromWorkflowInstance( final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { @@ -158,5 +182,4 @@ protected void assembleProject( checkArgument(project != null, "Cannot find the project code: " + workflowDefinition.getProjectCode()); workflowExecuteContextBuilder.setProject(project); } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java index 524c7a225a5c..059484d7060d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; import java.util.Date; @@ -79,20 +80,20 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } - /** - * Generate the workflow execution graph. - *

Will clear the history task instance and assembly the start tasks into the WorkflowExecutionGraph. - */ @Override - protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - markAllTaskInstanceInvalid(workflowExecuteContextBuilder); - super.assembleWorkflowExecutionGraph(workflowExecuteContextBuilder); - } - - private void markAllTaskInstanceInvalid(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { + protected IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler( + final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { + // Capture parent assembler + final IWorkflowExecutionGraphAssembler parentAssembler = + super.createWorkflowExecutionGraphAssembler(workflowExecuteContextBuilder); final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance(); - final List taskInstances = getValidTaskInstance(workflowInstance); - taskInstanceDao.markTaskInstanceInvalid(taskInstances); + + return () -> { + // Mark all task instances as invalid before creating the new graph + final List taskInstances = getValidTaskInstance(workflowInstance); + taskInstanceDao.markTaskInstanceInvalid(taskInstances); + return parentAssembler.assemble(); + }; } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java index 92f5fb4ed0c7..9606b71a6673 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler; import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph; import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor; @@ -100,48 +101,51 @@ protected void assembleWorkflowInstance( workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } - /** - * Generate the workflow execution graph. - *

Will clear the history failure/killed task. - *

If the task's predecessors exist failure/killed, will also mark the task as failure/killed. - */ @Override - protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - final Map taskInstanceMap = dealWithHistoryTaskInstances(workflowExecuteContextBuilder) - .stream() - .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); - + protected IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler( + final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { + // Capture the context needed for deferred graph assembly final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph(); - final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); - - final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { - final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = - TaskExecutionRunnableBuilder - .builder() - .workflowExecutionGraph(workflowExecutionGraph) - .workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition()) - .project(workflowExecuteContextBuilder.getProject()) - .workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance()) - .taskDefinition(workflowGraph.getTaskNodeByName(task)) - .taskInstance(taskInstanceMap.get(task)) - .workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus()) - .applicationContext(applicationContext) + final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance(); + final List startNodes = parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder); + + return () -> { + final Map taskInstanceMap = dealWithHistoryTaskInstances( + workflowExecuteContextBuilder) + .stream() + .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); + + final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); + + final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { + final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = + TaskExecutionRunnableBuilder + .builder() + .workflowExecutionGraph(workflowExecutionGraph) + .workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition()) + .project(workflowExecuteContextBuilder.getProject()) + .workflowInstance(workflowInstance) + .taskDefinition(workflowGraph.getTaskNodeByName(task)) + .taskInstance(taskInstanceMap.get(task)) + .workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus()) + .applicationContext(applicationContext) + .build(); + workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); + workflowExecutionGraph.addEdge(task, successors); + }; + + final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = + WorkflowGraphTopologyLogicalVisitor.builder() + .taskDependType(workflowInstance.getTaskDependType()) + .onWorkflowGraph(workflowGraph) + .fromTask(startNodes) + .doVisitFunction(taskExecutionRunnableCreator) .build(); - workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); - workflowExecutionGraph.addEdge(task, successors); - }; + workflowGraphTopologyLogicalVisitor.visit(); + workflowExecutionGraph.removeUnReachableEdge(); - final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = - WorkflowGraphTopologyLogicalVisitor.builder() - .taskDependType(workflowExecuteContextBuilder.getWorkflowInstance().getTaskDependType()) - .onWorkflowGraph(workflowGraph) - .fromTask(parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder)) - .doVisitFunction(taskExecutionRunnableCreator) - .build(); - workflowGraphTopologyLogicalVisitor.visit(); - workflowExecutionGraph.removeUnReachableEdge(); - - workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph); + return workflowExecutionGraph; + }; } /** diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java index 09486db8baac..63712a2d47b8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext; import org.springframework.beans.factory.annotation.Autowired; @@ -50,8 +51,10 @@ protected void assembleWorkflowInstance(WorkflowExecuteContext.WorkflowExecuteCo } @Override - protected void assembleWorkflowExecutionGraph(WorkflowExecuteContext.WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - + protected IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler( + WorkflowExecuteContext.WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { + // No execution graph needed for serial wait recovery + return null; } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java index 5aa3b22e56b7..a92a43305421 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler; import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph; import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor; @@ -81,36 +82,44 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work } @Override - protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { + protected IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler( + final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { + // Capture the context needed for deferred graph assembly final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph(); - final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); - final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { - final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = - TaskExecutionRunnableBuilder - .builder() - .workflowExecutionGraph(workflowExecutionGraph) - .workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition()) - .project(workflowExecuteContextBuilder.getProject()) - .workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance()) - .taskDefinition(workflowGraph.getTaskNodeByName(task)) - .workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus()) - .applicationContext(applicationContext) + final WorkflowDefinition workflowDefinition = workflowExecuteContextBuilder.getWorkflowDefinition(); + final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance(); + final List startNodes = parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder); + + return () -> { + final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); + final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { + final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = + TaskExecutionRunnableBuilder + .builder() + .workflowExecutionGraph(workflowExecutionGraph) + .workflowDefinition(workflowDefinition) + .project(workflowExecuteContextBuilder.getProject()) + .workflowInstance(workflowInstance) + .taskDefinition(workflowGraph.getTaskNodeByName(task)) + .workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus()) + .applicationContext(applicationContext) + .build(); + workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); + workflowExecutionGraph.addEdge(task, successors); + }; + + final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = + WorkflowGraphTopologyLogicalVisitor.builder() + .taskDependType(workflowInstance.getTaskDependType()) + .onWorkflowGraph(workflowGraph) + .fromTask(startNodes) + .doVisitFunction(taskExecutionRunnableCreator) .build(); - workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); - workflowExecutionGraph.addEdge(task, successors); - }; + workflowGraphTopologyLogicalVisitor.visit(); + workflowExecutionGraph.removeUnReachableEdge(); - final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = - WorkflowGraphTopologyLogicalVisitor.builder() - .taskDependType(workflowExecuteContextBuilder.getWorkflowInstance().getTaskDependType()) - .onWorkflowGraph(workflowGraph) - .fromTask(parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder)) - .doVisitFunction(taskExecutionRunnableCreator) - .build(); - workflowGraphTopologyLogicalVisitor.visit(); - workflowExecutionGraph.removeUnReachableEdge(); - - workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph); + return workflowExecutionGraph; + }; } /** diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java index e18d02b400f8..cff6492de3b2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.extract.master.command.WorkflowFailoverCommandParam; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler; import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph; import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor; @@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; import java.util.Date; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.BiConsumer; @@ -94,48 +96,51 @@ protected void assembleWorkflowInstance( workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } - /** - * Generate the workflow execution graph. - *

Will rebuild the WorkflowExecutionGraph from the exist task instance. - */ @Override - protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - final Map taskInstanceMap = - getValidTaskInstance(workflowExecuteContextBuilder.getWorkflowInstance()) - .stream() - .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); - + protected IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler( + final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { + // Capture the context needed for deferred graph assembly final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph(); - final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); - - final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { - final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = - TaskExecutionRunnableBuilder - .builder() - .workflowExecutionGraph(workflowExecutionGraph) - .workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition()) - .project(workflowExecuteContextBuilder.getProject()) - .workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance()) - .taskDefinition(workflowGraph.getTaskNodeByName(task)) - .taskInstance(taskInstanceMap.get(task)) - .workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus()) - .applicationContext(applicationContext) + final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance(); + final List startNodes = parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder); + + return () -> { + final Map taskInstanceMap = + getValidTaskInstance(workflowInstance) + .stream() + .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); + + final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); + + final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { + final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = + TaskExecutionRunnableBuilder + .builder() + .workflowExecutionGraph(workflowExecutionGraph) + .workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition()) + .project(workflowExecuteContextBuilder.getProject()) + .workflowInstance(workflowInstance) + .taskDefinition(workflowGraph.getTaskNodeByName(task)) + .taskInstance(taskInstanceMap.get(task)) + .workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus()) + .applicationContext(applicationContext) + .build(); + workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); + workflowExecutionGraph.addEdge(task, successors); + }; + + final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = + WorkflowGraphTopologyLogicalVisitor.builder() + .taskDependType(workflowInstance.getTaskDependType()) + .onWorkflowGraph(workflowGraph) + .fromTask(startNodes) + .doVisitFunction(taskExecutionRunnableCreator) .build(); - workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); - workflowExecutionGraph.addEdge(task, successors); - }; + workflowGraphTopologyLogicalVisitor.visit(); + workflowExecutionGraph.removeUnReachableEdge(); - final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = - WorkflowGraphTopologyLogicalVisitor.builder() - .taskDependType(workflowExecuteContextBuilder.getWorkflowInstance().getTaskDependType()) - .onWorkflowGraph(workflowGraph) - .fromTask(parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder)) - .doVisitFunction(taskExecutionRunnableCreator) - .build(); - workflowGraphTopologyLogicalVisitor.visit(); - workflowExecutionGraph.removeUnReachableEdge(); - - workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph); + return workflowExecutionGraph; + }; } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraphAssembler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraphAssembler.java new file mode 100644 index 000000000000..5620436a17a4 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraphAssembler.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.graph; + +/** + * Interface for deferred assembly of WorkflowExecutionGraph. + *

+ * The implementation captures all the context needed to assemble the execution graph, + * allowing the actual graph construction to be deferred until the WorkflowStartLifecycleEvent + * is fired. This reduces transaction time during command processing. + */ +@FunctionalInterface +public interface IWorkflowExecutionGraphAssembler { + + /** + * Assemble and return the WorkflowExecutionGraph. + *

+ * This method should be called when the workflow is ready to start execution, + * typically during the handling of WorkflowStartLifecycleEvent. + * + * @return the assembled WorkflowExecutionGraph + */ + IWorkflowExecutionGraph assemble(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java index 16839f378ab4..e0e0c75c49e7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java @@ -46,8 +46,18 @@ public class WorkflowRunningStateAction extends AbstractWorkflowStateAction { public void onStartEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowStartLifecycleEvent workflowStartEvent) { throwExceptionIfStateIsNotMatch(workflowExecutionRunnable); + + // Initialize the workflow execution graph if not already initialized + // This is deferred from command handling to reduce transaction time + workflowExecutionRunnable.getWorkflowExecuteContext().initializeWorkflowExecutionGraph(); + final IWorkflowExecutionGraph workflowExecutionGraph = workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowExecutionGraph(); + if (workflowExecutionGraph == null) { + log.info("Workflow execution graph is null, try to emit workflow finished event"); + emitWorkflowFinishedEventIfApplicable(workflowExecutionRunnable); + return; + } final List startNodes = workflowExecutionGraph.getStartNodes(); if (startNodes.isEmpty()) { log.info("Workflow start node is empty, try to emit workflow finished event"); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java index 0fa99c4ae71d..8d596fe5dccf 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java @@ -43,4 +43,16 @@ public interface IWorkflowExecuteContext { List getWorkflowInstanceLifecycleListeners(); + /** + * Initialize the workflow execution graph. + * This method should be called when the workflow is ready to start execution, + * typically during the handling of WorkflowStartLifecycleEvent. + */ + void initializeWorkflowExecutionGraph(); + + /** + * Check if the workflow execution graph has been initialized. + */ + boolean isWorkflowExecutionGraphInitialized(); + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java index 0727ff5e9841..49a867a3ab7f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph; +import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler; import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.engine.workflow.listener.IWorkflowLifecycleListener; @@ -30,13 +31,11 @@ import java.util.List; import java.util.Optional; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.Getter; import lombok.NoArgsConstructor; @Getter -@AllArgsConstructor public class WorkflowExecuteContext implements IWorkflowExecuteContext { private final Command command; @@ -49,12 +48,63 @@ public class WorkflowExecuteContext implements IWorkflowExecuteContext { private final IWorkflowGraph workflowGraph; - private final IWorkflowExecutionGraph workflowExecutionGraph; + private volatile IWorkflowExecutionGraph workflowExecutionGraph; + + private final IWorkflowExecutionGraphAssembler workflowExecutionGraphAssembler; private final WorkflowEventBus workflowEventBus; private final List workflowInstanceLifecycleListeners; + public WorkflowExecuteContext(Command command, + WorkflowDefinition workflowDefinition, + Project project, + WorkflowInstance workflowInstance, + IWorkflowGraph workflowGraph, + IWorkflowExecutionGraph workflowExecutionGraph, + IWorkflowExecutionGraphAssembler workflowExecutionGraphAssembler, + WorkflowEventBus workflowEventBus, + List workflowInstanceLifecycleListeners) { + this.command = command; + this.workflowDefinition = workflowDefinition; + this.project = project; + this.workflowInstance = workflowInstance; + this.workflowGraph = workflowGraph; + this.workflowExecutionGraph = workflowExecutionGraph; + this.workflowExecutionGraphAssembler = workflowExecutionGraphAssembler; + this.workflowEventBus = workflowEventBus; + this.workflowInstanceLifecycleListeners = workflowInstanceLifecycleListeners; + } + + /** + * Initialize the workflow execution graph using the assembler. + * This method should be called when the workflow is ready to start execution, + * typically during the handling of WorkflowStartLifecycleEvent. + * + * @throws IllegalStateException if the execution graph is already initialized + * or no assembler is available + */ + public void initializeWorkflowExecutionGraph() { + if (workflowExecutionGraph != null) { + return; + } + if (workflowExecutionGraphAssembler == null) { + return; + } + synchronized (this) { + if (workflowExecutionGraph == null) { + workflowExecutionGraph = workflowExecutionGraphAssembler.assemble(); + } + } + } + + /** + * Check if the workflow execution graph has been initialized. + */ + public boolean isWorkflowExecutionGraphInitialized() { + return workflowExecutionGraph != null; + } + public static WorkflowExecuteContextBuilder builder() { return new WorkflowExecuteContextBuilder(); } @@ -73,6 +123,8 @@ public static class WorkflowExecuteContextBuilder { private IWorkflowExecutionGraph workflowExecutionGraph; + private IWorkflowExecutionGraphAssembler workflowExecutionGraphAssembler; + private WorkflowEventBus workflowEventBus; private List workflowInstanceLifecycleListeners; @@ -92,6 +144,7 @@ public WorkflowExecuteContext build() { workflowInstance, workflowGraph, workflowExecutionGraph, + workflowExecutionGraphAssembler, workflowEventBus, Optional.ofNullable(workflowInstanceLifecycleListeners).orElse(Collections.emptyList())); } From d7047e73934ac6beb0e70ec18a76915f41ddfa63 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Thu, 5 Feb 2026 13:50:40 +0530 Subject: [PATCH 2/3] Address PR review comments: add try-catch, @Override annotations, and fix Javadoc --- .../statemachine/WorkflowRunningStateAction.java | 10 +++++++++- .../server/master/runner/WorkflowExecuteContext.java | 8 +++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java index e0e0c75c49e7..347cc61c93b2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java @@ -49,7 +49,15 @@ public void onStartEvent(final IWorkflowExecutionRunnable workflowExecutionRunna // Initialize the workflow execution graph if not already initialized // This is deferred from command handling to reduce transaction time - workflowExecutionRunnable.getWorkflowExecuteContext().initializeWorkflowExecutionGraph(); + try { + workflowExecutionRunnable.getWorkflowExecuteContext().initializeWorkflowExecutionGraph(); + } catch (Exception e) { + log.error("Failed to initialize workflow execution graph", e); + final WorkflowEventBus workflowEventBus = + workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowEventBus(); + workflowEventBus.publish(WorkflowFailedLifecycleEvent.of(workflowExecutionRunnable)); + return; + } final IWorkflowExecutionGraph workflowExecutionGraph = workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowExecutionGraph(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java index 49a867a3ab7f..e793dc9f9aed 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java @@ -80,10 +80,11 @@ public WorkflowExecuteContext(Command command, * Initialize the workflow execution graph using the assembler. * This method should be called when the workflow is ready to start execution, * typically during the handling of WorkflowStartLifecycleEvent. - * - * @throws IllegalStateException if the execution graph is already initialized - * or no assembler is available + *

+ * If the execution graph is already initialized or no assembler is available, + * this method returns without making any changes. */ + @Override public void initializeWorkflowExecutionGraph() { if (workflowExecutionGraph != null) { return; @@ -101,6 +102,7 @@ public void initializeWorkflowExecutionGraph() { /** * Check if the workflow execution graph has been initialized. */ + @Override public boolean isWorkflowExecutionGraphInitialized() { return workflowExecutionGraph != null; } From cba95a4eb9d1a6af51a60c9a512176b4b9ca65e8 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Fri, 6 Feb 2026 08:41:20 +0530 Subject: [PATCH 3/3] Refactor: Use factory pattern for WorkflowExecutionGraph creation Address reviewer feedback: command handlers should not be concerned with how instances are initialized. Changes: - Add WorkflowExecutionGraphFactory to centralize all graph creation logic - Remove IWorkflowExecutionGraphAssembler interface (no longer needed) - Remove assembleWorkflowExecutionGraphAssembler() from AbstractCommandHandler - Remove createWorkflowExecutionGraphAssembler() from all command handlers - Update WorkflowRunningStateAction to use factory for graph initialization - Update IWorkflowExecuteContext: add getProject(), change to setWorkflowExecutionGraph() - Update WorkflowExecuteContext to remove assembler, add setter The factory handles all command-type-specific graph creation strategies: - START_PROCESS: Fresh start with configured start nodes - REPEAT_RUNNING: Rerun entire workflow - START_FAILURE_TASK_PROCESS: Recover failed/paused tasks - RECOVER_TOLERANCE_FAULT_PROCESS: Failover with existing task instances - RECOVER_SERIAL_WAIT: Serial wait recovery --- .../handler/AbstractCommandHandler.java | 28 -- .../handler/ReRunWorkflowCommandHandler.java | 23 -- .../RecoverFailureTaskCommandHandler.java | 171 --------- .../RecoverSerialWaitCommandHandler.java | 8 - .../handler/RunWorkflowCommandHandler.java | 53 --- .../WorkflowFailoverCommandHandler.java | 64 ---- .../IWorkflowExecutionGraphAssembler.java | 40 -- .../graph/WorkflowExecutionGraphFactory.java | 351 ++++++++++++++++++ .../WorkflowRunningStateAction.java | 12 +- .../runner/IWorkflowExecuteContext.java | 9 +- .../master/runner/WorkflowExecuteContext.java | 29 +- 11 files changed, 373 insertions(+), 415 deletions(-) delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraphAssembler.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraphFactory.java diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java index 47e1fa3f7b4d..310f550d22fe 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java @@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.extract.master.command.ICommandParam; import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; import org.apache.dolphinscheduler.server.master.engine.command.ICommandHandler; -import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler; import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphFactory; import org.apache.dolphinscheduler.server.master.engine.workflow.listener.IWorkflowLifecycleListener; @@ -81,7 +80,6 @@ public WorkflowExecutionRunnable handleCommand(final Command command) { assembleWorkflowInstance(workflowExecuteContextBuilder); assembleWorkflowInstanceLifecycleListeners(workflowExecuteContextBuilder); assembleWorkflowEventBus(workflowExecuteContextBuilder); - assembleWorkflowExecutionGraphAssembler(workflowExecuteContextBuilder); final WorkflowExecutionRunnableBuilder workflowExecutionRunnableBuilder = WorkflowExecutionRunnableBuilder .builder() @@ -126,32 +124,6 @@ protected void assembleWorkflowGraph( protected abstract void assembleWorkflowInstance( final WorkflowExecuteContextBuilder workflowExecuteContextBuilder); - /** - * Assemble the workflow execution graph assembler. - *

- * The assembler is used to defer the initialization of the WorkflowExecutionGraph - * until the WorkflowStartLifecycleEvent is fired. This reduces transaction time - * during command processing. - */ - protected void assembleWorkflowExecutionGraphAssembler( - final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - final IWorkflowExecutionGraphAssembler assembler = createWorkflowExecutionGraphAssembler( - workflowExecuteContextBuilder); - workflowExecuteContextBuilder.setWorkflowExecutionGraphAssembler(assembler); - } - - /** - * Create the workflow execution graph assembler. - *

- * Subclasses should implement this method to provide the logic for building - * the WorkflowExecutionGraph. The returned assembler will be invoked when - * the WorkflowStartLifecycleEvent is fired. - * - * @return the assembler for creating the WorkflowExecutionGraph, or null if no graph is needed - */ - protected abstract IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler( - final WorkflowExecuteContextBuilder workflowExecuteContextBuilder); - protected List parseStartNodesFromWorkflowInstance( final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java index 059484d7060d..439a76f33155 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java @@ -20,16 +20,12 @@ import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.entity.Command; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; import java.util.Date; -import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -43,9 +39,6 @@ public class ReRunWorkflowCommandHandler extends RunWorkflowCommandHandler { @Autowired private WorkflowInstanceDao workflowInstanceDao; - @Autowired - private TaskInstanceDao taskInstanceDao; - @Autowired private MasterConfig masterConfig; @@ -80,22 +73,6 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } - @Override - protected IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler( - final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - // Capture parent assembler - final IWorkflowExecutionGraphAssembler parentAssembler = - super.createWorkflowExecutionGraphAssembler(workflowExecuteContextBuilder); - final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance(); - - return () -> { - // Mark all task instances as invalid before creating the new graph - final List taskInstances = getValidTaskInstance(workflowInstance); - taskInstanceDao.markTaskInstanceInvalid(taskInstances); - return parentAssembler.assemble(); - }; - } - @Override public CommandType commandType() { return CommandType.REPEAT_RUNNING; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java index 9606b71a6673..ad44a3d42f1d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java @@ -20,36 +20,14 @@ import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.entity.Command; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler; -import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; -import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph; -import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor; -import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable; -import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder; -import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskInstanceFactories; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.BiConsumer; -import java.util.function.Function; -import java.util.stream.Collectors; - import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; -import com.google.common.collect.Lists; - /** * This handler used to handle {@link CommandType#START_FAILURE_TASK_PROCESS}. *

Will start the failure/pause/killed and other task instance which is behind success tasks instance but not been triggered. @@ -60,15 +38,6 @@ public class RecoverFailureTaskCommandHandler extends AbstractCommandHandler { @Autowired private WorkflowInstanceDao workflowInstanceDao; - @Autowired - private TaskInstanceDao taskInstanceDao; - - @Autowired - private ApplicationContext applicationContext; - - @Autowired - private TaskInstanceFactories taskInstanceFactories; - @Autowired private MasterConfig masterConfig; @@ -101,146 +70,6 @@ protected void assembleWorkflowInstance( workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } - @Override - protected IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler( - final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - // Capture the context needed for deferred graph assembly - final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph(); - final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance(); - final List startNodes = parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder); - - return () -> { - final Map taskInstanceMap = dealWithHistoryTaskInstances( - workflowExecuteContextBuilder) - .stream() - .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); - - final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); - - final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { - final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = - TaskExecutionRunnableBuilder - .builder() - .workflowExecutionGraph(workflowExecutionGraph) - .workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition()) - .project(workflowExecuteContextBuilder.getProject()) - .workflowInstance(workflowInstance) - .taskDefinition(workflowGraph.getTaskNodeByName(task)) - .taskInstance(taskInstanceMap.get(task)) - .workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus()) - .applicationContext(applicationContext) - .build(); - workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); - workflowExecutionGraph.addEdge(task, successors); - }; - - final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = - WorkflowGraphTopologyLogicalVisitor.builder() - .taskDependType(workflowInstance.getTaskDependType()) - .onWorkflowGraph(workflowGraph) - .fromTask(startNodes) - .doVisitFunction(taskExecutionRunnableCreator) - .build(); - workflowGraphTopologyLogicalVisitor.visit(); - workflowExecutionGraph.removeUnReachableEdge(); - - return workflowExecutionGraph; - }; - } - - /** - * Return the valid task instance which should not be recovered. - *

Will mark the failure/killed task instance as invalid. - */ - private List dealWithHistoryTaskInstances( - final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance(); - final Map taskInstanceMap = super.getValidTaskInstance(workflowInstance) - .stream() - .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); - - final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph(); - - final Set needRecoverTasks = new HashSet<>(); - final Set markInvalidTasks = new HashSet<>(); - final BiConsumer> historyTaskInstanceMarker = (task, successors) -> { - // If the parent is need recover - // Then the task should mark as invalid, and it's child should be mark as invalidated. - if (markInvalidTasks.contains(task)) { - if (taskInstanceMap.containsKey(task)) { - taskInstanceDao.markTaskInstanceInvalid(Lists.newArrayList(taskInstanceMap.get(task))); - taskInstanceMap.remove(task); - } - markInvalidTasks.addAll(successors); - return; - } - - final TaskInstance taskInstance = taskInstanceMap.get(task); - if (taskInstance == null) { - return; - } - - if (isTaskNeedRecreate(taskInstance) || isTaskCanRecover(taskInstance)) { - needRecoverTasks.add(task); - markInvalidTasks.addAll(successors); - } - }; - - final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = - WorkflowGraphTopologyLogicalVisitor.builder() - .onWorkflowGraph(workflowGraph) - .taskDependType(workflowInstance.getTaskDependType()) - .fromTask(parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder)) - .doVisitFunction(historyTaskInstanceMarker) - .build(); - workflowGraphTopologyLogicalVisitor.visit(); - - for (String task : needRecoverTasks) { - final TaskInstance taskInstance = taskInstanceMap.get(task); - if (isTaskCanRecover(taskInstance)) { - taskInstanceMap.put(task, createRecoverTaskInstance(taskInstance)); - continue; - } - if (isTaskNeedRecreate(taskInstance)) { - taskInstanceMap.put(task, createRecreatedTaskInstance(taskInstance)); - } - } - return new ArrayList<>(taskInstanceMap.values()); - } - - /** - * Whether the task need to be recreated. - *

If the task state is FAILURE and KILL, then will mark the task invalid and recreate the task. - */ - private boolean isTaskNeedRecreate(final TaskInstance taskInstance) { - if (taskInstance == null) { - return false; - } - return taskInstance.getState() == TaskExecutionStatus.FAILURE - || taskInstance.getState() == TaskExecutionStatus.KILL; - } - - private TaskInstance createRecreatedTaskInstance(final TaskInstance taskInstance) { - return taskInstanceFactories.failedRecoverTaskInstanceFactory() - .builder() - .withTaskInstance(taskInstance) - .build(); - } - - private boolean isTaskCanRecover(final TaskInstance taskInstance) { - if (taskInstance == null) { - return false; - } - return taskInstance.getState() == TaskExecutionStatus.PAUSE; - } - - private TaskInstance createRecoverTaskInstance(final TaskInstance taskInstance) { - return taskInstanceFactories.pauseRecoverTaskInstanceFactory() - .builder() - .withTaskInstance(taskInstance) - .build(); - } - @Override public CommandType commandType() { return CommandType.START_FAILURE_TASK_PROCESS; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java index 63712a2d47b8..7abfff57579c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java @@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext; import org.springframework.beans.factory.annotation.Autowired; @@ -50,13 +49,6 @@ protected void assembleWorkflowInstance(WorkflowExecuteContext.WorkflowExecuteCo workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } - @Override - protected IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler( - WorkflowExecuteContext.WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - // No execution graph needed for serial wait recovery - return null; - } - @Override public CommandType commandType() { return CommandType.RECOVER_SERIAL_WAIT; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java index a92a43305421..eabf5ade17fc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java @@ -28,12 +28,6 @@ import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler; -import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; -import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph; -import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor; -import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable; -import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; import org.apache.commons.collections4.CollectionUtils; @@ -42,11 +36,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.function.BiConsumer; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; /** @@ -62,9 +53,6 @@ public class RunWorkflowCommandHandler extends AbstractCommandHandler { @Autowired private MasterConfig masterConfig; - @Autowired - private ApplicationContext applicationContext; - /** * Will generate a new workflow instance based on the command. */ @@ -81,47 +69,6 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } - @Override - protected IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler( - final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - // Capture the context needed for deferred graph assembly - final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph(); - final WorkflowDefinition workflowDefinition = workflowExecuteContextBuilder.getWorkflowDefinition(); - final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance(); - final List startNodes = parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder); - - return () -> { - final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); - final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { - final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = - TaskExecutionRunnableBuilder - .builder() - .workflowExecutionGraph(workflowExecutionGraph) - .workflowDefinition(workflowDefinition) - .project(workflowExecuteContextBuilder.getProject()) - .workflowInstance(workflowInstance) - .taskDefinition(workflowGraph.getTaskNodeByName(task)) - .workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus()) - .applicationContext(applicationContext) - .build(); - workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); - workflowExecutionGraph.addEdge(task, successors); - }; - - final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = - WorkflowGraphTopologyLogicalVisitor.builder() - .taskDependType(workflowInstance.getTaskDependType()) - .onWorkflowGraph(workflowGraph) - .fromTask(startNodes) - .doVisitFunction(taskExecutionRunnableCreator) - .build(); - workflowGraphTopologyLogicalVisitor.visit(); - workflowExecutionGraph.removeUnReachableEdge(); - - return workflowExecutionGraph; - }; - } - /** * Merge the command params with the workflow params. *

If there are duplicate keys, the command params will override the workflow params. diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java index cff6492de3b2..95982be04cec 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java @@ -20,29 +20,15 @@ import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Command; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.extract.master.command.WorkflowFailoverCommandParam; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler; -import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; -import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph; -import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor; -import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable; -import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.BiConsumer; -import java.util.function.Function; -import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; /** @@ -55,9 +41,6 @@ public class WorkflowFailoverCommandHandler extends AbstractCommandHandler { @Autowired private WorkflowInstanceDao workflowInstanceDao; - @Autowired - private ApplicationContext applicationContext; - @Autowired private MasterConfig masterConfig; @@ -96,53 +79,6 @@ protected void assembleWorkflowInstance( workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } - @Override - protected IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler( - final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - // Capture the context needed for deferred graph assembly - final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph(); - final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance(); - final List startNodes = parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder); - - return () -> { - final Map taskInstanceMap = - getValidTaskInstance(workflowInstance) - .stream() - .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); - - final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); - - final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { - final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = - TaskExecutionRunnableBuilder - .builder() - .workflowExecutionGraph(workflowExecutionGraph) - .workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition()) - .project(workflowExecuteContextBuilder.getProject()) - .workflowInstance(workflowInstance) - .taskDefinition(workflowGraph.getTaskNodeByName(task)) - .taskInstance(taskInstanceMap.get(task)) - .workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus()) - .applicationContext(applicationContext) - .build(); - workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); - workflowExecutionGraph.addEdge(task, successors); - }; - - final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = - WorkflowGraphTopologyLogicalVisitor.builder() - .taskDependType(workflowInstance.getTaskDependType()) - .onWorkflowGraph(workflowGraph) - .fromTask(startNodes) - .doVisitFunction(taskExecutionRunnableCreator) - .build(); - workflowGraphTopologyLogicalVisitor.visit(); - workflowExecutionGraph.removeUnReachableEdge(); - - return workflowExecutionGraph; - }; - } - @Override public CommandType commandType() { return CommandType.RECOVER_TOLERANCE_FAULT_PROCESS; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraphAssembler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraphAssembler.java deleted file mode 100644 index 5620436a17a4..000000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraphAssembler.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.engine.graph; - -/** - * Interface for deferred assembly of WorkflowExecutionGraph. - *

- * The implementation captures all the context needed to assemble the execution graph, - * allowing the actual graph construction to be deferred until the WorkflowStartLifecycleEvent - * is fired. This reduces transaction time during command processing. - */ -@FunctionalInterface -public interface IWorkflowExecutionGraphAssembler { - - /** - * Assemble and return the WorkflowExecutionGraph. - *

- * This method should be called when the workflow is ready to start execution, - * typically during the handling of WorkflowStartLifecycleEvent. - * - * @return the assembled WorkflowExecutionGraph - */ - IWorkflowExecutionGraph assemble(); - -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraphFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraphFactory.java new file mode 100644 index 000000000000..1cb2578bb350 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraphFactory.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.graph; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.extract.master.command.ICommandParam; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskInstanceFactories; +import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +import com.google.common.collect.Lists; + +/** + * Factory for creating WorkflowExecutionGraph based on command type. + *

+ * This factory encapsulates all graph creation logic, ensuring command handlers + * are not concerned with how instances are initialized. + */ +@Slf4j +@Component +public class WorkflowExecutionGraphFactory { + + @Autowired + private TaskInstanceDao taskInstanceDao; + + @Autowired + private TaskInstanceFactories taskInstanceFactories; + + @Autowired + private ApplicationContext applicationContext; + + /** + * Create a WorkflowExecutionGraph based on the command type in the context. + * + * @param context the workflow execute context + * @return the WorkflowExecutionGraph, or null if not applicable + */ + public IWorkflowExecutionGraph createWorkflowExecutionGraph(final IWorkflowExecuteContext context) { + final Command command = context.getCommand(); + if (command == null) { + log.warn("Command is null, cannot create workflow execution graph"); + return null; + } + + final CommandType commandType = command.getCommandType(); + switch (commandType) { + case START_PROCESS: + return createForStartProcess(context); + case REPEAT_RUNNING: + return createForRepeatRunning(context); + case START_FAILURE_TASK_PROCESS: + return createForRecoverFailure(context); + case RECOVER_TOLERANCE_FAULT_PROCESS: + return createForFailover(context); + case RECOVER_SERIAL_WAIT: + return null; // No execution graph needed + default: + log.warn("Unsupported command type for graph creation: {}", commandType); + return createForStartProcess(context); + } + } + + /** + * Create graph for START_PROCESS command - creates new task instances. + */ + private IWorkflowExecutionGraph createForStartProcess(final IWorkflowExecuteContext context) { + final IWorkflowGraph workflowGraph = context.getWorkflowGraph(); + final WorkflowInstance workflowInstance = context.getWorkflowInstance(); + final List startNodes = parseStartNodes(context); + + final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); + final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { + final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = + TaskExecutionRunnableBuilder + .builder() + .workflowExecutionGraph(workflowExecutionGraph) + .workflowDefinition(context.getWorkflowDefinition()) + .project(context.getProject()) + .workflowInstance(workflowInstance) + .taskDefinition(workflowGraph.getTaskNodeByName(task)) + .workflowEventBus(context.getWorkflowEventBus()) + .applicationContext(applicationContext) + .build(); + workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); + workflowExecutionGraph.addEdge(task, successors); + }; + + final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = + WorkflowGraphTopologyLogicalVisitor.builder() + .taskDependType(workflowInstance.getTaskDependType()) + .onWorkflowGraph(workflowGraph) + .fromTask(startNodes) + .doVisitFunction(taskExecutionRunnableCreator) + .build(); + workflowGraphTopologyLogicalVisitor.visit(); + workflowExecutionGraph.removeUnReachableEdge(); + + return workflowExecutionGraph; + } + + /** + * Create graph for REPEAT_RUNNING command - invalidates old task instances and creates new ones. + */ + private IWorkflowExecutionGraph createForRepeatRunning(final IWorkflowExecuteContext context) { + final WorkflowInstance workflowInstance = context.getWorkflowInstance(); + + // Mark all existing task instances as invalid before creating the new graph + final List taskInstances = getValidTaskInstances(workflowInstance); + taskInstanceDao.markTaskInstanceInvalid(taskInstances); + + // Create new graph (same logic as START_PROCESS) + return createForStartProcess(context); + } + + /** + * Create graph for START_FAILURE_TASK_PROCESS command - recovers from failed tasks. + */ + private IWorkflowExecutionGraph createForRecoverFailure(final IWorkflowExecuteContext context) { + final IWorkflowGraph workflowGraph = context.getWorkflowGraph(); + final WorkflowInstance workflowInstance = context.getWorkflowInstance(); + final List startNodes = parseStartNodes(context); + + final Map taskInstanceMap = dealWithHistoryTaskInstances(context) + .stream() + .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); + + final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); + + final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { + final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = + TaskExecutionRunnableBuilder + .builder() + .workflowExecutionGraph(workflowExecutionGraph) + .workflowDefinition(context.getWorkflowDefinition()) + .project(context.getProject()) + .workflowInstance(workflowInstance) + .taskDefinition(workflowGraph.getTaskNodeByName(task)) + .taskInstance(taskInstanceMap.get(task)) + .workflowEventBus(context.getWorkflowEventBus()) + .applicationContext(applicationContext) + .build(); + workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); + workflowExecutionGraph.addEdge(task, successors); + }; + + final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = + WorkflowGraphTopologyLogicalVisitor.builder() + .taskDependType(workflowInstance.getTaskDependType()) + .onWorkflowGraph(workflowGraph) + .fromTask(startNodes) + .doVisitFunction(taskExecutionRunnableCreator) + .build(); + workflowGraphTopologyLogicalVisitor.visit(); + workflowExecutionGraph.removeUnReachableEdge(); + + return workflowExecutionGraph; + } + + /** + * Create graph for RECOVER_TOLERANCE_FAULT_PROCESS command - recovers from failover. + */ + private IWorkflowExecutionGraph createForFailover(final IWorkflowExecuteContext context) { + final IWorkflowGraph workflowGraph = context.getWorkflowGraph(); + final WorkflowInstance workflowInstance = context.getWorkflowInstance(); + final List startNodes = parseStartNodes(context); + + final Map taskInstanceMap = + getValidTaskInstances(workflowInstance) + .stream() + .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); + + final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); + + final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { + final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = + TaskExecutionRunnableBuilder + .builder() + .workflowExecutionGraph(workflowExecutionGraph) + .workflowDefinition(context.getWorkflowDefinition()) + .project(context.getProject()) + .workflowInstance(workflowInstance) + .taskDefinition(workflowGraph.getTaskNodeByName(task)) + .taskInstance(taskInstanceMap.get(task)) + .workflowEventBus(context.getWorkflowEventBus()) + .applicationContext(applicationContext) + .build(); + workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); + workflowExecutionGraph.addEdge(task, successors); + }; + + final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = + WorkflowGraphTopologyLogicalVisitor.builder() + .taskDependType(workflowInstance.getTaskDependType()) + .onWorkflowGraph(workflowGraph) + .fromTask(startNodes) + .doVisitFunction(taskExecutionRunnableCreator) + .build(); + workflowGraphTopologyLogicalVisitor.visit(); + workflowExecutionGraph.removeUnReachableEdge(); + + return workflowExecutionGraph; + } + + /** + * Parse start nodes from the workflow instance command param. + * Converts task codes to task names. + */ + private List parseStartNodes(final IWorkflowExecuteContext context) { + final WorkflowInstance workflowInstance = context.getWorkflowInstance(); + final ICommandParam commandParam = + JSONUtils.parseObject(workflowInstance.getCommandParam(), ICommandParam.class); + if (commandParam == null || CollectionUtils.isEmpty(commandParam.getStartNodes())) { + return Collections.emptyList(); + } + final IWorkflowGraph workflowGraph = context.getWorkflowGraph(); + return commandParam.getStartNodes() + .stream() + .map(workflowGraph::getTaskNodeByCode) + .map(TaskDefinition::getName) + .collect(Collectors.toList()); + } + + /** + * Get valid (non-invalid) task instances for a workflow instance. + */ + private List getValidTaskInstances(final WorkflowInstance workflowInstance) { + return taskInstanceDao.queryValidTaskListByWorkflowInstanceId( + workflowInstance.getId()); + } + + /** + * Deal with history task instances for failure recovery. + * Mark failure/killed tasks and their children as invalid. + */ + private List dealWithHistoryTaskInstances(final IWorkflowExecuteContext context) { + final WorkflowInstance workflowInstance = context.getWorkflowInstance(); + final Map taskInstanceMap = getValidTaskInstances(workflowInstance) + .stream() + .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); + + final IWorkflowGraph workflowGraph = context.getWorkflowGraph(); + final List startNodes = parseStartNodes(context); + + final Set needRecoverTasks = new HashSet<>(); + final Set markInvalidTasks = new HashSet<>(); + final BiConsumer> historyTaskInstanceMarker = (task, successors) -> { + if (markInvalidTasks.contains(task)) { + if (taskInstanceMap.containsKey(task)) { + taskInstanceDao.markTaskInstanceInvalid(Lists.newArrayList(taskInstanceMap.get(task))); + taskInstanceMap.remove(task); + } + markInvalidTasks.addAll(successors); + return; + } + + final TaskInstance taskInstance = taskInstanceMap.get(task); + if (taskInstance == null) { + return; + } + + if (isTaskNeedRecreate(taskInstance) || isTaskCanRecover(taskInstance)) { + needRecoverTasks.add(task); + markInvalidTasks.addAll(successors); + } + }; + + final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = + WorkflowGraphTopologyLogicalVisitor.builder() + .onWorkflowGraph(workflowGraph) + .taskDependType(workflowInstance.getTaskDependType()) + .fromTask(startNodes) + .doVisitFunction(historyTaskInstanceMarker) + .build(); + workflowGraphTopologyLogicalVisitor.visit(); + + for (String task : needRecoverTasks) { + final TaskInstance taskInstance = taskInstanceMap.get(task); + if (isTaskCanRecover(taskInstance)) { + recoverTaskInstance(taskInstance); + } else if (isTaskNeedRecreate(taskInstance)) { + taskInstanceDao.markTaskInstanceInvalid(Lists.newArrayList(taskInstance)); + taskInstanceMap.remove(task); + } + } + + return Lists.newArrayList(taskInstanceMap.values()); + } + + private boolean isTaskNeedRecreate(final TaskInstance taskInstance) { + final TaskExecutionStatus state = taskInstance.getState(); + return state == TaskExecutionStatus.PAUSE + || state == TaskExecutionStatus.KILL + || state == TaskExecutionStatus.FAILURE + || state == TaskExecutionStatus.NEED_FAULT_TOLERANCE + || state == TaskExecutionStatus.DISPATCH + || state == TaskExecutionStatus.RUNNING_EXECUTION; + } + + private boolean isTaskCanRecover(final TaskInstance taskInstance) { + final TaskExecutionStatus state = taskInstance.getState(); + return state == TaskExecutionStatus.PAUSE || state == TaskExecutionStatus.KILL; + } + + private void recoverTaskInstance(final TaskInstance taskInstance) { + // The factory handles: setting old instance flag to NO, updating it, and inserting the new instance + taskInstanceFactories.failedRecoverTaskInstanceFactory() + .builder() + .withTaskInstance(taskInstance) + .build(); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java index 347cc61c93b2..13cff6097331 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java @@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph; +import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraphFactory; import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFailedLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFinalizeLifecycleEvent; @@ -36,21 +37,28 @@ import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component public class WorkflowRunningStateAction extends AbstractWorkflowStateAction { + @Autowired + private WorkflowExecutionGraphFactory workflowExecutionGraphFactory; + @Override public void onStartEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowStartLifecycleEvent workflowStartEvent) { throwExceptionIfStateIsNotMatch(workflowExecutionRunnable); - // Initialize the workflow execution graph if not already initialized + // Initialize the workflow execution graph using the factory // This is deferred from command handling to reduce transaction time try { - workflowExecutionRunnable.getWorkflowExecuteContext().initializeWorkflowExecutionGraph(); + final IWorkflowExecutionGraph workflowExecutionGraph = + workflowExecutionGraphFactory.createWorkflowExecutionGraph( + workflowExecutionRunnable.getWorkflowExecuteContext()); + workflowExecutionRunnable.getWorkflowExecuteContext().setWorkflowExecutionGraph(workflowExecutionGraph); } catch (Exception e) { log.error("Failed to initialize workflow execution graph", e); final WorkflowEventBus workflowEventBus = diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java index 8d596fe5dccf..e76eef1bf59d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; @@ -35,6 +36,8 @@ public interface IWorkflowExecuteContext { WorkflowInstance getWorkflowInstance(); + Project getProject(); + IWorkflowGraph getWorkflowGraph(); IWorkflowExecutionGraph getWorkflowExecutionGraph(); @@ -44,11 +47,13 @@ public interface IWorkflowExecuteContext { List getWorkflowInstanceLifecycleListeners(); /** - * Initialize the workflow execution graph. + * Set the workflow execution graph. * This method should be called when the workflow is ready to start execution, * typically during the handling of WorkflowStartLifecycleEvent. + * + * @param workflowExecutionGraph the workflow execution graph to set */ - void initializeWorkflowExecutionGraph(); + void setWorkflowExecutionGraph(IWorkflowExecutionGraph workflowExecutionGraph); /** * Check if the workflow execution graph has been initialized. diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java index e793dc9f9aed..c78074ad8d8f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java @@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph; -import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler; import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.engine.workflow.listener.IWorkflowLifecycleListener; @@ -50,8 +49,6 @@ public class WorkflowExecuteContext implements IWorkflowExecuteContext { private volatile IWorkflowExecutionGraph workflowExecutionGraph; - private final IWorkflowExecutionGraphAssembler workflowExecutionGraphAssembler; - private final WorkflowEventBus workflowEventBus; private final List workflowInstanceLifecycleListeners; @@ -62,7 +59,6 @@ public WorkflowExecuteContext(Command command, WorkflowInstance workflowInstance, IWorkflowGraph workflowGraph, IWorkflowExecutionGraph workflowExecutionGraph, - IWorkflowExecutionGraphAssembler workflowExecutionGraphAssembler, WorkflowEventBus workflowEventBus, List workflowInstanceLifecycleListeners) { this.command = command; @@ -71,32 +67,20 @@ public WorkflowExecuteContext(Command command, this.workflowInstance = workflowInstance; this.workflowGraph = workflowGraph; this.workflowExecutionGraph = workflowExecutionGraph; - this.workflowExecutionGraphAssembler = workflowExecutionGraphAssembler; this.workflowEventBus = workflowEventBus; this.workflowInstanceLifecycleListeners = workflowInstanceLifecycleListeners; } /** - * Initialize the workflow execution graph using the assembler. + * Set the workflow execution graph. * This method should be called when the workflow is ready to start execution, * typically during the handling of WorkflowStartLifecycleEvent. - *

- * If the execution graph is already initialized or no assembler is available, - * this method returns without making any changes. + * + * @param workflowExecutionGraph the workflow execution graph to set */ @Override - public void initializeWorkflowExecutionGraph() { - if (workflowExecutionGraph != null) { - return; - } - if (workflowExecutionGraphAssembler == null) { - return; - } - synchronized (this) { - if (workflowExecutionGraph == null) { - workflowExecutionGraph = workflowExecutionGraphAssembler.assemble(); - } - } + public void setWorkflowExecutionGraph(final IWorkflowExecutionGraph workflowExecutionGraph) { + this.workflowExecutionGraph = workflowExecutionGraph; } /** @@ -125,8 +109,6 @@ public static class WorkflowExecuteContextBuilder { private IWorkflowExecutionGraph workflowExecutionGraph; - private IWorkflowExecutionGraphAssembler workflowExecutionGraphAssembler; - private WorkflowEventBus workflowEventBus; private List workflowInstanceLifecycleListeners; @@ -146,7 +128,6 @@ public WorkflowExecuteContext build() { workflowInstance, workflowGraph, workflowExecutionGraph, - workflowExecutionGraphAssembler, workflowEventBus, Optional.ofNullable(workflowInstanceLifecycleListeners).orElse(Collections.emptyList())); }