diff --git a/apps/pre-processing-service/Dockerfile b/apps/pre-processing-service/Dockerfile index 2a9d9736..ebc6e465 100644 --- a/apps/pre-processing-service/Dockerfile +++ b/apps/pre-processing-service/Dockerfile @@ -38,21 +38,12 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates \ && rm -rf /var/lib/apt/lists/* -# Chrome 설치 -RUN wget -q -O - https://dl.google.com/linux/linux_signing_key.pub | apt-key add - && \ - echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" > /etc/apt/sources.list.d/google-chrome.list && \ - apt-get update && \ - apt-get install -y --no-install-recommends google-chrome-stable && \ - rm -rf /var/lib/apt/lists/* - -# ChromeDriver 설치 -RUN LATEST_VERSION=$(curl -s "https://googlechromelabs.github.io/chrome-for-testing/LATEST_RELEASE_STABLE") && \ - wget -O /tmp/chromedriver-linux64.zip "https://storage.googleapis.com/chrome-for-testing-public/${LATEST_VERSION}/linux64/chromedriver-linux64.zip" && \ - unzip /tmp/chromedriver-linux64.zip -d /tmp/ && \ - mv /tmp/chromedriver-linux64/chromedriver /usr/local/bin/chromedriver && \ - chmod +x /usr/local/bin/chromedriver && \ - rm -rf /tmp/* && \ - apt-get clean +# Chrome 설치 (블로그 방식 - 직접 .deb 파일 다운로드) +RUN wget -q https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb \ + && apt-get update \ + && apt-get install -y ./google-chrome-stable_current_amd64.deb \ + && rm ./google-chrome-stable_current_amd64.deb \ + && rm -rf /var/lib/apt/lists/* # MeCab & 사전 설치 (형태소 분석 의존) RUN apt-get update && apt-get install -y --no-install-recommends \ diff --git a/apps/user-service/build.gradle b/apps/user-service/build.gradle index 4c5cb671..7d4d7e0a 100644 --- a/apps/user-service/build.gradle +++ b/apps/user-service/build.gradle @@ -47,6 +47,9 @@ dependencies { // Scheduler implementation 'org.springframework.boot:spring-boot-starter-quartz' + // Retry + implementation 'org.springframework.retry:spring-retry' + implementation 'org.springframework.boot:spring-boot-starter-log4j2' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' implementation 'org.apache.logging.log4j:log4j-layout-template-json' diff --git a/apps/user-service/src/main/java/site/icebang/UserServiceApplication.java b/apps/user-service/src/main/java/site/icebang/UserServiceApplication.java index 29e975ba..e068181f 100644 --- a/apps/user-service/src/main/java/site/icebang/UserServiceApplication.java +++ b/apps/user-service/src/main/java/site/icebang/UserServiceApplication.java @@ -3,7 +3,9 @@ import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.retry.annotation.EnableRetry; +@EnableRetry @SpringBootApplication @MapperScan("site.icebang.**.mapper") public class UserServiceApplication { diff --git a/apps/user-service/src/main/java/site/icebang/domain/schedule/service/QuartzScheduleService.java b/apps/user-service/src/main/java/site/icebang/domain/schedule/service/QuartzScheduleService.java index 172ba807..d8348e7e 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/schedule/service/QuartzScheduleService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/schedule/service/QuartzScheduleService.java @@ -11,14 +11,10 @@ @Service @RequiredArgsConstructor public class QuartzScheduleService { - private final Scheduler scheduler; public void addOrUpdateSchedule(Schedule schedule) { try { - // 기존 스케줄 삭제 (있다면) - deleteSchedule(schedule.getWorkflowId()); - JobKey jobKey = JobKey.jobKey("workflow-" + schedule.getWorkflowId()); JobDetail jobDetail = JobBuilder.newJob(WorkflowTriggerJob.class) .withIdentity(jobKey) @@ -34,23 +30,25 @@ public void addOrUpdateSchedule(Schedule schedule) { .withSchedule(CronScheduleBuilder.cronSchedule(schedule.getCronExpression())) .build(); + if (scheduler.checkExists(jobKey)) { + scheduler.deleteJob(jobKey); // 기존 Job 삭제 후 재생성 (업데이트) + } scheduler.scheduleJob(jobDetail, trigger); log.info("Quartz 스케줄 등록/업데이트 완료: Workflow ID {}", schedule.getWorkflowId()); } catch (SchedulerException e) { - log.error("Quartz 스케줄 등록 실패", e); + log.error("Quartz 스케줄 등록 실패: Workflow ID " + schedule.getWorkflowId(), e); } } public void deleteSchedule(Long workflowId) { try { - JobKey jobkey = JobKey.jobKey("workflow-" + workflowId); - TriggerKey triggerKey = TriggerKey.triggerKey("trigger-for-workflow-" + workflowId); - - scheduler.unscheduleJob(triggerKey); - scheduler.deleteJob(jobkey); - log.info("Quartz 스케줄 삭제 완료: Workflow ID {}", workflowId); + JobKey jobKey = JobKey.jobKey("workflow-" + workflowId); + if (scheduler.checkExists(jobKey)) { + scheduler.deleteJob(jobKey); + log.info("Quartz 스케줄 삭제 완료: Workflow ID {}", workflowId); + } } catch (SchedulerException e) { - log.error("Quartz 스케줄 삭제 실패: Workflow ID {}", workflowId, e); + log.error("Quartz 스케줄 삭제 실패: Workflow ID " + workflowId, e); } } } \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/JobDto.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/JobDto.java new file mode 100644 index 00000000..6dd40c5d --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/JobDto.java @@ -0,0 +1,19 @@ +package site.icebang.domain.workflow.dto; + +import java.time.LocalDateTime; + +import lombok.Data; + +@Data +public class JobDto { + private Long id; + private String name; + private String description; + private Boolean isEnabled; + private LocalDateTime createdAt; + private Long createdBy; + private LocalDateTime updatedAt; + private Long updatedBy; + + private Integer executionOrder; +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/TaskDto.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/TaskDto.java index 286e7e8c..569e93dc 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/TaskDto.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/TaskDto.java @@ -11,8 +11,9 @@ public class TaskDto { private Long id; private String name; private String type; - private Integer executionOrder; private JsonNode parameters; private LocalDateTime createdAt; private LocalDateTime updatedAt; + + private Integer executionOrder; } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobMapper.java index f3bb69fa..e03ac06d 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobMapper.java @@ -4,12 +4,12 @@ import org.apache.ibatis.annotations.Mapper; +import site.icebang.domain.workflow.dto.JobDto; import site.icebang.domain.workflow.dto.TaskDto; -import site.icebang.domain.workflow.model.Job; @Mapper public interface JobMapper { - List findJobsByWorkflowId(Long workflowId); + List findJobsByWorkflowId(Long workflowId); List findTasksByJobId(Long jobId); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/execution/mapper/JobRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java similarity index 60% rename from apps/user-service/src/main/java/site/icebang/domain/execution/mapper/JobRunMapper.java rename to apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java index d5ce7e8f..a3546069 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/execution/mapper/JobRunMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java @@ -1,8 +1,8 @@ -package site.icebang.domain.execution.mapper; +package site.icebang.domain.workflow.mapper; import org.apache.ibatis.annotations.Mapper; -import site.icebang.domain.execution.model.JobRun; +import site.icebang.domain.workflow.model.JobRun; @Mapper public interface JobRunMapper { diff --git a/apps/user-service/src/main/java/site/icebang/domain/execution/mapper/TaskRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java similarity index 61% rename from apps/user-service/src/main/java/site/icebang/domain/execution/mapper/TaskRunMapper.java rename to apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java index 646a7c91..e177dee6 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/execution/mapper/TaskRunMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java @@ -1,8 +1,8 @@ -package site.icebang.domain.execution.mapper; +package site.icebang.domain.workflow.mapper; import org.apache.ibatis.annotations.Mapper; -import site.icebang.domain.execution.model.TaskRun; +import site.icebang.domain.workflow.model.TaskRun; @Mapper public interface TaskRunMapper { diff --git a/apps/user-service/src/main/java/site/icebang/domain/execution/mapper/WorkflowRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/WorkflowRunMapper.java similarity index 63% rename from apps/user-service/src/main/java/site/icebang/domain/execution/mapper/WorkflowRunMapper.java rename to apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/WorkflowRunMapper.java index 776ec4b0..64bbcbc6 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/execution/mapper/WorkflowRunMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/WorkflowRunMapper.java @@ -1,8 +1,8 @@ -package site.icebang.domain.execution.mapper; +package site.icebang.domain.workflow.mapper; import org.apache.ibatis.annotations.Mapper; -import site.icebang.domain.execution.model.WorkflowRun; +import site.icebang.domain.workflow.model.WorkflowRun; @Mapper public interface WorkflowRunMapper { diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/model/Job.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/Job.java index 0a3604b5..f0d36d8b 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/model/Job.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/Job.java @@ -7,6 +7,8 @@ import lombok.Getter; import lombok.NoArgsConstructor; +import site.icebang.domain.workflow.dto.JobDto; + @Getter @NoArgsConstructor(access = AccessLevel.PROTECTED) @AllArgsConstructor @@ -19,4 +21,13 @@ public class Job { private Long createdBy; private LocalDateTime updatedAt; private Long updatedBy; + + public Job(JobDto dto) { + this.id = dto.getId(); + this.name = dto.getName(); + this.description = dto.getDescription(); + this.isEnabled = dto.getIsEnabled(); + this.createdAt = dto.getCreatedAt(); + this.updatedAt = dto.getUpdatedAt(); + } } diff --git a/apps/user-service/src/main/java/site/icebang/domain/execution/model/JobRun.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/JobRun.java similarity index 95% rename from apps/user-service/src/main/java/site/icebang/domain/execution/model/JobRun.java rename to apps/user-service/src/main/java/site/icebang/domain/workflow/model/JobRun.java index f5310f12..038890dc 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/execution/model/JobRun.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/JobRun.java @@ -1,4 +1,4 @@ -package site.icebang.domain.execution.model; +package site.icebang.domain.workflow.model; import java.time.LocalDateTime; diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/model/Task.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/Task.java index 5fed64f9..713e460f 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/model/Task.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/Task.java @@ -1,7 +1,10 @@ package site.icebang.domain.workflow.model; +import java.time.LocalDateTime; + import com.fasterxml.jackson.databind.JsonNode; +import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; @@ -9,6 +12,7 @@ @Getter @NoArgsConstructor // MyBatis가 객체를 생성하기 위해 필요 +@AllArgsConstructor public class Task { private Long id; @@ -20,6 +24,10 @@ public class Task { /** Task 실행에 필요한 파라미터 (JSON) 예: {"url": "http://...", "method": "POST", "body": {...}} */ private JsonNode parameters; + private LocalDateTime createdAt; + + private LocalDateTime updatedAt; + public Task(TaskDto taskDto) { this.id = taskDto.getId(); this.name = taskDto.getName(); diff --git a/apps/user-service/src/main/java/site/icebang/domain/execution/model/TaskRun.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/TaskRun.java similarity index 96% rename from apps/user-service/src/main/java/site/icebang/domain/execution/model/TaskRun.java rename to apps/user-service/src/main/java/site/icebang/domain/workflow/model/TaskRun.java index a58e0c9f..d49542f0 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/execution/model/TaskRun.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/TaskRun.java @@ -1,4 +1,4 @@ -package site.icebang.domain.execution.model; +package site.icebang.domain.workflow.model; import java.time.LocalDateTime; diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/model/Workflow.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/Workflow.java index 3ea80388..8b536003 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/model/Workflow.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/Workflow.java @@ -4,12 +4,10 @@ import lombok.AccessLevel; import lombok.AllArgsConstructor; -import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; @Getter -@Builder @NoArgsConstructor(access = AccessLevel.PROTECTED) @AllArgsConstructor public class Workflow { diff --git a/apps/user-service/src/main/java/site/icebang/domain/execution/model/WorkflowRun.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/WorkflowRun.java similarity index 95% rename from apps/user-service/src/main/java/site/icebang/domain/execution/model/WorkflowRun.java rename to apps/user-service/src/main/java/site/icebang/domain/workflow/model/WorkflowRun.java index 6bd5dbc9..011f7ee5 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/execution/model/WorkflowRun.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/WorkflowRun.java @@ -1,4 +1,4 @@ -package site.icebang.domain.execution.model; +package site.icebang.domain.workflow.model; import java.time.LocalDateTime; import java.util.UUID; diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/TaskRunner.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/TaskRunner.java index 9c6ab224..f8ad27c8 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/TaskRunner.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/TaskRunner.java @@ -2,8 +2,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; -import site.icebang.domain.execution.model.TaskRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.model.TaskRun; /** 워크플로우의 개별 Task를 실행하는 모든 Runner가 구현해야 할 인터페이스 */ public interface TaskRunner { diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/FastApiTaskRunner.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/FastApiTaskRunner.java index c7a62c97..136a9d93 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/FastApiTaskRunner.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/FastApiTaskRunner.java @@ -8,8 +8,8 @@ import lombok.RequiredArgsConstructor; -import site.icebang.domain.execution.model.TaskRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.model.TaskRun; import site.icebang.domain.workflow.runner.TaskRunner; import site.icebang.external.fastapi.adapter.FastApiAdapter; diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/TaskExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/TaskExecutionService.java new file mode 100644 index 00000000..80cf44a3 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/TaskExecutionService.java @@ -0,0 +1,55 @@ +package site.icebang.domain.workflow.service; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Recover; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Service; +import org.springframework.web.client.RestClientException; + +import com.fasterxml.jackson.databind.node.ObjectNode; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.model.TaskRun; +import site.icebang.domain.workflow.runner.TaskRunner; + +@Slf4j +@Service +@RequiredArgsConstructor +public class TaskExecutionService { // 📌 클래스 이름 변경 + private static final Logger workflowLogger = LoggerFactory.getLogger("WORKFLOW_HISTORY"); + private final Map taskRunners; + + /** RestClientException 발생 시, 5초 간격으로 최대 3번 재시도합니다. */ + @Retryable( + value = {RestClientException.class}, + maxAttempts = 3, + backoff = @Backoff(delay = 5000)) + public TaskRunner.TaskExecutionResult executeWithRetry( + Task task, TaskRun taskRun, ObjectNode requestBody) { + workflowLogger.info("Task 실행 시도: TaskId={}, TaskRunId={}", task.getId(), taskRun.getId()); + + String runnerBeanName = task.getType().toLowerCase() + "TaskRunner"; + TaskRunner runner = taskRunners.get(runnerBeanName); + + if (runner == null) { + throw new IllegalArgumentException("지원하지 않는 Task 타입: " + task.getType()); + } + + return runner.execute(task, taskRun, requestBody); + } + + /** 모든 재시도가 실패했을 때 마지막으로 호출될 복구 메소드입니다. */ + @Recover + public TaskRunner.TaskExecutionResult recover( + RestClientException e, Task task, TaskRun taskRun, ObjectNode requestBody) { + workflowLogger.error("최종 Task 실행 실패 (모든 재시도 소진): TaskRunId={}", taskRun.getId(), e); + return TaskRunner.TaskExecutionResult.failure("최대 재시도 횟수 초과: " + e.getMessage()); + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index c547dd2d..a27807ec 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -11,29 +11,27 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import site.icebang.domain.execution.mapper.JobRunMapper; -import site.icebang.domain.execution.mapper.TaskRunMapper; -import site.icebang.domain.execution.mapper.WorkflowRunMapper; -import site.icebang.domain.execution.model.JobRun; -import site.icebang.domain.execution.model.TaskRun; -import site.icebang.domain.execution.model.WorkflowRun; + +import site.icebang.domain.workflow.dto.JobDto; import site.icebang.domain.workflow.dto.TaskDto; import site.icebang.domain.workflow.manager.ExecutionMdcManager; import site.icebang.domain.workflow.mapper.JobMapper; +import site.icebang.domain.workflow.mapper.JobRunMapper; +import site.icebang.domain.workflow.mapper.TaskRunMapper; +import site.icebang.domain.workflow.mapper.WorkflowRunMapper; import site.icebang.domain.workflow.model.Job; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.model.TaskRun; +import site.icebang.domain.workflow.model.WorkflowRun; import site.icebang.domain.workflow.runner.TaskRunner; import site.icebang.domain.workflow.runner.fastapi.body.TaskBodyBuilder; -@Slf4j @Service @RequiredArgsConstructor public class WorkflowExecutionService { @@ -42,58 +40,62 @@ public class WorkflowExecutionService { private final WorkflowRunMapper workflowRunMapper; private final JobRunMapper jobRunMapper; private final TaskRunMapper taskRunMapper; - private final Map taskRunners; private final ObjectMapper objectMapper; private final List bodyBuilders; private final ExecutionMdcManager mdcManager; + private final TaskExecutionService taskExecutionService; @Transactional @Async("traceExecutor") public void executeWorkflow(Long workflowId) { mdcManager.setWorkflowContext(workflowId); - try { workflowLogger.info("========== 워크플로우 실행 시작: WorkflowId={} ==========", workflowId); - WorkflowRun workflowRun = WorkflowRun.start(workflowId); workflowRunMapper.insert(workflowRun); Map workflowContext = new HashMap<>(); - List jobs = jobMapper.findJobsByWorkflowId(workflowId); - workflowLogger.info("총 {}개의 Job을 순차적으로 실행합니다.", jobs.size()); - for (Job job : jobs) { + // 📌 Mapper로부터 JobDto 리스트를 조회합니다. + List jobDtos = jobMapper.findJobsByWorkflowId(workflowId); + // 📌 JobDto를 execution_order 기준으로 정렬합니다. + jobDtos.sort( + Comparator.comparing( + JobDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) + .thenComparing(JobDto::getId)); + + workflowLogger.info("총 {}개의 Job을 순차적으로 실행합니다.", jobDtos.size()); + boolean hasAnyJobFailed = false; + + // 📌 정렬된 JobDto 리스트를 순회합니다. + for (JobDto jobDto : jobDtos) { + // 📌 DTO로부터 Job 모델을 생성합니다. + Job job = new Job(jobDto); + + mdcManager.setJobContext(job.getId()); JobRun jobRun = JobRun.start(workflowRun.getId(), job.getId()); jobRunMapper.insert(jobRun); - - // Job 컨텍스트로 전환 - mdcManager.setJobContext(jobRun.getId()); workflowLogger.info( "---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); boolean jobSucceeded = executeTasksForJob(jobRun, workflowContext); - jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED"); jobRunMapper.update(jobRun); if (!jobSucceeded) { - workflowRun.finish("FAILED"); - workflowRunMapper.update(workflowRun); - workflowLogger.error("Job 실패로 인해 워크플로우 실행을 중단합니다: WorkflowRunId={}", workflowRun.getId()); - return; + workflowLogger.error("Job 실행 실패: JobRunId={}", jobRun.getId()); + hasAnyJobFailed = true; + } else { + workflowLogger.info("---------- Job 실행 성공: JobRunId={} ----------", jobRun.getId()); } - - workflowLogger.info("---------- Job 실행 성공: JobRunId={} ----------", jobRun.getId()); - - // 다시 워크플로우 컨텍스트로 복원 mdcManager.setWorkflowContext(workflowId); } - - workflowRun.finish("SUCCESS"); + workflowRun.finish(hasAnyJobFailed ? "FAILED" : "SUCCESS"); workflowRunMapper.update(workflowRun); workflowLogger.info( - "========== 워크플로우 실행 성공: WorkflowRunId={} ==========", workflowRun.getId()); - + "========== 워크플로우 실행 {} : WorkflowRunId={} ==========", + hasAnyJobFailed ? "실패" : "성공", + workflowRun.getId()); } finally { mdcManager.clearExecutionContext(); } @@ -101,91 +103,54 @@ public void executeWorkflow(Long workflowId) { private boolean executeTasksForJob(JobRun jobRun, Map workflowContext) { List taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId()); - - // execution_order null 처리 및 중복 처리 taskDtos.sort( Comparator.comparing( TaskDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) .thenComparing(TaskDto::getId)); workflowLogger.info( - "Job (JobRunId={}) 내 총 {}개의 Task를 execution_order 순으로 실행합니다.", - jobRun.getId(), - taskDtos.size()); + "Job (JobRunId={}) 내 총 {}개의 Task를 순차 실행합니다.", jobRun.getId(), taskDtos.size()); + boolean hasAnyTaskFailed = false; for (TaskDto taskDto : taskDtos) { - TaskRun taskRun = TaskRun.start(jobRun.getId(), taskDto.getId(), taskDto.getExecutionOrder()); - taskRunMapper.insert(taskRun); - - // Task 컨텍스트로 전환 - mdcManager.setTaskContext(taskRun.getId()); - workflowLogger.info( - "Task 실행 시작: TaskId={}, ExecutionOrder={}, TaskName={}, TaskRunId={}", - taskDto.getId(), - taskDto.getExecutionOrder(), - taskDto.getName(), - taskRun.getId()); - - String runnerBeanName = taskDto.getType().toLowerCase() + "TaskRunner"; - TaskRunner runner = taskRunners.get(runnerBeanName); - - if (runner == null) { - taskRun.finish("FAILED", "지원하지 않는 Task 타입: " + taskDto.getType()); + try { + TaskRun taskRun = + TaskRun.start(jobRun.getId(), taskDto.getId(), taskDto.getExecutionOrder()); + taskRunMapper.insert(taskRun); + mdcManager.setTaskContext(taskRun.getId()); + workflowLogger.info("Task 실행 시작: TaskId={}, Name={}", taskDto.getId(), taskDto.getName()); + + Task task = new Task(taskDto); + + ObjectNode requestBody = + bodyBuilders.stream() + .filter(builder -> builder.supports(task.getName())) + .findFirst() + .map(builder -> builder.build(task, workflowContext)) + .orElse(objectMapper.createObjectNode()); + + TaskRunner.TaskExecutionResult result = + taskExecutionService.executeWithRetry(task, taskRun, requestBody); + taskRun.finish(result.status(), result.message()); taskRunMapper.update(taskRun); - workflowLogger.error( - "Task 실행 실패 (미지원 타입): Type={}, ExecutionOrder={}", - taskDto.getType(), - taskDto.getExecutionOrder()); - mdcManager.setJobContext(jobRun.getId()); // Job 컨텍스트로 복원 - return false; - } - - // TaskDto에서 직접 Task 생성 (불필요한 변환 제거) - ObjectNode requestBody = - bodyBuilders.stream() - .filter(builder -> builder.supports(taskDto.getName())) - .findFirst() - .map(builder -> builder.build(createTaskFromDto(taskDto), workflowContext)) - .orElse(objectMapper.createObjectNode()); - TaskRunner.TaskExecutionResult result = - runner.execute(createTaskFromDto(taskDto), taskRun, requestBody); - taskRun.finish(result.status(), result.message()); - taskRunMapper.update(taskRun); - - if (result.isFailure()) { + if (result.isFailure()) { + workflowLogger.error( + "Task 최종 실패: TaskRunId={}, Message={}", taskRun.getId(), result.message()); + hasAnyTaskFailed = true; + } else { + JsonNode resultJson = objectMapper.readTree(result.message()); + workflowContext.put(task.getName(), resultJson); + workflowLogger.info("Task 실행 성공: TaskRunId={}", taskRun.getId()); + } + } catch (Exception e) { workflowLogger.error( - "Task 실행 실패: ExecutionOrder={}, Message={}", - taskDto.getExecutionOrder(), - result.message()); - mdcManager.setJobContext(jobRun.getId()); // Job 컨텍스트로 복원 - return false; - } - - try { - JsonNode resultJson = objectMapper.readTree(result.message()); - workflowContext.put(taskDto.getName(), resultJson); - } catch (JsonProcessingException e) { - workflowLogger.error("Task 결과 JSON 파싱 실패: ExecutionOrder={}", taskDto.getExecutionOrder()); - taskRun.finish("FAILED", "결과 JSON 파싱 실패"); - taskRunMapper.update(taskRun); - mdcManager.setJobContext(jobRun.getId()); // Job 컨텍스트로 복원 - return false; + "Task 처리 중 심각한 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), taskDto.getName(), e); + hasAnyTaskFailed = true; + } finally { + mdcManager.setJobContext(jobRun.getId()); } - - workflowLogger.info( - "Task 실행 성공: ExecutionOrder={}, TaskRunId={}", - taskDto.getExecutionOrder(), - taskRun.getId()); - - // 다시 Job 컨텍스트로 복원 - mdcManager.setJobContext(jobRun.getId()); } - return true; - } - - /** TaskDto를 Task 모델로 변환합니다. 📌 주의: Reflection을 사용한 방식은 성능이 느리고 불안정하므로 권장되지 않습니다. */ - private Task createTaskFromDto(TaskDto taskDto) { - return new Task(taskDto); // 생성자 사용 + return !hasAnyTaskFailed; } } diff --git a/apps/user-service/src/main/resources/mybatis/mapper/JobMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/JobMapper.xml index f27012ff..cd64ad2c 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/JobMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/JobMapper.xml @@ -3,36 +3,36 @@ - + - - - - - - - + - + - + SELECT + j.*, + wj.execution_order + FROM + job j + JOIN + workflow_job wj ON j.id = wj.job_id + WHERE + wj.workflow_id = #{workflowId} ORDER BY wj.execution_order ASC diff --git a/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml index 4fd0ea3d..3a0e17bd 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml @@ -1,9 +1,9 @@ - + - + diff --git a/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml index 807144cc..8fb277e2 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml @@ -1,7 +1,7 @@ - + INSERT INTO task_run (job_run_id, task_id, execution_order, status, started_at, created_at) VALUES (#{jobRunId}, #{taskId}, #{executionOrder}, #{status}, #{startedAt}, #{createdAt}) diff --git a/apps/user-service/src/main/resources/mybatis/mapper/WorkflowRunMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/WorkflowRunMapper.xml index 224abd02..d032da56 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/WorkflowRunMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/WorkflowRunMapper.xml @@ -1,9 +1,9 @@ - + - +