diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java index 2c9df21a870c..310f550d22fe 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java @@ -80,7 +80,6 @@ public WorkflowExecutionRunnable handleCommand(final Command command) { assembleWorkflowInstance(workflowExecuteContextBuilder); assembleWorkflowInstanceLifecycleListeners(workflowExecuteContextBuilder); assembleWorkflowEventBus(workflowExecuteContextBuilder); - assembleWorkflowExecutionGraph(workflowExecuteContextBuilder); final WorkflowExecutionRunnableBuilder workflowExecutionRunnableBuilder = WorkflowExecutionRunnableBuilder .builder() @@ -125,9 +124,6 @@ protected void assembleWorkflowGraph( protected abstract void assembleWorkflowInstance( final WorkflowExecuteContextBuilder workflowExecuteContextBuilder); - protected abstract void assembleWorkflowExecutionGraph( - final WorkflowExecuteContextBuilder workflowExecuteContextBuilder); - protected List parseStartNodesFromWorkflowInstance( final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance(); @@ -158,5 +154,4 @@ protected void assembleProject( checkArgument(project != null, "Cannot find the project code: " + workflowDefinition.getProjectCode()); workflowExecuteContextBuilder.setProject(project); } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java index 524c7a225a5c..439a76f33155 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java @@ -20,15 +20,12 @@ import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.entity.Command; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; import java.util.Date; -import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -42,9 +39,6 @@ public class ReRunWorkflowCommandHandler extends RunWorkflowCommandHandler { @Autowired private WorkflowInstanceDao workflowInstanceDao; - @Autowired - private TaskInstanceDao taskInstanceDao; - @Autowired private MasterConfig masterConfig; @@ -79,22 +73,6 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } - /** - * Generate the workflow execution graph. - *

Will clear the history task instance and assembly the start tasks into the WorkflowExecutionGraph. - */ - @Override - protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - markAllTaskInstanceInvalid(workflowExecuteContextBuilder); - super.assembleWorkflowExecutionGraph(workflowExecuteContextBuilder); - } - - private void markAllTaskInstanceInvalid(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance(); - final List taskInstances = getValidTaskInstance(workflowInstance); - taskInstanceDao.markTaskInstanceInvalid(taskInstances); - } - @Override public CommandType commandType() { return CommandType.REPEAT_RUNNING; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java index 92f5fb4ed0c7..ad44a3d42f1d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java @@ -20,35 +20,14 @@ import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.entity.Command; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; -import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph; -import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor; -import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable; -import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder; -import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskInstanceFactories; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.BiConsumer; -import java.util.function.Function; -import java.util.stream.Collectors; - import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; -import com.google.common.collect.Lists; - /** * This handler used to handle {@link CommandType#START_FAILURE_TASK_PROCESS}. *

Will start the failure/pause/killed and other task instance which is behind success tasks instance but not been triggered. @@ -59,15 +38,6 @@ public class RecoverFailureTaskCommandHandler extends AbstractCommandHandler { @Autowired private WorkflowInstanceDao workflowInstanceDao; - @Autowired - private TaskInstanceDao taskInstanceDao; - - @Autowired - private ApplicationContext applicationContext; - - @Autowired - private TaskInstanceFactories taskInstanceFactories; - @Autowired private MasterConfig masterConfig; @@ -100,143 +70,6 @@ protected void assembleWorkflowInstance( workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } - /** - * Generate the workflow execution graph. - *

Will clear the history failure/killed task. - *

If the task's predecessors exist failure/killed, will also mark the task as failure/killed. - */ - @Override - protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - final Map taskInstanceMap = dealWithHistoryTaskInstances(workflowExecuteContextBuilder) - .stream() - .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); - - final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph(); - final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); - - final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { - final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = - TaskExecutionRunnableBuilder - .builder() - .workflowExecutionGraph(workflowExecutionGraph) - .workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition()) - .project(workflowExecuteContextBuilder.getProject()) - .workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance()) - .taskDefinition(workflowGraph.getTaskNodeByName(task)) - .taskInstance(taskInstanceMap.get(task)) - .workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus()) - .applicationContext(applicationContext) - .build(); - workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); - workflowExecutionGraph.addEdge(task, successors); - }; - - final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = - WorkflowGraphTopologyLogicalVisitor.builder() - .taskDependType(workflowExecuteContextBuilder.getWorkflowInstance().getTaskDependType()) - .onWorkflowGraph(workflowGraph) - .fromTask(parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder)) - .doVisitFunction(taskExecutionRunnableCreator) - .build(); - workflowGraphTopologyLogicalVisitor.visit(); - workflowExecutionGraph.removeUnReachableEdge(); - - workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph); - } - - /** - * Return the valid task instance which should not be recovered. - *

Will mark the failure/killed task instance as invalid. - */ - private List dealWithHistoryTaskInstances( - final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance(); - final Map taskInstanceMap = super.getValidTaskInstance(workflowInstance) - .stream() - .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); - - final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph(); - - final Set needRecoverTasks = new HashSet<>(); - final Set markInvalidTasks = new HashSet<>(); - final BiConsumer> historyTaskInstanceMarker = (task, successors) -> { - // If the parent is need recover - // Then the task should mark as invalid, and it's child should be mark as invalidated. - if (markInvalidTasks.contains(task)) { - if (taskInstanceMap.containsKey(task)) { - taskInstanceDao.markTaskInstanceInvalid(Lists.newArrayList(taskInstanceMap.get(task))); - taskInstanceMap.remove(task); - } - markInvalidTasks.addAll(successors); - return; - } - - final TaskInstance taskInstance = taskInstanceMap.get(task); - if (taskInstance == null) { - return; - } - - if (isTaskNeedRecreate(taskInstance) || isTaskCanRecover(taskInstance)) { - needRecoverTasks.add(task); - markInvalidTasks.addAll(successors); - } - }; - - final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = - WorkflowGraphTopologyLogicalVisitor.builder() - .onWorkflowGraph(workflowGraph) - .taskDependType(workflowInstance.getTaskDependType()) - .fromTask(parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder)) - .doVisitFunction(historyTaskInstanceMarker) - .build(); - workflowGraphTopologyLogicalVisitor.visit(); - - for (String task : needRecoverTasks) { - final TaskInstance taskInstance = taskInstanceMap.get(task); - if (isTaskCanRecover(taskInstance)) { - taskInstanceMap.put(task, createRecoverTaskInstance(taskInstance)); - continue; - } - if (isTaskNeedRecreate(taskInstance)) { - taskInstanceMap.put(task, createRecreatedTaskInstance(taskInstance)); - } - } - return new ArrayList<>(taskInstanceMap.values()); - } - - /** - * Whether the task need to be recreated. - *

If the task state is FAILURE and KILL, then will mark the task invalid and recreate the task. - */ - private boolean isTaskNeedRecreate(final TaskInstance taskInstance) { - if (taskInstance == null) { - return false; - } - return taskInstance.getState() == TaskExecutionStatus.FAILURE - || taskInstance.getState() == TaskExecutionStatus.KILL; - } - - private TaskInstance createRecreatedTaskInstance(final TaskInstance taskInstance) { - return taskInstanceFactories.failedRecoverTaskInstanceFactory() - .builder() - .withTaskInstance(taskInstance) - .build(); - } - - private boolean isTaskCanRecover(final TaskInstance taskInstance) { - if (taskInstance == null) { - return false; - } - return taskInstance.getState() == TaskExecutionStatus.PAUSE; - } - - private TaskInstance createRecoverTaskInstance(final TaskInstance taskInstance) { - return taskInstanceFactories.pauseRecoverTaskInstanceFactory() - .builder() - .withTaskInstance(taskInstance) - .build(); - } - @Override public CommandType commandType() { return CommandType.START_FAILURE_TASK_PROCESS; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java index 09486db8baac..7abfff57579c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java @@ -49,11 +49,6 @@ protected void assembleWorkflowInstance(WorkflowExecuteContext.WorkflowExecuteCo workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } - @Override - protected void assembleWorkflowExecutionGraph(WorkflowExecuteContext.WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - - } - @Override public CommandType commandType() { return CommandType.RECOVER_SERIAL_WAIT; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java index 5aa3b22e56b7..eabf5ade17fc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java @@ -28,11 +28,6 @@ import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; -import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph; -import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor; -import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable; -import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; import org.apache.commons.collections4.CollectionUtils; @@ -41,11 +36,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.function.BiConsumer; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; /** @@ -61,9 +53,6 @@ public class RunWorkflowCommandHandler extends AbstractCommandHandler { @Autowired private MasterConfig masterConfig; - @Autowired - private ApplicationContext applicationContext; - /** * Will generate a new workflow instance based on the command. */ @@ -80,39 +69,6 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } - @Override - protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph(); - final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); - final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { - final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = - TaskExecutionRunnableBuilder - .builder() - .workflowExecutionGraph(workflowExecutionGraph) - .workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition()) - .project(workflowExecuteContextBuilder.getProject()) - .workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance()) - .taskDefinition(workflowGraph.getTaskNodeByName(task)) - .workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus()) - .applicationContext(applicationContext) - .build(); - workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); - workflowExecutionGraph.addEdge(task, successors); - }; - - final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = - WorkflowGraphTopologyLogicalVisitor.builder() - .taskDependType(workflowExecuteContextBuilder.getWorkflowInstance().getTaskDependType()) - .onWorkflowGraph(workflowGraph) - .fromTask(parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder)) - .doVisitFunction(taskExecutionRunnableCreator) - .build(); - workflowGraphTopologyLogicalVisitor.visit(); - workflowExecutionGraph.removeUnReachableEdge(); - - workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph); - } - /** * Merge the command params with the workflow params. *

If there are duplicate keys, the command params will override the workflow params. diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java index e18d02b400f8..95982be04cec 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java @@ -20,27 +20,15 @@ import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Command; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.extract.master.command.WorkflowFailoverCommandParam; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; -import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph; -import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor; -import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable; -import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; import java.util.Date; -import java.util.Map; -import java.util.Set; -import java.util.function.BiConsumer; -import java.util.function.Function; -import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; /** @@ -53,9 +41,6 @@ public class WorkflowFailoverCommandHandler extends AbstractCommandHandler { @Autowired private WorkflowInstanceDao workflowInstanceDao; - @Autowired - private ApplicationContext applicationContext; - @Autowired private MasterConfig masterConfig; @@ -94,50 +79,6 @@ protected void assembleWorkflowInstance( workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } - /** - * Generate the workflow execution graph. - *

Will rebuild the WorkflowExecutionGraph from the exist task instance. - */ - @Override - protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { - final Map taskInstanceMap = - getValidTaskInstance(workflowExecuteContextBuilder.getWorkflowInstance()) - .stream() - .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); - - final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph(); - final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); - - final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { - final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = - TaskExecutionRunnableBuilder - .builder() - .workflowExecutionGraph(workflowExecutionGraph) - .workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition()) - .project(workflowExecuteContextBuilder.getProject()) - .workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance()) - .taskDefinition(workflowGraph.getTaskNodeByName(task)) - .taskInstance(taskInstanceMap.get(task)) - .workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus()) - .applicationContext(applicationContext) - .build(); - workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); - workflowExecutionGraph.addEdge(task, successors); - }; - - final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = - WorkflowGraphTopologyLogicalVisitor.builder() - .taskDependType(workflowExecuteContextBuilder.getWorkflowInstance().getTaskDependType()) - .onWorkflowGraph(workflowGraph) - .fromTask(parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder)) - .doVisitFunction(taskExecutionRunnableCreator) - .build(); - workflowGraphTopologyLogicalVisitor.visit(); - workflowExecutionGraph.removeUnReachableEdge(); - - workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph); - } - @Override public CommandType commandType() { return CommandType.RECOVER_TOLERANCE_FAULT_PROCESS; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraphFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraphFactory.java new file mode 100644 index 000000000000..1cb2578bb350 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraphFactory.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.graph; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.extract.master.command.ICommandParam; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskInstanceFactories; +import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +import com.google.common.collect.Lists; + +/** + * Factory for creating WorkflowExecutionGraph based on command type. + *

+ * This factory encapsulates all graph creation logic, ensuring command handlers + * are not concerned with how instances are initialized. + */ +@Slf4j +@Component +public class WorkflowExecutionGraphFactory { + + @Autowired + private TaskInstanceDao taskInstanceDao; + + @Autowired + private TaskInstanceFactories taskInstanceFactories; + + @Autowired + private ApplicationContext applicationContext; + + /** + * Create a WorkflowExecutionGraph based on the command type in the context. + * + * @param context the workflow execute context + * @return the WorkflowExecutionGraph, or null if not applicable + */ + public IWorkflowExecutionGraph createWorkflowExecutionGraph(final IWorkflowExecuteContext context) { + final Command command = context.getCommand(); + if (command == null) { + log.warn("Command is null, cannot create workflow execution graph"); + return null; + } + + final CommandType commandType = command.getCommandType(); + switch (commandType) { + case START_PROCESS: + return createForStartProcess(context); + case REPEAT_RUNNING: + return createForRepeatRunning(context); + case START_FAILURE_TASK_PROCESS: + return createForRecoverFailure(context); + case RECOVER_TOLERANCE_FAULT_PROCESS: + return createForFailover(context); + case RECOVER_SERIAL_WAIT: + return null; // No execution graph needed + default: + log.warn("Unsupported command type for graph creation: {}", commandType); + return createForStartProcess(context); + } + } + + /** + * Create graph for START_PROCESS command - creates new task instances. + */ + private IWorkflowExecutionGraph createForStartProcess(final IWorkflowExecuteContext context) { + final IWorkflowGraph workflowGraph = context.getWorkflowGraph(); + final WorkflowInstance workflowInstance = context.getWorkflowInstance(); + final List startNodes = parseStartNodes(context); + + final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); + final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { + final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = + TaskExecutionRunnableBuilder + .builder() + .workflowExecutionGraph(workflowExecutionGraph) + .workflowDefinition(context.getWorkflowDefinition()) + .project(context.getProject()) + .workflowInstance(workflowInstance) + .taskDefinition(workflowGraph.getTaskNodeByName(task)) + .workflowEventBus(context.getWorkflowEventBus()) + .applicationContext(applicationContext) + .build(); + workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); + workflowExecutionGraph.addEdge(task, successors); + }; + + final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = + WorkflowGraphTopologyLogicalVisitor.builder() + .taskDependType(workflowInstance.getTaskDependType()) + .onWorkflowGraph(workflowGraph) + .fromTask(startNodes) + .doVisitFunction(taskExecutionRunnableCreator) + .build(); + workflowGraphTopologyLogicalVisitor.visit(); + workflowExecutionGraph.removeUnReachableEdge(); + + return workflowExecutionGraph; + } + + /** + * Create graph for REPEAT_RUNNING command - invalidates old task instances and creates new ones. + */ + private IWorkflowExecutionGraph createForRepeatRunning(final IWorkflowExecuteContext context) { + final WorkflowInstance workflowInstance = context.getWorkflowInstance(); + + // Mark all existing task instances as invalid before creating the new graph + final List taskInstances = getValidTaskInstances(workflowInstance); + taskInstanceDao.markTaskInstanceInvalid(taskInstances); + + // Create new graph (same logic as START_PROCESS) + return createForStartProcess(context); + } + + /** + * Create graph for START_FAILURE_TASK_PROCESS command - recovers from failed tasks. + */ + private IWorkflowExecutionGraph createForRecoverFailure(final IWorkflowExecuteContext context) { + final IWorkflowGraph workflowGraph = context.getWorkflowGraph(); + final WorkflowInstance workflowInstance = context.getWorkflowInstance(); + final List startNodes = parseStartNodes(context); + + final Map taskInstanceMap = dealWithHistoryTaskInstances(context) + .stream() + .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); + + final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); + + final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { + final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = + TaskExecutionRunnableBuilder + .builder() + .workflowExecutionGraph(workflowExecutionGraph) + .workflowDefinition(context.getWorkflowDefinition()) + .project(context.getProject()) + .workflowInstance(workflowInstance) + .taskDefinition(workflowGraph.getTaskNodeByName(task)) + .taskInstance(taskInstanceMap.get(task)) + .workflowEventBus(context.getWorkflowEventBus()) + .applicationContext(applicationContext) + .build(); + workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); + workflowExecutionGraph.addEdge(task, successors); + }; + + final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = + WorkflowGraphTopologyLogicalVisitor.builder() + .taskDependType(workflowInstance.getTaskDependType()) + .onWorkflowGraph(workflowGraph) + .fromTask(startNodes) + .doVisitFunction(taskExecutionRunnableCreator) + .build(); + workflowGraphTopologyLogicalVisitor.visit(); + workflowExecutionGraph.removeUnReachableEdge(); + + return workflowExecutionGraph; + } + + /** + * Create graph for RECOVER_TOLERANCE_FAULT_PROCESS command - recovers from failover. + */ + private IWorkflowExecutionGraph createForFailover(final IWorkflowExecuteContext context) { + final IWorkflowGraph workflowGraph = context.getWorkflowGraph(); + final WorkflowInstance workflowInstance = context.getWorkflowInstance(); + final List startNodes = parseStartNodes(context); + + final Map taskInstanceMap = + getValidTaskInstances(workflowInstance) + .stream() + .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); + + final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); + + final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { + final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = + TaskExecutionRunnableBuilder + .builder() + .workflowExecutionGraph(workflowExecutionGraph) + .workflowDefinition(context.getWorkflowDefinition()) + .project(context.getProject()) + .workflowInstance(workflowInstance) + .taskDefinition(workflowGraph.getTaskNodeByName(task)) + .taskInstance(taskInstanceMap.get(task)) + .workflowEventBus(context.getWorkflowEventBus()) + .applicationContext(applicationContext) + .build(); + workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); + workflowExecutionGraph.addEdge(task, successors); + }; + + final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = + WorkflowGraphTopologyLogicalVisitor.builder() + .taskDependType(workflowInstance.getTaskDependType()) + .onWorkflowGraph(workflowGraph) + .fromTask(startNodes) + .doVisitFunction(taskExecutionRunnableCreator) + .build(); + workflowGraphTopologyLogicalVisitor.visit(); + workflowExecutionGraph.removeUnReachableEdge(); + + return workflowExecutionGraph; + } + + /** + * Parse start nodes from the workflow instance command param. + * Converts task codes to task names. + */ + private List parseStartNodes(final IWorkflowExecuteContext context) { + final WorkflowInstance workflowInstance = context.getWorkflowInstance(); + final ICommandParam commandParam = + JSONUtils.parseObject(workflowInstance.getCommandParam(), ICommandParam.class); + if (commandParam == null || CollectionUtils.isEmpty(commandParam.getStartNodes())) { + return Collections.emptyList(); + } + final IWorkflowGraph workflowGraph = context.getWorkflowGraph(); + return commandParam.getStartNodes() + .stream() + .map(workflowGraph::getTaskNodeByCode) + .map(TaskDefinition::getName) + .collect(Collectors.toList()); + } + + /** + * Get valid (non-invalid) task instances for a workflow instance. + */ + private List getValidTaskInstances(final WorkflowInstance workflowInstance) { + return taskInstanceDao.queryValidTaskListByWorkflowInstanceId( + workflowInstance.getId()); + } + + /** + * Deal with history task instances for failure recovery. + * Mark failure/killed tasks and their children as invalid. + */ + private List dealWithHistoryTaskInstances(final IWorkflowExecuteContext context) { + final WorkflowInstance workflowInstance = context.getWorkflowInstance(); + final Map taskInstanceMap = getValidTaskInstances(workflowInstance) + .stream() + .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); + + final IWorkflowGraph workflowGraph = context.getWorkflowGraph(); + final List startNodes = parseStartNodes(context); + + final Set needRecoverTasks = new HashSet<>(); + final Set markInvalidTasks = new HashSet<>(); + final BiConsumer> historyTaskInstanceMarker = (task, successors) -> { + if (markInvalidTasks.contains(task)) { + if (taskInstanceMap.containsKey(task)) { + taskInstanceDao.markTaskInstanceInvalid(Lists.newArrayList(taskInstanceMap.get(task))); + taskInstanceMap.remove(task); + } + markInvalidTasks.addAll(successors); + return; + } + + final TaskInstance taskInstance = taskInstanceMap.get(task); + if (taskInstance == null) { + return; + } + + if (isTaskNeedRecreate(taskInstance) || isTaskCanRecover(taskInstance)) { + needRecoverTasks.add(task); + markInvalidTasks.addAll(successors); + } + }; + + final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = + WorkflowGraphTopologyLogicalVisitor.builder() + .onWorkflowGraph(workflowGraph) + .taskDependType(workflowInstance.getTaskDependType()) + .fromTask(startNodes) + .doVisitFunction(historyTaskInstanceMarker) + .build(); + workflowGraphTopologyLogicalVisitor.visit(); + + for (String task : needRecoverTasks) { + final TaskInstance taskInstance = taskInstanceMap.get(task); + if (isTaskCanRecover(taskInstance)) { + recoverTaskInstance(taskInstance); + } else if (isTaskNeedRecreate(taskInstance)) { + taskInstanceDao.markTaskInstanceInvalid(Lists.newArrayList(taskInstance)); + taskInstanceMap.remove(task); + } + } + + return Lists.newArrayList(taskInstanceMap.values()); + } + + private boolean isTaskNeedRecreate(final TaskInstance taskInstance) { + final TaskExecutionStatus state = taskInstance.getState(); + return state == TaskExecutionStatus.PAUSE + || state == TaskExecutionStatus.KILL + || state == TaskExecutionStatus.FAILURE + || state == TaskExecutionStatus.NEED_FAULT_TOLERANCE + || state == TaskExecutionStatus.DISPATCH + || state == TaskExecutionStatus.RUNNING_EXECUTION; + } + + private boolean isTaskCanRecover(final TaskInstance taskInstance) { + final TaskExecutionStatus state = taskInstance.getState(); + return state == TaskExecutionStatus.PAUSE || state == TaskExecutionStatus.KILL; + } + + private void recoverTaskInstance(final TaskInstance taskInstance) { + // The factory handles: setting old instance flag to NO, updating it, and inserting the new instance + taskInstanceFactories.failedRecoverTaskInstanceFactory() + .builder() + .withTaskInstance(taskInstance) + .build(); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java index 16839f378ab4..13cff6097331 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java @@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph; +import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraphFactory; import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFailedLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFinalizeLifecycleEvent; @@ -36,18 +37,43 @@ import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component public class WorkflowRunningStateAction extends AbstractWorkflowStateAction { + @Autowired + private WorkflowExecutionGraphFactory workflowExecutionGraphFactory; + @Override public void onStartEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowStartLifecycleEvent workflowStartEvent) { throwExceptionIfStateIsNotMatch(workflowExecutionRunnable); + + // Initialize the workflow execution graph using the factory + // This is deferred from command handling to reduce transaction time + try { + final IWorkflowExecutionGraph workflowExecutionGraph = + workflowExecutionGraphFactory.createWorkflowExecutionGraph( + workflowExecutionRunnable.getWorkflowExecuteContext()); + workflowExecutionRunnable.getWorkflowExecuteContext().setWorkflowExecutionGraph(workflowExecutionGraph); + } catch (Exception e) { + log.error("Failed to initialize workflow execution graph", e); + final WorkflowEventBus workflowEventBus = + workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowEventBus(); + workflowEventBus.publish(WorkflowFailedLifecycleEvent.of(workflowExecutionRunnable)); + return; + } + final IWorkflowExecutionGraph workflowExecutionGraph = workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowExecutionGraph(); + if (workflowExecutionGraph == null) { + log.info("Workflow execution graph is null, try to emit workflow finished event"); + emitWorkflowFinishedEventIfApplicable(workflowExecutionRunnable); + return; + } final List startNodes = workflowExecutionGraph.getStartNodes(); if (startNodes.isEmpty()) { log.info("Workflow start node is empty, try to emit workflow finished event"); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java index 0fa99c4ae71d..e76eef1bf59d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; @@ -35,6 +36,8 @@ public interface IWorkflowExecuteContext { WorkflowInstance getWorkflowInstance(); + Project getProject(); + IWorkflowGraph getWorkflowGraph(); IWorkflowExecutionGraph getWorkflowExecutionGraph(); @@ -43,4 +46,18 @@ public interface IWorkflowExecuteContext { List getWorkflowInstanceLifecycleListeners(); + /** + * Set the workflow execution graph. + * This method should be called when the workflow is ready to start execution, + * typically during the handling of WorkflowStartLifecycleEvent. + * + * @param workflowExecutionGraph the workflow execution graph to set + */ + void setWorkflowExecutionGraph(IWorkflowExecutionGraph workflowExecutionGraph); + + /** + * Check if the workflow execution graph has been initialized. + */ + boolean isWorkflowExecutionGraphInitialized(); + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java index 0727ff5e9841..c78074ad8d8f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java @@ -30,13 +30,11 @@ import java.util.List; import java.util.Optional; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.Getter; import lombok.NoArgsConstructor; @Getter -@AllArgsConstructor public class WorkflowExecuteContext implements IWorkflowExecuteContext { private final Command command; @@ -49,12 +47,50 @@ public class WorkflowExecuteContext implements IWorkflowExecuteContext { private final IWorkflowGraph workflowGraph; - private final IWorkflowExecutionGraph workflowExecutionGraph; + private volatile IWorkflowExecutionGraph workflowExecutionGraph; private final WorkflowEventBus workflowEventBus; private final List workflowInstanceLifecycleListeners; + public WorkflowExecuteContext(Command command, + WorkflowDefinition workflowDefinition, + Project project, + WorkflowInstance workflowInstance, + IWorkflowGraph workflowGraph, + IWorkflowExecutionGraph workflowExecutionGraph, + WorkflowEventBus workflowEventBus, + List workflowInstanceLifecycleListeners) { + this.command = command; + this.workflowDefinition = workflowDefinition; + this.project = project; + this.workflowInstance = workflowInstance; + this.workflowGraph = workflowGraph; + this.workflowExecutionGraph = workflowExecutionGraph; + this.workflowEventBus = workflowEventBus; + this.workflowInstanceLifecycleListeners = workflowInstanceLifecycleListeners; + } + + /** + * Set the workflow execution graph. + * This method should be called when the workflow is ready to start execution, + * typically during the handling of WorkflowStartLifecycleEvent. + * + * @param workflowExecutionGraph the workflow execution graph to set + */ + @Override + public void setWorkflowExecutionGraph(final IWorkflowExecutionGraph workflowExecutionGraph) { + this.workflowExecutionGraph = workflowExecutionGraph; + } + + /** + * Check if the workflow execution graph has been initialized. + */ + @Override + public boolean isWorkflowExecutionGraphInitialized() { + return workflowExecutionGraph != null; + } + public static WorkflowExecuteContextBuilder builder() { return new WorkflowExecuteContextBuilder(); }