Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public WorkflowExecutionRunnable handleCommand(final Command command) {
assembleWorkflowInstance(workflowExecuteContextBuilder);
assembleWorkflowInstanceLifecycleListeners(workflowExecuteContextBuilder);
assembleWorkflowEventBus(workflowExecuteContextBuilder);
assembleWorkflowExecutionGraph(workflowExecuteContextBuilder);

final WorkflowExecutionRunnableBuilder workflowExecutionRunnableBuilder = WorkflowExecutionRunnableBuilder
.builder()
Expand Down Expand Up @@ -125,9 +124,6 @@ protected void assembleWorkflowGraph(
protected abstract void assembleWorkflowInstance(
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder);

protected abstract void assembleWorkflowExecutionGraph(
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder);

protected List<String> parseStartNodesFromWorkflowInstance(
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance();
Expand Down Expand Up @@ -158,5 +154,4 @@ protected void assembleProject(
checkArgument(project != null, "Cannot find the project code: " + workflowDefinition.getProjectCode());
workflowExecuteContextBuilder.setProject(project);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;

import java.util.Date;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand All @@ -42,9 +39,6 @@ public class ReRunWorkflowCommandHandler extends RunWorkflowCommandHandler {
@Autowired
private WorkflowInstanceDao workflowInstanceDao;

@Autowired
private TaskInstanceDao taskInstanceDao;

@Autowired
private MasterConfig masterConfig;

Expand Down Expand Up @@ -79,22 +73,6 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
}

/**
* Generate the workflow execution graph.
* <p> Will clear the history task instance and assembly the start tasks into the WorkflowExecutionGraph.
*/
@Override
protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
markAllTaskInstanceInvalid(workflowExecuteContextBuilder);
super.assembleWorkflowExecutionGraph(workflowExecuteContextBuilder);
}

private void markAllTaskInstanceInvalid(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance();
final List<TaskInstance> taskInstances = getValidTaskInstance(workflowInstance);
taskInstanceDao.markTaskInstanceInvalid(taskInstances);
}

@Override
public CommandType commandType() {
return CommandType.REPEAT_RUNNING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,14 @@
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskInstanceFactories;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import com.google.common.collect.Lists;

/**
* This handler used to handle {@link CommandType#START_FAILURE_TASK_PROCESS}.
* <p> Will start the failure/pause/killed and other task instance which is behind success tasks instance but not been triggered.
Expand All @@ -59,15 +38,6 @@ public class RecoverFailureTaskCommandHandler extends AbstractCommandHandler {
@Autowired
private WorkflowInstanceDao workflowInstanceDao;

@Autowired
private TaskInstanceDao taskInstanceDao;

@Autowired
private ApplicationContext applicationContext;

@Autowired
private TaskInstanceFactories taskInstanceFactories;

@Autowired
private MasterConfig masterConfig;

Expand Down Expand Up @@ -100,143 +70,6 @@ protected void assembleWorkflowInstance(
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
}

/**
* Generate the workflow execution graph.
* <p> Will clear the history failure/killed task.
* <p> If the task's predecessors exist failure/killed, will also mark the task as failure/killed.
*/
@Override
protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
final Map<String, TaskInstance> taskInstanceMap = dealWithHistoryTaskInstances(workflowExecuteContextBuilder)
.stream()
.collect(Collectors.toMap(TaskInstance::getName, Function.identity()));

final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph();
final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph();

final BiConsumer<String, Set<String>> taskExecutionRunnableCreator = (task, successors) -> {
final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder =
TaskExecutionRunnableBuilder
.builder()
.workflowExecutionGraph(workflowExecutionGraph)
.workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition())
.project(workflowExecuteContextBuilder.getProject())
.workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance())
.taskDefinition(workflowGraph.getTaskNodeByName(task))
.taskInstance(taskInstanceMap.get(task))
.workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus())
.applicationContext(applicationContext)
.build();
workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder));
workflowExecutionGraph.addEdge(task, successors);
};

final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor =
WorkflowGraphTopologyLogicalVisitor.builder()
.taskDependType(workflowExecuteContextBuilder.getWorkflowInstance().getTaskDependType())
.onWorkflowGraph(workflowGraph)
.fromTask(parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder))
.doVisitFunction(taskExecutionRunnableCreator)
.build();
workflowGraphTopologyLogicalVisitor.visit();
workflowExecutionGraph.removeUnReachableEdge();

workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph);
}

/**
* Return the valid task instance which should not be recovered.
* <p> Will mark the failure/killed task instance as invalid.
*/
private List<TaskInstance> dealWithHistoryTaskInstances(
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance();
final Map<String, TaskInstance> taskInstanceMap = super.getValidTaskInstance(workflowInstance)
.stream()
.collect(Collectors.toMap(TaskInstance::getName, Function.identity()));

final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph();

final Set<String> needRecoverTasks = new HashSet<>();
final Set<String> markInvalidTasks = new HashSet<>();
final BiConsumer<String, Set<String>> historyTaskInstanceMarker = (task, successors) -> {
// If the parent is need recover
// Then the task should mark as invalid, and it's child should be mark as invalidated.
if (markInvalidTasks.contains(task)) {
if (taskInstanceMap.containsKey(task)) {
taskInstanceDao.markTaskInstanceInvalid(Lists.newArrayList(taskInstanceMap.get(task)));
taskInstanceMap.remove(task);
}
markInvalidTasks.addAll(successors);
return;
}

final TaskInstance taskInstance = taskInstanceMap.get(task);
if (taskInstance == null) {
return;
}

if (isTaskNeedRecreate(taskInstance) || isTaskCanRecover(taskInstance)) {
needRecoverTasks.add(task);
markInvalidTasks.addAll(successors);
}
};

final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor =
WorkflowGraphTopologyLogicalVisitor.builder()
.onWorkflowGraph(workflowGraph)
.taskDependType(workflowInstance.getTaskDependType())
.fromTask(parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder))
.doVisitFunction(historyTaskInstanceMarker)
.build();
workflowGraphTopologyLogicalVisitor.visit();

for (String task : needRecoverTasks) {
final TaskInstance taskInstance = taskInstanceMap.get(task);
if (isTaskCanRecover(taskInstance)) {
taskInstanceMap.put(task, createRecoverTaskInstance(taskInstance));
continue;
}
if (isTaskNeedRecreate(taskInstance)) {
taskInstanceMap.put(task, createRecreatedTaskInstance(taskInstance));
}
}
return new ArrayList<>(taskInstanceMap.values());
}

/**
* Whether the task need to be recreated.
* <p> If the task state is FAILURE and KILL, then will mark the task invalid and recreate the task.
*/
private boolean isTaskNeedRecreate(final TaskInstance taskInstance) {
if (taskInstance == null) {
return false;
}
return taskInstance.getState() == TaskExecutionStatus.FAILURE
|| taskInstance.getState() == TaskExecutionStatus.KILL;
}

private TaskInstance createRecreatedTaskInstance(final TaskInstance taskInstance) {
return taskInstanceFactories.failedRecoverTaskInstanceFactory()
.builder()
.withTaskInstance(taskInstance)
.build();
}

private boolean isTaskCanRecover(final TaskInstance taskInstance) {
if (taskInstance == null) {
return false;
}
return taskInstance.getState() == TaskExecutionStatus.PAUSE;
}

private TaskInstance createRecoverTaskInstance(final TaskInstance taskInstance) {
return taskInstanceFactories.pauseRecoverTaskInstanceFactory()
.builder()
.withTaskInstance(taskInstance)
.build();
}

@Override
public CommandType commandType() {
return CommandType.START_FAILURE_TASK_PROCESS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ protected void assembleWorkflowInstance(WorkflowExecuteContext.WorkflowExecuteCo
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
}

@Override
protected void assembleWorkflowExecutionGraph(WorkflowExecuteContext.WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {

}

@Override
public CommandType commandType() {
return CommandType.RECOVER_SERIAL_WAIT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@
import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;

import org.apache.commons.collections4.CollectionUtils;
Expand All @@ -41,11 +36,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

/**
Expand All @@ -61,9 +53,6 @@ public class RunWorkflowCommandHandler extends AbstractCommandHandler {
@Autowired
private MasterConfig masterConfig;

@Autowired
private ApplicationContext applicationContext;

/**
* Will generate a new workflow instance based on the command.
*/
Expand All @@ -80,39 +69,6 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
}

@Override
protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph();
final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph();
final BiConsumer<String, Set<String>> taskExecutionRunnableCreator = (task, successors) -> {
final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder =
TaskExecutionRunnableBuilder
.builder()
.workflowExecutionGraph(workflowExecutionGraph)
.workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition())
.project(workflowExecuteContextBuilder.getProject())
.workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance())
.taskDefinition(workflowGraph.getTaskNodeByName(task))
.workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus())
.applicationContext(applicationContext)
.build();
workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder));
workflowExecutionGraph.addEdge(task, successors);
};

final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor =
WorkflowGraphTopologyLogicalVisitor.builder()
.taskDependType(workflowExecuteContextBuilder.getWorkflowInstance().getTaskDependType())
.onWorkflowGraph(workflowGraph)
.fromTask(parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder))
.doVisitFunction(taskExecutionRunnableCreator)
.build();
workflowGraphTopologyLogicalVisitor.visit();
workflowExecutionGraph.removeUnReachableEdge();

workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph);
}

/**
* Merge the command params with the workflow params.
* <p> If there are duplicate keys, the command params will override the workflow params.
Expand Down
Loading