Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,13 @@
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
Expand Down Expand Up @@ -144,7 +148,9 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
Expand Down Expand Up @@ -2116,22 +2122,10 @@ protected void doBatchOperateWorkflowDefinition(User loginUser,
taskDefinitionLog.setVersion(0);
taskDefinitionLog.setName(taskDefinitionLog.getName());
if (TaskTypeUtils.isSwitchTask(taskDefinitionLog.getTaskType())) {
final String taskParams = taskDefinitionLog.getTaskParams();
final SwitchParameters switchParameters =
JSONUtils.parseObject(taskParams, SwitchParameters.class);
if (switchParameters == null) {
throw new IllegalArgumentException(
"Switch task params: " + taskParams + " is invalid.");
}
SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult();
switchResult.getDependTaskList().forEach(switchResultVo -> {
switchResultVo.setNextNode(taskCodeMap.get(switchResultVo.getNextNode()));
});
if (switchResult.getNextNode() != null) {
switchResult.setNextNode(
taskCodeMap.get(switchResult.getNextNode()));
}
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(switchParameters));
replaceTaskCodeForSwitchTaskParams(taskDefinitionLog, taskCodeMap);
}
if (TaskTypeUtils.isConditionTask(taskDefinitionLog.getTaskType())) {
replaceTaskCodeForConditionTaskParams(taskDefinitionLog, taskCodeMap);
}
}
for (WorkflowTaskRelationLog workflowTaskRelationLog : taskRelationList) {
Expand Down Expand Up @@ -2205,6 +2199,109 @@ protected void doBatchOperateWorkflowDefinition(User loginUser,
}
}

/**
* Replaces old task codes with new ones in the parameters of a Switch task.
* Used during workflow duplication or import to preserve correct task dependencies.
*/
private void replaceTaskCodeForSwitchTaskParams(TaskDefinitionLog taskDefLog, Map<Long, Long> taskCodeMap) {
String taskParams = taskDefLog.getTaskParams();
SwitchParameters params;

try {
params = JSONUtils.parseObject(taskParams, SwitchParameters.class);
} catch (Exception e) {
log.warn("Invalid Switch task params: {}", taskParams, e);
throw new IllegalArgumentException("Failed to parse Switch task params: " + taskParams, e);
}

if (params == null) {
log.warn("Parsed Switch task params is null: {}", taskParams);
throw new IllegalArgumentException("Failed to parse Switch task params: " + taskParams);
}

// Update nextBranch if mapped
Long nextBranch = params.getNextBranch();
if (nextBranch != null && taskCodeMap.containsKey(nextBranch)) {
params.setNextBranch(taskCodeMap.get(nextBranch));
}

// Update switch result nodes
SwitchParameters.SwitchResult result = params.getSwitchResult();
if (result != null) {
Long nextNode = result.getNextNode();
if (nextNode != null && taskCodeMap.containsKey(nextNode)) {
result.setNextNode(taskCodeMap.get(nextNode));
}

// Update depend task list in result
for (SwitchResultVo vo : result.getDependTaskList()) {
Long original = vo.getNextNode();
if (original != null && taskCodeMap.containsKey(original)) {
vo.setNextNode(taskCodeMap.get(original));
}
}
}

taskDefLog.setTaskParams(JSONUtils.toJsonString(params));
}

/**
* Replaces old task codes with new ones in the parameters of a Condition task.
* Used during workflow duplication or import to preserve correct task dependencies.
*/
private void replaceTaskCodeForConditionTaskParams(TaskDefinitionLog taskDefLog, Map<Long, Long> taskCodeMap) {
String taskParams = taskDefLog.getTaskParams();
ConditionsParameters params;

try {
params = JSONUtils.parseObject(taskParams, ConditionsParameters.class);
} catch (Exception e) {
log.warn("Invalid Condition task params: {}", taskParams, e);
throw new IllegalArgumentException("Failed to parse Condition task params: " + taskParams, e);
}

if (params == null) {
log.warn("Parsed Condition task params is null: {}", taskParams);
throw new IllegalArgumentException("Failed to parse Condition task params: " + taskParams);
}

// Update dependency task codes
ConditionsParameters.ConditionDependency dep = params.getDependence();
if (dep != null) {
for (ConditionDependentTaskModel taskModel : dep.getDependTaskList()) {
for (ConditionDependentItem item : taskModel.getDependItemList()) {
Long oldCode = item.getDepTaskCode();
if (taskCodeMap.containsKey(oldCode)) {
item.setDepTaskCode(taskCodeMap.get(oldCode));
}
}
}
}

// Update success/failed node lists
ConditionsParameters.ConditionResult result = params.getConditionResult();
if (result != null) {
replaceInNodeList(result::getSuccessNode, result::setSuccessNode, taskCodeMap);
replaceInNodeList(result::getFailedNode, result::setFailedNode, taskCodeMap);
}

taskDefLog.setTaskParams(JSONUtils.toJsonString(params));
}

// Helper to avoid duplication for success/failed node lists
private void replaceInNodeList(Supplier<List<Long>> getter, Consumer<List<Long>> setter,
Map<Long, Long> taskCodeMap) {
List<Long> original = getter.get();
if (CollectionUtils.isEmpty(original))
return;

List<Long> updated = original.stream()
.map(code -> code != null && taskCodeMap.containsKey(code) ? taskCodeMap.get(code) : code)
.collect(Collectors.toList());

setter.accept(updated);
}

/**
* get new task name or workflow name when copy or import operate
*
Expand Down
Loading
Loading