From 093b4729db3aa4a36b7f105fba13449daf0a9bb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Mon, 8 Dec 2025 14:35:46 +0800 Subject: [PATCH 1/5] replace the TaskCode references within the task parameters --- .../impl/WorkflowDefinitionServiceImpl.java | 143 ++++++++++++++++-- 1 file changed, 127 insertions(+), 16 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java index f0c060562035..eb50d1f8df88 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java @@ -111,9 +111,13 @@ import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentItem; +import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; +import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; @@ -2116,22 +2120,10 @@ protected void doBatchOperateWorkflowDefinition(User loginUser, taskDefinitionLog.setVersion(0); taskDefinitionLog.setName(taskDefinitionLog.getName()); if (TaskTypeUtils.isSwitchTask(taskDefinitionLog.getTaskType())) { - final String taskParams = taskDefinitionLog.getTaskParams(); - final SwitchParameters switchParameters = - JSONUtils.parseObject(taskParams, SwitchParameters.class); - if (switchParameters == null) { - throw new IllegalArgumentException( - "Switch task params: " + taskParams + " is invalid."); - } - SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult(); - switchResult.getDependTaskList().forEach(switchResultVo -> { - switchResultVo.setNextNode(taskCodeMap.get(switchResultVo.getNextNode())); - }); - if (switchResult.getNextNode() != null) { - switchResult.setNextNode( - taskCodeMap.get(switchResult.getNextNode())); - } - taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(switchParameters)); + updateSwitchTaskParams(taskDefinitionLog, taskCodeMap); + } + if (TaskTypeUtils.isConditionTask(taskDefinitionLog.getTaskType())) { + updateConditionTaskParams(taskDefinitionLog, taskCodeMap); } } for (WorkflowTaskRelationLog workflowTaskRelationLog : taskRelationList) { @@ -2205,6 +2197,125 @@ protected void doBatchOperateWorkflowDefinition(User loginUser, } } + /** + * Updates task code references inside the task parameters of a SWITCH-type task. + * Replaces {@code nextNode} in both the default branch and conditional branches + * using the provided {@code taskCodeMap}. + *

+ * If parsing fails, an error is recorded in {@code result} and the method returns {@code false}. + * + * @param taskDefinitionLog the task log to update + * @param taskCodeMap mapping from old task code to new task code + * @throws IllegalArgumentException if taskParams is invalid or cannot be parsed + */ + private void updateSwitchTaskParams(TaskDefinitionLog taskDefinitionLog, Map taskCodeMap) { + final String taskParams = taskDefinitionLog.getTaskParams(); + final SwitchParameters switchParameters = JSONUtils.parseObject(taskParams, SwitchParameters.class); + + if (switchParameters == null) { + log.warn("Failed to parse SWITCH task params: {}", taskParams); + throw new IllegalArgumentException( + "Switch task params: " + taskParams + " is invalid."); + } + + // Update top-level nextBranch if used + if (switchParameters.getNextBranch() != null && taskCodeMap.containsKey(switchParameters.getNextBranch())) { + switchParameters.setNextBranch(taskCodeMap.get(switchParameters.getNextBranch())); + } + + // Update switchResult block + SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult(); + if (switchResult != null) { + // Default branch + if (switchResult.getNextNode() != null && taskCodeMap.containsKey(switchResult.getNextNode())) { + switchResult.setNextNode(taskCodeMap.get(switchResult.getNextNode())); + } + + // Conditional branches + if (CollectionUtils.isNotEmpty(switchResult.getDependTaskList())) { + for (SwitchResultVo vo : switchResult.getDependTaskList()) { + if (vo != null && vo.getNextNode() != null && taskCodeMap.containsKey(vo.getNextNode())) { + vo.setNextNode(taskCodeMap.get(vo.getNextNode())); + } + } + } + } + + taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(switchParameters)); + } + + /** + * Updates task code references inside the task parameters of a CONDITIONS-type task. + * Replaces: + * - {@code depTaskCode} in each {@code ConditionDependentItem} + * - node IDs in {@code successNode} and {@code failedNode} + * using the provided {@code taskCodeMap}. + * + * @param taskDefinitionLog the task log to update + * @param taskCodeMap mapping from old task code to new task code + * @throws IllegalArgumentException if taskParams is invalid or cannot be parsed + */ + private void updateConditionTaskParams(TaskDefinitionLog taskDefinitionLog, Map taskCodeMap) { + final String taskParams = taskDefinitionLog.getTaskParams(); + final ConditionsParameters conditionsParameters = JSONUtils.parseObject(taskParams, ConditionsParameters.class); + + if (conditionsParameters == null) { + log.warn("Failed to parse CONDITION task params: {}", taskParams); + throw new IllegalArgumentException("Condition task params: " + taskParams + " is invalid."); + } + + // Update dependence -> depTaskCode + ConditionsParameters.ConditionDependency dependence = conditionsParameters.getDependence(); + if (dependence != null && CollectionUtils.isNotEmpty(dependence.getDependTaskList())) { + for (ConditionDependentTaskModel dependTask : dependence.getDependTaskList()) { + if (CollectionUtils.isEmpty(dependTask.getDependItemList())) { + continue; + } + for (ConditionDependentItem item : dependTask.getDependItemList()) { + if (item == null) { + continue; + } + Long oldCode = item.getDepTaskCode(); + if (taskCodeMap.containsKey(oldCode)) { + item.setDepTaskCode(taskCodeMap.get(oldCode)); + } + } + } + } + + // Update condition result branches + ConditionsParameters.ConditionResult conditionResult = conditionsParameters.getConditionResult(); + if (conditionResult != null) { + // Success branch + if (CollectionUtils.isNotEmpty(conditionResult.getSuccessNode())) { + List updatedSuccess = conditionResult.getSuccessNode().stream() + .map(code -> { + if (code != null && taskCodeMap.containsKey(code)) { + return taskCodeMap.get(code); + } + return code; + }) + .collect(Collectors.toList()); + conditionResult.setSuccessNode(updatedSuccess); + } + + // Failed branch + if (CollectionUtils.isNotEmpty(conditionResult.getFailedNode())) { + List updatedFailed = conditionResult.getFailedNode().stream() + .map(code -> { + if (code != null && taskCodeMap.containsKey(code)) { + return taskCodeMap.get(code); + } + return code; + }) + .collect(Collectors.toList()); + conditionResult.setFailedNode(updatedFailed); + } + } + + taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(conditionsParameters)); + } + /** * get new task name or workflow name when copy or import operate * From 5f6046e6fc9f768f8b5487d1b1192bced56c0d5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Wed, 10 Dec 2025 14:42:39 +0800 Subject: [PATCH 2/5] replace task code for Switch and Condition Task --- .../impl/WorkflowDefinitionServiceImpl.java | 114 ++++++++---------- 1 file changed, 52 insertions(+), 62 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java index eb50d1f8df88..8ffc7d1b13d2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java @@ -116,7 +116,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; @@ -2120,10 +2119,10 @@ protected void doBatchOperateWorkflowDefinition(User loginUser, taskDefinitionLog.setVersion(0); taskDefinitionLog.setName(taskDefinitionLog.getName()); if (TaskTypeUtils.isSwitchTask(taskDefinitionLog.getTaskType())) { - updateSwitchTaskParams(taskDefinitionLog, taskCodeMap); + replaceTaskCodeForSwitchTaskParams(taskDefinitionLog, taskCodeMap); } if (TaskTypeUtils.isConditionTask(taskDefinitionLog.getTaskType())) { - updateConditionTaskParams(taskDefinitionLog, taskCodeMap); + replaceTaskCodeForConditionTaskParams(taskDefinitionLog, taskCodeMap); } } for (WorkflowTaskRelationLog workflowTaskRelationLog : taskRelationList) { @@ -2198,118 +2197,109 @@ protected void doBatchOperateWorkflowDefinition(User loginUser, } /** - * Updates task code references inside the task parameters of a SWITCH-type task. - * Replaces {@code nextNode} in both the default branch and conditional branches - * using the provided {@code taskCodeMap}. - *

- * If parsing fails, an error is recorded in {@code result} and the method returns {@code false}. + * replace task code references inside the task parameters of a Switch task. * * @param taskDefinitionLog the task log to update * @param taskCodeMap mapping from old task code to new task code * @throws IllegalArgumentException if taskParams is invalid or cannot be parsed */ - private void updateSwitchTaskParams(TaskDefinitionLog taskDefinitionLog, Map taskCodeMap) { + private void replaceTaskCodeForSwitchTaskParams(TaskDefinitionLog taskDefinitionLog, Map taskCodeMap) { final String taskParams = taskDefinitionLog.getTaskParams(); - final SwitchParameters switchParameters = JSONUtils.parseObject(taskParams, SwitchParameters.class); - + final SwitchParameters switchParameters = + JSONUtils.parseObject(taskParams, SwitchParameters.class); if (switchParameters == null) { - log.warn("Failed to parse SWITCH task params: {}", taskParams); + log.warn("Failed to parse Switch task params: {}", taskParams); throw new IllegalArgumentException( "Switch task params: " + taskParams + " is invalid."); } - // Update top-level nextBranch if used + // SwitchParameters.nextBranch if (switchParameters.getNextBranch() != null && taskCodeMap.containsKey(switchParameters.getNextBranch())) { switchParameters.setNextBranch(taskCodeMap.get(switchParameters.getNextBranch())); } - // Update switchResult block + // SwitchParameters.SwitchResult SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult(); if (switchResult != null) { - // Default branch + // SwitchParameters.SwitchResult.nextNode if (switchResult.getNextNode() != null && taskCodeMap.containsKey(switchResult.getNextNode())) { - switchResult.setNextNode(taskCodeMap.get(switchResult.getNextNode())); + switchResult.setNextNode( + taskCodeMap.get(switchResult.getNextNode())); } - // Conditional branches - if (CollectionUtils.isNotEmpty(switchResult.getDependTaskList())) { - for (SwitchResultVo vo : switchResult.getDependTaskList()) { - if (vo != null && vo.getNextNode() != null && taskCodeMap.containsKey(vo.getNextNode())) { - vo.setNextNode(taskCodeMap.get(vo.getNextNode())); - } - } - } + // SwitchParameters.SwitchResult.SwitchResultVo.nextNode + switchResult.getDependTaskList().forEach(switchResultVo -> { + switchResultVo.setNextNode(taskCodeMap.get(switchResultVo.getNextNode())); + }); } taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(switchParameters)); } /** - * Updates task code references inside the task parameters of a CONDITIONS-type task. - * Replaces: - * - {@code depTaskCode} in each {@code ConditionDependentItem} - * - node IDs in {@code successNode} and {@code failedNode} - * using the provided {@code taskCodeMap}. + * replace task code references inside the task parameters of a Condition task. * * @param taskDefinitionLog the task log to update * @param taskCodeMap mapping from old task code to new task code * @throws IllegalArgumentException if taskParams is invalid or cannot be parsed */ - private void updateConditionTaskParams(TaskDefinitionLog taskDefinitionLog, Map taskCodeMap) { + private void replaceTaskCodeForConditionTaskParams(TaskDefinitionLog taskDefinitionLog, + Map taskCodeMap) { final String taskParams = taskDefinitionLog.getTaskParams(); - final ConditionsParameters conditionsParameters = JSONUtils.parseObject(taskParams, ConditionsParameters.class); + final ConditionsParameters conditionsParameters = + JSONUtils.parseObject(taskParams, ConditionsParameters.class); if (conditionsParameters == null) { - log.warn("Failed to parse CONDITION task params: {}", taskParams); - throw new IllegalArgumentException("Condition task params: " + taskParams + " is invalid."); + log.warn("Failed to parse Condition task params: {}", taskParams); + throw new IllegalArgumentException( + "Condition task params: " + taskParams + " is invalid."); } - // Update dependence -> depTaskCode - ConditionsParameters.ConditionDependency dependence = conditionsParameters.getDependence(); - if (dependence != null && CollectionUtils.isNotEmpty(dependence.getDependTaskList())) { - for (ConditionDependentTaskModel dependTask : dependence.getDependTaskList()) { - if (CollectionUtils.isEmpty(dependTask.getDependItemList())) { + // ConditionsParameters.ConditionDependency + ConditionsParameters.ConditionDependency conditionDependency = conditionsParameters.getDependence(); + if (conditionDependency != null && CollectionUtils.isNotEmpty(conditionDependency.getDependTaskList())) { + for (ConditionDependentTaskModel conditionDependentTaskModel : conditionDependency.getDependTaskList()) { + if (CollectionUtils.isEmpty(conditionDependentTaskModel.getDependItemList())) { continue; } - for (ConditionDependentItem item : dependTask.getDependItemList()) { - if (item == null) { + for (ConditionDependentItem conditionDependentItem : conditionDependentTaskModel.getDependItemList()) { + if (conditionDependentItem == null) { continue; } - Long oldCode = item.getDepTaskCode(); - if (taskCodeMap.containsKey(oldCode)) { - item.setDepTaskCode(taskCodeMap.get(oldCode)); + // ConditionsParameters.ConditionDependency.ConditionDependentTaskModel.ConditionDependentItem.depTaskCode + Long depTaskCode = conditionDependentItem.getDepTaskCode(); + if (taskCodeMap.containsKey(depTaskCode)) { + conditionDependentItem.setDepTaskCode(taskCodeMap.get(depTaskCode)); } } } } - // Update condition result branches + // ConditionsParameters.ConditionResult ConditionsParameters.ConditionResult conditionResult = conditionsParameters.getConditionResult(); if (conditionResult != null) { - // Success branch + // ConditionsParameters.ConditionResult.successNode if (CollectionUtils.isNotEmpty(conditionResult.getSuccessNode())) { - List updatedSuccess = conditionResult.getSuccessNode().stream() - .map(code -> { - if (code != null && taskCodeMap.containsKey(code)) { - return taskCodeMap.get(code); + List successNode = conditionResult.getSuccessNode().stream() + .map(taskCode -> { + if (taskCode != null && taskCodeMap.containsKey(taskCode)) { + return taskCodeMap.get(taskCode); } - return code; - }) - .collect(Collectors.toList()); - conditionResult.setSuccessNode(updatedSuccess); + return taskCode; + }).collect(Collectors.toList()); + conditionResult.setSuccessNode(successNode); } - // Failed branch + // ConditionsParameters.ConditionResult.failedNode if (CollectionUtils.isNotEmpty(conditionResult.getFailedNode())) { - List updatedFailed = conditionResult.getFailedNode().stream() - .map(code -> { - if (code != null && taskCodeMap.containsKey(code)) { - return taskCodeMap.get(code); + List failedNode = conditionResult.getFailedNode().stream() + .map(taskCode -> { + if (taskCode != null && taskCodeMap.containsKey(taskCode)) { + return taskCodeMap.get(taskCode); } - return code; - }) - .collect(Collectors.toList()); - conditionResult.setFailedNode(updatedFailed); + return taskCode; + }).collect(Collectors.toList()); + conditionResult.setFailedNode(failedNode); } } From fcbc1de01c59cc2e88a01ef1df79b491171fce0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Mon, 12 Jan 2026 20:45:35 +0800 Subject: [PATCH 3/5] update format --- .../impl/WorkflowDefinitionServiceImpl.java | 162 ++++++++---------- 1 file changed, 71 insertions(+), 91 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java index 8ffc7d1b13d2..35150a5f16d1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java @@ -116,6 +116,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; @@ -147,7 +148,9 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.zip.ZipEntry; @@ -2197,113 +2200,90 @@ protected void doBatchOperateWorkflowDefinition(User loginUser, } /** - * replace task code references inside the task parameters of a Switch task. - * - * @param taskDefinitionLog the task log to update - * @param taskCodeMap mapping from old task code to new task code - * @throws IllegalArgumentException if taskParams is invalid or cannot be parsed + * Replaces old task codes with new ones in the parameters of a Switch task. + * Used during workflow duplication or import to preserve correct task dependencies. */ - private void replaceTaskCodeForSwitchTaskParams(TaskDefinitionLog taskDefinitionLog, Map taskCodeMap) { - final String taskParams = taskDefinitionLog.getTaskParams(); - final SwitchParameters switchParameters = - JSONUtils.parseObject(taskParams, SwitchParameters.class); - if (switchParameters == null) { - log.warn("Failed to parse Switch task params: {}", taskParams); - throw new IllegalArgumentException( - "Switch task params: " + taskParams + " is invalid."); - } - - // SwitchParameters.nextBranch - if (switchParameters.getNextBranch() != null && taskCodeMap.containsKey(switchParameters.getNextBranch())) { - switchParameters.setNextBranch(taskCodeMap.get(switchParameters.getNextBranch())); - } - - // SwitchParameters.SwitchResult - SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult(); - if (switchResult != null) { - // SwitchParameters.SwitchResult.nextNode - if (switchResult.getNextNode() != null && taskCodeMap.containsKey(switchResult.getNextNode())) { - switchResult.setNextNode( - taskCodeMap.get(switchResult.getNextNode())); + private void replaceTaskCodeForSwitchTaskParams(TaskDefinitionLog taskDefLog, Map taskCodeMap) { + String taskParams = taskDefLog.getTaskParams(); + SwitchParameters params = JSONUtils.parseObject(taskParams, SwitchParameters.class); + if (params == null) { + log.warn("Invalid Switch task params: {}", taskParams); + throw new IllegalArgumentException("Failed to parse Switch task params: " + taskParams); + } + + // Update nextBranch if mapped + Long nextBranch = params.getNextBranch(); + if (nextBranch != null && taskCodeMap.containsKey(nextBranch)) { + params.setNextBranch(taskCodeMap.get(nextBranch)); + } + + // Update switch result nodes + SwitchParameters.SwitchResult result = params.getSwitchResult(); + if (result != null) { + Long nextNode = result.getNextNode(); + if (nextNode != null && taskCodeMap.containsKey(nextNode)) { + result.setNextNode(taskCodeMap.get(nextNode)); } - // SwitchParameters.SwitchResult.SwitchResultVo.nextNode - switchResult.getDependTaskList().forEach(switchResultVo -> { - switchResultVo.setNextNode(taskCodeMap.get(switchResultVo.getNextNode())); - }); + // Update depend task list in result + for (SwitchResultVo vo : result.getDependTaskList()) { + Long original = vo.getNextNode(); + if (original != null && taskCodeMap.containsKey(original)) { + vo.setNextNode(taskCodeMap.get(original)); + } + } } - taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(switchParameters)); + taskDefLog.setTaskParams(JSONUtils.toJsonString(params)); } /** - * replace task code references inside the task parameters of a Condition task. - * - * @param taskDefinitionLog the task log to update - * @param taskCodeMap mapping from old task code to new task code - * @throws IllegalArgumentException if taskParams is invalid or cannot be parsed + * Replaces old task codes with new ones in the parameters of a Condition task. + * Used during workflow duplication or import to preserve correct task dependencies. */ - private void replaceTaskCodeForConditionTaskParams(TaskDefinitionLog taskDefinitionLog, - Map taskCodeMap) { - final String taskParams = taskDefinitionLog.getTaskParams(); - final ConditionsParameters conditionsParameters = - JSONUtils.parseObject(taskParams, ConditionsParameters.class); - - if (conditionsParameters == null) { - log.warn("Failed to parse Condition task params: {}", taskParams); - throw new IllegalArgumentException( - "Condition task params: " + taskParams + " is invalid."); - } - - // ConditionsParameters.ConditionDependency - ConditionsParameters.ConditionDependency conditionDependency = conditionsParameters.getDependence(); - if (conditionDependency != null && CollectionUtils.isNotEmpty(conditionDependency.getDependTaskList())) { - for (ConditionDependentTaskModel conditionDependentTaskModel : conditionDependency.getDependTaskList()) { - if (CollectionUtils.isEmpty(conditionDependentTaskModel.getDependItemList())) { - continue; - } - for (ConditionDependentItem conditionDependentItem : conditionDependentTaskModel.getDependItemList()) { - if (conditionDependentItem == null) { - continue; - } - // ConditionsParameters.ConditionDependency.ConditionDependentTaskModel.ConditionDependentItem.depTaskCode - Long depTaskCode = conditionDependentItem.getDepTaskCode(); - if (taskCodeMap.containsKey(depTaskCode)) { - conditionDependentItem.setDepTaskCode(taskCodeMap.get(depTaskCode)); + private void replaceTaskCodeForConditionTaskParams(TaskDefinitionLog taskDefLog, Map taskCodeMap) { + String taskParams = taskDefLog.getTaskParams(); + ConditionsParameters params = JSONUtils.parseObject(taskParams, ConditionsParameters.class); + if (params == null) { + log.warn("Invalid Condition task params: {}", taskParams); + throw new IllegalArgumentException("Failed to parse Condition task params: " + taskParams); + } + + // Update dependency task codes + ConditionsParameters.ConditionDependency dep = params.getDependence(); + if (dep != null) { + for (ConditionDependentTaskModel taskModel : dep.getDependTaskList()) { + for (ConditionDependentItem item : taskModel.getDependItemList()) { + Long oldCode = item.getDepTaskCode(); + if (taskCodeMap.containsKey(oldCode)) { + item.setDepTaskCode(taskCodeMap.get(oldCode)); } } } } - // ConditionsParameters.ConditionResult - ConditionsParameters.ConditionResult conditionResult = conditionsParameters.getConditionResult(); - if (conditionResult != null) { - // ConditionsParameters.ConditionResult.successNode - if (CollectionUtils.isNotEmpty(conditionResult.getSuccessNode())) { - List successNode = conditionResult.getSuccessNode().stream() - .map(taskCode -> { - if (taskCode != null && taskCodeMap.containsKey(taskCode)) { - return taskCodeMap.get(taskCode); - } - return taskCode; - }).collect(Collectors.toList()); - conditionResult.setSuccessNode(successNode); - } - - // ConditionsParameters.ConditionResult.failedNode - if (CollectionUtils.isNotEmpty(conditionResult.getFailedNode())) { - List failedNode = conditionResult.getFailedNode().stream() - .map(taskCode -> { - if (taskCode != null && taskCodeMap.containsKey(taskCode)) { - return taskCodeMap.get(taskCode); - } - return taskCode; - }).collect(Collectors.toList()); - conditionResult.setFailedNode(failedNode); - } + // Update success/failed node lists + ConditionsParameters.ConditionResult result = params.getConditionResult(); + if (result != null) { + replaceInNodeList(result::getSuccessNode, result::setSuccessNode, taskCodeMap); + replaceInNodeList(result::getFailedNode, result::setFailedNode, taskCodeMap); } - taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(conditionsParameters)); + taskDefLog.setTaskParams(JSONUtils.toJsonString(params)); + } + + // Helper to avoid duplication for success/failed node lists + private void replaceInNodeList(Supplier> getter, Consumer> setter, + Map taskCodeMap) { + List original = getter.get(); + if (CollectionUtils.isEmpty(original)) + return; + + List updated = original.stream() + .map(code -> code != null && taskCodeMap.containsKey(code) ? taskCodeMap.get(code) : code) + .collect(Collectors.toList()); + + setter.accept(updated); } /** From 4fd8fa38c2de31d67c7bb955d39a0ea08bce9d58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Thu, 15 Jan 2026 10:12:45 +0800 Subject: [PATCH 4/5] add unit test --- .../impl/WorkflowDefinitionServiceImpl.java | 24 +- .../WorkflowDefinitionServiceImplTest.java | 226 ++++++++++++++++++ 2 files changed, 246 insertions(+), 4 deletions(-) create mode 100644 dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImplTest.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java index 35150a5f16d1..d35560f70767 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java @@ -2205,9 +2205,17 @@ protected void doBatchOperateWorkflowDefinition(User loginUser, */ private void replaceTaskCodeForSwitchTaskParams(TaskDefinitionLog taskDefLog, Map taskCodeMap) { String taskParams = taskDefLog.getTaskParams(); - SwitchParameters params = JSONUtils.parseObject(taskParams, SwitchParameters.class); + SwitchParameters params; + + try { + params = JSONUtils.parseObject(taskParams, SwitchParameters.class); + } catch (Exception e) { + log.warn("Invalid Switch task params: {}", taskParams, e); + throw new IllegalArgumentException("Failed to parse Switch task params: " + taskParams, e); + } + if (params == null) { - log.warn("Invalid Switch task params: {}", taskParams); + log.warn("Parsed Switch task params is null: {}", taskParams); throw new IllegalArgumentException("Failed to parse Switch task params: " + taskParams); } @@ -2243,9 +2251,17 @@ private void replaceTaskCodeForSwitchTaskParams(TaskDefinitionLog taskDefLog, Ma */ private void replaceTaskCodeForConditionTaskParams(TaskDefinitionLog taskDefLog, Map taskCodeMap) { String taskParams = taskDefLog.getTaskParams(); - ConditionsParameters params = JSONUtils.parseObject(taskParams, ConditionsParameters.class); + ConditionsParameters params; + + try { + params = JSONUtils.parseObject(taskParams, ConditionsParameters.class); + } catch (Exception e) { + log.warn("Invalid Condition task params: {}", taskParams, e); + throw new IllegalArgumentException("Failed to parse Condition task params: " + taskParams, e); + } + if (params == null) { - log.warn("Invalid Condition task params: {}", taskParams); + log.warn("Parsed Condition task params is null: {}", taskParams); throw new IllegalArgumentException("Failed to parse Condition task params: " + taskParams); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImplTest.java new file mode 100644 index 000000000000..b85efee48193 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImplTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; +import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentItem; +import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentTaskModel; +import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; +import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class WorkflowDefinitionServiceImplTest { + + private WorkflowDefinitionServiceImpl workflowDefinitionService; + + @BeforeEach + void setUp() { + workflowDefinitionService = new WorkflowDefinitionServiceImpl(); + } + + @Test + void testReplaceTaskCodeForSwitchTaskParams_replacesAllMappedCodes() throws Exception { + long old1 = 100L, old2 = 200L, old3 = 300L; + long new1 = 1000L, new2 = 2000L, new3 = 3000L; + + SwitchParameters.SwitchResult result = new SwitchParameters.SwitchResult(); + result.setNextNode(old2); + result.setDependTaskList(Arrays.asList( + createSwitchResultVo(old3), + createSwitchResultVo(999L))); + + SwitchParameters params = new SwitchParameters(); + params.setNextBranch(old1); + params.setSwitchResult(result); + + TaskDefinitionLog taskDef = new TaskDefinitionLog(); + taskDef.setTaskParams(JSONUtils.toJsonString(params)); + + Map codeMap = new HashMap<>(); + codeMap.put(old1, new1); + codeMap.put(old2, new2); + codeMap.put(old3, new3); + + invokePrivateMethod("replaceTaskCodeForSwitchTaskParams", taskDef, codeMap); + + SwitchParameters updated = JSONUtils.parseObject(taskDef.getTaskParams(), SwitchParameters.class); + assert updated != null; + assertThat(updated.getNextBranch()).isEqualTo(new1); + assertThat(updated.getSwitchResult().getNextNode()).isEqualTo(new2); + assertThat(updated.getSwitchResult().getDependTaskList().get(0).getNextNode()).isEqualTo(new3); + assertThat(updated.getSwitchResult().getDependTaskList().get(1).getNextNode()).isEqualTo(999L); + } + + @Test + void testReplaceTaskCodeForSwitchTaskParams_handlesNullFields() throws Exception { + SwitchParameters params = new SwitchParameters(); + params.setNextBranch(null); + params.setSwitchResult(null); + + TaskDefinitionLog taskDef = new TaskDefinitionLog(); + taskDef.setTaskParams(JSONUtils.toJsonString(params)); + + invokePrivateMethod("replaceTaskCodeForSwitchTaskParams", taskDef, Collections.emptyMap()); + + SwitchParameters result = JSONUtils.parseObject(taskDef.getTaskParams(), SwitchParameters.class); + assert result != null; + assertThat(result.getNextBranch()).isNull(); + assertThat(result.getSwitchResult()).isNull(); + } + + @Test + void testReplaceTaskCodeForSwitchTaskParams_throwsOnInvalidJson() { + TaskDefinitionLog taskDef = new TaskDefinitionLog(); + taskDef.setTaskParams("{ broken json }"); + + assertThatThrownBy( + () -> invokePrivateMethod("replaceTaskCodeForSwitchTaskParams", taskDef, Collections.emptyMap())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Failed to parse Switch task params"); + } + + @Test + void testReplaceTaskCodeForConditionTaskParams_replacesDepAndResultNodes() throws Exception { + long oldDep = 500L, oldSuc = 600L, oldFail = 700L; + long newDep = 5000L, newSuc = 6000L, newFail = 7000L; + + ConditionDependentItem item = new ConditionDependentItem(); + item.setDepTaskCode(oldDep); + + ConditionDependentTaskModel model = new ConditionDependentTaskModel(); + model.setDependItemList(Collections.singletonList(item)); + + ConditionsParameters.ConditionDependency dep = new ConditionsParameters.ConditionDependency(); + dep.setDependTaskList(Collections.singletonList(model)); + + ConditionsParameters.ConditionResult result = new ConditionsParameters.ConditionResult(); + result.setSuccessNode(Arrays.asList(oldSuc, 888L)); + result.setFailedNode(Collections.singletonList(oldFail)); + + ConditionsParameters params = new ConditionsParameters(); + params.setDependence(dep); + params.setConditionResult(result); + + TaskDefinitionLog taskDef = new TaskDefinitionLog(); + taskDef.setTaskParams(JSONUtils.toJsonString(params)); + + Map codeMap = new HashMap<>(); + codeMap.put(oldDep, newDep); + codeMap.put(oldSuc, newSuc); + codeMap.put(oldFail, newFail); + + invokePrivateMethod("replaceTaskCodeForConditionTaskParams", taskDef, codeMap); + + ConditionsParameters updated = JSONUtils.parseObject(taskDef.getTaskParams(), ConditionsParameters.class); + + assert updated != null; + long actualDepCode = updated.getDependence() + .getDependTaskList().get(0).getDependItemList().get(0).getDepTaskCode(); + assertThat(actualDepCode).isEqualTo(newDep); + + assertThat(updated.getConditionResult().getSuccessNode()) + .containsExactly(newSuc, 888L); + assertThat(updated.getConditionResult().getFailedNode()) + .containsExactly(newFail); + } + + @Test + void testReplaceTaskCodeForConditionTaskParams_handlesNulls() throws Exception { + ConditionsParameters params = new ConditionsParameters(); + params.setDependence(null); + params.setConditionResult(null); + + TaskDefinitionLog taskDef = new TaskDefinitionLog(); + taskDef.setTaskParams(JSONUtils.toJsonString(params)); + + invokePrivateMethod("replaceTaskCodeForConditionTaskParams", taskDef, Collections.emptyMap()); + + ConditionsParameters result = JSONUtils.parseObject(taskDef.getTaskParams(), ConditionsParameters.class); + assert result != null; + assertThat(result.getDependence()).isNull(); + assertThat(result.getConditionResult()).isNull(); + } + + @Test + void testReplaceTaskCodeForConditionTaskParams_throwsOnInvalidJson() { + TaskDefinitionLog taskDef = new TaskDefinitionLog(); + taskDef.setTaskParams("{ invalid: , }"); + + Assertions + .assertThatThrownBy(() -> invokePrivateMethod("replaceTaskCodeForConditionTaskParams", taskDef, + Collections.emptyMap())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Failed to parse Condition task params"); + } + + // Reflection Helper + private void invokePrivateMethod(String methodName, Object... args) throws Exception { + Class[] argTypes = new Class[args.length]; + for (int i = 0; i < args.length; i++) { + argTypes[i] = args[i].getClass(); + } + if ("replaceTaskCodeForSwitchTaskParams".equals(methodName) || + "replaceTaskCodeForConditionTaskParams".equals(methodName)) { + argTypes[0] = TaskDefinitionLog.class; + argTypes[1] = Map.class; + } + + Method method = WorkflowDefinitionServiceImpl.class.getDeclaredMethod(methodName, argTypes); + method.setAccessible(true); + + try { + method.invoke(workflowDefinitionService, args); + } catch (InvocationTargetException e) { + // Unwrap the actual exception thrown by the private method + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof Error) { + throw (Error) cause; + } else { + // Wrap checked exceptions in a RuntimeException or rethrow as Exception + throw new RuntimeException("Checked exception thrown from private method", cause); + } + } + } + + // Helper to create SwitchResultVo + private SwitchResultVo createSwitchResultVo(Long nextNode) { + SwitchResultVo vo = new SwitchResultVo(); + vo.setNextNode(nextNode); + return vo; + } +} From b662c2cdfa6f02c83eac09d96ff5ce4f6ab2db15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Thu, 15 Jan 2026 10:53:32 +0800 Subject: [PATCH 5/5] add test for replaceInNodeList --- .../WorkflowDefinitionServiceImplTest.java | 102 +++++++++++++++++- 1 file changed, 101 insertions(+), 1 deletion(-) diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImplTest.java index b85efee48193..a963bebbafc7 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImplTest.java @@ -30,10 +30,15 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Supplier; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -186,7 +191,102 @@ void testReplaceTaskCodeForConditionTaskParams_throwsOnInvalidJson() { .hasMessageContaining("Failed to parse Condition task params"); } - // Reflection Helper + @Test + void testReplaceInNodeList_withNullList() throws Exception { + AtomicReference> stateRef = new AtomicReference<>(null); + Supplier> getter = stateRef::get; + Consumer> setter = stateRef::set; + + invokeReplaceInNodeList(getter, setter, Collections.emptyMap()); + + assertThat(stateRef.get()).isNull(); + } + + @Test + void testReplaceInNodeList_withEmptyList() throws Exception { + AtomicReference> stateRef = new AtomicReference<>(new ArrayList<>()); + Supplier> getter = stateRef::get; + Consumer> setter = stateRef::set; + + invokeReplaceInNodeList(getter, setter, Collections.emptyMap()); + + assertThat(stateRef.get()).isEmpty(); + } + + @Test + void testReplaceInNodeList_replacesMappedCodes() throws Exception { + AtomicReference> stateRef = new AtomicReference<>(new ArrayList<>(Arrays.asList(1L, 2L, 3L))); + Supplier> getter = stateRef::get; + Consumer> setter = stateRef::set; + + Map codeMap = new HashMap<>(); + codeMap.put(1L, 10L); + codeMap.put(3L, 30L); + + invokeReplaceInNodeList(getter, setter, codeMap); + + assertThat(stateRef.get()).containsExactly(10L, 2L, 30L); + } + + @Test + void testReplaceInNodeList_preservesUnmappedAndNullElements() throws Exception { + AtomicReference> stateRef = + new AtomicReference<>(new ArrayList<>(Arrays.asList(null, 4L, 5L, null))); + Supplier> getter = stateRef::get; + Consumer> setter = stateRef::set; + + Map codeMap = new HashMap<>(); + codeMap.put(4L, 40L); + + invokeReplaceInNodeList(getter, setter, codeMap); + + assertThat(stateRef.get()).containsExactly((Long) null, 40L, 5L, (Long) null); + } + + @Test + void testReplaceInNodeList_noOpWhenCodeMapIsEmpty() throws Exception { + AtomicReference> stateRef = new AtomicReference<>(new ArrayList<>(Arrays.asList(6L, 7L))); + Supplier> getter = stateRef::get; + Consumer> setter = stateRef::set; + + invokeReplaceInNodeList(getter, setter, Collections.emptyMap()); + + assertThat(stateRef.get()).containsExactly(6L, 7L); + } + + @Test + void testReplaceInNodeList_createsNewListInstance() throws Exception { + List original = Arrays.asList(8L, 9L); + AtomicReference> stateRef = new AtomicReference<>(new ArrayList<>(original)); + Supplier> getter = stateRef::get; + Consumer> setter = stateRef::set; + + invokeReplaceInNodeList(getter, setter, Collections.emptyMap()); + + assertThat(stateRef.get()).isNotSameAs(original); + assertThat(stateRef.get()).isEqualTo(original); + } + + // Reflection Helper to call private replaceInNodeList + private void invokeReplaceInNodeList(Supplier> getter, + Consumer> setter, + Map codeMap) throws Exception { + Method method = WorkflowDefinitionServiceImpl.class + .getDeclaredMethod("replaceInNodeList", Supplier.class, Consumer.class, Map.class); + method.setAccessible(true); + + try { + method.invoke(workflowDefinitionService, getter, setter, codeMap); + } catch (java.lang.reflect.InvocationTargetException e) { + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + throw new RuntimeException("Exception in private method", cause); + } + } + + // Reflection Helper to call private replaceTaskCodeForSwitchTaskParams and replaceTaskCodeForConditionTaskParams private void invokePrivateMethod(String methodName, Object... args) throws Exception { Class[] argTypes = new Class[args.length]; for (int i = 0; i < args.length; i++) {