Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected String getObjectNameFromIdentity(Object identity) {
return "";
}

WorkflowInstance obj = workflowInstanceMapper.queryDetailById(objId);
WorkflowInstance obj = workflowInstanceMapper.selectById(objId);
return obj == null ? "" : obj.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.dolphinscheduler.api.controller;

import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKFLOW_INSTANCE_LIST_PAGING_ERROR;

import org.apache.dolphinscheduler.api.audit.OperatorLog;
import org.apache.dolphinscheduler.api.audit.enums.AuditType;
import org.apache.dolphinscheduler.api.dto.DynamicSubWorkflowDto;
Expand Down Expand Up @@ -422,22 +420,4 @@ public Result batchDeleteWorkflowInstanceByIds(@RequestAttribute(value = Constan
return returnDataList(result);
}

// Todo: This is unstable, in some case the command trigger failed, we cannot get workflow instance
// And it's a bad design to use trigger code to get workflow instance why not directly get by workflow instanceId or
// inject the trigger id into workflow instance?
@Deprecated
@Operation(summary = "queryWorkflowInstanceListByTrigger", description = "QUERY_WORKFLOW_INSTANCE_BY_TRIGGER_NOTES")
@Parameters({
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true, schema = @Schema(implementation = Long.class)),
@Parameter(name = "triggerCode", description = "TRIGGER_CODE", required = true, schema = @Schema(implementation = Long.class))
})
@GetMapping("/trigger")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_WORKFLOW_INSTANCE_LIST_PAGING_ERROR)
public Result queryWorkflowInstancesByTriggerCode(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable long projectCode,
@RequestParam(value = "triggerCode") Long triggerCode) {
Map<String, Object> result = workflowInstanceService.queryByTriggerCode(loginUser, projectCode, triggerCode);
return returnDataList(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,6 @@ List<WorkflowInstance> queryByWorkflowCodeVersionStatus(Long workflowDefinitionC
List<WorkflowInstance> queryByWorkflowDefinitionCode(Long workflowDefinitionCode,
int size);

/**
* query workflow instance list bt trigger code
*
* @param loginUser
* @param projectCode
* @param triggerCode
* @return
*/
Map<String, Object> queryByTriggerCode(User loginUser, long projectCode, Long triggerCode);

void deleteWorkflowInstanceByWorkflowDefinitionCode(long workflowDefinitionCode);

void deleteWorkflowInstanceById(int workflowInstanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public WorkflowExecuteResponse executeTask(User loginUser,
projectService.checkProjectAndAuthThrowException(loginUser, projectCode,
ApiFuncIdentificationConstant.map.get(ExecuteType.EXECUTE_TASK));

WorkflowInstance workflowInstance = processService.findWorkflowInstanceDetailById(workflowInstanceId)
WorkflowInstance workflowInstance = workflowInstanceDao.queryOptionalById(workflowInstanceId)
.orElseThrow(() -> new ServiceException(Status.WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId));

if (!workflowInstance.getState().isFinished()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,21 @@
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -83,6 +86,8 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService

@Autowired(required = false)
private StorageOperator storageOperator;
@Autowired
private WorkflowInstanceDao workflowInstanceDao;

/**
* Check the tenant new object valid or not
Expand Down Expand Up @@ -129,10 +134,10 @@ && checkTenantExists(updateTenant.getTenantCode())) {
/**
* create tenant
*
* @param loginUser login user
* @param loginUser login user
* @param tenantCode tenant code
* @param queueId queue id
* @param desc description
* @param queueId queue id
* @param desc description
* @return create result code
*/
@Override
Expand Down Expand Up @@ -179,11 +184,11 @@ public PageInfo<Tenant> queryTenantList(User loginUser, String searchVal, Intege
/**
* updateWorkflowInstance tenant
*
* @param loginUser login user
* @param id tenant id
* @param loginUser login user
* @param id tenant id
* @param tenantCode tenant code
* @param queueId queue id
* @param desc description
* @param queueId queue id
* @param desc description
* @return update result code
* @throws Exception exception
*/
Expand Down Expand Up @@ -256,9 +261,11 @@ public void deleteTenantById(User loginUser, int id) throws Exception {
}

private List<WorkflowInstance> getWorkflowInstancesByTenant(Tenant tenant) {
return workflowInstanceMapper.queryByTenantCodeAndStatus(
tenant.getTenantCode(),
WorkflowExecutionStatus.getNotTerminalStatus());
return workflowInstanceDao.queryByCondition(
queryWrapper -> queryWrapper.eq(WorkflowInstance::getTenantCode, tenant.getTenantCode())
.in(WorkflowInstance::getState,
Arrays.stream(WorkflowExecutionStatus.getNotTerminalStatus()).boxed()
.collect(Collectors.toList())));
}

/**
Expand Down Expand Up @@ -324,9 +331,9 @@ public Map<String, Object> queryByTenantCode(String tenantCode) {
* ONLY for python gateway server, and should not use this in web ui function
*
* @param tenantCode tenant code
* @param desc The description of tenant object
* @param queue The value of queue which current tenant belong
* @param queueName The name of queue which current tenant belong
* @param desc The description of tenant object
* @param queue The value of queue which current tenant belong
* @param queueName The name of queue which current tenant belong
* @return Tenant object
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.IMasterContainerService;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
Expand All @@ -56,6 +56,7 @@
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand All @@ -81,9 +82,6 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Autowired
private WorkerGroupDao workerGroupDao;

@Autowired
private WorkflowInstanceMapper workflowInstanceMapper;

@Autowired
private RegistryClient registryClient;

Expand All @@ -98,6 +96,8 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro

@Autowired
private WorkflowDefinitionMapper workflowDefinitionMapper;
@Autowired
private WorkflowInstanceDao workflowInstanceDao;

/**
* create or update a worker group
Expand Down Expand Up @@ -344,9 +344,9 @@ public Map<String, Object> deleteWorkerGroupById(User loginUser, Integer id) {
putMsg(result, Status.DELETE_WORKER_GROUP_NOT_EXIST);
return result;
}
List<WorkflowInstance> workflowInstances = workflowInstanceMapper.queryByWorkerGroupNameAndStatus(
workerGroup.getName(),
WorkflowExecutionStatus.getNotTerminalStatus());
List<WorkflowInstance> workflowInstances = workflowInstanceDao.queryByCondition(queryWrapper -> queryWrapper
.eq(WorkflowInstance::getWorkerGroup, workerGroup.getName()).in(WorkflowInstance::getState, Arrays
.stream(WorkflowExecutionStatus.getNotTerminalStatus()).boxed().collect(Collectors.toList())));
if (CollectionUtils.isNotEmpty(workflowInstances)) {
List<Integer> workflowInstanceIds =
workflowInstances.stream().map(WorkflowInstance::getId).collect(Collectors.toList());
Expand Down
Loading
Loading