diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ExecuteTaskCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ExecuteTaskCommandHandler.java new file mode 100644 index 000000000000..43afdb09e733 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ExecuteTaskCommandHandler.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.command.handler; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_NODES; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; +import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph; +import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +import com.google.common.base.Splitter; + +/** + * This handler is used to handle {@link CommandType#EXECUTE_TASK}. + *

It will rerun the given start task and all its downstream tasks in the same workflow instance. + */ +@Component +public class ExecuteTaskCommandHandler extends AbstractCommandHandler { + + @Autowired + private WorkflowInstanceDao workflowInstanceDao; + + @Autowired + private TaskInstanceDao taskInstanceDao; + + @Autowired + private ApplicationContext applicationContext; + + @Autowired + private MasterConfig masterConfig; + + @Override + protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { + final Command command = workflowExecuteContextBuilder.getCommand(); + final int workflowInstanceId = command.getWorkflowInstanceId(); + final WorkflowInstance workflowInstance = workflowInstanceDao.queryOptionalById(workflowInstanceId) + .orElseThrow(() -> new IllegalArgumentException("Cannot find WorkflowInstance:" + workflowInstanceId)); + workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, command.getCommandType().name()); + workflowInstance.setCommandType(command.getCommandType()); + if (command.getTaskDependType() != null) { + workflowInstance.setTaskDependType(command.getTaskDependType()); + } + workflowInstance.setHost(masterConfig.getMasterAddress()); + workflowInstanceDao.updateById(workflowInstance); + workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); + } + + @Override + protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { + final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph(); + final List startNodes = parseStartNodesFromCommand(workflowExecuteContextBuilder, workflowGraph); + final Map taskInstanceMap = getValidTaskInstance(workflowExecuteContextBuilder + .getWorkflowInstance()) + .stream() + .collect(Collectors.toMap(TaskInstance::getName, Function.identity())); + + // Mark the selected task and all its downstream task instances as invalid, then trigger them again. + final Set taskNamesNeedRerun = new HashSet<>(); + final WorkflowGraphTopologyLogicalVisitor markInvalidTaskVisitor = + WorkflowGraphTopologyLogicalVisitor.builder() + .onWorkflowGraph(workflowGraph) + .taskDependType(workflowExecuteContextBuilder.getWorkflowInstance().getTaskDependType()) + .fromTask(startNodes) + .doVisitFunction((task, successors) -> taskNamesNeedRerun.add(task)) + .build(); + markInvalidTaskVisitor.visit(); + + final List taskInstancesNeedRerun = taskNamesNeedRerun.stream() + .map(taskInstanceMap::remove) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(taskInstancesNeedRerun)) { + taskInstanceDao.markTaskInstanceInvalid(taskInstancesNeedRerun); + } + + final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph(); + final BiConsumer> taskExecutionRunnableCreator = (task, successors) -> { + final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = + TaskExecutionRunnableBuilder + .builder() + .workflowExecutionGraph(workflowExecutionGraph) + .workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition()) + .project(workflowExecuteContextBuilder.getProject()) + .workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance()) + .taskDefinition(workflowGraph.getTaskNodeByName(task)) + .taskInstance(taskInstanceMap.get(task)) + .workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus()) + .applicationContext(applicationContext) + .build(); + workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder)); + workflowExecutionGraph.addEdge(task, successors); + }; + + final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor = + WorkflowGraphTopologyLogicalVisitor.builder() + .taskDependType(workflowExecuteContextBuilder.getWorkflowInstance().getTaskDependType()) + .onWorkflowGraph(workflowGraph) + .fromTask(startNodes) + .doVisitFunction(taskExecutionRunnableCreator) + .build(); + workflowGraphTopologyLogicalVisitor.visit(); + workflowExecutionGraph.removeUnReachableEdge(); + workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph); + } + + private List parseStartNodesFromCommand(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder, + final IWorkflowGraph workflowGraph) { + final Command command = workflowExecuteContextBuilder.getCommand(); + final String startNodes = JSONUtils.getNodeString(command.getCommandParam(), CMD_PARAM_START_NODES); + checkArgument(StringUtils.isNotBlank(startNodes), + "Invalid command param, the start nodes is empty: " + command.getCommandParam()); + final List startNodeCodes = Splitter.on(',') + .trimResults() + .omitEmptyStrings() + .splitToStream(startNodes) + .map(Long::parseLong) + .collect(Collectors.toList()); + checkArgument(CollectionUtils.isNotEmpty(startNodeCodes), + "Invalid command param, cannot parse start nodes from command param: " + command.getCommandParam()); + return startNodeCodes + .stream() + .map(workflowGraph::getTaskNodeByCode) + .map(TaskDefinition::getName) + .collect(Collectors.toList()); + } + + @Override + public CommandType commandType() { + return CommandType.EXECUTE_TASK; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java index cf88c86234a8..d36a88a8a52a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java @@ -49,10 +49,9 @@ import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -87,7 +86,8 @@ public TaskExecutionContext createTaskExecutionContext(TaskExecutionContextCreat final Project project = request.getProject(); final List varPools = - generateTaskInstanceVarPool(request.getTaskDefinition(), request.getWorkflowExecutionGraph()); + generateTaskInstanceVarPool(workflowInstance, request.getTaskDefinition(), + request.getWorkflowExecutionGraph()); taskInstance.setVarPool(VarPoolUtils.serializeVarPool(varPools)); return TaskExecutionContextBuilder.get() @@ -192,19 +192,26 @@ private Optional getEnvironmentConfigFromDB(final TaskInstance taskInsta return Optional.ofNullable(environmentOptional.get().getConfig()); } - // The successors of the task instance will be used to generate the var pool - // All out varPool from the successors will be merged into the var pool of the task instance - private List generateTaskInstanceVarPool(TaskDefinition taskDefinition, + // The predecessors of the task instance will be used to generate the var pool. + // In execute-task(TASK_ONLY) scenario, the predecessor might be outside current execution sub-graph. + // For this case, fallback to workflow varPool to keep compatibility with historical behavior. + private List generateTaskInstanceVarPool(WorkflowInstance workflowInstance, + TaskDefinition taskDefinition, IWorkflowExecutionGraph workflowExecutionGraph) { - List predecessors = workflowExecutionGraph.getPredecessors(taskDefinition.getName()); - if (CollectionUtils.isEmpty(predecessors)) { - return Collections.emptyList(); + final boolean isStartNode = workflowExecutionGraph.getStartNodes() + .stream() + .anyMatch(node -> node.getTaskDefinition().getCode() == taskDefinition.getCode()); + if (isStartNode) { + return VarPoolUtils.deserializeVarPool(workflowInstance.getVarPool()); } - List varPoolsFromPredecessors = predecessors + + List varPoolsFromPredecessors = workflowExecutionGraph.getPredecessors(taskDefinition.getName()) .stream() .filter(ITaskExecutionRunnable::isTaskInstanceInitialized) .map(ITaskExecutionRunnable::getTaskInstance) + .sorted(Comparator.comparing(TaskInstance::getEndTime, Comparator.nullsLast(Comparator.naturalOrder()))) .map(TaskInstance::getVarPool) + .filter(StringUtils::isNotBlank) .collect(Collectors.toList()); return VarPoolUtils.mergeVarPoolJsonString(varPoolsFromPredecessors); } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ExecuteTaskCommandHandlerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ExecuteTaskCommandHandlerTest.java new file mode 100644 index 000000000000..76e5a2ef8e30 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ExecuteTaskCommandHandlerTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.command.handler; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.common.constants.CommandKeyConstants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; +import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.context.ApplicationContext; +import org.springframework.test.util.ReflectionTestUtils; + +@ExtendWith(MockitoExtension.class) +class ExecuteTaskCommandHandlerTest { + + private ExecuteTaskCommandHandler executeTaskCommandHandler; + + @Mock + private WorkflowInstanceDao workflowInstanceDao; + + @Mock + private TaskInstanceDao taskInstanceDao; + + @Mock + private ApplicationContext applicationContext; + + @Mock + private IWorkflowGraph workflowGraph; + + private MasterConfig masterConfig; + + @BeforeEach + void setUp() { + executeTaskCommandHandler = new ExecuteTaskCommandHandler(); + masterConfig = new MasterConfig(); + masterConfig.setMasterAddress("127.0.0.1:5678"); + ReflectionTestUtils.setField(executeTaskCommandHandler, "workflowInstanceDao", workflowInstanceDao); + ReflectionTestUtils.setField(executeTaskCommandHandler, "taskInstanceDao", taskInstanceDao); + ReflectionTestUtils.setField(executeTaskCommandHandler, AbstractCommandHandler.class, "taskInstanceDao", + taskInstanceDao, TaskInstanceDao.class); + ReflectionTestUtils.setField(executeTaskCommandHandler, "applicationContext", applicationContext); + ReflectionTestUtils.setField(executeTaskCommandHandler, "masterConfig", masterConfig); + } + + @Test + void testExecuteTaskCommandType() { + assertEquals(CommandType.EXECUTE_TASK, executeTaskCommandHandler.commandType()); + } + + @Test + void testAssembleWorkflowInstance() { + Command command = new Command(); + command.setWorkflowInstanceId(1); + command.setCommandType(CommandType.EXECUTE_TASK); + command.setTaskDependType(TaskDependType.TASK_POST); + WorkflowExecuteContextBuilder contextBuilder = WorkflowExecuteContext.builder().withCommand(command); + + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setTaskDependType(TaskDependType.TASK_ONLY); + when(workflowInstanceDao.queryOptionalById(1)).thenReturn(Optional.of(workflowInstance)); + + executeTaskCommandHandler.assembleWorkflowInstance(contextBuilder); + + assertSame(workflowInstance, contextBuilder.getWorkflowInstance()); + assertEquals(WorkflowExecutionStatus.RUNNING_EXECUTION, workflowInstance.getState()); + assertEquals(CommandType.EXECUTE_TASK, workflowInstance.getCommandType()); + assertEquals(TaskDependType.TASK_POST, workflowInstance.getTaskDependType()); + assertEquals("127.0.0.1:5678", workflowInstance.getHost()); + verify(workflowInstanceDao).updateById(workflowInstance); + } + + @Test + void testThrowExceptionWhenWorkflowInstanceNotExists() { + Command command = new Command(); + command.setWorkflowInstanceId(100); + WorkflowExecuteContextBuilder contextBuilder = WorkflowExecuteContext.builder().withCommand(command); + when(workflowInstanceDao.queryOptionalById(100)).thenReturn(Optional.empty()); + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, + () -> executeTaskCommandHandler.assembleWorkflowInstance(contextBuilder)); + + assertEquals("Cannot find WorkflowInstance:100", ex.getMessage()); + } + + @Test + void testAssembleWorkflowExecutionGraph() { + Command command = new Command(); + command.setWorkflowInstanceId(1); + command.setCommandType(CommandType.EXECUTE_TASK); + Map commandParam = new HashMap<>(); + commandParam.put(CommandKeyConstants.CMD_PARAM_START_NODES, "101"); + command.setCommandParam(JSONUtils.toJsonString(commandParam)); + WorkflowExecuteContextBuilder contextBuilder = WorkflowExecuteContext.builder().withCommand(command); + + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(1); + workflowInstance.setTaskDependType(TaskDependType.TASK_POST); + contextBuilder.setWorkflowInstance(workflowInstance); + contextBuilder.setWorkflowDefinition(new WorkflowDefinition()); + contextBuilder.setProject(new Project()); + contextBuilder.setWorkflowEventBus(new WorkflowEventBus()); + contextBuilder.setWorkflowGraph(workflowGraph); + + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setCode(101L); + taskDefinition.setName("shell1"); + when(workflowGraph.getTaskNodeByCode(101L)).thenReturn(taskDefinition); + when(workflowGraph.getTaskNodeByName("shell1")).thenReturn(taskDefinition); + when(workflowGraph.getAllTaskNodes()).thenReturn(Collections.singletonList(taskDefinition)); + when(workflowGraph.getPredecessors("shell1")).thenReturn(Collections.emptySet()); + when(workflowGraph.getSuccessors("shell1")).thenReturn(Collections.emptySet()); + when(taskInstanceDao.queryValidTaskListByWorkflowInstanceId(1)).thenReturn(Collections.emptyList()); + + executeTaskCommandHandler.assembleWorkflowExecutionGraph(contextBuilder); + + assertNotNull(contextBuilder.getWorkflowExecutionGraph()); + assertEquals(1, contextBuilder.getWorkflowExecutionGraph().getAllTaskExecutionRunnable().size()); + } + + @Test + void testThrowExceptionWhenStartNodesMissing() { + Command command = new Command(); + command.setCommandType(CommandType.EXECUTE_TASK); + command.setCommandParam("{}"); + WorkflowExecuteContextBuilder contextBuilder = WorkflowExecuteContext.builder().withCommand(command); + contextBuilder.setWorkflowGraph(workflowGraph); + + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setTaskDependType(TaskDependType.TASK_POST); + contextBuilder.setWorkflowInstance(workflowInstance); + + assertThrows(IllegalArgumentException.class, + () -> executeTaskCommandHandler.assembleWorkflowExecutionGraph(contextBuilder)); + } +}