From fc80ee1babc8003ec3a74be8be69838af76c3f69 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Tue, 6 May 2025 23:02:03 +0800 Subject: [PATCH 1/2] Support query by wrapper in BaseDao --- .../WorkflowInstanceAuditOperatorImpl.java | 2 +- .../WorkflowInstanceController.java | 20 --- .../api/service/WorkflowInstanceService.java | 10 -- .../api/service/impl/ExecutorServiceImpl.java | 2 +- .../api/service/impl/TenantServiceImpl.java | 33 ++-- .../service/impl/WorkerGroupServiceImpl.java | 14 +- .../impl/WorkflowInstanceServiceImpl.java | 103 +++++------- .../controller/WorkerGroupControllerTest.java | 13 +- .../WorkflowInstanceControllerTest.java | 19 --- .../api/service/TaskInstanceServiceTest.java | 2 +- .../api/service/TenantServiceTest.java | 11 +- .../api/service/WorkerGroupServiceTest.java | 26 +-- .../service/WorkflowInstanceServiceTest.java | 50 ++---- .../dao/mapper/WorkflowInstanceMapper.java | 111 ------------ .../dao/repository/BaseDao.java | 29 +++- .../dolphinscheduler/dao/repository/IDao.java | 16 +- .../dao/repository/WorkflowInstanceDao.java | 23 +-- .../impl/WorkflowInstanceDaoImpl.java | 57 ++++--- .../dao/mapper/WorkflowInstanceMapper.xml | 158 +----------------- .../mapper/WorkflowInstanceMapperTest.java | 43 +---- .../impl/WorkflowInstanceDaoImplTest.java | 52 ------ .../master/failover/FailoverCoordinator.java | 2 +- .../service/process/ProcessService.java | 7 - .../service/process/ProcessServiceImpl.java | 50 +----- 24 files changed, 202 insertions(+), 651 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/operator/impl/WorkflowInstanceAuditOperatorImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/operator/impl/WorkflowInstanceAuditOperatorImpl.java index fec46ac1499d..e28e79ce56be 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/operator/impl/WorkflowInstanceAuditOperatorImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/operator/impl/WorkflowInstanceAuditOperatorImpl.java @@ -64,7 +64,7 @@ protected String getObjectNameFromIdentity(Object identity) { return ""; } - WorkflowInstance obj = workflowInstanceMapper.queryDetailById(objId); + WorkflowInstance obj = workflowInstanceMapper.selectById(objId); return obj == null ? "" : obj.getName(); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceController.java index bd7fb4619b3f..83505928bf56 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceController.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.api.controller; -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKFLOW_INSTANCE_LIST_PAGING_ERROR; - import org.apache.dolphinscheduler.api.audit.OperatorLog; import org.apache.dolphinscheduler.api.audit.enums.AuditType; import org.apache.dolphinscheduler.api.dto.DynamicSubWorkflowDto; @@ -422,22 +420,4 @@ public Result batchDeleteWorkflowInstanceByIds(@RequestAttribute(value = Constan return returnDataList(result); } - // Todo: This is unstable, in some case the command trigger failed, we cannot get workflow instance - // And it's a bad design to use trigger code to get workflow instance why not directly get by workflow instanceId or - // inject the trigger id into workflow instance? - @Deprecated - @Operation(summary = "queryWorkflowInstanceListByTrigger", description = "QUERY_WORKFLOW_INSTANCE_BY_TRIGGER_NOTES") - @Parameters({ - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true, schema = @Schema(implementation = Long.class)), - @Parameter(name = "triggerCode", description = "TRIGGER_CODE", required = true, schema = @Schema(implementation = Long.class)) - }) - @GetMapping("/trigger") - @ResponseStatus(HttpStatus.OK) - @ApiException(QUERY_WORKFLOW_INSTANCE_LIST_PAGING_ERROR) - public Result queryWorkflowInstancesByTriggerCode(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @PathVariable long projectCode, - @RequestParam(value = "triggerCode") Long triggerCode) { - Map result = workflowInstanceService.queryByTriggerCode(loginUser, projectCode, triggerCode); - return returnDataList(result); - } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceService.java index c5d535d30b8c..cae5f7726705 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceService.java @@ -230,16 +230,6 @@ List queryByWorkflowCodeVersionStatus(Long workflowDefinitionC List queryByWorkflowDefinitionCode(Long workflowDefinitionCode, int size); - /** - * query workflow instance list bt trigger code - * - * @param loginUser - * @param projectCode - * @param triggerCode - * @return - */ - Map queryByTriggerCode(User loginUser, long projectCode, Long triggerCode); - void deleteWorkflowInstanceByWorkflowDefinitionCode(long workflowDefinitionCode); void deleteWorkflowInstanceById(int workflowInstanceId); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 99e55bfab2ee..f59320a219fe 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -326,7 +326,7 @@ public WorkflowExecuteResponse executeTask(User loginUser, projectService.checkProjectAndAuthThrowException(loginUser, projectCode, ApiFuncIdentificationConstant.map.get(ExecuteType.EXECUTE_TASK)); - WorkflowInstance workflowInstance = processService.findWorkflowInstanceDetailById(workflowInstanceId) + WorkflowInstance workflowInstance = workflowInstanceDao.queryOptionalById(workflowInstanceId) .orElseThrow(() -> new ServiceException(Status.WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId)); if (!workflowInstance.getState().isFinished()) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java index d318892a4d24..54c5f9877c91 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java @@ -40,18 +40,21 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper; +import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -83,6 +86,8 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService @Autowired(required = false) private StorageOperator storageOperator; + @Autowired + private WorkflowInstanceDao workflowInstanceDao; /** * Check the tenant new object valid or not @@ -129,10 +134,10 @@ && checkTenantExists(updateTenant.getTenantCode())) { /** * create tenant * - * @param loginUser login user + * @param loginUser login user * @param tenantCode tenant code - * @param queueId queue id - * @param desc description + * @param queueId queue id + * @param desc description * @return create result code */ @Override @@ -179,11 +184,11 @@ public PageInfo queryTenantList(User loginUser, String searchVal, Intege /** * updateWorkflowInstance tenant * - * @param loginUser login user - * @param id tenant id + * @param loginUser login user + * @param id tenant id * @param tenantCode tenant code - * @param queueId queue id - * @param desc description + * @param queueId queue id + * @param desc description * @return update result code * @throws Exception exception */ @@ -256,9 +261,11 @@ public void deleteTenantById(User loginUser, int id) throws Exception { } private List getWorkflowInstancesByTenant(Tenant tenant) { - return workflowInstanceMapper.queryByTenantCodeAndStatus( - tenant.getTenantCode(), - WorkflowExecutionStatus.getNotTerminalStatus()); + return workflowInstanceDao.queryByCondition( + queryWrapper -> queryWrapper.eq(WorkflowInstance::getTenantCode, tenant.getTenantCode()) + .in(WorkflowInstance::getState, + Arrays.stream(WorkflowExecutionStatus.getNotTerminalStatus()).boxed() + .collect(Collectors.toList()))); } /** @@ -324,9 +331,9 @@ public Map queryByTenantCode(String tenantCode) { * ONLY for python gateway server, and should not use this in web ui function * * @param tenantCode tenant code - * @param desc The description of tenant object - * @param queue The value of queue which current tenant belong - * @param queueName The name of queue which current tenant belong + * @param desc The description of tenant object + * @param queue The value of queue which current tenant belong + * @param queueName The name of queue which current tenant belong * @return Tenant object */ @Override diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index eefe30767026..57de1ec4bef8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -45,8 +45,8 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper; import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao; +import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.extract.base.client.Clients; import org.apache.dolphinscheduler.extract.master.IMasterContainerService; import org.apache.dolphinscheduler.registry.api.RegistryClient; @@ -56,6 +56,7 @@ import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -81,9 +82,6 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro @Autowired private WorkerGroupDao workerGroupDao; - @Autowired - private WorkflowInstanceMapper workflowInstanceMapper; - @Autowired private RegistryClient registryClient; @@ -98,6 +96,8 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro @Autowired private WorkflowDefinitionMapper workflowDefinitionMapper; + @Autowired + private WorkflowInstanceDao workflowInstanceDao; /** * create or update a worker group @@ -344,9 +344,9 @@ public Map deleteWorkerGroupById(User loginUser, Integer id) { putMsg(result, Status.DELETE_WORKER_GROUP_NOT_EXIST); return result; } - List workflowInstances = workflowInstanceMapper.queryByWorkerGroupNameAndStatus( - workerGroup.getName(), - WorkflowExecutionStatus.getNotTerminalStatus()); + List workflowInstances = workflowInstanceDao.queryByCondition(queryWrapper -> queryWrapper + .eq(WorkflowInstance::getWorkerGroup, workerGroup.getName()).in(WorkflowInstance::getState, Arrays + .stream(WorkflowExecutionStatus.getNotTerminalStatus()).boxed().collect(Collectors.toList()))); if (CollectionUtils.isNotEmpty(workflowInstances)) { List workflowInstanceIds = workflowInstances.stream().map(WorkflowInstance::getId).collect(Collectors.toList()); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java index f92000a3a209..796cd10c896c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java @@ -90,6 +90,7 @@ import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.Date; @@ -232,9 +233,9 @@ public Map queryTopNLongestRunningWorkflowInstance(User loginUse /** * query workflow instance by id * - * @param loginUser login user - * @param projectCode project code - * @param workflowInstanceId workflow instance id + * @param loginUser login user + * @param projectCode project code + * @param workflowInstanceId workflow instance id * @return workflow instance detail */ @Override @@ -247,7 +248,7 @@ public Map queryWorkflowInstanceById(User loginUser, long projec if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } - WorkflowInstance workflowInstance = processService.findWorkflowInstanceDetailById(workflowInstanceId) + WorkflowInstance workflowInstance = workflowInstanceDao.queryOptionalById(workflowInstanceId) .orElseThrow(() -> new ServiceException(WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId)); WorkflowDefinition workflowDefinition = @@ -295,17 +296,17 @@ public Map queryWorkflowInstanceById(User loginUser, Integer wor /** * paging query workflow instance list, filtering according to project, workflow definition, time range, keyword, workflow status * - * @param loginUser login user - * @param projectCode project code + * @param loginUser login user + * @param projectCode project code * @param workflowDefinitionCode workflow definition code - * @param pageNo page number - * @param pageSize page size - * @param searchVal search value - * @param stateType state type - * @param host host - * @param startDate start time - * @param endDate end time - * @param otherParamsJson otherParamsJson handle other params + * @param pageNo page number + * @param pageSize page size + * @param searchVal search value + * @param stateType state type + * @param host host + * @param startDate start time + * @param endDate end time + * @param otherParamsJson otherParamsJson handle other params * @return workflow instance list */ @Override @@ -441,9 +442,9 @@ public Result queryWorkflowInstanceList(User loginUser, WorkflowInstanceQueryReq /** * query task list by workflow instance id * - * @param loginUser login user - * @param projectCode project code - * @param workflowInstanceId workflow instance id + * @param loginUser login user + * @param projectCode project code + * @param workflowInstanceId workflow instance id * @return task list for the workflow instance */ @Override @@ -457,7 +458,7 @@ public Map queryTaskListByWorkflowInstanceId(User loginUser, lon if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } - WorkflowInstance workflowInstance = processService.findWorkflowInstanceDetailById(workflowInstanceId) + WorkflowInstance workflowInstance = workflowInstanceDao.queryOptionalById(workflowInstanceId) .orElseThrow(() -> new ServiceException(WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId)); WorkflowDefinition workflowDefinition = workflowDefinitionMapper.queryByCode(workflowInstance.getWorkflowDefinitionCode()); @@ -628,7 +629,7 @@ public Map querySubWorkflowInstanceByTaskId(User loginUser, long * @param projectCode project code * @param taskRelationJson workflow task relation json * @param taskDefinitionJson taskDefinitionJson - * @param workflowInstanceId workflow instance id + * @param workflowInstanceId workflow instance id * @param scheduleTime schedule time * @param syncDefine sync define * @param globalParams global params @@ -649,7 +650,7 @@ public Map updateWorkflowInstance(User loginUser, long projectCo ApiFuncIdentificationConstant.INSTANCE_UPDATE); Map result = new HashMap<>(); // check workflow instance exists - WorkflowInstance workflowInstance = processService.findWorkflowInstanceDetailById(workflowInstanceId) + WorkflowInstance workflowInstance = workflowInstanceDao.queryOptionalById(workflowInstanceId) .orElseThrow(() -> new ServiceException(WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId)); // check workflow instance exists in project WorkflowDefinition workflowDefinition0 = @@ -797,7 +798,7 @@ public Map queryParentInstanceBySubId(User loginUser, long proje return result; } - WorkflowInstance subInstance = processService.findWorkflowInstanceDetailById(subId) + WorkflowInstance subInstance = workflowInstanceDao.queryOptionalById(subId) .orElseThrow(() -> new ServiceException(WORKFLOW_INSTANCE_NOT_EXIST, subId)); if (subInstance.getIsSubWorkflow() == Flag.NO) { log.warn( @@ -824,14 +825,14 @@ public Map queryParentInstanceBySubId(User loginUser, long proje /** * delete workflow instance by id, at the same time,delete task instance and their mapping relation data * - * @param loginUser login user + * @param loginUser login user * @param workflowInstanceId workflow instance id * @return delete result code */ @Override @Transactional public void deleteWorkflowInstanceById(User loginUser, Integer workflowInstanceId) { - WorkflowInstance workflowInstance = processService.findWorkflowInstanceDetailById(workflowInstanceId) + WorkflowInstance workflowInstance = workflowInstanceDao.queryOptionalById(workflowInstanceId) .orElseThrow(() -> new ServiceException(WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId)); WorkflowDefinition workflowDefinition = workflowDefinitionLogMapper.queryByDefinitionCodeAndVersion( workflowInstance.getWorkflowDefinitionCode(), workflowInstance.getWorkflowDefinitionVersion()); @@ -853,7 +854,7 @@ public void deleteWorkflowInstanceById(User loginUser, Integer workflowInstanceI /** * view workflow instance variables * - * @param projectCode project code + * @param projectCode project code * @param workflowInstanceId workflow instance id * @return variables data */ @@ -861,7 +862,7 @@ public void deleteWorkflowInstanceById(User loginUser, Integer workflowInstanceI public Map viewVariables(long projectCode, Integer workflowInstanceId) { Map result = new HashMap<>(); - WorkflowInstance workflowInstance = workflowInstanceMapper.queryDetailById(workflowInstanceId); + WorkflowInstance workflowInstance = workflowInstanceMapper.selectById(workflowInstanceId); if (workflowInstance == null) { log.error("workflow instance does not exist, projectCode:{}, workflowInstanceId:{}.", projectCode, @@ -947,7 +948,7 @@ private Map> getLocalParams(WorkflowInstance workflo /** * encapsulation gantt structure * - * @param projectCode project code + * @param projectCode project code * @param workflowInstanceId workflow instance id * @return gantt tree data * @throws Exception exception when json parse @@ -955,7 +956,7 @@ private Map> getLocalParams(WorkflowInstance workflo @Override public Map viewGantt(long projectCode, Integer workflowInstanceId) throws Exception { Map result = new HashMap<>(); - WorkflowInstance workflowInstance = workflowInstanceMapper.queryDetailById(workflowInstanceId); + WorkflowInstance workflowInstance = workflowInstanceMapper.selectById(workflowInstanceId); if (workflowInstance == null) { log.error("workflow instance does not exist, projectCode:{}, workflowInstanceId:{}.", projectCode, @@ -1020,64 +1021,44 @@ public Map viewGantt(long projectCode, Integer workflowInstanceI * query workflow instance by workflowDefinitionCode and stateArray * * @param workflowDefinitionCode workflowDefinitionCode - * @param states states array + * @param states states array * @return workflow instance list */ @Override public List queryByWorkflowDefinitionCodeAndStatus(Long workflowDefinitionCode, int[] states) { - return workflowInstanceMapper.queryByWorkflowDefinitionCodeAndStatus(workflowDefinitionCode, states); + return workflowInstanceDao.queryByCondition( + queryWrapper -> queryWrapper.eq(WorkflowInstance::getWorkflowDefinitionCode, workflowDefinitionCode) + .in(WorkflowInstance::getState, Arrays.stream(states).boxed().collect(Collectors.toList()))); } @Override public List queryByWorkflowCodeVersionStatus(Long workflowDefinitionCode, int workflowDefinitionVersion, int[] states) { - return workflowInstanceDao.queryByWorkflowCodeVersionStatus(workflowDefinitionCode, workflowDefinitionVersion, - states); + return workflowInstanceDao.queryByCondition( + queryWrapper -> queryWrapper.eq(WorkflowInstance::getWorkflowDefinitionCode, workflowDefinitionCode) + .eq(WorkflowInstance::getWorkflowDefinitionVersion, workflowDefinitionVersion) + .in(WorkflowInstance::getState, states)); } /** * query workflow instance by workflowDefinitionCode * * @param workflowDefinitionCode workflowDefinitionCode - * @param size size + * @param size size * @return workflow instance list */ @Override public List queryByWorkflowDefinitionCode(Long workflowDefinitionCode, int size) { - return workflowInstanceMapper.queryByWorkflowDefinitionCode(workflowDefinitionCode, size); - } - - /** - * query workflow instance list bt trigger code - * - * @param loginUser - * @param projectCode - * @param triggerCode - * @return - */ - @Override - public Map queryByTriggerCode(User loginUser, long projectCode, Long triggerCode) { - - Project project = projectMapper.queryByCode(projectCode); - // check user access for project - Map result = - projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE); - if (result.get(Constants.STATUS) != Status.SUCCESS || triggerCode == null) { - return result; - } - - List workflowInstances = workflowInstanceMapper.queryByTriggerCode( - triggerCode); - result.put(DATA_LIST, workflowInstances); - putMsg(result, Status.SUCCESS); - return result; + return workflowInstanceDao.queryByCondition( + queryWrapper -> queryWrapper.eq(WorkflowInstance::getWorkflowDefinitionCode, workflowDefinitionCode) + .orderByDesc(WorkflowInstance::getStartTime), + size); } @Override public void deleteWorkflowInstanceByWorkflowDefinitionCode(long workflowDefinitionCode) { while (true) { - List workflowInstances = - workflowInstanceMapper.queryByWorkflowDefinitionCode(workflowDefinitionCode, 100); + List workflowInstances = queryByWorkflowDefinitionCode(workflowDefinitionCode, 100); if (CollectionUtils.isEmpty(workflowInstances)) { break; } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java index 06ac58fde5ce..b578e148b890 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java @@ -27,13 +27,17 @@ import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper; import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao; +import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -59,6 +63,9 @@ public class WorkerGroupControllerTest extends AbstractControllerTest { @MockBean(name = "processInstanceMapper") private WorkflowInstanceMapper workflowInstanceMapper; + @MockBean(name = "workflowInstanceDao") + private WorkflowInstanceDao workflowInstanceDao; + @MockBean(name = "registryClient") private RegistryClient registryClient; @@ -134,8 +141,10 @@ public void testDeleteById() throws Exception { workerGroup.setId(12); workerGroup.setName("测试"); Mockito.when(workerGroupDao.queryById(12)).thenReturn(workerGroup); - Mockito.when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus("测试", - WorkflowExecutionStatus.getNotTerminalStatus())) + Mockito.when(workflowInstanceDao.queryByCondition( + queryWrapper -> queryWrapper.eq(WorkflowInstance::getWorkerGroup, "测试").in(WorkflowInstance::getState, + Arrays.stream(WorkflowExecutionStatus.getNotTerminalStatus()).boxed() + .collect(Collectors.toList())))) .thenReturn(null); Mockito.when(workerGroupDao.deleteById(12)).thenReturn(true); Mockito.when(workflowInstanceMapper.updateWorkflowInstanceByWorkerGroupName("测试", "")).thenReturn(1); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceControllerTest.java index 9007446dd12b..66725c323f5b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceControllerTest.java @@ -248,23 +248,4 @@ public void testBatchDeleteWorkflowInstanceByIds() throws Exception { Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); } - @Test - public void queryWorkflowInstancesByTriggerCode() throws Exception { - Map mockResult = new HashMap<>(); - mockResult.put(Constants.STATUS, Status.SUCCESS); - - Mockito.when(workflowInstanceService - .queryByTriggerCode(Mockito.any(), Mockito.anyLong(), Mockito.anyLong())) - .thenReturn(mockResult); - - MvcResult mvcResult = mockMvc.perform(get("/projects/1113/workflow-instances/trigger") - .header("sessionId", sessionId) - .param("triggerCode", "12051206")) - .andExpect(status().isOk()) - .andExpect(content().contentType(MediaType.APPLICATION_JSON)) - .andReturn(); - Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); - Assertions.assertNotNull(result); - Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); - } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java index 5a8dabaaae04..5d4d0bd5a18a 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java @@ -179,7 +179,7 @@ public void queryTaskListPaging() { Mockito.any())) .thenReturn(pageReturn); when(usersService.queryUser(workflowInstance.getExecutorId())).thenReturn(loginUser); - when(processService.findWorkflowInstanceDetailById(taskInstance.getWorkflowInstanceId())) + when(workflowInstanceDao.queryOptionalById(taskInstance.getWorkflowInstanceId())) .thenReturn(Optional.of(workflowInstance)); Result successRes = taskInstanceService.queryTaskListPaging(loginUser, diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java index c595415b0b38..f2d9a8776f05 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java @@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.UserType; -import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.entity.Queue; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Tenant; @@ -42,6 +41,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper; +import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator; import org.apache.commons.collections4.CollectionUtils; @@ -89,6 +89,9 @@ public class TenantServiceTest { @Mock private WorkflowInstanceMapper workflowInstanceMapper; + @Mock + private WorkflowInstanceDao workflowInstanceDao; + @Mock private UserMapper userMapper; @@ -191,9 +194,7 @@ public void testDeleteTenantById() { when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.TENANT, null, 0, baseServiceLogger)).thenReturn(true); when(tenantMapper.queryById(1)).thenReturn(getTenant()); - when(workflowInstanceMapper.queryByTenantCodeAndStatus(tenantCode, - WorkflowExecutionStatus.getNotTerminalStatus())) - .thenReturn(getInstanceList()); + when(workflowInstanceDao.queryByCondition(Mockito.any())).thenReturn(getInstanceList()); when(scheduleMapper.queryScheduleListByTenant(tenantCode)).thenReturn(getScheduleList()); when(userMapper.queryUserListByTenant(3)).thenReturn(getUserList()); @@ -205,7 +206,7 @@ public void testDeleteTenantById() { () -> tenantService.deleteTenantById(getLoginUser(), 1)); // DELETE_TENANT_BY_ID_FAIL_DEFINES - when(workflowInstanceMapper.queryByTenantCodeAndStatus(any(), any())).thenReturn(Collections.emptyList()); + when(workflowInstanceDao.queryByCondition(any())).thenReturn(Collections.emptyList()); when(tenantMapper.queryById(2)).thenReturn(getTenant(2)); assertThrowsServiceException(Status.DELETE_TENANT_BY_ID_FAIL_DEFINES, () -> tenantService.deleteTenantById(getLoginUser(), 2)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java index 01a140635edf..92b8428bf927 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java @@ -39,17 +39,19 @@ import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper; import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao; +import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -64,6 +66,8 @@ import org.slf4j.LoggerFactory; import org.springframework.dao.DuplicateKeyException; +import com.google.common.collect.Lists; + @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) public class WorkerGroupServiceTest { @@ -79,7 +83,7 @@ public class WorkerGroupServiceTest { private WorkerGroupDao workerGroupDao; @Mock - private WorkflowInstanceMapper workflowInstanceMapper; + private WorkflowInstanceDao workflowInstanceDao; @Mock private RegistryClient registryClient; @@ -230,15 +234,9 @@ public void giveRunningProcess_whenDeleteWorkerGroupById_expectFailed() { WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true); when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, baseServiceLogger)).thenReturn(true); - WorkerGroup workerGroup = getWorkerGroup(1); - when(workerGroupDao.queryById(1)).thenReturn(workerGroup); - WorkflowInstance workflowInstance = new WorkflowInstance(); - workflowInstance.setId(1); - List workflowInstances = new ArrayList(); - workflowInstances.add(workflowInstance); - when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(), - WorkflowExecutionStatus.getNotTerminalStatus())) - .thenReturn(workflowInstances); + when(workerGroupDao.queryById(1)).thenReturn(getWorkerGroup(1)); + when(workflowInstanceDao.queryByCondition(Mockito.any())) + .thenReturn(Lists.newArrayList(WorkflowInstance.builder().id(1).build())); Map deleteFailed = workerGroupService.deleteWorkerGroupById(loginUser, 1); Assertions.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(), @@ -254,8 +252,10 @@ public void giveValidParams_whenDeleteWorkerGroupById_expectSuccess() { baseServiceLogger)).thenReturn(true); WorkerGroup workerGroup = getWorkerGroup(1); when(workerGroupDao.queryById(1)).thenReturn(workerGroup); - when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(), - WorkflowExecutionStatus.getNotTerminalStatus())).thenReturn(null); + when(workflowInstanceDao.queryByCondition(queryWrapper -> queryWrapper + .eq(WorkflowInstance::getWorkerGroup, workerGroup.getName()).in(WorkflowInstance::getState, Arrays + .stream(WorkflowExecutionStatus.getNotTerminalStatus()).boxed().collect(Collectors.toList())))) + .thenReturn(null); when(workerGroupDao.deleteById(1)).thenReturn(true); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java index 9395e471ee0b..57e7edc4c041 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java @@ -306,28 +306,6 @@ public void testQueryWorkflowInstanceList() { } - @Test - public void queryByTriggerCode() { - long projectCode = 666L; - User loginUser = getAdminUser(); - Project project = getProject(projectCode); - Map result = new HashMap<>(); - putMsg(result, Status.PROJECT_NOT_FOUND, projectCode); - - // project auth fail - when(projectMapper.queryByCode(projectCode)).thenReturn(project); - when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); - Map proejctAuthFailMap = - workflowInstanceService.queryByTriggerCode(loginUser, projectCode, 999L); - Assertions.assertEquals(Status.PROJECT_NOT_FOUND, proejctAuthFailMap.get(Constants.STATUS)); - // project auth sucess - putMsg(result, Status.SUCCESS, projectCode); - when(workflowInstanceMapper.queryByTriggerCode(projectCode)).thenReturn(new ArrayList()); - proejctAuthFailMap = - workflowInstanceService.queryByTriggerCode(loginUser, projectCode, 999L); - Assertions.assertEquals(Status.SUCCESS, proejctAuthFailMap.get(Constants.STATUS)); - } - @Test public void testQueryTopNLongestRunningWorkflowInstance() { long projectCode = 1L; @@ -426,7 +404,7 @@ public void testQueryWorkflowInstanceById() { workflowDefinition.setProjectCode(projectCode); when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); - when(processService.findWorkflowInstanceDetailById(workflowInstance.getId())) + when(workflowInstanceDao.queryOptionalById(workflowInstance.getId())) .thenReturn(Optional.of(workflowInstance)); when(processService.findWorkflowDefinition(workflowInstance.getWorkflowDefinitionCode(), workflowInstance.getWorkflowDefinitionVersion())).thenReturn(workflowDefinition); @@ -491,7 +469,7 @@ public void testQueryTaskListByWorkflowInstanceId() throws IOException { res.setData("xxx"); when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); - when(processService.findWorkflowInstanceDetailById(workflowInstance.getId())) + when(workflowInstanceDao.queryOptionalById(workflowInstance.getId())) .thenReturn(Optional.of(workflowInstance)); when(taskInstanceDao.queryValidTaskListByWorkflowInstanceId(workflowInstance.getId(), workflowInstance.getTestFlag())) @@ -594,13 +572,13 @@ public void testUpdateWorkflowInstance() { doNothing() .when(projectService) .checkProjectAndAuthThrowException(loginUser, projectCode, INSTANCE_UPDATE); - when(processService.findWorkflowInstanceDetailById(1)).thenReturn(Optional.empty()); + when(workflowInstanceDao.queryOptionalById(1)).thenReturn(Optional.empty()); assertThrows(ServiceException.class, () -> { workflowInstanceService.updateWorkflowInstance(loginUser, projectCode, 1, shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0); }); // process instance not finish - when(processService.findWorkflowInstanceDetailById(1)).thenReturn(Optional.ofNullable(workflowInstance)); + when(workflowInstanceDao.queryOptionalById(1)).thenReturn(Optional.ofNullable(workflowInstance)); workflowInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); putMsg(result, Status.SUCCESS, projectCode); Map processInstanceNotFinishRes = @@ -674,7 +652,7 @@ public void testQueryParentInstanceBySubId() { putMsg(result, Status.SUCCESS, projectCode); when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); - when(processService.findWorkflowInstanceDetailById(1)).thenReturn(Optional.empty()); + when(workflowInstanceDao.queryOptionalById(1)).thenReturn(Optional.empty()); assertThrows(ServiceException.class, () -> { workflowInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1); }); @@ -683,7 +661,7 @@ public void testQueryParentInstanceBySubId() { WorkflowInstance workflowInstance = getProcessInstance(); workflowInstance.setIsSubWorkflow(Flag.NO); putMsg(result, Status.SUCCESS, projectCode); - when(processService.findWorkflowInstanceDetailById(1)).thenReturn(Optional.ofNullable(workflowInstance)); + when(workflowInstanceDao.queryOptionalById(1)).thenReturn(Optional.ofNullable(workflowInstance)); Map notSubProcessRes = workflowInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1); Assertions.assertEquals(Status.WORKFLOW_INSTANCE_NOT_SUB_WORKFLOW_INSTANCE, @@ -724,7 +702,7 @@ public void testDeleteWorkflowInstanceById() { workflowInstance.setIsSubWorkflow(Flag.NO); workflowInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); putMsg(result, Status.SUCCESS, projectCode); - when(processService.findWorkflowInstanceDetailById(1)).thenReturn(Optional.ofNullable(workflowInstance)); + when(workflowInstanceDao.queryOptionalById(1)).thenReturn(Optional.ofNullable(workflowInstance)); when(workflowDefinitionLogMapper.queryByDefinitionCodeAndVersion(Mockito.anyLong(), Mockito.anyInt())) .thenReturn(new WorkflowDefinitionLog()); assertThrows(ServiceException.class, @@ -741,12 +719,12 @@ public void testDeleteWorkflowInstanceById() { workflowDefinition.setUserId(1); workflowDefinition.setProjectCode(0L); when(workflowDefinitionMapper.queryByCode(46L)).thenReturn(workflowDefinition); - when(processService.findWorkflowInstanceDetailById(Mockito.anyInt())).thenReturn(Optional.empty()); + when(workflowInstanceDao.queryOptionalById(Mockito.anyInt())).thenReturn(Optional.empty()); assertThrows(ServiceException.class, () -> workflowInstanceService.deleteWorkflowInstanceById(loginUser, 1)); workflowDefinition.setProjectCode(projectCode); - when(processService.findWorkflowInstanceDetailById(Mockito.anyInt())).thenReturn(Optional.of(workflowInstance)); + when(workflowInstanceDao.queryOptionalById(Mockito.anyInt())).thenReturn(Optional.of(workflowInstance)); when(processService.deleteWorkflowInstanceById(1)).thenReturn(1); workflowInstanceService.deleteWorkflowInstanceById(loginUser, 1); @@ -761,12 +739,12 @@ public void testViewVariables() { workflowInstance.setCommandType(CommandType.SCHEDULER); workflowInstance.setScheduleTime(new Date()); workflowInstance.setGlobalParams(""); - when(workflowInstanceMapper.queryDetailById(1)).thenReturn(workflowInstance); + when(workflowInstanceMapper.selectById(1)).thenReturn(workflowInstance); Map successRes = workflowInstanceService.viewVariables(1L, 1); Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); - when(workflowInstanceMapper.queryDetailById(1)).thenReturn(null); + when(workflowInstanceMapper.selectById(1)).thenReturn(null); Map processNotExist = workflowInstanceService.viewVariables(1L, 1); Assertions.assertEquals(Status.WORKFLOW_INSTANCE_NOT_EXIST, processNotExist.get(Constants.STATUS)); } @@ -777,11 +755,11 @@ public void testViewGantt() throws Exception { TaskInstance taskInstance = getTaskInstance(); taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); - when(workflowInstanceMapper.queryDetailById(1)).thenReturn(workflowInstance); + when(workflowInstanceMapper.selectById(1)).thenReturn(workflowInstance); when(workflowDefinitionLogMapper.queryByDefinitionCodeAndVersion( workflowInstance.getWorkflowDefinitionCode(), workflowInstance.getWorkflowDefinitionVersion())).thenReturn(new WorkflowDefinitionLog()); - when(workflowInstanceMapper.queryDetailById(1)).thenReturn(workflowInstance); + when(workflowInstanceMapper.selectById(1)).thenReturn(workflowInstance); DAG graph = new DAG<>(); for (long i = 1; i <= 7; ++i) { graph.addNode(i, new TaskNode()); @@ -793,7 +771,7 @@ public void testViewGantt() throws Exception { Map successRes = workflowInstanceService.viewGantt(0L, 1); Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); - when(workflowInstanceMapper.queryDetailById(1)).thenReturn(null); + when(workflowInstanceMapper.selectById(1)).thenReturn(null); Map processNotExist = workflowInstanceService.viewVariables(1L, 1); Assertions.assertEquals(Status.WORKFLOW_INSTANCE_NOT_EXIST, processNotExist.get(Constants.STATUS)); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java index a9f743f49ab2..117a6fb1ccb1 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java @@ -38,59 +38,6 @@ */ public interface WorkflowInstanceMapper extends BaseMapper { - /** - * query workflow instance detail info by id - * - * @param id id - * @return workflow instance - */ - WorkflowInstance queryDetailById(@Param("id") int id); - - /** - * query workflow instance by host and stateArray - * - * @param host host - * @param stateArray stateArray - * @return workflow instance list - */ - List queryByHostAndStatus(@Param("host") String host, - @Param("states") int[] stateArray); - - /** - * query workflow instance by host and stateArray which is not sub workflow - * - * @param host host - * @param stateArray stateArray - * @return workflow instance list - */ - List queryMainWorkflowByHostAndStatus(@Param("host") String host, - @Param("states") int[] stateArray); - /** - * query workflow instance host by stateArray - * - * @param stateArray - * @return - */ - List queryNeedFailoverWorkflowInstanceHost(@Param("states") int[] stateArray); - - /** - * query workflow instance by tenantCode and stateArray - * - * @param tenantCode tenantCode - * @param states states array - * @return workflow instance list - */ - List queryByTenantCodeAndStatus(@Param("tenantCode") String tenantCode, - @Param("states") int[] states); - - /** - * @param workerGroupName workerGroupName - * @param states states array - * @return workflow instance list - */ - List queryByWorkerGroupNameAndStatus(@Param("workerGroupName") String workerGroupName, - @Param("states") int[] states); - /** * workflow instance page * @param page page @@ -129,16 +76,6 @@ IPage queryWorkflowInstanceListPaging(Page p @Param("startTime") Date startTime, @Param("endTime") Date endTime); - /** - * set failover by host and state array - * - * @param host host - * @param stateArray stateArray - * @return set result - */ - int setFailoverByHostAndStateArray(@Param("host") String host, - @Param("states") int[] stateArray); - /** * Update the workflow instance state from originState to destState */ @@ -182,16 +119,6 @@ List countWorkflowInstanceStateByProjectCodes( @Param("endTime") Date endTime, @Param("projectCodes") Collection projectCodes); - /** - * query workflow instance by workflowDefinitionCode - * - * @param workflowDefinitionCode workflowDefinitionCode - * @param size size - * @return workflow instance list - */ - List queryByWorkflowDefinitionCode(@Param("workflowDefinitionCode") Long workflowDefinitionCode, - @Param("size") int size); - /** * query last scheduler workflow instance * @@ -224,22 +151,6 @@ WorkflowInstance queryLastManualWorkflow(@Param("workflowDefinitionCode") Long w @Param("endTime") Date endTime, @Param("testFlag") int testFlag); - /** - * query first schedule workflow instance - * - * @param workflowDefinitionCode workflowDefinitionCode - * @return workflow instance - */ - WorkflowInstance queryFirstScheduleWorkflowInstance(@Param("workflowDefinitionCode") Long workflowDefinitionCode); - - /** - * query first manual workflow instance - * - * @param workflowDefinitionCode workflowDefinitionCode - * @return workflow instance - */ - WorkflowInstance queryFirstStartWorkflowInstance(@Param("workflowDefinitionCode") Long workflowDefinitionCode); - /** * query top n workflow instance order by running duration * @@ -257,21 +168,6 @@ List queryTopNWorkflowInstance(@Param("size") int size, @Param("status") WorkflowExecutionStatus status, @Param("projectCode") long projectCode); - /** - * query workflow instance by workflowDefinitionCode and stateArray - * - * @param workflowDefinitionCode workflowDefinitionCode - * @param states states array - * @return workflow instance list - */ - - List queryByWorkflowDefinitionCodeAndStatus(@Param("workflowDefinitionCode") Long workflowDefinitionCode, - @Param("states") int[] states); - - List queryByWorkflowCodeVersionStatus(@Param("workflowDefinitionCode") long workflowDefinitionCode, - @Param("workflowDefinitionVersion") int workflowDefinitionVersion, - @Param("states") int[] states); - /** * Filter workflow instance * @@ -313,11 +209,4 @@ List countInstanceStateV2( @Param("model") Integer model, @Param("projectIds") Set projectIds); - /** - * query process list by triggerCode - * - * @param triggerCode - * @return - */ - List queryByTriggerCode(@Param("triggerCode") Long triggerCode); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java index 664b56ee472c..a0f385013118 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java @@ -24,11 +24,14 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.function.Function; import lombok.NonNull; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; public abstract class BaseDao> implements IDao { @@ -62,11 +65,33 @@ public List queryAll() { } @Override - public List queryByCondition(ENTITY queryCondition) { + public List queryByCondition(Function, LambdaQueryWrapper> queryCondition) { if (queryCondition == null) { throw new IllegalArgumentException("queryCondition can not be null"); } - return mybatisMapper.selectList(new QueryWrapper<>(queryCondition)); + LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); + queryCondition.apply(wrapper); + return mybatisMapper.selectList(wrapper); + } + + @Override + public Optional queryOneByCondition(Function, LambdaQueryWrapper> queryCondition) { + return queryByCondition(queryCondition).stream().findFirst(); + } + + @Override + public List queryByCondition(Function, LambdaQueryWrapper> queryCondition, + int limit) { + if (queryCondition == null) { + throw new IllegalArgumentException("queryCondition can not be null"); + } + if (limit <= 0) { + return Collections.emptyList(); + } + LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); + queryCondition.apply(wrapper); + Page entityPage = mybatisMapper.selectPage(new Page<>(1, limit), wrapper); + return entityPage.getRecords(); } @Override diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java index ab774196003f..1d3b7a716dc0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java @@ -21,9 +21,12 @@ import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.function.Function; import lombok.NonNull; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; + public interface IDao { /** @@ -49,7 +52,18 @@ public interface IDao { /** * Query the entity by condition. */ - List queryByCondition(Entity queryCondition); + List queryByCondition(Function, LambdaQueryWrapper> queryCondition); + + /** + * Query the entity by condition. + */ + Optional queryOneByCondition(Function, LambdaQueryWrapper> queryCondition); + + /** + * Query the entity by condition. + */ + List queryByCondition(Function, LambdaQueryWrapper> queryCondition, + int limit); /** * Insert the entity. diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java index 5a2fb4e147ef..5582c4e09336 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java @@ -26,7 +26,8 @@ public interface WorkflowInstanceDao extends IDao { /** - * insert or update work workflow instance to database + * Insert or update work workflow instance to database. + *

If the workflow instance id is null, it will be inserted, otherwise it will be updated. * * @param workflowInstance workflowInstance */ @@ -39,19 +40,12 @@ void updateWorkflowInstanceState(Integer workflowInstanceId, WorkflowExecutionStatus originState, WorkflowExecutionStatus targetState); - /** - * performs an "upsert" operation (update or insert) on a WorkflowInstance object within a new transaction - * - * @param workflowInstance workflowInstance - */ - void performTransactionalUpsert(WorkflowInstance workflowInstance); - /** * find last scheduler workflow instance in the date interval * * @param workflowDefinitionCode definitionCode - * @param taskDefinitionCode definitionCode - * @param dateInterval dateInterval + * @param taskDefinitionCode definitionCode + * @param dateInterval dateInterval * @return workflow instance */ WorkflowInstance queryLastSchedulerWorkflowInterval(Long workflowDefinitionCode, Long taskDefinitionCode, @@ -86,11 +80,10 @@ WorkflowInstance queryLastManualWorkflowInterval(Long definitionCode, Long taskC WorkflowInstance querySubWorkflowInstanceByParentId(Integer workflowInstanceId, Integer taskInstanceId); - List queryByWorkflowCodeVersionStatus(Long workflowDefinitionCode, - int workflowDefinitionVersion, - int[] states); - - List queryNeedFailoverMasters(); + /** + * Find the host which contains workflow instances need failover. + */ + List listHostsNeedingFailover(); /** * Query the workflow instances under the master that need to be failover. diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java index 996678c5813f..abab3b625be2 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.dao.repository.impl; +import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.dao.entity.WorkflowInstanceRelation; @@ -26,16 +27,15 @@ import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; -import org.springframework.transaction.annotation.Isolation; -import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; @Slf4j @Repository @@ -60,7 +60,8 @@ public void upsertWorkflowInstance(@NonNull WorkflowInstance workflowInstance) { } @Override - public void updateWorkflowInstanceState(Integer workflowInstanceId, WorkflowExecutionStatus originalStatus, + public void updateWorkflowInstanceState(Integer workflowInstanceId, + WorkflowExecutionStatus originalStatus, WorkflowExecutionStatus targetStatus) { int update = mybatisMapper.updateWorkflowInstanceState(workflowInstanceId, originalStatus, targetStatus); if (update != 1) { @@ -75,18 +76,12 @@ public void updateWorkflowInstanceState(Integer workflowInstanceId, WorkflowExec } } - @Override - @Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class) - public void performTransactionalUpsert(WorkflowInstance workflowInstance) { - this.upsertWorkflowInstance(workflowInstance); - } - /** * find last scheduler process instance in the date interval * * @param workflowDefinitionCode definitionCode - * @param taskDefinitionCode definitionCode - * @param dateInterval dateInterval + * @param taskDefinitionCode definitionCode + * @param dateInterval dateInterval * @return process instance */ @Override @@ -128,7 +123,10 @@ public WorkflowInstance queryLastManualWorkflowInterval(Long definitionCode, Lon */ @Override public WorkflowInstance queryFirstScheduleWorkflowInstance(Long definitionCode) { - return mybatisMapper.queryFirstScheduleWorkflowInstance(definitionCode); + return queryOneByCondition(queryWrapper -> queryWrapper + .eq(WorkflowInstance::getWorkflowDefinitionCode, definitionCode) + .isNotNull(WorkflowInstance::getScheduleTime) + .orderByDesc(WorkflowInstance::getScheduleTime)).orElse(null); } /** @@ -139,7 +137,10 @@ public WorkflowInstance queryFirstScheduleWorkflowInstance(Long definitionCode) */ @Override public WorkflowInstance queryFirstStartWorkflowInstance(Long definitionCode) { - return mybatisMapper.queryFirstStartWorkflowInstance(definitionCode); + return queryOneByCondition(queryWrapper -> queryWrapper + .eq(WorkflowInstance::getWorkflowDefinitionCode, definitionCode) + .isNotNull(WorkflowInstance::getStartTime) + .orderByDesc(WorkflowInstance::getStartTime)).orElse(null); } @Override @@ -155,22 +156,24 @@ public WorkflowInstance querySubWorkflowInstanceByParentId(Integer workflowInsta } @Override - public List queryByWorkflowCodeVersionStatus(Long workflowDefinitionCode, - int workflowDefinitionVersion, - int[] states) { - return mybatisMapper.queryByWorkflowCodeVersionStatus(workflowDefinitionCode, workflowDefinitionVersion, - states); - } - - @Override - public List queryNeedFailoverMasters() { - return mybatisMapper - .queryNeedFailoverWorkflowInstanceHost(WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState()); + public List listHostsNeedingFailover() { + return queryByCondition(queryWrapper -> queryWrapper + .in(WorkflowInstance::getState, + Arrays.stream(WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState()).boxed() + .collect(Collectors.toList()))) + .stream() + .map(WorkflowInstance::getHost) + .distinct() + .collect(Collectors.toList()); } @Override public List queryNeedFailoverWorkflowInstances(String masterAddress) { - return mybatisMapper.queryMainWorkflowByHostAndStatus(masterAddress, - WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState()); + return queryByCondition(queryWrapper -> queryWrapper + .eq(WorkflowInstance::getHost, masterAddress) + .eq(WorkflowInstance::getIsSubWorkflow, Flag.NO.getCode()) + .in(WorkflowInstance::getState, + Arrays.stream(WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState()).boxed() + .collect(Collectors.toList()))); } } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml index 83fba417697c..3dded9854f52 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml @@ -28,55 +28,6 @@ dry_run, test_flag, next_workflow_instance_id, restart_time, state_history - - - - - - - - - - - update t_ds_workflow_instance - set host=null - where host =#{host} - - and state in - - #{i} - - - + update t_ds_workflow_instance set state = #{targetState} @@ -205,13 +112,6 @@ group by state - - - - - - - - - - - diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapperTest.java index ad09d6e858f1..52b94ecfcd51 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapperTest.java @@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.dao.model.WorkflowInstanceStatusCountDto; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import java.util.Date; import java.util.List; @@ -127,31 +126,11 @@ public void testQueryDetailById() { WorkflowInstance workflowInstance = insertOne(); workflowInstanceMapper.updateById(workflowInstance); - WorkflowInstance workflowInstance1 = workflowInstanceMapper.queryDetailById(workflowInstance.getId()); + WorkflowInstance workflowInstance1 = workflowInstanceMapper.selectById(workflowInstance.getId()); Assertions.assertNotNull(workflowInstance1); workflowInstanceMapper.deleteById(workflowInstance.getId()); } - /** - * test query by host and states - */ - @Test - public void testQueryByHostAndStates() { - WorkflowInstance workflowInstance = insertOne(); - workflowInstance.setHost("192.168.2.155"); - workflowInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); - workflowInstanceMapper.updateById(workflowInstance); - - int[] stateArray = new int[]{ - TaskExecutionStatus.RUNNING_EXECUTION.getCode(), - TaskExecutionStatus.SUCCESS.getCode()}; - - List workflowInstances = workflowInstanceMapper.queryByHostAndStatus(null, stateArray); - - workflowInstanceMapper.deleteById(workflowInstance.getId()); - Assertions.assertNotEquals(0, workflowInstances.size()); - } - /** * test query process instance page */ @@ -235,26 +214,6 @@ public void testCountInstanceStateByUser() { workflowInstanceMapper.deleteById(workflowInstance.getId()); } - /** - * test query process instance by process definition id - */ - @Test - public void testQueryByProcessDefineId() { - WorkflowInstance workflowInstance = insertOne(); - WorkflowInstance workflowInstance1 = insertOne(); - - List workflowInstances = - workflowInstanceMapper.queryByWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode(), 1); - Assertions.assertEquals(1, workflowInstances.size()); - - workflowInstances = - workflowInstanceMapper.queryByWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode(), 2); - Assertions.assertEquals(2, workflowInstances.size()); - - workflowInstanceMapper.deleteById(workflowInstance.getId()); - workflowInstanceMapper.deleteById(workflowInstance1.getId()); - } - /** * test query last schedule process instance */ diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java index 520a9b907e78..8d9f16e900fc 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java @@ -17,11 +17,8 @@ package org.apache.dolphinscheduler.dao.repository.impl; -import static org.apache.commons.collections4.CollectionUtils.isEmpty; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.BaseDaoTest; @@ -37,37 +34,6 @@ class WorkflowInstanceDaoImplTest extends BaseDaoTest { @Autowired private WorkflowInstanceDao workflowInstanceDao; - @Test - void queryByWorkflowCodeVersionStatus_EMPTY_INSTANCE() { - long workflowDefinitionCode = 1L; - int workflowDefinitionVersion = 1; - int[] status = WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState(); - - assertTrue(isEmpty(workflowInstanceDao.queryByWorkflowCodeVersionStatus(workflowDefinitionCode, - workflowDefinitionVersion, status))); - } - - @Test - void queryByWorkflowCodeVersionStatus_EXIST_NOT_FINISH_INSTANCE() { - long workflowDefinitionCode = 1L; - int workflowDefinitionVersion = 1; - int[] status = WorkflowExecutionStatus.getNotTerminalStatus(); - - assertTrue(isEmpty(workflowInstanceDao.queryByWorkflowCodeVersionStatus(workflowDefinitionCode, - workflowDefinitionVersion, status))); - - workflowInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion, - WorkflowExecutionStatus.RUNNING_EXECUTION)); - workflowInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion, - WorkflowExecutionStatus.READY_PAUSE)); - workflowInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion, - WorkflowExecutionStatus.READY_STOP)); - workflowInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion, - WorkflowExecutionStatus.SERIAL_WAIT)); - assertEquals(4, workflowInstanceDao - .queryByWorkflowCodeVersionStatus(workflowDefinitionCode, workflowDefinitionVersion, status).size()); - } - @Test void updateWorkflowInstanceState_success() { WorkflowInstance workflowInstance = createWorkflowInstance( @@ -97,24 +63,6 @@ void updateWorkflowInstanceState_failed() { unsupportedOperationException.getMessage()); } - @Test - void queryByWorkflowCodeVersionStatus_EXIST_FINISH_INSTANCE() { - long workflowDefinitionCode = 1L; - int workflowDefinitionVersion = 1; - int[] status = WorkflowExecutionStatus.getNotTerminalStatus(); - - workflowInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion, - WorkflowExecutionStatus.PAUSE)); - workflowInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion, - WorkflowExecutionStatus.STOP)); - workflowInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion, - WorkflowExecutionStatus.FAILURE)); - workflowInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion, - WorkflowExecutionStatus.SUCCESS)); - assertTrue(isEmpty(workflowInstanceDao.queryByWorkflowCodeVersionStatus(workflowDefinitionCode, - workflowDefinitionVersion, status))); - } - private WorkflowInstance createWorkflowInstance(Long workflowDefinitionCode, int workflowDefinitionVersion, WorkflowExecutionStatus status) { WorkflowInstance workflowInstance = new WorkflowInstance(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java index 30a9ae773ba0..0c2fc9d260fe 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java @@ -72,7 +72,7 @@ public void globalMasterFailover(final GlobalMasterFailoverEvent globalMasterFai final StopWatch failoverTimeCost = StopWatch.createStarted(); log.info("Global master failover starting"); final List masterAddressWhichContainsUnFinishedWorkflow = - workflowInstanceDao.queryNeedFailoverMasters(); + workflowInstanceDao.listHostsNeedingFailover(); for (final String masterAddress : masterAddressWhichContainsUnFinishedWorkflow) { final Optional aliveMasterOptional = clusterManager.getMasterClusters().getServer(masterAddress); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 3d0b317b4119..b63af7f49277 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -41,21 +41,16 @@ import java.util.List; import java.util.Map; -import java.util.Optional; public interface ProcessService { WorkflowInstance constructWorkflowInstance(Command command, String host) throws CronParseException, CodeGenerateUtils.CodeGenerateException; - Optional findWorkflowInstanceDetailById(int workflowInstanceId); - WorkflowInstance findWorkflowInstanceById(int workflowInstanceId); WorkflowDefinition findWorkflowDefinition(Long workflowDefinitionCode, int workflowDefinitionVersion); - WorkflowDefinition findWorkflowDefinitionByCode(Long workflowDefinitionCode); - int deleteWorkflowInstanceById(int workflowInstanceId); int deleteAllSubWorkflowByParentId(int workflowInstanceId); @@ -74,8 +69,6 @@ WorkflowInstance constructWorkflowInstance(Command command, void changeOutParam(TaskInstance taskInstance); - Schedule querySchedule(int id); - List queryReleaseSchedulerListByWorkflowDefinitionCode(long workflowDefinitionCode); DataSource findDataSourceById(int id); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 47dcb676f66d..95523b52e4d0 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -59,7 +59,6 @@ import org.apache.dolphinscheduler.dao.entity.WorkflowTaskRelation; import org.apache.dolphinscheduler.dao.entity.WorkflowTaskRelationLog; import org.apache.dolphinscheduler.dao.mapper.ClusterMapper; -import org.apache.dolphinscheduler.dao.mapper.CommandMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; @@ -77,7 +76,6 @@ import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; -import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao; import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils; import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.extract.base.client.Clients; @@ -89,7 +87,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.SubWorkflowParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter; import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils; -import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.cron.CronUtils; import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.expand.CuringParamsService; @@ -161,18 +158,12 @@ public class ProcessServiceImpl implements ProcessService { @Autowired private TaskDefinitionLogDao taskDefinitionLogDao; - @Autowired - private WorkflowInstanceMapDao workflowInstanceMapDao; - @Autowired private DataSourceMapper dataSourceMapper; @Autowired private WorkflowInstanceRelationMapper workflowInstanceRelationMapper; - @Autowired - private CommandMapper commandMapper; - @Autowired private ScheduleMapper scheduleMapper; @@ -200,20 +191,6 @@ public class ProcessServiceImpl implements ProcessService { @Autowired private CuringParamsService curingGlobalParamsService; - @Autowired - private CommandService commandService; - - /** - * find workflow instance detail by id - * - * @param workflowInstanceId workflowInstanceId - * @return workflow instance - */ - @Override - public Optional findWorkflowInstanceDetailById(int workflowInstanceId) { - return Optional.ofNullable(workflowInstanceMapper.queryDetailById(workflowInstanceId)); - } - /** * find workflow instance by id * @@ -244,17 +221,6 @@ public WorkflowDefinition findWorkflowDefinition(Long workflowDefinitionCode, in return workflowDefinition; } - /** - * find workflow define by code. - * - * @param workflowDefinitionCode workflowDefinitionCode - * @return workflow definition - */ - @Override - public WorkflowDefinition findWorkflowDefinitionByCode(Long workflowDefinitionCode) { - return workflowDefinitionMapper.queryByCode(workflowDefinitionCode); - } - /** * delete work workflow instance by id * @@ -513,7 +479,7 @@ private Boolean checkCmdParam(Command command, Map cmdParam) { if (workflowInstanceId == 0) { workflowInstance = generateNewWorkflowInstance(workflowDefinition, command, cmdParam); } else { - workflowInstance = this.findWorkflowInstanceDetailById(workflowInstanceId).orElse(null); + workflowInstance = workflowInstanceDao.queryOptionalById(workflowInstanceId).orElse(null); setGlobalParamIfCommanded(workflowDefinition, cmdParam); if (workflowInstance == null) { return null; @@ -861,17 +827,6 @@ private List convertIntListToString(List intList) { return result; } - /** - * query schedule by id - * - * @param id id - * @return schedule - */ - @Override - public Schedule querySchedule(int id) { - return scheduleMapper.selectById(id); - } - /** * query Schedule by workflowDefinitionCode * @@ -1377,7 +1332,8 @@ public String findConfigYamlByName(String clusterName) { @Override public void forceWorkflowInstanceSuccessByTaskInstanceId(TaskInstance task) { - WorkflowInstance workflowInstance = findWorkflowInstanceDetailById(task.getWorkflowInstanceId()).orElse(null); + WorkflowInstance workflowInstance = + workflowInstanceDao.queryOptionalById(task.getWorkflowInstanceId()).orElse(null); if (workflowInstance != null && (workflowInstance.getState().isFailure() || workflowInstance.getState().isStop())) { List validTaskList = From 97f374a56171bc73d28c8eecc05fdea17e415efe Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Fri, 9 May 2025 21:11:49 +0800 Subject: [PATCH 2/2] Update dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../api/service/impl/WorkflowInstanceServiceImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java index 796cd10c896c..f55a37579a95 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java @@ -1037,7 +1037,7 @@ public List queryByWorkflowCodeVersionStatus(Long workflowDefi return workflowInstanceDao.queryByCondition( queryWrapper -> queryWrapper.eq(WorkflowInstance::getWorkflowDefinitionCode, workflowDefinitionCode) .eq(WorkflowInstance::getWorkflowDefinitionVersion, workflowDefinitionVersion) - .in(WorkflowInstance::getState, states)); + .in(WorkflowInstance::getState, Arrays.stream(states).boxed().collect(Collectors.toList()))); } /**