Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class TaskRun {
private Long id;
private Long jobRunId;
private Long taskId;
private Integer executionOrder;
private String status; // PENDING, RUNNING, SUCCESS, FAILED
private String resultMessage; // 실행 결과 메시지
private LocalDateTime startedAt;
Expand All @@ -27,8 +28,9 @@ private TaskRun(Long jobRunId, Long taskId) {
}

/** Task 실행 시작을 위한 정적 팩토리 메서드 */
public static TaskRun start(Long jobRunId, Long taskId) {
public static TaskRun start(Long jobRunId, Long taskId, Integer executionOrder) {
TaskRun taskRun = new TaskRun(jobRunId, taskId);
taskRun.executionOrder = executionOrder;
taskRun.status = "RUNNING";
taskRun.startedAt = LocalDateTime.now();
return taskRun;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class TaskDto {
private Long id;
private String name;
private String type;
private Integer executionOrder;
private JsonNode parameters;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import lombok.Getter;
import lombok.NoArgsConstructor;

import site.icebang.domain.workflow.dto.TaskDto;

@Getter
@NoArgsConstructor // MyBatis가 객체를 생성하기 위해 필요
public class Task {
Expand All @@ -17,4 +19,11 @@ public class Task {

/** Task 실행에 필요한 파라미터 (JSON) 예: {"url": "http://...", "method": "POST", "body": {...}} */
private JsonNode parameters;

public Task(TaskDto taskDto) {
this.id = taskDto.getId();
this.name = taskDto.getName();
this.type = taskDto.getType();
this.parameters = taskDto.getParameters();
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package site.icebang.domain.workflow.service;

import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -101,58 +101,82 @@ public void executeWorkflow(Long workflowId) {

private boolean executeTasksForJob(JobRun jobRun, Map<String, JsonNode> workflowContext) {
List<TaskDto> taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId());
List<Task> tasks = taskDtos.stream().map(this::convertToTask).collect(Collectors.toList());

workflowLogger.info("Job (JobRunId={}) 내 총 {}개의 Task를 실행합니다.", jobRun.getId(), tasks.size());
// execution_order null 처리 및 중복 처리
taskDtos.sort(
Comparator.comparing(
TaskDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(TaskDto::getId));

for (Task task : tasks) {
TaskRun taskRun = TaskRun.start(jobRun.getId(), task.getId());
workflowLogger.info(
"Job (JobRunId={}) 내 총 {}개의 Task를 execution_order 순으로 실행합니다.",
jobRun.getId(),
taskDtos.size());

for (TaskDto taskDto : taskDtos) {
TaskRun taskRun = TaskRun.start(jobRun.getId(), taskDto.getId(), taskDto.getExecutionOrder());
taskRunMapper.insert(taskRun);

// Task 컨텍스트로 전환
mdcManager.setTaskContext(taskRun.getId());
workflowLogger.info("Task 실행 시작: TaskId={}, TaskRunId={}", task.getId(), taskRun.getId());
workflowLogger.info(
"Task 실행 시작: TaskId={}, ExecutionOrder={}, TaskName={}, TaskRunId={}",
taskDto.getId(),
taskDto.getExecutionOrder(),
taskDto.getName(),
taskRun.getId());

String runnerBeanName = task.getType().toLowerCase() + "TaskRunner";
String runnerBeanName = taskDto.getType().toLowerCase() + "TaskRunner";
TaskRunner runner = taskRunners.get(runnerBeanName);

if (runner == null) {
taskRun.finish("FAILED", "지원하지 않는 Task 타입: " + task.getType());
taskRun.finish("FAILED", "지원하지 않는 Task 타입: " + taskDto.getType());
taskRunMapper.update(taskRun);
workflowLogger.error("Task 실행 실패 (미지원 타입): Type={}", task.getType());
workflowLogger.error(
"Task 실행 실패 (미지원 타입): Type={}, ExecutionOrder={}",
taskDto.getType(),
taskDto.getExecutionOrder());
mdcManager.setJobContext(jobRun.getId()); // Job 컨텍스트로 복원
return false;
}

// TaskDto에서 직접 Task 생성 (불필요한 변환 제거)
ObjectNode requestBody =
bodyBuilders.stream()
.filter(builder -> builder.supports(task.getName()))
.filter(builder -> builder.supports(taskDto.getName()))
.findFirst()
.map(builder -> builder.build(task, workflowContext))
.map(builder -> builder.build(createTaskFromDto(taskDto), workflowContext))
.orElse(objectMapper.createObjectNode());

TaskRunner.TaskExecutionResult result = runner.execute(task, taskRun, requestBody);
TaskRunner.TaskExecutionResult result =
runner.execute(createTaskFromDto(taskDto), taskRun, requestBody);
taskRun.finish(result.status(), result.message());
taskRunMapper.update(taskRun);

if (result.isFailure()) {
workflowLogger.error("Task 실행 실패: Message={}", result.message());
workflowLogger.error(
"Task 실행 실패: ExecutionOrder={}, Message={}",
taskDto.getExecutionOrder(),
result.message());
mdcManager.setJobContext(jobRun.getId()); // Job 컨텍스트로 복원
return false;
}

try {
JsonNode resultJson = objectMapper.readTree(result.message());
workflowContext.put(task.getName(), resultJson);
workflowContext.put(taskDto.getName(), resultJson);
} catch (JsonProcessingException e) {
workflowLogger.error("Task 결과 JSON 파싱 실패");
workflowLogger.error("Task 결과 JSON 파싱 실패: ExecutionOrder={}", taskDto.getExecutionOrder());
taskRun.finish("FAILED", "결과 JSON 파싱 실패");
taskRunMapper.update(taskRun);
mdcManager.setJobContext(jobRun.getId()); // Job 컨텍스트로 복원
return false;
}

workflowLogger.info("Task 실행 성공: TaskRunId={}", taskRun.getId());
workflowLogger.info(
"Task 실행 성공: ExecutionOrder={}, TaskRunId={}",
taskDto.getExecutionOrder(),
taskRun.getId());

// 다시 Job 컨텍스트로 복원
mdcManager.setJobContext(jobRun.getId());
Expand All @@ -161,28 +185,7 @@ private boolean executeTasksForJob(JobRun jobRun, Map<String, JsonNode> workflow
}

/** TaskDto를 Task 모델로 변환합니다. 📌 주의: Reflection을 사용한 방식은 성능이 느리고 불안정하므로 권장되지 않습니다. */
private Task convertToTask(TaskDto taskDto) {
Task task = new Task();
try {
java.lang.reflect.Field idField = Task.class.getDeclaredField("id");
idField.setAccessible(true);
idField.set(task, taskDto.getId());

java.lang.reflect.Field nameField = Task.class.getDeclaredField("name");
nameField.setAccessible(true);
nameField.set(task, taskDto.getName());

java.lang.reflect.Field typeField = Task.class.getDeclaredField("type");
typeField.setAccessible(true);
typeField.set(task, taskDto.getType());

java.lang.reflect.Field parametersField = Task.class.getDeclaredField("parameters");
parametersField.setAccessible(true);
parametersField.set(task, taskDto.getParameters());

} catch (Exception e) {
throw new RuntimeException("TaskDto to Task 변환 중 오류 발생", e);
}
return task;
private Task createTaskFromDto(TaskDto taskDto) {
return new Task(taskDto); // 생성자 사용
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<id property="id" column="id"/>
<result property="name" column="name"/>
<result property="type" column="type"/>
<result property="executionOrder" column="execution_order"/>
<result property="parameters" column="parameters" javaType="com.fasterxml.jackson.databind.JsonNode" jdbcType="VARCHAR" typeHandler="site.icebang.global.config.mybatis.typehandler.JsonNodeTypeHandler"/>
<result property="createdAt" column="created_at"/>
<result property="updatedAt" column="updated_at"/>
Expand All @@ -36,7 +37,7 @@
</select>

<select id="findTasksByJobId" resultMap="TaskDtoResultMap">
SELECT t.* FROM task t
SELECT t.*, jt.execution_order FROM task t
JOIN job_task jt ON t.id = jt.task_id
WHERE jt.job_id = #{jobId}
ORDER BY jt.execution_order ASC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

<mapper namespace="site.icebang.domain.execution.mapper.TaskRunMapper">
<insert id="insert" useGeneratedKeys="true" keyProperty="id">
INSERT INTO task_run (job_run_id, task_id, status, started_at, created_at)
VALUES (#{jobRunId}, #{taskId}, #{status}, #{startedAt}, #{createdAt})
INSERT INTO task_run (job_run_id, task_id, execution_order, status, started_at, created_at)
VALUES (#{jobRunId}, #{taskId}, #{executionOrder}, #{status}, #{startedAt}, #{createdAt})
</insert>

<update id="update">
Expand Down
Loading