diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java index f0c060562035..d35560f70767 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java @@ -111,9 +111,13 @@ import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentItem; +import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; +import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; @@ -144,7 +148,9 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.zip.ZipEntry; @@ -2116,22 +2122,10 @@ protected void doBatchOperateWorkflowDefinition(User loginUser, taskDefinitionLog.setVersion(0); taskDefinitionLog.setName(taskDefinitionLog.getName()); if (TaskTypeUtils.isSwitchTask(taskDefinitionLog.getTaskType())) { - final String taskParams = taskDefinitionLog.getTaskParams(); - final SwitchParameters switchParameters = - JSONUtils.parseObject(taskParams, SwitchParameters.class); - if (switchParameters == null) { - throw new IllegalArgumentException( - "Switch task params: " + taskParams + " is invalid."); - } - SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult(); - switchResult.getDependTaskList().forEach(switchResultVo -> { - switchResultVo.setNextNode(taskCodeMap.get(switchResultVo.getNextNode())); - }); - if (switchResult.getNextNode() != null) { - switchResult.setNextNode( - taskCodeMap.get(switchResult.getNextNode())); - } - taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(switchParameters)); + replaceTaskCodeForSwitchTaskParams(taskDefinitionLog, taskCodeMap); + } + if (TaskTypeUtils.isConditionTask(taskDefinitionLog.getTaskType())) { + replaceTaskCodeForConditionTaskParams(taskDefinitionLog, taskCodeMap); } } for (WorkflowTaskRelationLog workflowTaskRelationLog : taskRelationList) { @@ -2205,6 +2199,109 @@ protected void doBatchOperateWorkflowDefinition(User loginUser, } } + /** + * Replaces old task codes with new ones in the parameters of a Switch task. + * Used during workflow duplication or import to preserve correct task dependencies. + */ + private void replaceTaskCodeForSwitchTaskParams(TaskDefinitionLog taskDefLog, Map taskCodeMap) { + String taskParams = taskDefLog.getTaskParams(); + SwitchParameters params; + + try { + params = JSONUtils.parseObject(taskParams, SwitchParameters.class); + } catch (Exception e) { + log.warn("Invalid Switch task params: {}", taskParams, e); + throw new IllegalArgumentException("Failed to parse Switch task params: " + taskParams, e); + } + + if (params == null) { + log.warn("Parsed Switch task params is null: {}", taskParams); + throw new IllegalArgumentException("Failed to parse Switch task params: " + taskParams); + } + + // Update nextBranch if mapped + Long nextBranch = params.getNextBranch(); + if (nextBranch != null && taskCodeMap.containsKey(nextBranch)) { + params.setNextBranch(taskCodeMap.get(nextBranch)); + } + + // Update switch result nodes + SwitchParameters.SwitchResult result = params.getSwitchResult(); + if (result != null) { + Long nextNode = result.getNextNode(); + if (nextNode != null && taskCodeMap.containsKey(nextNode)) { + result.setNextNode(taskCodeMap.get(nextNode)); + } + + // Update depend task list in result + for (SwitchResultVo vo : result.getDependTaskList()) { + Long original = vo.getNextNode(); + if (original != null && taskCodeMap.containsKey(original)) { + vo.setNextNode(taskCodeMap.get(original)); + } + } + } + + taskDefLog.setTaskParams(JSONUtils.toJsonString(params)); + } + + /** + * Replaces old task codes with new ones in the parameters of a Condition task. + * Used during workflow duplication or import to preserve correct task dependencies. + */ + private void replaceTaskCodeForConditionTaskParams(TaskDefinitionLog taskDefLog, Map taskCodeMap) { + String taskParams = taskDefLog.getTaskParams(); + ConditionsParameters params; + + try { + params = JSONUtils.parseObject(taskParams, ConditionsParameters.class); + } catch (Exception e) { + log.warn("Invalid Condition task params: {}", taskParams, e); + throw new IllegalArgumentException("Failed to parse Condition task params: " + taskParams, e); + } + + if (params == null) { + log.warn("Parsed Condition task params is null: {}", taskParams); + throw new IllegalArgumentException("Failed to parse Condition task params: " + taskParams); + } + + // Update dependency task codes + ConditionsParameters.ConditionDependency dep = params.getDependence(); + if (dep != null) { + for (ConditionDependentTaskModel taskModel : dep.getDependTaskList()) { + for (ConditionDependentItem item : taskModel.getDependItemList()) { + Long oldCode = item.getDepTaskCode(); + if (taskCodeMap.containsKey(oldCode)) { + item.setDepTaskCode(taskCodeMap.get(oldCode)); + } + } + } + } + + // Update success/failed node lists + ConditionsParameters.ConditionResult result = params.getConditionResult(); + if (result != null) { + replaceInNodeList(result::getSuccessNode, result::setSuccessNode, taskCodeMap); + replaceInNodeList(result::getFailedNode, result::setFailedNode, taskCodeMap); + } + + taskDefLog.setTaskParams(JSONUtils.toJsonString(params)); + } + + // Helper to avoid duplication for success/failed node lists + private void replaceInNodeList(Supplier> getter, Consumer> setter, + Map taskCodeMap) { + List original = getter.get(); + if (CollectionUtils.isEmpty(original)) + return; + + List updated = original.stream() + .map(code -> code != null && taskCodeMap.containsKey(code) ? taskCodeMap.get(code) : code) + .collect(Collectors.toList()); + + setter.accept(updated); + } + /** * get new task name or workflow name when copy or import operate * diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImplTest.java new file mode 100644 index 000000000000..a963bebbafc7 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImplTest.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; +import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentItem; +import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentTaskModel; +import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; +import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class WorkflowDefinitionServiceImplTest { + + private WorkflowDefinitionServiceImpl workflowDefinitionService; + + @BeforeEach + void setUp() { + workflowDefinitionService = new WorkflowDefinitionServiceImpl(); + } + + @Test + void testReplaceTaskCodeForSwitchTaskParams_replacesAllMappedCodes() throws Exception { + long old1 = 100L, old2 = 200L, old3 = 300L; + long new1 = 1000L, new2 = 2000L, new3 = 3000L; + + SwitchParameters.SwitchResult result = new SwitchParameters.SwitchResult(); + result.setNextNode(old2); + result.setDependTaskList(Arrays.asList( + createSwitchResultVo(old3), + createSwitchResultVo(999L))); + + SwitchParameters params = new SwitchParameters(); + params.setNextBranch(old1); + params.setSwitchResult(result); + + TaskDefinitionLog taskDef = new TaskDefinitionLog(); + taskDef.setTaskParams(JSONUtils.toJsonString(params)); + + Map codeMap = new HashMap<>(); + codeMap.put(old1, new1); + codeMap.put(old2, new2); + codeMap.put(old3, new3); + + invokePrivateMethod("replaceTaskCodeForSwitchTaskParams", taskDef, codeMap); + + SwitchParameters updated = JSONUtils.parseObject(taskDef.getTaskParams(), SwitchParameters.class); + assert updated != null; + assertThat(updated.getNextBranch()).isEqualTo(new1); + assertThat(updated.getSwitchResult().getNextNode()).isEqualTo(new2); + assertThat(updated.getSwitchResult().getDependTaskList().get(0).getNextNode()).isEqualTo(new3); + assertThat(updated.getSwitchResult().getDependTaskList().get(1).getNextNode()).isEqualTo(999L); + } + + @Test + void testReplaceTaskCodeForSwitchTaskParams_handlesNullFields() throws Exception { + SwitchParameters params = new SwitchParameters(); + params.setNextBranch(null); + params.setSwitchResult(null); + + TaskDefinitionLog taskDef = new TaskDefinitionLog(); + taskDef.setTaskParams(JSONUtils.toJsonString(params)); + + invokePrivateMethod("replaceTaskCodeForSwitchTaskParams", taskDef, Collections.emptyMap()); + + SwitchParameters result = JSONUtils.parseObject(taskDef.getTaskParams(), SwitchParameters.class); + assert result != null; + assertThat(result.getNextBranch()).isNull(); + assertThat(result.getSwitchResult()).isNull(); + } + + @Test + void testReplaceTaskCodeForSwitchTaskParams_throwsOnInvalidJson() { + TaskDefinitionLog taskDef = new TaskDefinitionLog(); + taskDef.setTaskParams("{ broken json }"); + + assertThatThrownBy( + () -> invokePrivateMethod("replaceTaskCodeForSwitchTaskParams", taskDef, Collections.emptyMap())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Failed to parse Switch task params"); + } + + @Test + void testReplaceTaskCodeForConditionTaskParams_replacesDepAndResultNodes() throws Exception { + long oldDep = 500L, oldSuc = 600L, oldFail = 700L; + long newDep = 5000L, newSuc = 6000L, newFail = 7000L; + + ConditionDependentItem item = new ConditionDependentItem(); + item.setDepTaskCode(oldDep); + + ConditionDependentTaskModel model = new ConditionDependentTaskModel(); + model.setDependItemList(Collections.singletonList(item)); + + ConditionsParameters.ConditionDependency dep = new ConditionsParameters.ConditionDependency(); + dep.setDependTaskList(Collections.singletonList(model)); + + ConditionsParameters.ConditionResult result = new ConditionsParameters.ConditionResult(); + result.setSuccessNode(Arrays.asList(oldSuc, 888L)); + result.setFailedNode(Collections.singletonList(oldFail)); + + ConditionsParameters params = new ConditionsParameters(); + params.setDependence(dep); + params.setConditionResult(result); + + TaskDefinitionLog taskDef = new TaskDefinitionLog(); + taskDef.setTaskParams(JSONUtils.toJsonString(params)); + + Map codeMap = new HashMap<>(); + codeMap.put(oldDep, newDep); + codeMap.put(oldSuc, newSuc); + codeMap.put(oldFail, newFail); + + invokePrivateMethod("replaceTaskCodeForConditionTaskParams", taskDef, codeMap); + + ConditionsParameters updated = JSONUtils.parseObject(taskDef.getTaskParams(), ConditionsParameters.class); + + assert updated != null; + long actualDepCode = updated.getDependence() + .getDependTaskList().get(0).getDependItemList().get(0).getDepTaskCode(); + assertThat(actualDepCode).isEqualTo(newDep); + + assertThat(updated.getConditionResult().getSuccessNode()) + .containsExactly(newSuc, 888L); + assertThat(updated.getConditionResult().getFailedNode()) + .containsExactly(newFail); + } + + @Test + void testReplaceTaskCodeForConditionTaskParams_handlesNulls() throws Exception { + ConditionsParameters params = new ConditionsParameters(); + params.setDependence(null); + params.setConditionResult(null); + + TaskDefinitionLog taskDef = new TaskDefinitionLog(); + taskDef.setTaskParams(JSONUtils.toJsonString(params)); + + invokePrivateMethod("replaceTaskCodeForConditionTaskParams", taskDef, Collections.emptyMap()); + + ConditionsParameters result = JSONUtils.parseObject(taskDef.getTaskParams(), ConditionsParameters.class); + assert result != null; + assertThat(result.getDependence()).isNull(); + assertThat(result.getConditionResult()).isNull(); + } + + @Test + void testReplaceTaskCodeForConditionTaskParams_throwsOnInvalidJson() { + TaskDefinitionLog taskDef = new TaskDefinitionLog(); + taskDef.setTaskParams("{ invalid: , }"); + + Assertions + .assertThatThrownBy(() -> invokePrivateMethod("replaceTaskCodeForConditionTaskParams", taskDef, + Collections.emptyMap())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Failed to parse Condition task params"); + } + + @Test + void testReplaceInNodeList_withNullList() throws Exception { + AtomicReference> stateRef = new AtomicReference<>(null); + Supplier> getter = stateRef::get; + Consumer> setter = stateRef::set; + + invokeReplaceInNodeList(getter, setter, Collections.emptyMap()); + + assertThat(stateRef.get()).isNull(); + } + + @Test + void testReplaceInNodeList_withEmptyList() throws Exception { + AtomicReference> stateRef = new AtomicReference<>(new ArrayList<>()); + Supplier> getter = stateRef::get; + Consumer> setter = stateRef::set; + + invokeReplaceInNodeList(getter, setter, Collections.emptyMap()); + + assertThat(stateRef.get()).isEmpty(); + } + + @Test + void testReplaceInNodeList_replacesMappedCodes() throws Exception { + AtomicReference> stateRef = new AtomicReference<>(new ArrayList<>(Arrays.asList(1L, 2L, 3L))); + Supplier> getter = stateRef::get; + Consumer> setter = stateRef::set; + + Map codeMap = new HashMap<>(); + codeMap.put(1L, 10L); + codeMap.put(3L, 30L); + + invokeReplaceInNodeList(getter, setter, codeMap); + + assertThat(stateRef.get()).containsExactly(10L, 2L, 30L); + } + + @Test + void testReplaceInNodeList_preservesUnmappedAndNullElements() throws Exception { + AtomicReference> stateRef = + new AtomicReference<>(new ArrayList<>(Arrays.asList(null, 4L, 5L, null))); + Supplier> getter = stateRef::get; + Consumer> setter = stateRef::set; + + Map codeMap = new HashMap<>(); + codeMap.put(4L, 40L); + + invokeReplaceInNodeList(getter, setter, codeMap); + + assertThat(stateRef.get()).containsExactly((Long) null, 40L, 5L, (Long) null); + } + + @Test + void testReplaceInNodeList_noOpWhenCodeMapIsEmpty() throws Exception { + AtomicReference> stateRef = new AtomicReference<>(new ArrayList<>(Arrays.asList(6L, 7L))); + Supplier> getter = stateRef::get; + Consumer> setter = stateRef::set; + + invokeReplaceInNodeList(getter, setter, Collections.emptyMap()); + + assertThat(stateRef.get()).containsExactly(6L, 7L); + } + + @Test + void testReplaceInNodeList_createsNewListInstance() throws Exception { + List original = Arrays.asList(8L, 9L); + AtomicReference> stateRef = new AtomicReference<>(new ArrayList<>(original)); + Supplier> getter = stateRef::get; + Consumer> setter = stateRef::set; + + invokeReplaceInNodeList(getter, setter, Collections.emptyMap()); + + assertThat(stateRef.get()).isNotSameAs(original); + assertThat(stateRef.get()).isEqualTo(original); + } + + // Reflection Helper to call private replaceInNodeList + private void invokeReplaceInNodeList(Supplier> getter, + Consumer> setter, + Map codeMap) throws Exception { + Method method = WorkflowDefinitionServiceImpl.class + .getDeclaredMethod("replaceInNodeList", Supplier.class, Consumer.class, Map.class); + method.setAccessible(true); + + try { + method.invoke(workflowDefinitionService, getter, setter, codeMap); + } catch (java.lang.reflect.InvocationTargetException e) { + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + throw new RuntimeException("Exception in private method", cause); + } + } + + // Reflection Helper to call private replaceTaskCodeForSwitchTaskParams and replaceTaskCodeForConditionTaskParams + private void invokePrivateMethod(String methodName, Object... args) throws Exception { + Class[] argTypes = new Class[args.length]; + for (int i = 0; i < args.length; i++) { + argTypes[i] = args[i].getClass(); + } + if ("replaceTaskCodeForSwitchTaskParams".equals(methodName) || + "replaceTaskCodeForConditionTaskParams".equals(methodName)) { + argTypes[0] = TaskDefinitionLog.class; + argTypes[1] = Map.class; + } + + Method method = WorkflowDefinitionServiceImpl.class.getDeclaredMethod(methodName, argTypes); + method.setAccessible(true); + + try { + method.invoke(workflowDefinitionService, args); + } catch (InvocationTargetException e) { + // Unwrap the actual exception thrown by the private method + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof Error) { + throw (Error) cause; + } else { + // Wrap checked exceptions in a RuntimeException or rethrow as Exception + throw new RuntimeException("Checked exception thrown from private method", cause); + } + } + } + + // Helper to create SwitchResultVo + private SwitchResultVo createSwitchResultVo(Long nextNode) { + SwitchResultVo vo = new SwitchResultVo(); + vo.setNextNode(nextNode); + return vo; + } +}