From 50b00c56a23609d10d90a767ad5a07f04ebd4ebb Mon Sep 17 00:00:00 2001 From: jihukimme Date: Mon, 29 Sep 2025 11:27:24 +0900 Subject: [PATCH 1/9] =?UTF-8?q?feat:=20TaskIoData=20model=20=EC=83=9D?= =?UTF-8?q?=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../domain/workflow/model/TaskIoData.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 apps/user-service/src/main/java/site/icebang/domain/workflow/model/TaskIoData.java diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/model/TaskIoData.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/TaskIoData.java new file mode 100644 index 00000000..0de7c7b1 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/TaskIoData.java @@ -0,0 +1,29 @@ +package site.icebang.domain.workflow.model; + +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.time.Instant; + +@Getter +@NoArgsConstructor +public class TaskIoData { + private Long id; + private Long taskRunId; + private String ioType; + private String name; + private String dataType; + private String dataValue; // JSON을 문자열로 저장 + private Long dataSize; + private Instant createdAt; + + public TaskIoData(Long taskRunId, String ioType, String name, String dataType, String dataValue, Long dataSize) { + this.taskRunId = taskRunId; + this.ioType = ioType; + this.name = name; + this.dataType = dataType; + this.dataValue = dataValue; + this.dataSize = dataSize; + this.createdAt = Instant.now(); + } +} From e63d08be4765e5d7d13726460bc0d8007d091642 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Mon, 29 Sep 2025 11:27:51 +0900 Subject: [PATCH 2/9] =?UTF-8?q?feat:=20TaskIoData=EA=B4=80=EB=A0=A8=20Mapp?= =?UTF-8?q?er=20=EC=9E=91=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../domain/workflow/mapper/TaskIoDataMapper.java | 11 +++++++++++ .../mybatis/mapper/TaskIoDataMapper.xml | 16 ++++++++++++++++ 2 files changed, 27 insertions(+) create mode 100644 apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskIoDataMapper.java create mode 100644 apps/user-service/src/main/resources/mybatis/mapper/TaskIoDataMapper.xml diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskIoDataMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskIoDataMapper.java new file mode 100644 index 00000000..2160c152 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskIoDataMapper.java @@ -0,0 +1,11 @@ +package site.icebang.domain.workflow.mapper; + +import org.apache.ibatis.annotations.Mapper; +import site.icebang.domain.workflow.model.TaskIoData; +import java.util.Optional; + +@Mapper +public interface TaskIoDataMapper { + void insert(TaskIoData taskIoData); + Optional findOutputByTaskRunId(Long taskRunId); +} \ No newline at end of file diff --git a/apps/user-service/src/main/resources/mybatis/mapper/TaskIoDataMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/TaskIoDataMapper.xml new file mode 100644 index 00000000..fa0dd73c --- /dev/null +++ b/apps/user-service/src/main/resources/mybatis/mapper/TaskIoDataMapper.xml @@ -0,0 +1,16 @@ + + + + + + INSERT INTO task_io_data (task_run_id, io_type, name, data_type, data_value, data_size, created_at) + VALUES (#{taskRunId}, #{ioType}, #{name}, #{dataType}, #{dataValue}, #{dataSize}, #{createdAt}) + + + + \ No newline at end of file From 6cf528f2cd045ba15d5ea803cc3a0ea04c993819 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Mon, 29 Sep 2025 11:28:20 +0900 Subject: [PATCH 3/9] =?UTF-8?q?feat:=20TaskRun=EA=B4=80=EB=A0=A8=20Mapper?= =?UTF-8?q?=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../icebang/domain/workflow/mapper/TaskRunMapper.java | 4 ++++ .../main/resources/mybatis/mapper/TaskRunMapper.xml | 11 +++++++++++ 2 files changed, 15 insertions(+) diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java index e177dee6..428a168d 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java @@ -4,9 +4,13 @@ import site.icebang.domain.workflow.model.TaskRun; +import java.util.Optional; + @Mapper public interface TaskRunMapper { void insert(TaskRun taskRun); void update(TaskRun taskRun); + + Optional findLatestSuccessRunInJob(Long jobRunId, String taskName); } diff --git a/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml index 61ec3cf0..322a9f04 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml @@ -26,4 +26,15 @@ finished_at = #{finishedAt} WHERE id = #{id} + + \ No newline at end of file From bc4eee11170770981103cb20145b33f333d17a80 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Mon, 29 Sep 2025 11:31:24 +0900 Subject: [PATCH 4/9] =?UTF-8?q?feat:=20=EC=9B=8C=ED=81=AC=ED=94=8C?= =?UTF-8?q?=EB=A1=9C=EC=9A=B0=20=EC=BB=A8=ED=85=8D=EC=8A=A4=ED=8A=B8=20?= =?UTF-8?q?=EC=A1=B0=ED=9A=8C=20=EC=84=9C=EB=B9=84=EC=8A=A4=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 퍼싸드 패턴 도입 --- .../service/WorkflowContextService.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java new file mode 100644 index 00000000..74b7dd67 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java @@ -0,0 +1,45 @@ +package site.icebang.domain.workflow.service; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import site.icebang.domain.workflow.mapper.TaskIoDataMapper; +import site.icebang.domain.workflow.mapper.TaskRunMapper; +import site.icebang.domain.workflow.model.JobRun; +import java.util.Optional; + +@Slf4j +@Service +@RequiredArgsConstructor +public class WorkflowContextService { + + private final TaskRunMapper taskRunMapper; + private final TaskIoDataMapper taskIoDataMapper; + private final ObjectMapper objectMapper; + + /** + * 특정 Job 실행 내에서, 이전에 성공한 Task의 이름으로 결과(Output)를 조회합니다. + * @param jobRun 현재 실행중인 JobRun + * @param sourceTaskName 결과를 조회할 이전 Task의 이름 + * @return 조회된 결과 데이터 (JsonNode) + */ + public Optional getPreviousTaskOutput(JobRun jobRun, String sourceTaskName) { + try { + return taskRunMapper.findLatestSuccessRunInJob(jobRun.getId(), sourceTaskName) + .flatMap(taskRun -> taskIoDataMapper.findOutputByTaskRunId(taskRun.getId())) + .map(ioData -> { + try { + return objectMapper.readTree(ioData.getDataValue()); + } catch (Exception e) { + log.error("TaskIoData JSON 파싱 실패: TaskIoDataId={}", ioData.getId(), e); + return null; + } + }); + } catch (Exception e) { + log.error("이전 Task 결과 조회 중 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), sourceTaskName, e); + return Optional.empty(); + } + } +} \ No newline at end of file From a16cdc719c3de541231bc40314eb4c3ae22e3709 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Mon, 29 Sep 2025 11:32:16 +0900 Subject: [PATCH 5/9] =?UTF-8?q?feat:=20=EC=9D=B8=EB=A9=94=EB=AA=A8?= =?UTF-8?q?=EB=A6=AC=20=EC=BB=A8=ED=85=8D=EC=8A=A4=ED=8A=B8=20=EB=B0=A9?= =?UTF-8?q?=EC=8B=9D=EC=97=90=EC=84=9C=20=EC=98=81=EC=86=8D=EC=A0=81=20?= =?UTF-8?q?=EC=BB=A8=ED=85=8D=EC=8A=A4=ED=8A=B8=20=EB=B0=A9=EC=8B=9D?= =?UTF-8?q?=EC=9C=BC=EB=A1=9C=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../runner/fastapi/body/TaskBodyBuilder.java | 12 +- .../service/WorkflowExecutionService.java | 179 ++++++++---------- 2 files changed, 80 insertions(+), 111 deletions(-) diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/TaskBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/TaskBodyBuilder.java index 04dacef4..e5f8b8b1 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/TaskBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/TaskBodyBuilder.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; public interface TaskBodyBuilder { @@ -17,12 +18,7 @@ public interface TaskBodyBuilder { */ boolean supports(String taskName); - /** - * 실제 API 요청에 사용될 Body를 생성합니다. - * - * @param task DB에 저장된 Task의 원본 정의 - * @param workflowContext 이전 Task들의 결과가 담긴 컨텍스트 - * @return 생성된 JSON Body - */ - ObjectNode build(Task task, Map workflowContext); + + // 📌 workflowContext(Map) 대신 JobRun 객체를 받도록 변경 + ObjectNode build(Task task, JobRun jobRun); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index 4d781cee..91d634ee 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -1,37 +1,40 @@ package site.icebang.domain.workflow.service; -import java.math.BigInteger; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Service; - import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; - import lombok.RequiredArgsConstructor; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import site.icebang.domain.workflow.mapper.JobRunMapper; +import site.icebang.domain.workflow.mapper.TaskIoDataMapper; +import site.icebang.domain.workflow.mapper.TaskRunMapper; +import site.icebang.domain.workflow.mapper.WorkflowRunMapper; +import site.icebang.domain.workflow.model.JobRun; +import site.icebang.domain.workflow.model.TaskIoData; +import site.icebang.domain.workflow.model.TaskRun; +import site.icebang.domain.workflow.model.WorkflowRun; import site.icebang.domain.workflow.dto.JobDto; import site.icebang.domain.workflow.dto.TaskDto; import site.icebang.domain.workflow.dto.WorkflowDetailCardDto; import site.icebang.domain.workflow.manager.ExecutionMdcManager; -import site.icebang.domain.workflow.mapper.*; +import site.icebang.domain.workflow.mapper.JobMapper; +import site.icebang.domain.workflow.mapper.WorkflowMapper; import site.icebang.domain.workflow.model.Job; -import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; -import site.icebang.domain.workflow.model.TaskRun; -import site.icebang.domain.workflow.model.WorkflowRun; import site.icebang.domain.workflow.runner.TaskRunner; import site.icebang.domain.workflow.runner.fastapi.body.TaskBodyBuilder; +import java.math.BigInteger; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + @Service @RequiredArgsConstructor public class WorkflowExecutionService { @@ -40,155 +43,125 @@ public class WorkflowExecutionService { private final WorkflowRunMapper workflowRunMapper; private final JobRunMapper jobRunMapper; private final TaskRunMapper taskRunMapper; + private final TaskIoDataMapper taskIoDataMapper; private final ObjectMapper objectMapper; private final List bodyBuilders; private final ExecutionMdcManager mdcManager; private final TaskExecutionService taskExecutionService; private final WorkflowMapper workflowMapper; + @Transactional @Async("traceExecutor") public void executeWorkflow(Long workflowId) { - WorkflowRun workflowRun = WorkflowRun.start(workflowId); - workflowRunMapper.insert(workflowRun); - - mdcManager.setWorkflowContext(workflowId, workflowRun.getTraceId()); + mdcManager.setWorkflowContext(workflowId); + WorkflowRun workflowRun = null; try { workflowLogger.info("========== 워크플로우 실행 시작: WorkflowId={} ==========", workflowId); + workflowRun = WorkflowRun.start(workflowId); + workflowRunMapper.insert(workflowRun); - Map workflowContext = new HashMap<>(); - WorkflowDetailCardDto settings = - workflowMapper.selectWorkflowDetailById(BigInteger.valueOf(workflowId)); - workflowLogger.info("Workflow 정보 로드 성공"); + // 📌 1. 워크플로우 상세 정보를 DTO로 조회합니다. + WorkflowDetailCardDto settingsDto = workflowMapper.selectWorkflowDetailById(BigInteger.valueOf(workflowId)); + if (settingsDto == null) { + throw new IllegalStateException("실행할 워크플로우를 찾을 수 없습니다: ID " + workflowId); + } - workflowLogger.info("Default config 로드 시도"); - JsonNode setting = objectMapper.readTree(settings.getDefaultConfig()); - workflowLogger.info("Default config 로드 성공"); + // 📌 2. DTO에서 defaultConfig JSON 문자열을 가져와 파싱합니다. + String defaultConfigJson = settingsDto.getDefaultConfig(); + JsonNode setting = (defaultConfigJson != null && !defaultConfigJson.isEmpty()) + ? objectMapper.readTree(defaultConfigJson) + : objectMapper.createObjectNode(); - workflowLogger.info("Job 목록 로드 시도"); List jobDtos = jobMapper.findJobsByWorkflowId(workflowId); - workflowLogger.info("Job 목록 로드 성공"); - - workflowLogger.info("execution_order 기준으로 정렬 시도"); - jobDtos.sort( - Comparator.comparing( - JobDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) + jobDtos.sort(Comparator.comparing(JobDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) .thenComparing(JobDto::getId)); - workflowLogger.info("execution_order 기준으로 성공"); - - workflowLogger.info("총 {}개의 Job을 순차적으로 실행합니다.", jobDtos.size()); boolean hasAnyJobFailed = false; - // 📌 정렬된 JobDto 리스트를 순회합니다. for (JobDto jobDto : jobDtos) { - // 📌 DTO로부터 Job 모델을 생성합니다. Job job = new Job(jobDto); - mdcManager.setJobContext(job.getId()); JobRun jobRun = JobRun.start(workflowRun.getId(), job.getId()); jobRunMapper.insert(jobRun); - workflowLogger.info( - "---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); + workflowLogger.info("---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); - boolean jobSucceeded = executeTasksForJob(jobRun, workflowContext, setting); + boolean jobSucceeded = executeTasksForJob(jobRun, setting); jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED"); jobRunMapper.update(jobRun); - if (!jobSucceeded) { - workflowLogger.error("Job 실행 실패: JobRunId={}", jobRun.getId()); - hasAnyJobFailed = true; - } else { - workflowLogger.info("---------- Job 실행 성공: JobRunId={} ----------", jobRun.getId()); - } + if (!jobSucceeded) hasAnyJobFailed = true; + mdcManager.setWorkflowContext(workflowId); } workflowRun.finish(hasAnyJobFailed ? "FAILED" : "SUCCESS"); workflowRunMapper.update(workflowRun); - workflowLogger.info( - "========== 워크플로우 실행 {} : WorkflowRunId={} ==========", - hasAnyJobFailed ? "실패" : "성공", - workflowRun.getId()); - } catch (JsonMappingException e) { - throw new RuntimeException(e); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); + workflowLogger.info("========== 워크플로우 실행 {} : ...", hasAnyJobFailed ? "실패" : "성공"); + } catch (Exception e) { + workflowLogger.error("워크플로우 실행 중 심각한 오류 발생: WorkflowId={}", workflowId, e); + if (workflowRun != null) { + workflowRun.finish("FAILED"); + workflowRunMapper.update(workflowRun); + } } finally { mdcManager.clearExecutionContext(); } } - private boolean executeTasksForJob( - JobRun jobRun, Map workflowContext, JsonNode setting) { + private boolean executeTasksForJob(JobRun jobRun, JsonNode setting) { List taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId()); - for (TaskDto taskDto : taskDtos) { - String taskId = taskDto.getId().toString(); - JsonNode settingForTask = setting.get(taskId); - if (settingForTask != null) { - taskDto.setSettings(settingForTask); - } - } - taskDtos.sort( - Comparator.comparing( - TaskDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) - .thenComparing(TaskDto::getId)); + taskDtos.forEach(dto -> { + JsonNode s = setting.get(String.valueOf(dto.getId())); + if (s != null) dto.setSettings(s); + }); + taskDtos.sort(Comparator.comparing(TaskDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())).thenComparing(TaskDto::getId)); - workflowLogger.info( - "Job (JobRunId={}) 내 총 {}개의 Task를 순차 실행합니다.", jobRun.getId(), taskDtos.size()); boolean hasAnyTaskFailed = false; - Long s3UploadTaskRunId = null; // S3 업로드 태스크의 task_run_id 저장용 for (TaskDto taskDto : taskDtos) { + TaskRun taskRun = null; try { - TaskRun taskRun = - TaskRun.start(jobRun.getId(), taskDto.getId(), taskDto.getExecutionOrder()); + taskRun = TaskRun.start(jobRun.getId(), taskDto.getId(), taskDto.getExecutionOrder()); taskRunMapper.insert(taskRun); mdcManager.setTaskContext(taskRun.getId()); workflowLogger.info("Task 실행 시작: TaskId={}, Name={}", taskDto.getId(), taskDto.getName()); Task task = new Task(taskDto); - ObjectNode requestBody = - bodyBuilders.stream() + ObjectNode requestBody = bodyBuilders.stream() .filter(builder -> builder.supports(task.getName())) .findFirst() - .map(builder -> builder.build(task, workflowContext)) + .map(builder -> builder.build(task, jobRun)) // jobRun을 컨텍스트로 전달 .orElse(objectMapper.createObjectNode()); - if ("S3 업로드 태스크".equals(task.getName())) { - requestBody.put("task_run_id", taskRun.getId()); - s3UploadTaskRunId = taskRun.getId(); // S3 업로드의 task_run_id 저장 - } else if ("상품 선택 태스크".equals(task.getName())) { - // S3 업로드에서 사용한 task_run_id를 사용 - if (s3UploadTaskRunId != null) { - requestBody.put("task_run_id", s3UploadTaskRunId); - } else { - workflowLogger.error("S3 업로드 태스크가 먼저 실행되지 않아 task_run_id를 찾을 수 없습니다."); - // 또는 이전 Job에서 S3 업로드를 찾는 로직 추가 가능 - } - } - - TaskRunner.TaskExecutionResult result = - taskExecutionService.executeWithRetry(task, taskRun, requestBody); + saveIoData(taskRun.getId(), "INPUT", "request_body", requestBody); + TaskRunner.TaskExecutionResult result = taskExecutionService.executeWithRetry(task, taskRun, requestBody); taskRun.finish(result.status(), result.message()); - taskRunMapper.update(taskRun); if (result.isFailure()) { - workflowLogger.error( - "Task 최종 실패: TaskRunId={}, Message={}", taskRun.getId(), result.message()); hasAnyTaskFailed = true; } else { JsonNode resultJson = objectMapper.readTree(result.message()); - workflowContext.put(task.getName(), resultJson); - workflowLogger.info("Task 실행 성공: TaskRunId={}", taskRun.getId()); + saveIoData(taskRun.getId(), "OUTPUT", "response_body", resultJson); } } catch (Exception e) { - workflowLogger.error( - "Task 처리 중 심각한 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), taskDto.getName(), e); + workflowLogger.error("Task 처리 중 심각한 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), taskDto.getName(), e); hasAnyTaskFailed = true; + if (taskRun != null) taskRun.finish("FAILED", e.getMessage()); } finally { + if (taskRun != null) taskRunMapper.update(taskRun); mdcManager.setJobContext(jobRun.getId()); } } return !hasAnyTaskFailed; } -} + + private void saveIoData(Long taskRunId, String ioType, String name, JsonNode data) { + try { + String dataValue = data.toString(); + TaskIoData ioData = new TaskIoData(taskRunId, ioType, name, "JSON", dataValue, (long) dataValue.getBytes().length); + taskIoDataMapper.insert(ioData); + } catch (Exception e) { + workflowLogger.error("Task IO 데이터 저장 실패: TaskRunId={}, Type={}", taskRunId, ioType, e); + } + } +} \ No newline at end of file From 7ad44f19448f8c1646b504e108ccd5f42999663d Mon Sep 17 00:00:00 2001 From: jihukimme Date: Mon, 29 Sep 2025 11:33:21 +0900 Subject: [PATCH 6/9] =?UTF-8?q?feat:=20=EC=9D=B8=ED=84=B0=ED=8E=98?= =?UTF-8?q?=EC=9D=B4=EC=8A=A4=20=EB=B3=80=EA=B2=BD=EC=97=90=20=EB=94=B0?= =?UTF-8?q?=EB=A5=B8=20=EA=B5=AC=ED=98=84=EC=B2=B4=20=EB=B3=80=EA=B2=BD=20?= =?UTF-8?q?=EB=B0=8F=20=EC=BB=A8=ED=85=8D=EC=8A=A4=ED=8A=B8=20=EC=A1=B0?= =?UTF-8?q?=ED=9A=8C=20=EC=84=9C=EB=B9=84=EC=8A=A4=20=EC=82=AC=EC=9A=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fastapi/body/BlogPublishBodyBuilder.java | 99 ++++++++++--------- .../fastapi/body/BlogRagBodyBuilder.java | 56 ++++++----- .../fastapi/body/ImageOcrBodyBuilder.java | 38 ++++--- .../body/KeywordSearchBodyBuilder.java | 19 +++- .../fastapi/body/ProductCrawlBodyBuilder.java | 90 ++++++++--------- .../fastapi/body/ProductMatchBodyBuilder.java | 45 +++++---- .../body/ProductSearchBodyBuilder.java | 16 ++- .../body/ProductSelectBodyBuilder.java | 21 ++-- .../body/ProductSimilarityBodyBuilder.java | 54 ++++++---- .../fastapi/body/S3UploadBodyBuilder.java | 55 ++++++----- 10 files changed, 288 insertions(+), 205 deletions(-) diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogPublishBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogPublishBodyBuilder.java index ed148061..f3ab15a0 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogPublishBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogPublishBodyBuilder.java @@ -1,60 +1,63 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; -import java.util.Optional; - -import org.springframework.stereotype.Component; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; - import lombok.RequiredArgsConstructor; - +import org.springframework.stereotype.Component; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; +import java.util.Optional; @Component @RequiredArgsConstructor public class BlogPublishBodyBuilder implements TaskBodyBuilder { - private final ObjectMapper objectMapper; - private static final String TASK_NAME = "블로그 발행 태스크"; - private static final String RAG_SOURCE_TASK = "블로그 RAG 생성 태스크"; - - @Override - public boolean supports(String taskName) { - return TASK_NAME.equals(taskName); - } - - @Override - public ObjectNode build(Task task, Map workflowContext) { - ObjectNode body = objectMapper.createObjectNode(); - - // RAG에서 생성된 블로그 콘텐츠 가져오기 - Optional.ofNullable(workflowContext.get(RAG_SOURCE_TASK)) - .ifPresent( - ragResult -> { - JsonNode data = ragResult.path("data"); - - // 제목, 내용, 태그 설정 - Optional.ofNullable(data.path("title")) - .filter(node -> !node.isMissingNode()) - .ifPresent(titleNode -> body.set("post_title", titleNode)); - - Optional.ofNullable(data.path("content")) - .filter(node -> !node.isMissingNode()) - .ifPresent(contentNode -> body.set("post_content", contentNode)); - - Optional.ofNullable(data.path("tags")) - .filter(node -> !node.isMissingNode()) - .ifPresent(tagsNode -> body.set("post_tags", tagsNode)); - }); - String blog_name = task.getSettings().path("blog_name").asText(""); - body.put("tag", task.getSettings().get("tag").asText()); - body.put("blog_name", blog_name); - body.put("blog_id", task.getSettings().get("blog_id").asText()); - body.put("blog_pw", task.getSettings().get("blog_pw").asText()); - - return body; - } -} + private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; + private static final String TASK_NAME = "블로그 발행 태스크"; + private static final String RAG_SOURCE_TASK = "블로그 RAG 생성 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, JobRun jobRun) { + ObjectNode body = objectMapper.createObjectNode(); + + Optional ragResultOpt = contextService.getPreviousTaskOutput(jobRun, RAG_SOURCE_TASK); + ragResultOpt.ifPresent(ragResult -> { + JsonNode data = ragResult.path("data"); + + // 📌 1. .path()로 노드를 가져옵니다. + JsonNode titleNode = data.path("title"); + // 📌 2. .isMissingNode()로 노드가 존재하는지 확인합니다. + if (!titleNode.isMissingNode()) { + body.set("post_title", titleNode); + } + + JsonNode contentNode = data.path("content"); + if (!contentNode.isMissingNode()) { + body.set("post_content", contentNode); + } + + JsonNode tagsNode = data.path("tags"); + if (!tagsNode.isMissingNode()) { + body.set("post_tags", tagsNode); + } + }); + + Optional settingsOpt = Optional.ofNullable(task.getSettings()); + settingsOpt.ifPresent(settings -> { + body.put("tag", settings.path("tag").asText()); + body.put("blog_name", settings.path("blog_name").asText()); + body.put("blog_id", settings.path("blog_id").asText()); + body.put("blog_pw", settings.path("blog_pw").asText()); + }); + + return body; + } +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java index ed528629..a79ddf89 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java @@ -1,24 +1,25 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; -import java.util.Optional; - -import org.springframework.stereotype.Component; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; - import lombok.RequiredArgsConstructor; - +import org.springframework.stereotype.Component; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; + +import java.util.Optional; @Component @RequiredArgsConstructor public class BlogRagBodyBuilder implements TaskBodyBuilder { private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; // 📌 컨텍스트 서비스 주입 private static final String TASK_NAME = "블로그 RAG 생성 태스크"; + + // 📌 데이터 소스가 되는 이전 Task들의 이름 private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; private static final String PRODUCT_SELECT_SOURCE_TASK = "상품 선택 태스크"; private static final String OCR_SOURCE_TASK = "이미지 OCR 태스크"; @@ -28,26 +29,37 @@ public boolean supports(String taskName) { return TASK_NAME.equals(taskName); } + /** + * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 + * '블로그 RAG 생성'을 위한 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) + * @return 생성된 JSON Body + */ @Override - public ObjectNode build(Task task, Map workflowContext) { + public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); - // 키워드 정보 가져오기 - Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) - .map(node -> node.path("data").path("keyword")) - .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + // 1. '키워드 검색 태스크' 결과에서 키워드 정보 가져오기 + Optional keywordResult = contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); + keywordResult + .map(node -> node.path("data").path("keyword")) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); - // OCR 번역 결과 가져오기 (새로 추가) - Optional.ofNullable(workflowContext.get(OCR_SOURCE_TASK)) - .map(node -> node.path("data").path("translation_language")) - .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) - .ifPresent(translationNode -> body.set("translation_language", translationNode)); + // 2. '이미지 OCR 태스크' 결과에서 번역 언어 정보 가져오기 + Optional ocrResult = contextService.getPreviousTaskOutput(jobRun, OCR_SOURCE_TASK); + ocrResult + .map(node -> node.path("data").path("translation_language")) + .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) + .ifPresent(translationNode -> body.set("translation_language", translationNode)); - // 선택된 상품 정보 가져오기 - Optional.ofNullable(workflowContext.get(PRODUCT_SELECT_SOURCE_TASK)) - .map(node -> node.path("data").path("selected_product")) - .ifPresent(productNode -> body.set("product_info", productNode)); + // 3. '상품 선택 태스크' 결과에서 선택된 상품 정보 가져오기 + Optional productSelectResult = contextService.getPreviousTaskOutput(jobRun, PRODUCT_SELECT_SOURCE_TASK); + productSelectResult + .map(node -> node.path("data").path("selected_product")) + .ifPresent(productNode -> body.set("product_info", productNode)); return body; } -} +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ImageOcrBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ImageOcrBodyBuilder.java index b045c0fe..4c554cc7 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ImageOcrBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ImageOcrBodyBuilder.java @@ -1,41 +1,49 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; -import java.util.Optional; - -import org.springframework.stereotype.Component; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; - import lombok.RequiredArgsConstructor; - +import org.springframework.stereotype.Component; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; + +import java.util.Optional; @Component @RequiredArgsConstructor public class ImageOcrBodyBuilder implements TaskBodyBuilder { private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; // 📌 컨텍스트 서비스 주입 private static final String TASK_NAME = "이미지 OCR 태스크"; - private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; + private static final String SOURCE_TASK_NAME = "키워드 검색 태스크"; @Override public boolean supports(String taskName) { return TASK_NAME.equals(taskName); } + /** + * 이전 Task 결과(키워드)를 DB에서 조회하여 OCR Task의 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 + * @return 생성된 JSON Body + */ @Override - public ObjectNode build(Task task, Map workflowContext) { + public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); - // 키워드 정보 가져오기 (OCR 처리용) - Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) - .map(node -> node.path("data").path("keyword")) - .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) - .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + // 📌 컨텍스트 서비스를 통해 DB에서 '키워드 검색 태스크'의 결과를 조회합니다. + Optional sourceResult = contextService.getPreviousTaskOutput(jobRun, SOURCE_TASK_NAME); + + sourceResult + .map(result -> result.path("data").path("keyword")) + .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); return body; } -} +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/KeywordSearchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/KeywordSearchBodyBuilder.java index 597ab0b7..5c3e8cf5 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/KeywordSearchBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/KeywordSearchBodyBuilder.java @@ -1,6 +1,7 @@ package site.icebang.domain.workflow.runner.fastapi.body; import java.util.Map; +import java.util.Optional; import org.springframework.stereotype.Component; @@ -10,6 +11,7 @@ import lombok.RequiredArgsConstructor; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; @Component @@ -24,10 +26,21 @@ public boolean supports(String taskName) { return TASK_NAME.equals(taskName); } + /** + * Task에 주입된 사용자 정의 설정(settings)을 기반으로 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 (settings 포함) + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이 빌더에서는 사용되지 않음) + * @return 생성된 JSON Body (예: {"tag": "google"}) + */ @Override - public ObjectNode build(Task task, Map workflowContext) { - // 이 Task는 항상 정적인 Body를 가집니다. - String tag = task.getSettings().get("tag").asText(); + public ObjectNode build(Task task, JobRun jobRun) { + // 📌 Task에 동적으로 주입된 settings에서 'tag' 값을 가져옵니다. + // settings가 없거나 'tag' 필드가 없으면 기본값으로 "naver"를 사용합니다. + String tag = Optional.ofNullable(task.getSettings()) + .map(settings -> settings.path("tag").asText("naver")) + .orElse("naver"); + return objectMapper.createObjectNode().put("tag", tag); } } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductCrawlBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductCrawlBodyBuilder.java index 4c90e31a..4396168f 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductCrawlBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductCrawlBodyBuilder.java @@ -1,58 +1,60 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; -import java.util.Optional; - -import org.springframework.stereotype.Component; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; - import lombok.RequiredArgsConstructor; - +import org.springframework.stereotype.Component; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; + +import java.util.Optional; @Component @RequiredArgsConstructor public class ProductCrawlBodyBuilder implements TaskBodyBuilder { - private final ObjectMapper objectMapper; - private static final String TASK_NAME = "상품 정보 크롤링 태스크"; - private static final String SIMILARITY_SOURCE_TASK = "상품 유사도 분석 태스크"; - - @Override - public boolean supports(String taskName) { - return TASK_NAME.equals(taskName); - } - - @Override - public ObjectNode build(Task task, Map workflowContext) { - ObjectNode body = objectMapper.createObjectNode(); - - // ArrayNode 준비 (product_urls 배열로 변경) - ArrayNode productUrls = objectMapper.createArrayNode(); - - // 유사도 분석에서 선택된 상품들의 URL 가져오기 (복수로 변경) - Optional.ofNullable(workflowContext.get(SIMILARITY_SOURCE_TASK)) - .ifPresent( - node -> { - JsonNode topProducts = node.path("data").path("top_products"); - if (topProducts.isArray()) { - // top_products 배열에서 각 상품의 URL 추출 - topProducts.forEach( - product -> { - JsonNode urlNode = product.path("url"); - if (!urlNode.isMissingNode() && !urlNode.asText().isEmpty()) { + private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; // 📌 컨텍스트 서비스 주입 + private static final String TASK_NAME = "상품 정보 크롤링 태스크"; + private static final String SIMILARITY_SOURCE_TASK = "상품 유사도 분석 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + /** + * 이전 Task 결과(유사도 분석 결과)를 DB에서 조회하여 크롤링할 상품 URL 목록으로 구성된 + * Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 + * @return 생성된 JSON Body (예: {"product_urls": ["url1", "url2", ...]}) + */ + @Override + public ObjectNode build(Task task, JobRun jobRun) { + ObjectNode body = objectMapper.createObjectNode(); + ArrayNode productUrls = objectMapper.createArrayNode(); + + // 📌 컨텍스트 서비스를 통해 DB에서 '상품 유사도 분석 태스크'의 결과를 조회합니다. + Optional sourceResult = contextService.getPreviousTaskOutput(jobRun, SIMILARITY_SOURCE_TASK); + + sourceResult.ifPresent(node -> { + JsonNode topProducts = node.path("data").path("top_products"); + if (topProducts.isArray()) { + topProducts.forEach(product -> { + JsonNode urlNode = product.path("url"); + if (!urlNode.isMissingNode() && urlNode.isTextual() && !urlNode.asText().isEmpty()) { productUrls.add(urlNode.asText()); - } - }); - } - }); - - body.set("product_urls", productUrls); - - return body; - } -} + } + }); + } + }); + + body.set("product_urls", productUrls); + return body; + } +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductMatchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductMatchBodyBuilder.java index 65e693f3..964bf17d 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductMatchBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductMatchBodyBuilder.java @@ -1,24 +1,25 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; -import java.util.Optional; - -import org.springframework.stereotype.Component; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; - import lombok.RequiredArgsConstructor; - +import org.springframework.stereotype.Component; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; + +import java.util.Optional; @Component @RequiredArgsConstructor public class ProductMatchBodyBuilder implements TaskBodyBuilder { private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; // 📌 컨텍스트 서비스 주입 private static final String TASK_NAME = "상품 매칭 태스크"; + + // 📌 데이터 소스가 되는 이전 Task들의 이름 private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; private static final String SEARCH_SOURCE_TASK = "상품 검색 태스크"; @@ -27,20 +28,30 @@ public boolean supports(String taskName) { return TASK_NAME.equals(taskName); } + /** + * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 + * '상품 매칭'을 위한 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) + * @return 생성된 JSON Body + */ @Override - public ObjectNode build(Task task, Map workflowContext) { + public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); - // 키워드 정보 가져오기 - Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) - .map(node -> node.path("data").path("keyword")) - .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + // 📌 1. 컨텍스트 서비스를 통해 DB에서 '키워드 검색 태스크'의 결과를 조회 + Optional keywordResult = contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); + keywordResult + .map(node -> node.path("data").path("keyword")) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); - // 상품 검색 결과 정보 가져오기 - Optional.ofNullable(workflowContext.get(SEARCH_SOURCE_TASK)) - .map(node -> node.path("data").path("search_results")) - .ifPresent(resultsNode -> body.set("search_results", resultsNode)); + // 📌 2. 컨텍스트 서비스를 통해 DB에서 '상품 검색 태스크'의 결과를 조회 + Optional searchResult = contextService.getPreviousTaskOutput(jobRun, SEARCH_SOURCE_TASK); + searchResult + .map(node -> node.path("data").path("search_results")) + .ifPresent(resultsNode -> body.set("search_results", resultsNode)); return body; } -} +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSearchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSearchBodyBuilder.java index d594e4e2..3e70bf19 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSearchBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSearchBodyBuilder.java @@ -1,6 +1,7 @@ package site.icebang.domain.workflow.runner.fastapi.body; import java.util.Map; +import java.util.Optional; import org.springframework.stereotype.Component; @@ -10,13 +11,17 @@ import lombok.RequiredArgsConstructor; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; @Component @RequiredArgsConstructor public class ProductSearchBodyBuilder implements TaskBodyBuilder { private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; + private static final String TASK_NAME = "상품 검색 태스크"; private static final String SOURCE_TASK_NAME = "키워드 검색 태스크"; @@ -26,10 +31,13 @@ public boolean supports(String taskName) { } @Override - public ObjectNode build(Task task, Map workflowContext) { - JsonNode sourceResult = workflowContext.get(SOURCE_TASK_NAME); - String keyword = - sourceResult != null ? sourceResult.path("data").path("keyword").asText("") : ""; + public ObjectNode build(Task task, JobRun jobRun) { + Optional sourceResult = contextService.getPreviousTaskOutput(jobRun, SOURCE_TASK_NAME); + + String keyword = sourceResult + .map(result -> result.path("data").path("keyword").asText("")) + .orElse(""); + return objectMapper.createObjectNode().put("keyword", keyword); } } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java index a8a885ed..56276e3f 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java @@ -1,15 +1,10 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; - -import org.springframework.stereotype.Component; - -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; - import lombok.RequiredArgsConstructor; - +import org.springframework.stereotype.Component; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; @Component @@ -24,12 +19,20 @@ public boolean supports(String taskName) { return TASK_NAME.equals(taskName); } + /** + * '상품 선택' Task를 위한 정적인 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 (이 빌더에서는 사용되지 않음) + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이 빌더에서는 사용되지 않음) + * @return 생성된 JSON Body (예: {"selection_criteria": "image_count_priority"}) + */ @Override - public ObjectNode build(Task task, Map workflowContext) { + public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); + // 이 Task는 항상 고정된 선택 기준을 Body에 담아 보냅니다. body.put("selection_criteria", "image_count_priority"); return body; } -} +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSimilarityBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSimilarityBodyBuilder.java index 45f19ad8..7feb39a1 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSimilarityBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSimilarityBodyBuilder.java @@ -1,24 +1,25 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; -import java.util.Optional; - -import org.springframework.stereotype.Component; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; - import lombok.RequiredArgsConstructor; - +import org.springframework.stereotype.Component; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; + +import java.util.Optional; @Component @RequiredArgsConstructor public class ProductSimilarityBodyBuilder implements TaskBodyBuilder { private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; // 📌 컨텍스트 서비스 주입 private static final String TASK_NAME = "상품 유사도 분석 태스크"; + + // 📌 데이터 소스가 되는 이전 Task들의 이름 private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; private static final String MATCH_SOURCE_TASK = "상품 매칭 태스크"; private static final String SEARCH_SOURCE_TASK = "상품 검색 태스크"; @@ -28,25 +29,36 @@ public boolean supports(String taskName) { return TASK_NAME.equals(taskName); } + /** + * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 + * '상품 유사도 분석'을 위한 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) + * @return 생성된 JSON Body + */ @Override - public ObjectNode build(Task task, Map workflowContext) { + public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); - // 키워드 정보 가져오기 - Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) - .map(node -> node.path("data").path("keyword")) - .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + // 1. 컨텍스트 서비스를 통해 DB에서 '키워드 검색 태스크'의 결과를 조회 + Optional keywordResult = contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); + keywordResult + .map(node -> node.path("data").path("keyword")) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); - // 매칭된 상품 정보 가져오기 - Optional.ofNullable(workflowContext.get(MATCH_SOURCE_TASK)) - .map(node -> node.path("data").path("matched_products")) - .ifPresent(matchedNode -> body.set("matched_products", matchedNode)); + // 2. 컨텍스트 서비스를 통해 DB에서 '상품 매칭 태스크'의 결과를 조회 + Optional matchResult = contextService.getPreviousTaskOutput(jobRun, MATCH_SOURCE_TASK); + matchResult + .map(node -> node.path("data").path("matched_products")) + .ifPresent(matchedNode -> body.set("matched_products", matchedNode)); - // 상품 검색 결과 정보 가져오기 - Optional.ofNullable(workflowContext.get(SEARCH_SOURCE_TASK)) - .map(node -> node.path("data").path("search_results")) - .ifPresent(resultsNode -> body.set("search_results", resultsNode)); + // 3. 컨텍스트 서비스를 통해 DB에서 '상품 검색 태스크'의 결과를 조회 + Optional searchResult = contextService.getPreviousTaskOutput(jobRun, SEARCH_SOURCE_TASK); + searchResult + .map(node -> node.path("data").path("search_results")) + .ifPresent(resultsNode -> body.set("search_results", resultsNode)); return body; } -} +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java index 7548452a..60b08133 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java @@ -1,24 +1,25 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; -import java.util.Optional; - -import org.springframework.stereotype.Component; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; - import lombok.RequiredArgsConstructor; - +import org.springframework.stereotype.Component; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; + +import java.util.Optional; @Component @RequiredArgsConstructor public class S3UploadBodyBuilder implements TaskBodyBuilder { private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; // 📌 컨텍스트 서비스 주입 private static final String TASK_NAME = "S3 업로드 태스크"; + + // 📌 데이터 소스가 되는 이전 Task들의 이름 private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; private static final String CRAWL_SOURCE_TASK = "상품 정보 크롤링 태스크"; @@ -27,25 +28,35 @@ public boolean supports(String taskName) { return TASK_NAME.equals(taskName); } + /** + * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 + * 'S3 업로드'를 위한 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) + * @return 생성된 JSON Body + */ @Override - public ObjectNode build(Task task, Map workflowContext) { + public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); - // 키워드 정보 가져오기 (폴더명 생성용 - 스키마 주석 참조) - Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) - .map(node -> node.path("data").path("keyword")) - .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) - .ifPresent(keywordNode -> body.set("keyword", keywordNode)); - - // 크롤링된 상품 데이터 가져오기 - Optional.ofNullable(workflowContext.get(CRAWL_SOURCE_TASK)) - .map(node -> node.path("data").path("crawled_products")) - .filter(node -> !node.isMissingNode()) - .ifPresent(crawledProductsNode -> body.set("crawled_products", crawledProductsNode)); - - // 기본 폴더 설정 (스키마의 기본값과 일치) + // 1. 컨텍스트 서비스를 통해 DB에서 '키워드 검색 태스크'의 결과를 조회 + Optional keywordResult = contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); + keywordResult + .map(node -> node.path("data").path("keyword")) + .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + + // 2. 컨텍스트 서비스를 통해 DB에서 '상품 정보 크롤링 태스크'의 결과를 조회 + Optional crawlResult = contextService.getPreviousTaskOutput(jobRun, CRAWL_SOURCE_TASK); + crawlResult + .map(node -> node.path("data").path("crawled_products")) + .filter(node -> !node.isMissingNode()) + .ifPresent(crawledProductsNode -> body.set("crawled_products", crawledProductsNode)); + + // 3. 정적 데이터 설정 body.put("base_folder", "product"); return body; } -} +} \ No newline at end of file From 557ba9d59f6945b8d48cb8aad4e6ddd5ae87d7d7 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Mon, 29 Sep 2025 11:39:20 +0900 Subject: [PATCH 7/9] refactor: Code Formatting --- .../workflow/mapper/TaskIoDataMapper.java | 11 ++- .../domain/workflow/mapper/TaskRunMapper.java | 4 +- .../domain/workflow/model/TaskIoData.java | 44 +++++---- .../fastapi/body/BlogPublishBodyBuilder.java | 96 ++++++++++--------- .../fastapi/body/BlogRagBodyBuilder.java | 38 ++++---- .../fastapi/body/ImageOcrBodyBuilder.java | 24 +++-- .../body/KeywordSearchBodyBuilder.java | 9 +- .../fastapi/body/ProductCrawlBodyBuilder.java | 91 ++++++++++-------- .../fastapi/body/ProductMatchBodyBuilder.java | 32 ++++--- .../body/ProductSearchBodyBuilder.java | 9 +- .../body/ProductSelectBodyBuilder.java | 11 ++- .../body/ProductSimilarityBodyBuilder.java | 39 ++++---- .../fastapi/body/S3UploadBodyBuilder.java | 36 +++---- .../runner/fastapi/body/TaskBodyBuilder.java | 4 - .../service/WorkflowContextService.java | 66 +++++++------ .../service/WorkflowExecutionService.java | 80 +++++++++------- 16 files changed, 328 insertions(+), 266 deletions(-) diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskIoDataMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskIoDataMapper.java index 2160c152..6f44de02 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskIoDataMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskIoDataMapper.java @@ -1,11 +1,14 @@ package site.icebang.domain.workflow.mapper; +import java.util.Optional; + import org.apache.ibatis.annotations.Mapper; + import site.icebang.domain.workflow.model.TaskIoData; -import java.util.Optional; @Mapper public interface TaskIoDataMapper { - void insert(TaskIoData taskIoData); - Optional findOutputByTaskRunId(Long taskRunId); -} \ No newline at end of file + void insert(TaskIoData taskIoData); + + Optional findOutputByTaskRunId(Long taskRunId); +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java index 428a168d..267e931a 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java @@ -1,11 +1,11 @@ package site.icebang.domain.workflow.mapper; +import java.util.Optional; + import org.apache.ibatis.annotations.Mapper; import site.icebang.domain.workflow.model.TaskRun; -import java.util.Optional; - @Mapper public interface TaskRunMapper { void insert(TaskRun taskRun); diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/model/TaskIoData.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/TaskIoData.java index 0de7c7b1..16bbd7c2 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/model/TaskIoData.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/TaskIoData.java @@ -1,29 +1,35 @@ package site.icebang.domain.workflow.model; +import java.time.Instant; + import lombok.Getter; import lombok.NoArgsConstructor; -import java.time.Instant; - @Getter @NoArgsConstructor public class TaskIoData { - private Long id; - private Long taskRunId; - private String ioType; - private String name; - private String dataType; - private String dataValue; // JSON을 문자열로 저장 - private Long dataSize; - private Instant createdAt; + private Long id; + private Long taskRunId; + private String ioType; + private String name; + private String dataType; + private String dataValue; // JSON을 문자열로 저장 + private Long dataSize; + private Instant createdAt; - public TaskIoData(Long taskRunId, String ioType, String name, String dataType, String dataValue, Long dataSize) { - this.taskRunId = taskRunId; - this.ioType = ioType; - this.name = name; - this.dataType = dataType; - this.dataValue = dataValue; - this.dataSize = dataSize; - this.createdAt = Instant.now(); - } + public TaskIoData( + Long taskRunId, + String ioType, + String name, + String dataType, + String dataValue, + Long dataSize) { + this.taskRunId = taskRunId; + this.ioType = ioType; + this.name = name; + this.dataType = dataType; + this.dataValue = dataValue; + this.dataSize = dataSize; + this.createdAt = Instant.now(); + } } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogPublishBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogPublishBodyBuilder.java index f3ab15a0..39cb9378 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogPublishBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogPublishBodyBuilder.java @@ -1,63 +1,69 @@ package site.icebang.domain.workflow.runner.fastapi.body; +import java.util.Optional; + +import org.springframework.stereotype.Component; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; + import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; import site.icebang.domain.workflow.service.WorkflowContextService; -import java.util.Optional; @Component @RequiredArgsConstructor public class BlogPublishBodyBuilder implements TaskBodyBuilder { - private final ObjectMapper objectMapper; - private final WorkflowContextService contextService; - private static final String TASK_NAME = "블로그 발행 태스크"; - private static final String RAG_SOURCE_TASK = "블로그 RAG 생성 태스크"; - - @Override - public boolean supports(String taskName) { - return TASK_NAME.equals(taskName); - } - - @Override - public ObjectNode build(Task task, JobRun jobRun) { - ObjectNode body = objectMapper.createObjectNode(); - - Optional ragResultOpt = contextService.getPreviousTaskOutput(jobRun, RAG_SOURCE_TASK); - ragResultOpt.ifPresent(ragResult -> { - JsonNode data = ragResult.path("data"); - - // 📌 1. .path()로 노드를 가져옵니다. - JsonNode titleNode = data.path("title"); - // 📌 2. .isMissingNode()로 노드가 존재하는지 확인합니다. - if (!titleNode.isMissingNode()) { - body.set("post_title", titleNode); - } - - JsonNode contentNode = data.path("content"); - if (!contentNode.isMissingNode()) { - body.set("post_content", contentNode); - } - - JsonNode tagsNode = data.path("tags"); - if (!tagsNode.isMissingNode()) { - body.set("post_tags", tagsNode); - } + private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; + private static final String TASK_NAME = "블로그 발행 태스크"; + private static final String RAG_SOURCE_TASK = "블로그 RAG 생성 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, JobRun jobRun) { + ObjectNode body = objectMapper.createObjectNode(); + + Optional ragResultOpt = contextService.getPreviousTaskOutput(jobRun, RAG_SOURCE_TASK); + ragResultOpt.ifPresent( + ragResult -> { + JsonNode data = ragResult.path("data"); + + // 📌 1. .path()로 노드를 가져옵니다. + JsonNode titleNode = data.path("title"); + // 📌 2. .isMissingNode()로 노드가 존재하는지 확인합니다. + if (!titleNode.isMissingNode()) { + body.set("post_title", titleNode); + } + + JsonNode contentNode = data.path("content"); + if (!contentNode.isMissingNode()) { + body.set("post_content", contentNode); + } + + JsonNode tagsNode = data.path("tags"); + if (!tagsNode.isMissingNode()) { + body.set("post_tags", tagsNode); + } }); - Optional settingsOpt = Optional.ofNullable(task.getSettings()); - settingsOpt.ifPresent(settings -> { - body.put("tag", settings.path("tag").asText()); - body.put("blog_name", settings.path("blog_name").asText()); - body.put("blog_id", settings.path("blog_id").asText()); - body.put("blog_pw", settings.path("blog_pw").asText()); + Optional settingsOpt = Optional.ofNullable(task.getSettings()); + settingsOpt.ifPresent( + settings -> { + body.put("tag", settings.path("tag").asText()); + body.put("blog_name", settings.path("blog_name").asText()); + body.put("blog_id", settings.path("blog_id").asText()); + body.put("blog_pw", settings.path("blog_pw").asText()); }); - return body; - } -} \ No newline at end of file + return body; + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java index a79ddf89..33b3ca55 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java @@ -1,16 +1,19 @@ package site.icebang.domain.workflow.runner.fastapi.body; +import java.util.Optional; + +import org.springframework.stereotype.Component; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; + import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; import site.icebang.domain.workflow.service.WorkflowContextService; -import java.util.Optional; - @Component @RequiredArgsConstructor public class BlogRagBodyBuilder implements TaskBodyBuilder { @@ -30,11 +33,10 @@ public boolean supports(String taskName) { } /** - * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 - * '블로그 RAG 생성'을 위한 Request Body를 생성합니다. + * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 '블로그 RAG 생성'을 위한 Request Body를 생성합니다. * - * @param task 실행할 Task의 도메인 모델 - * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) * @return 생성된 JSON Body */ @Override @@ -42,24 +44,26 @@ public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); // 1. '키워드 검색 태스크' 결과에서 키워드 정보 가져오기 - Optional keywordResult = contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); + Optional keywordResult = + contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); keywordResult - .map(node -> node.path("data").path("keyword")) - .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + .map(node -> node.path("data").path("keyword")) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); // 2. '이미지 OCR 태스크' 결과에서 번역 언어 정보 가져오기 Optional ocrResult = contextService.getPreviousTaskOutput(jobRun, OCR_SOURCE_TASK); ocrResult - .map(node -> node.path("data").path("translation_language")) - .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) - .ifPresent(translationNode -> body.set("translation_language", translationNode)); + .map(node -> node.path("data").path("translation_language")) + .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) + .ifPresent(translationNode -> body.set("translation_language", translationNode)); // 3. '상품 선택 태스크' 결과에서 선택된 상품 정보 가져오기 - Optional productSelectResult = contextService.getPreviousTaskOutput(jobRun, PRODUCT_SELECT_SOURCE_TASK); + Optional productSelectResult = + contextService.getPreviousTaskOutput(jobRun, PRODUCT_SELECT_SOURCE_TASK); productSelectResult - .map(node -> node.path("data").path("selected_product")) - .ifPresent(productNode -> body.set("product_info", productNode)); + .map(node -> node.path("data").path("selected_product")) + .ifPresent(productNode -> body.set("product_info", productNode)); return body; } -} \ No newline at end of file +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ImageOcrBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ImageOcrBodyBuilder.java index 4c554cc7..5d819dc6 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ImageOcrBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ImageOcrBodyBuilder.java @@ -1,16 +1,19 @@ package site.icebang.domain.workflow.runner.fastapi.body; +import java.util.Optional; + +import org.springframework.stereotype.Component; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; + import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; import site.icebang.domain.workflow.service.WorkflowContextService; -import java.util.Optional; - @Component @RequiredArgsConstructor public class ImageOcrBodyBuilder implements TaskBodyBuilder { @@ -28,8 +31,8 @@ public boolean supports(String taskName) { /** * 이전 Task 결과(키워드)를 DB에서 조회하여 OCR Task의 Request Body를 생성합니다. * - * @param task 실행할 Task의 도메인 모델 - * @param jobRun 현재 실행 중인 Job의 기록 객체 + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 * @return 생성된 JSON Body */ @Override @@ -37,13 +40,14 @@ public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); // 📌 컨텍스트 서비스를 통해 DB에서 '키워드 검색 태스크'의 결과를 조회합니다. - Optional sourceResult = contextService.getPreviousTaskOutput(jobRun, SOURCE_TASK_NAME); + Optional sourceResult = + contextService.getPreviousTaskOutput(jobRun, SOURCE_TASK_NAME); sourceResult - .map(result -> result.path("data").path("keyword")) - .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) - .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + .map(result -> result.path("data").path("keyword")) + .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); return body; } -} \ No newline at end of file +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/KeywordSearchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/KeywordSearchBodyBuilder.java index 5c3e8cf5..00c9551e 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/KeywordSearchBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/KeywordSearchBodyBuilder.java @@ -1,11 +1,9 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; import java.util.Optional; import org.springframework.stereotype.Component; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -29,15 +27,16 @@ public boolean supports(String taskName) { /** * Task에 주입된 사용자 정의 설정(settings)을 기반으로 Request Body를 생성합니다. * - * @param task 실행할 Task의 도메인 모델 (settings 포함) - * @param jobRun 현재 실행 중인 Job의 기록 객체 (이 빌더에서는 사용되지 않음) + * @param task 실행할 Task의 도메인 모델 (settings 포함) + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이 빌더에서는 사용되지 않음) * @return 생성된 JSON Body (예: {"tag": "google"}) */ @Override public ObjectNode build(Task task, JobRun jobRun) { // 📌 Task에 동적으로 주입된 settings에서 'tag' 값을 가져옵니다. // settings가 없거나 'tag' 필드가 없으면 기본값으로 "naver"를 사용합니다. - String tag = Optional.ofNullable(task.getSettings()) + String tag = + Optional.ofNullable(task.getSettings()) .map(settings -> settings.path("tag").asText("naver")) .orElse("naver"); diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductCrawlBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductCrawlBodyBuilder.java index 4396168f..7cc9c005 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductCrawlBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductCrawlBodyBuilder.java @@ -1,60 +1,67 @@ package site.icebang.domain.workflow.runner.fastapi.body; +import java.util.Optional; + +import org.springframework.stereotype.Component; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; + import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; import site.icebang.domain.workflow.service.WorkflowContextService; -import java.util.Optional; - @Component @RequiredArgsConstructor public class ProductCrawlBodyBuilder implements TaskBodyBuilder { - private final ObjectMapper objectMapper; - private final WorkflowContextService contextService; // 📌 컨텍스트 서비스 주입 - private static final String TASK_NAME = "상품 정보 크롤링 태스크"; - private static final String SIMILARITY_SOURCE_TASK = "상품 유사도 분석 태스크"; - - @Override - public boolean supports(String taskName) { - return TASK_NAME.equals(taskName); - } - - /** - * 이전 Task 결과(유사도 분석 결과)를 DB에서 조회하여 크롤링할 상품 URL 목록으로 구성된 - * Request Body를 생성합니다. - * - * @param task 실행할 Task의 도메인 모델 - * @param jobRun 현재 실행 중인 Job의 기록 객체 - * @return 생성된 JSON Body (예: {"product_urls": ["url1", "url2", ...]}) - */ - @Override - public ObjectNode build(Task task, JobRun jobRun) { - ObjectNode body = objectMapper.createObjectNode(); - ArrayNode productUrls = objectMapper.createArrayNode(); - - // 📌 컨텍스트 서비스를 통해 DB에서 '상품 유사도 분석 태스크'의 결과를 조회합니다. - Optional sourceResult = contextService.getPreviousTaskOutput(jobRun, SIMILARITY_SOURCE_TASK); - - sourceResult.ifPresent(node -> { - JsonNode topProducts = node.path("data").path("top_products"); - if (topProducts.isArray()) { - topProducts.forEach(product -> { - JsonNode urlNode = product.path("url"); - if (!urlNode.isMissingNode() && urlNode.isTextual() && !urlNode.asText().isEmpty()) { - productUrls.add(urlNode.asText()); - } + private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; // 📌 컨텍스트 서비스 주입 + private static final String TASK_NAME = "상품 정보 크롤링 태스크"; + private static final String SIMILARITY_SOURCE_TASK = "상품 유사도 분석 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + /** + * 이전 Task 결과(유사도 분석 결과)를 DB에서 조회하여 크롤링할 상품 URL 목록으로 구성된 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 + * @return 생성된 JSON Body (예: {"product_urls": ["url1", "url2", ...]}) + */ + @Override + public ObjectNode build(Task task, JobRun jobRun) { + ObjectNode body = objectMapper.createObjectNode(); + ArrayNode productUrls = objectMapper.createArrayNode(); + + // 📌 컨텍스트 서비스를 통해 DB에서 '상품 유사도 분석 태스크'의 결과를 조회합니다. + Optional sourceResult = + contextService.getPreviousTaskOutput(jobRun, SIMILARITY_SOURCE_TASK); + + sourceResult.ifPresent( + node -> { + JsonNode topProducts = node.path("data").path("top_products"); + if (topProducts.isArray()) { + topProducts.forEach( + product -> { + JsonNode urlNode = product.path("url"); + if (!urlNode.isMissingNode() + && urlNode.isTextual() + && !urlNode.asText().isEmpty()) { + productUrls.add(urlNode.asText()); + } }); - } + } }); - body.set("product_urls", productUrls); - return body; - } -} \ No newline at end of file + body.set("product_urls", productUrls); + return body; + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductMatchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductMatchBodyBuilder.java index 964bf17d..a1b55970 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductMatchBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductMatchBodyBuilder.java @@ -1,16 +1,19 @@ package site.icebang.domain.workflow.runner.fastapi.body; +import java.util.Optional; + +import org.springframework.stereotype.Component; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; + import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; import site.icebang.domain.workflow.service.WorkflowContextService; -import java.util.Optional; - @Component @RequiredArgsConstructor public class ProductMatchBodyBuilder implements TaskBodyBuilder { @@ -29,11 +32,10 @@ public boolean supports(String taskName) { } /** - * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 - * '상품 매칭'을 위한 Request Body를 생성합니다. + * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 '상품 매칭'을 위한 Request Body를 생성합니다. * - * @param task 실행할 Task의 도메인 모델 - * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) * @return 생성된 JSON Body */ @Override @@ -41,17 +43,19 @@ public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); // 📌 1. 컨텍스트 서비스를 통해 DB에서 '키워드 검색 태스크'의 결과를 조회 - Optional keywordResult = contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); + Optional keywordResult = + contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); keywordResult - .map(node -> node.path("data").path("keyword")) - .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + .map(node -> node.path("data").path("keyword")) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); // 📌 2. 컨텍스트 서비스를 통해 DB에서 '상품 검색 태스크'의 결과를 조회 - Optional searchResult = contextService.getPreviousTaskOutput(jobRun, SEARCH_SOURCE_TASK); + Optional searchResult = + contextService.getPreviousTaskOutput(jobRun, SEARCH_SOURCE_TASK); searchResult - .map(node -> node.path("data").path("search_results")) - .ifPresent(resultsNode -> body.set("search_results", resultsNode)); + .map(node -> node.path("data").path("search_results")) + .ifPresent(resultsNode -> body.set("search_results", resultsNode)); return body; } -} \ No newline at end of file +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSearchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSearchBodyBuilder.java index 3e70bf19..9056b1fd 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSearchBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSearchBodyBuilder.java @@ -1,6 +1,5 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; import java.util.Optional; import org.springframework.stereotype.Component; @@ -32,11 +31,11 @@ public boolean supports(String taskName) { @Override public ObjectNode build(Task task, JobRun jobRun) { - Optional sourceResult = contextService.getPreviousTaskOutput(jobRun, SOURCE_TASK_NAME); + Optional sourceResult = + contextService.getPreviousTaskOutput(jobRun, SOURCE_TASK_NAME); - String keyword = sourceResult - .map(result -> result.path("data").path("keyword").asText("")) - .orElse(""); + String keyword = + sourceResult.map(result -> result.path("data").path("keyword").asText("")).orElse(""); return objectMapper.createObjectNode().put("keyword", keyword); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java index 56276e3f..b25cd966 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java @@ -1,9 +1,12 @@ package site.icebang.domain.workflow.runner.fastapi.body; +import org.springframework.stereotype.Component; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; + import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; @@ -22,8 +25,8 @@ public boolean supports(String taskName) { /** * '상품 선택' Task를 위한 정적인 Request Body를 생성합니다. * - * @param task 실행할 Task의 도메인 모델 (이 빌더에서는 사용되지 않음) - * @param jobRun 현재 실행 중인 Job의 기록 객체 (이 빌더에서는 사용되지 않음) + * @param task 실행할 Task의 도메인 모델 (이 빌더에서는 사용되지 않음) + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이 빌더에서는 사용되지 않음) * @return 생성된 JSON Body (예: {"selection_criteria": "image_count_priority"}) */ @Override @@ -35,4 +38,4 @@ public ObjectNode build(Task task, JobRun jobRun) { return body; } -} \ No newline at end of file +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSimilarityBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSimilarityBodyBuilder.java index 7feb39a1..d4857602 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSimilarityBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSimilarityBodyBuilder.java @@ -1,16 +1,19 @@ package site.icebang.domain.workflow.runner.fastapi.body; +import java.util.Optional; + +import org.springframework.stereotype.Component; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; + import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; import site.icebang.domain.workflow.service.WorkflowContextService; -import java.util.Optional; - @Component @RequiredArgsConstructor public class ProductSimilarityBodyBuilder implements TaskBodyBuilder { @@ -30,11 +33,10 @@ public boolean supports(String taskName) { } /** - * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 - * '상품 유사도 분석'을 위한 Request Body를 생성합니다. + * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 '상품 유사도 분석'을 위한 Request Body를 생성합니다. * - * @param task 실행할 Task의 도메인 모델 - * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) * @return 생성된 JSON Body */ @Override @@ -42,23 +44,26 @@ public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); // 1. 컨텍스트 서비스를 통해 DB에서 '키워드 검색 태스크'의 결과를 조회 - Optional keywordResult = contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); + Optional keywordResult = + contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); keywordResult - .map(node -> node.path("data").path("keyword")) - .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + .map(node -> node.path("data").path("keyword")) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); // 2. 컨텍스트 서비스를 통해 DB에서 '상품 매칭 태스크'의 결과를 조회 - Optional matchResult = contextService.getPreviousTaskOutput(jobRun, MATCH_SOURCE_TASK); + Optional matchResult = + contextService.getPreviousTaskOutput(jobRun, MATCH_SOURCE_TASK); matchResult - .map(node -> node.path("data").path("matched_products")) - .ifPresent(matchedNode -> body.set("matched_products", matchedNode)); + .map(node -> node.path("data").path("matched_products")) + .ifPresent(matchedNode -> body.set("matched_products", matchedNode)); // 3. 컨텍스트 서비스를 통해 DB에서 '상품 검색 태스크'의 결과를 조회 - Optional searchResult = contextService.getPreviousTaskOutput(jobRun, SEARCH_SOURCE_TASK); + Optional searchResult = + contextService.getPreviousTaskOutput(jobRun, SEARCH_SOURCE_TASK); searchResult - .map(node -> node.path("data").path("search_results")) - .ifPresent(resultsNode -> body.set("search_results", resultsNode)); + .map(node -> node.path("data").path("search_results")) + .ifPresent(resultsNode -> body.set("search_results", resultsNode)); return body; } -} \ No newline at end of file +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java index 60b08133..ddd3c296 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java @@ -1,16 +1,19 @@ package site.icebang.domain.workflow.runner.fastapi.body; +import java.util.Optional; + +import org.springframework.stereotype.Component; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; + import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; import site.icebang.domain.workflow.service.WorkflowContextService; -import java.util.Optional; - @Component @RequiredArgsConstructor public class S3UploadBodyBuilder implements TaskBodyBuilder { @@ -29,11 +32,10 @@ public boolean supports(String taskName) { } /** - * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 - * 'S3 업로드'를 위한 Request Body를 생성합니다. + * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 'S3 업로드'를 위한 Request Body를 생성합니다. * - * @param task 실행할 Task의 도메인 모델 - * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) * @return 생성된 JSON Body */ @Override @@ -41,22 +43,24 @@ public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); // 1. 컨텍스트 서비스를 통해 DB에서 '키워드 검색 태스크'의 결과를 조회 - Optional keywordResult = contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); + Optional keywordResult = + contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); keywordResult - .map(node -> node.path("data").path("keyword")) - .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) - .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + .map(node -> node.path("data").path("keyword")) + .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); // 2. 컨텍스트 서비스를 통해 DB에서 '상품 정보 크롤링 태스크'의 결과를 조회 - Optional crawlResult = contextService.getPreviousTaskOutput(jobRun, CRAWL_SOURCE_TASK); + Optional crawlResult = + contextService.getPreviousTaskOutput(jobRun, CRAWL_SOURCE_TASK); crawlResult - .map(node -> node.path("data").path("crawled_products")) - .filter(node -> !node.isMissingNode()) - .ifPresent(crawledProductsNode -> body.set("crawled_products", crawledProductsNode)); + .map(node -> node.path("data").path("crawled_products")) + .filter(node -> !node.isMissingNode()) + .ifPresent(crawledProductsNode -> body.set("crawled_products", crawledProductsNode)); // 3. 정적 데이터 설정 body.put("base_folder", "product"); return body; } -} \ No newline at end of file +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/TaskBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/TaskBodyBuilder.java index e5f8b8b1..ffd76457 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/TaskBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/TaskBodyBuilder.java @@ -1,8 +1,5 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; - -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import site.icebang.domain.workflow.model.JobRun; @@ -18,7 +15,6 @@ public interface TaskBodyBuilder { */ boolean supports(String taskName); - // 📌 workflowContext(Map) 대신 JobRun 객체를 받도록 변경 ObjectNode build(Task task, JobRun jobRun); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java index 74b7dd67..bbdff181 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java @@ -1,45 +1,53 @@ package site.icebang.domain.workflow.service; +import java.util.Optional; + +import org.springframework.stereotype.Service; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; + import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; + import site.icebang.domain.workflow.mapper.TaskIoDataMapper; import site.icebang.domain.workflow.mapper.TaskRunMapper; import site.icebang.domain.workflow.model.JobRun; -import java.util.Optional; @Slf4j @Service @RequiredArgsConstructor public class WorkflowContextService { - private final TaskRunMapper taskRunMapper; - private final TaskIoDataMapper taskIoDataMapper; - private final ObjectMapper objectMapper; - - /** - * 특정 Job 실행 내에서, 이전에 성공한 Task의 이름으로 결과(Output)를 조회합니다. - * @param jobRun 현재 실행중인 JobRun - * @param sourceTaskName 결과를 조회할 이전 Task의 이름 - * @return 조회된 결과 데이터 (JsonNode) - */ - public Optional getPreviousTaskOutput(JobRun jobRun, String sourceTaskName) { - try { - return taskRunMapper.findLatestSuccessRunInJob(jobRun.getId(), sourceTaskName) - .flatMap(taskRun -> taskIoDataMapper.findOutputByTaskRunId(taskRun.getId())) - .map(ioData -> { - try { - return objectMapper.readTree(ioData.getDataValue()); - } catch (Exception e) { - log.error("TaskIoData JSON 파싱 실패: TaskIoDataId={}", ioData.getId(), e); - return null; - } - }); - } catch (Exception e) { - log.error("이전 Task 결과 조회 중 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), sourceTaskName, e); - return Optional.empty(); - } + private final TaskRunMapper taskRunMapper; + private final TaskIoDataMapper taskIoDataMapper; + private final ObjectMapper objectMapper; + + /** + * 특정 Job 실행 내에서, 이전에 성공한 Task의 이름으로 결과(Output)를 조회합니다. + * + * @param jobRun 현재 실행중인 JobRun + * @param sourceTaskName 결과를 조회할 이전 Task의 이름 + * @return 조회된 결과 데이터 (JsonNode) + */ + public Optional getPreviousTaskOutput(JobRun jobRun, String sourceTaskName) { + try { + return taskRunMapper + .findLatestSuccessRunInJob(jobRun.getId(), sourceTaskName) + .flatMap(taskRun -> taskIoDataMapper.findOutputByTaskRunId(taskRun.getId())) + .map( + ioData -> { + try { + return objectMapper.readTree(ioData.getDataValue()); + } catch (Exception e) { + log.error("TaskIoData JSON 파싱 실패: TaskIoDataId={}", ioData.getId(), e); + return null; + } + }); + } catch (Exception e) { + log.error( + "이전 Task 결과 조회 중 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), sourceTaskName, e); + return Optional.empty(); } -} \ No newline at end of file + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index 91d634ee..221c4524 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -1,40 +1,40 @@ package site.icebang.domain.workflow.service; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import lombok.RequiredArgsConstructor; +import java.math.BigInteger; +import java.util.Comparator; +import java.util.List; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import site.icebang.domain.workflow.mapper.JobRunMapper; -import site.icebang.domain.workflow.mapper.TaskIoDataMapper; -import site.icebang.domain.workflow.mapper.TaskRunMapper; -import site.icebang.domain.workflow.mapper.WorkflowRunMapper; -import site.icebang.domain.workflow.model.JobRun; -import site.icebang.domain.workflow.model.TaskIoData; -import site.icebang.domain.workflow.model.TaskRun; -import site.icebang.domain.workflow.model.WorkflowRun; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import lombok.RequiredArgsConstructor; + import site.icebang.domain.workflow.dto.JobDto; import site.icebang.domain.workflow.dto.TaskDto; import site.icebang.domain.workflow.dto.WorkflowDetailCardDto; import site.icebang.domain.workflow.manager.ExecutionMdcManager; import site.icebang.domain.workflow.mapper.JobMapper; +import site.icebang.domain.workflow.mapper.JobRunMapper; +import site.icebang.domain.workflow.mapper.TaskIoDataMapper; +import site.icebang.domain.workflow.mapper.TaskRunMapper; import site.icebang.domain.workflow.mapper.WorkflowMapper; +import site.icebang.domain.workflow.mapper.WorkflowRunMapper; import site.icebang.domain.workflow.model.Job; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.model.TaskIoData; +import site.icebang.domain.workflow.model.TaskRun; +import site.icebang.domain.workflow.model.WorkflowRun; import site.icebang.domain.workflow.runner.TaskRunner; import site.icebang.domain.workflow.runner.fastapi.body.TaskBodyBuilder; -import java.math.BigInteger; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - @Service @RequiredArgsConstructor public class WorkflowExecutionService { @@ -61,19 +61,23 @@ public void executeWorkflow(Long workflowId) { workflowRunMapper.insert(workflowRun); // 📌 1. 워크플로우 상세 정보를 DTO로 조회합니다. - WorkflowDetailCardDto settingsDto = workflowMapper.selectWorkflowDetailById(BigInteger.valueOf(workflowId)); + WorkflowDetailCardDto settingsDto = + workflowMapper.selectWorkflowDetailById(BigInteger.valueOf(workflowId)); if (settingsDto == null) { throw new IllegalStateException("실행할 워크플로우를 찾을 수 없습니다: ID " + workflowId); } // 📌 2. DTO에서 defaultConfig JSON 문자열을 가져와 파싱합니다. String defaultConfigJson = settingsDto.getDefaultConfig(); - JsonNode setting = (defaultConfigJson != null && !defaultConfigJson.isEmpty()) + JsonNode setting = + (defaultConfigJson != null && !defaultConfigJson.isEmpty()) ? objectMapper.readTree(defaultConfigJson) : objectMapper.createObjectNode(); List jobDtos = jobMapper.findJobsByWorkflowId(workflowId); - jobDtos.sort(Comparator.comparing(JobDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) + jobDtos.sort( + Comparator.comparing( + JobDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) .thenComparing(JobDto::getId)); boolean hasAnyJobFailed = false; @@ -83,7 +87,8 @@ public void executeWorkflow(Long workflowId) { mdcManager.setJobContext(job.getId()); JobRun jobRun = JobRun.start(workflowRun.getId(), job.getId()); jobRunMapper.insert(jobRun); - workflowLogger.info("---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); + workflowLogger.info( + "---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); boolean jobSucceeded = executeTasksForJob(jobRun, setting); jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED"); @@ -109,11 +114,15 @@ public void executeWorkflow(Long workflowId) { private boolean executeTasksForJob(JobRun jobRun, JsonNode setting) { List taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId()); - taskDtos.forEach(dto -> { - JsonNode s = setting.get(String.valueOf(dto.getId())); - if (s != null) dto.setSettings(s); - }); - taskDtos.sort(Comparator.comparing(TaskDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())).thenComparing(TaskDto::getId)); + taskDtos.forEach( + dto -> { + JsonNode s = setting.get(String.valueOf(dto.getId())); + if (s != null) dto.setSettings(s); + }); + taskDtos.sort( + Comparator.comparing( + TaskDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) + .thenComparing(TaskDto::getId)); boolean hasAnyTaskFailed = false; @@ -127,14 +136,16 @@ private boolean executeTasksForJob(JobRun jobRun, JsonNode setting) { Task task = new Task(taskDto); - ObjectNode requestBody = bodyBuilders.stream() + ObjectNode requestBody = + bodyBuilders.stream() .filter(builder -> builder.supports(task.getName())) .findFirst() .map(builder -> builder.build(task, jobRun)) // jobRun을 컨텍스트로 전달 .orElse(objectMapper.createObjectNode()); saveIoData(taskRun.getId(), "INPUT", "request_body", requestBody); - TaskRunner.TaskExecutionResult result = taskExecutionService.executeWithRetry(task, taskRun, requestBody); + TaskRunner.TaskExecutionResult result = + taskExecutionService.executeWithRetry(task, taskRun, requestBody); taskRun.finish(result.status(), result.message()); if (result.isFailure()) { @@ -144,7 +155,8 @@ private boolean executeTasksForJob(JobRun jobRun, JsonNode setting) { saveIoData(taskRun.getId(), "OUTPUT", "response_body", resultJson); } } catch (Exception e) { - workflowLogger.error("Task 처리 중 심각한 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), taskDto.getName(), e); + workflowLogger.error( + "Task 처리 중 심각한 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), taskDto.getName(), e); hasAnyTaskFailed = true; if (taskRun != null) taskRun.finish("FAILED", e.getMessage()); } finally { @@ -158,10 +170,12 @@ private boolean executeTasksForJob(JobRun jobRun, JsonNode setting) { private void saveIoData(Long taskRunId, String ioType, String name, JsonNode data) { try { String dataValue = data.toString(); - TaskIoData ioData = new TaskIoData(taskRunId, ioType, name, "JSON", dataValue, (long) dataValue.getBytes().length); + TaskIoData ioData = + new TaskIoData( + taskRunId, ioType, name, "JSON", dataValue, (long) dataValue.getBytes().length); taskIoDataMapper.insert(ioData); } catch (Exception e) { workflowLogger.error("Task IO 데이터 저장 실패: TaskRunId={}, Type={}", taskRunId, ioType, e); } } -} \ No newline at end of file +} From 5dbe63cbee26cf9e873f2e1712ffc85d136d633a Mon Sep 17 00:00:00 2001 From: jihukimme Date: Mon, 29 Sep 2025 14:57:44 +0900 Subject: [PATCH 8/9] =?UTF-8?q?refactor:=20WorkflowContext=EA=B0=80=20?= =?UTF-8?q?=EC=98=81=EC=86=8D=EC=A0=81=20=EC=BB=A8=ED=85=8D=EC=8A=A4?= =?UTF-8?q?=ED=8A=B8=EB=A5=BC=20=EA=B8=B0=EB=B0=98=EC=9C=BC=EB=A1=9C=20?= =?UTF-8?q?=EB=8F=99=EC=9E=91=ED=95=98=EB=8F=84=EB=A1=9D=20=EB=A6=AC?= =?UTF-8?q?=ED=8C=A9=ED=86=A0=EB=A7=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/WorkflowExecutionService.java | 110 ++++++++---------- 1 file changed, 50 insertions(+), 60 deletions(-) diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index 221c4524..2db5d259 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -1,40 +1,39 @@ package site.icebang.domain.workflow.service; -import java.math.BigInteger; -import java.util.Comparator; -import java.util.List; - +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.RequiredArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - -import lombok.RequiredArgsConstructor; - -import site.icebang.domain.workflow.dto.JobDto; -import site.icebang.domain.workflow.dto.TaskDto; import site.icebang.domain.workflow.dto.WorkflowDetailCardDto; -import site.icebang.domain.workflow.manager.ExecutionMdcManager; -import site.icebang.domain.workflow.mapper.JobMapper; import site.icebang.domain.workflow.mapper.JobRunMapper; import site.icebang.domain.workflow.mapper.TaskIoDataMapper; import site.icebang.domain.workflow.mapper.TaskRunMapper; -import site.icebang.domain.workflow.mapper.WorkflowMapper; import site.icebang.domain.workflow.mapper.WorkflowRunMapper; -import site.icebang.domain.workflow.model.Job; import site.icebang.domain.workflow.model.JobRun; -import site.icebang.domain.workflow.model.Task; import site.icebang.domain.workflow.model.TaskIoData; import site.icebang.domain.workflow.model.TaskRun; import site.icebang.domain.workflow.model.WorkflowRun; +import site.icebang.domain.workflow.dto.JobDto; +import site.icebang.domain.workflow.dto.RequestContext; +import site.icebang.domain.workflow.dto.TaskDto; +import site.icebang.domain.workflow.manager.ExecutionMdcManager; +import site.icebang.domain.workflow.mapper.JobMapper; +import site.icebang.domain.workflow.mapper.WorkflowMapper; +import site.icebang.domain.workflow.model.Job; +import site.icebang.domain.workflow.model.Task; import site.icebang.domain.workflow.runner.TaskRunner; import site.icebang.domain.workflow.runner.fastapi.body.TaskBodyBuilder; +import java.math.BigInteger; +import java.util.Comparator; +import java.util.List; + @Service @RequiredArgsConstructor public class WorkflowExecutionService { @@ -52,32 +51,28 @@ public class WorkflowExecutionService { @Transactional @Async("traceExecutor") - public void executeWorkflow(Long workflowId) { - mdcManager.setWorkflowContext(workflowId); - WorkflowRun workflowRun = null; + public void executeWorkflow(Long workflowId, RequestContext context) { + WorkflowRun workflowRun = WorkflowRun.start(workflowId, context.getTraceId()); + workflowRunMapper.insert(workflowRun); + + mdcManager.setWorkflowContext(workflowId, context.getTraceId(), context.getClientIp(), context.getUserAgent()); try { workflowLogger.info("========== 워크플로우 실행 시작: WorkflowId={} ==========", workflowId); - workflowRun = WorkflowRun.start(workflowId); - workflowRunMapper.insert(workflowRun); - // 📌 1. 워크플로우 상세 정보를 DTO로 조회합니다. - WorkflowDetailCardDto settingsDto = - workflowMapper.selectWorkflowDetailById(BigInteger.valueOf(workflowId)); + // 📌 1. selectWorkflowDetailById를 호출하여 워크플로우의 모든 상세 정보를 가져옵니다. + WorkflowDetailCardDto settingsDto = workflowMapper.selectWorkflowDetailById(BigInteger.valueOf(workflowId)); if (settingsDto == null) { throw new IllegalStateException("실행할 워크플로우를 찾을 수 없습니다: ID " + workflowId); } - // 📌 2. DTO에서 defaultConfig JSON 문자열을 가져와 파싱합니다. + // 📌 2. 가져온 DTO 객체에서 getDefaultConfig() 메소드를 호출하여 값을 얻습니다. String defaultConfigJson = settingsDto.getDefaultConfig(); - JsonNode setting = - (defaultConfigJson != null && !defaultConfigJson.isEmpty()) + JsonNode setting = (defaultConfigJson != null && !defaultConfigJson.isEmpty()) ? objectMapper.readTree(defaultConfigJson) : objectMapper.createObjectNode(); List jobDtos = jobMapper.findJobsByWorkflowId(workflowId); - jobDtos.sort( - Comparator.comparing( - JobDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) + jobDtos.sort(Comparator.comparing(JobDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) .thenComparing(JobDto::getId)); boolean hasAnyJobFailed = false; @@ -87,20 +82,20 @@ public void executeWorkflow(Long workflowId) { mdcManager.setJobContext(job.getId()); JobRun jobRun = JobRun.start(workflowRun.getId(), job.getId()); jobRunMapper.insert(jobRun); - workflowLogger.info( - "---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); + workflowLogger.info("---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); boolean jobSucceeded = executeTasksForJob(jobRun, setting); jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED"); jobRunMapper.update(jobRun); - if (!jobSucceeded) hasAnyJobFailed = true; - - mdcManager.setWorkflowContext(workflowId); + if (!jobSucceeded) { + hasAnyJobFailed = true; + } + mdcManager.setWorkflowContext(workflowId, context.getTraceId(), context.getClientIp(), context.getUserAgent()); } workflowRun.finish(hasAnyJobFailed ? "FAILED" : "SUCCESS"); workflowRunMapper.update(workflowRun); - workflowLogger.info("========== 워크플로우 실행 {} : ...", hasAnyJobFailed ? "실패" : "성공"); + workflowLogger.info("========== 워크플로우 실행 {} : WorkflowRunId={} ==========", hasAnyJobFailed ? "실패" : "성공", workflowRun.getId()); } catch (Exception e) { workflowLogger.error("워크플로우 실행 중 심각한 오류 발생: WorkflowId={}", workflowId, e); if (workflowRun != null) { @@ -114,15 +109,11 @@ public void executeWorkflow(Long workflowId) { private boolean executeTasksForJob(JobRun jobRun, JsonNode setting) { List taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId()); - taskDtos.forEach( - dto -> { - JsonNode s = setting.get(String.valueOf(dto.getId())); - if (s != null) dto.setSettings(s); - }); - taskDtos.sort( - Comparator.comparing( - TaskDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) - .thenComparing(TaskDto::getId)); + taskDtos.forEach(dto -> { + JsonNode s = setting.get(String.valueOf(dto.getId())); + if (s != null) dto.setSettings(s); + }); + taskDtos.sort(Comparator.comparing(TaskDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())).thenComparing(TaskDto::getId)); boolean hasAnyTaskFailed = false; @@ -132,33 +123,34 @@ private boolean executeTasksForJob(JobRun jobRun, JsonNode setting) { taskRun = TaskRun.start(jobRun.getId(), taskDto.getId(), taskDto.getExecutionOrder()); taskRunMapper.insert(taskRun); mdcManager.setTaskContext(taskRun.getId()); - workflowLogger.info("Task 실행 시작: TaskId={}, Name={}", taskDto.getId(), taskDto.getName()); Task task = new Task(taskDto); + workflowLogger.info("Task 실행 시작: TaskId={}, Name={}", task.getId(), task.getName()); - ObjectNode requestBody = - bodyBuilders.stream() + ObjectNode requestBody = bodyBuilders.stream() .filter(builder -> builder.supports(task.getName())) .findFirst() - .map(builder -> builder.build(task, jobRun)) // jobRun을 컨텍스트로 전달 + .map(builder -> builder.build(task, jobRun)) .orElse(objectMapper.createObjectNode()); saveIoData(taskRun.getId(), "INPUT", "request_body", requestBody); - TaskRunner.TaskExecutionResult result = - taskExecutionService.executeWithRetry(task, taskRun, requestBody); + TaskRunner.TaskExecutionResult result = taskExecutionService.executeWithRetry(task, taskRun, requestBody); taskRun.finish(result.status(), result.message()); if (result.isFailure()) { hasAnyTaskFailed = true; + saveIoData(taskRun.getId(), "OUTPUT", "error_message", objectMapper.valueToTree(result.message())); } else { JsonNode resultJson = objectMapper.readTree(result.message()); saveIoData(taskRun.getId(), "OUTPUT", "response_body", resultJson); } } catch (Exception e) { - workflowLogger.error( - "Task 처리 중 심각한 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), taskDto.getName(), e); + workflowLogger.error("Task 처리 중 심각한 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), taskDto.getName(), e); hasAnyTaskFailed = true; - if (taskRun != null) taskRun.finish("FAILED", e.getMessage()); + if (taskRun != null) { + taskRun.finish("FAILED", e.getMessage()); + saveIoData(taskRun.getId(), "OUTPUT", "error_message", objectMapper.valueToTree(e.getMessage())); + } } finally { if (taskRun != null) taskRunMapper.update(taskRun); mdcManager.setJobContext(jobRun.getId()); @@ -170,12 +162,10 @@ private boolean executeTasksForJob(JobRun jobRun, JsonNode setting) { private void saveIoData(Long taskRunId, String ioType, String name, JsonNode data) { try { String dataValue = data.toString(); - TaskIoData ioData = - new TaskIoData( - taskRunId, ioType, name, "JSON", dataValue, (long) dataValue.getBytes().length); + TaskIoData ioData = new TaskIoData(taskRunId, ioType, name, "JSON", dataValue, (long) dataValue.getBytes().length); taskIoDataMapper.insert(ioData); } catch (Exception e) { workflowLogger.error("Task IO 데이터 저장 실패: TaskRunId={}, Type={}", taskRunId, ioType, e); } } -} +} \ No newline at end of file From 076ff0285afcf03591c79540313a2fba8783b92e Mon Sep 17 00:00:00 2001 From: jihukimme Date: Mon, 29 Sep 2025 16:44:23 +0900 Subject: [PATCH 9/9] =?UTF-8?q?refactor:=20s3=20=ED=83=9C=EC=8A=A4?= =?UTF-8?q?=ED=81=AC=20=EA=B4=80=EB=A0=A8=20=EB=A1=9C=EC=A7=81=EC=9D=80=20?= =?UTF-8?q?=EC=9E=84=EC=8B=9C=EB=A1=9C=20service=EC=97=90=20=EB=91=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 추후에 리팩토링 필요 --- .../service/WorkflowExecutionService.java | 113 ++++++++++++------ 1 file changed, 77 insertions(+), 36 deletions(-) diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index 2db5d259..2e1ca782 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -1,39 +1,41 @@ package site.icebang.domain.workflow.service; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import lombok.RequiredArgsConstructor; +import java.math.BigInteger; +import java.util.Comparator; +import java.util.List; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import lombok.RequiredArgsConstructor; + +import site.icebang.domain.workflow.dto.JobDto; +import site.icebang.domain.workflow.dto.RequestContext; +import site.icebang.domain.workflow.dto.TaskDto; import site.icebang.domain.workflow.dto.WorkflowDetailCardDto; +import site.icebang.domain.workflow.manager.ExecutionMdcManager; +import site.icebang.domain.workflow.mapper.JobMapper; import site.icebang.domain.workflow.mapper.JobRunMapper; import site.icebang.domain.workflow.mapper.TaskIoDataMapper; import site.icebang.domain.workflow.mapper.TaskRunMapper; +import site.icebang.domain.workflow.mapper.WorkflowMapper; import site.icebang.domain.workflow.mapper.WorkflowRunMapper; +import site.icebang.domain.workflow.model.Job; import site.icebang.domain.workflow.model.JobRun; +import site.icebang.domain.workflow.model.Task; import site.icebang.domain.workflow.model.TaskIoData; import site.icebang.domain.workflow.model.TaskRun; import site.icebang.domain.workflow.model.WorkflowRun; -import site.icebang.domain.workflow.dto.JobDto; -import site.icebang.domain.workflow.dto.RequestContext; -import site.icebang.domain.workflow.dto.TaskDto; -import site.icebang.domain.workflow.manager.ExecutionMdcManager; -import site.icebang.domain.workflow.mapper.JobMapper; -import site.icebang.domain.workflow.mapper.WorkflowMapper; -import site.icebang.domain.workflow.model.Job; -import site.icebang.domain.workflow.model.Task; import site.icebang.domain.workflow.runner.TaskRunner; import site.icebang.domain.workflow.runner.fastapi.body.TaskBodyBuilder; -import java.math.BigInteger; -import java.util.Comparator; -import java.util.List; - @Service @RequiredArgsConstructor public class WorkflowExecutionService { @@ -55,24 +57,29 @@ public void executeWorkflow(Long workflowId, RequestContext context) { WorkflowRun workflowRun = WorkflowRun.start(workflowId, context.getTraceId()); workflowRunMapper.insert(workflowRun); - mdcManager.setWorkflowContext(workflowId, context.getTraceId(), context.getClientIp(), context.getUserAgent()); + mdcManager.setWorkflowContext( + workflowId, context.getTraceId(), context.getClientIp(), context.getUserAgent()); try { workflowLogger.info("========== 워크플로우 실행 시작: WorkflowId={} ==========", workflowId); // 📌 1. selectWorkflowDetailById를 호출하여 워크플로우의 모든 상세 정보를 가져옵니다. - WorkflowDetailCardDto settingsDto = workflowMapper.selectWorkflowDetailById(BigInteger.valueOf(workflowId)); + WorkflowDetailCardDto settingsDto = + workflowMapper.selectWorkflowDetailById(BigInteger.valueOf(workflowId)); if (settingsDto == null) { throw new IllegalStateException("실행할 워크플로우를 찾을 수 없습니다: ID " + workflowId); } // 📌 2. 가져온 DTO 객체에서 getDefaultConfig() 메소드를 호출하여 값을 얻습니다. String defaultConfigJson = settingsDto.getDefaultConfig(); - JsonNode setting = (defaultConfigJson != null && !defaultConfigJson.isEmpty()) + JsonNode setting = + (defaultConfigJson != null && !defaultConfigJson.isEmpty()) ? objectMapper.readTree(defaultConfigJson) : objectMapper.createObjectNode(); List jobDtos = jobMapper.findJobsByWorkflowId(workflowId); - jobDtos.sort(Comparator.comparing(JobDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) + jobDtos.sort( + Comparator.comparing( + JobDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) .thenComparing(JobDto::getId)); boolean hasAnyJobFailed = false; @@ -82,7 +89,8 @@ public void executeWorkflow(Long workflowId, RequestContext context) { mdcManager.setJobContext(job.getId()); JobRun jobRun = JobRun.start(workflowRun.getId(), job.getId()); jobRunMapper.insert(jobRun); - workflowLogger.info("---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); + workflowLogger.info( + "---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); boolean jobSucceeded = executeTasksForJob(jobRun, setting); jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED"); @@ -91,11 +99,15 @@ public void executeWorkflow(Long workflowId, RequestContext context) { if (!jobSucceeded) { hasAnyJobFailed = true; } - mdcManager.setWorkflowContext(workflowId, context.getTraceId(), context.getClientIp(), context.getUserAgent()); + mdcManager.setWorkflowContext( + workflowId, context.getTraceId(), context.getClientIp(), context.getUserAgent()); } workflowRun.finish(hasAnyJobFailed ? "FAILED" : "SUCCESS"); workflowRunMapper.update(workflowRun); - workflowLogger.info("========== 워크플로우 실행 {} : WorkflowRunId={} ==========", hasAnyJobFailed ? "실패" : "성공", workflowRun.getId()); + workflowLogger.info( + "========== 워크플로우 실행 {} : WorkflowRunId={} ==========", + hasAnyJobFailed ? "실패" : "성공", + workflowRun.getId()); } catch (Exception e) { workflowLogger.error("워크플로우 실행 중 심각한 오류 발생: WorkflowId={}", workflowId, e); if (workflowRun != null) { @@ -109,13 +121,18 @@ public void executeWorkflow(Long workflowId, RequestContext context) { private boolean executeTasksForJob(JobRun jobRun, JsonNode setting) { List taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId()); - taskDtos.forEach(dto -> { - JsonNode s = setting.get(String.valueOf(dto.getId())); - if (s != null) dto.setSettings(s); - }); - taskDtos.sort(Comparator.comparing(TaskDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())).thenComparing(TaskDto::getId)); + taskDtos.forEach( + dto -> { + JsonNode s = setting.get(String.valueOf(dto.getId())); + if (s != null) dto.setSettings(s); + }); + taskDtos.sort( + Comparator.comparing( + TaskDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) + .thenComparing(TaskDto::getId)); boolean hasAnyTaskFailed = false; + Long s3UploadTaskRunId = null; // S3 업로드 태스크의 task_run_id 저장용 for (TaskDto taskDto : taskDtos) { TaskRun taskRun = null; @@ -127,29 +144,51 @@ private boolean executeTasksForJob(JobRun jobRun, JsonNode setting) { Task task = new Task(taskDto); workflowLogger.info("Task 실행 시작: TaskId={}, Name={}", task.getId(), task.getName()); - ObjectNode requestBody = bodyBuilders.stream() + ObjectNode requestBody = + bodyBuilders.stream() .filter(builder -> builder.supports(task.getName())) .findFirst() .map(builder -> builder.build(task, jobRun)) .orElse(objectMapper.createObjectNode()); + // TODO: 아래 로직 다른 곳으로 분리시키기 + if ("S3 업로드 태스크".equals(task.getName())) { + requestBody.put("task_run_id", taskRun.getId()); + s3UploadTaskRunId = taskRun.getId(); // S3 업로드의 task_run_id 저장 + } else if ("상품 선택 태스크".equals(task.getName())) { + // S3 업로드에서 사용한 task_run_id를 사용 + if (s3UploadTaskRunId != null) { + requestBody.put("task_run_id", s3UploadTaskRunId); + } else { + workflowLogger.error("S3 업로드 태스크가 먼저 실행되지 않아 task_run_id를 찾을 수 없습니다."); + // 또는 이전 Job에서 S3 업로드를 찾는 로직 추가 가능 + } + } + saveIoData(taskRun.getId(), "INPUT", "request_body", requestBody); - TaskRunner.TaskExecutionResult result = taskExecutionService.executeWithRetry(task, taskRun, requestBody); + TaskRunner.TaskExecutionResult result = + taskExecutionService.executeWithRetry(task, taskRun, requestBody); taskRun.finish(result.status(), result.message()); if (result.isFailure()) { hasAnyTaskFailed = true; - saveIoData(taskRun.getId(), "OUTPUT", "error_message", objectMapper.valueToTree(result.message())); + saveIoData( + taskRun.getId(), + "OUTPUT", + "error_message", + objectMapper.valueToTree(result.message())); } else { JsonNode resultJson = objectMapper.readTree(result.message()); saveIoData(taskRun.getId(), "OUTPUT", "response_body", resultJson); } } catch (Exception e) { - workflowLogger.error("Task 처리 중 심각한 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), taskDto.getName(), e); + workflowLogger.error( + "Task 처리 중 심각한 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), taskDto.getName(), e); hasAnyTaskFailed = true; if (taskRun != null) { taskRun.finish("FAILED", e.getMessage()); - saveIoData(taskRun.getId(), "OUTPUT", "error_message", objectMapper.valueToTree(e.getMessage())); + saveIoData( + taskRun.getId(), "OUTPUT", "error_message", objectMapper.valueToTree(e.getMessage())); } } finally { if (taskRun != null) taskRunMapper.update(taskRun); @@ -162,10 +201,12 @@ private boolean executeTasksForJob(JobRun jobRun, JsonNode setting) { private void saveIoData(Long taskRunId, String ioType, String name, JsonNode data) { try { String dataValue = data.toString(); - TaskIoData ioData = new TaskIoData(taskRunId, ioType, name, "JSON", dataValue, (long) dataValue.getBytes().length); + TaskIoData ioData = + new TaskIoData( + taskRunId, ioType, name, "JSON", dataValue, (long) dataValue.getBytes().length); taskIoDataMapper.insert(ioData); } catch (Exception e) { workflowLogger.error("Task IO 데이터 저장 실패: TaskRunId={}, Type={}", taskRunId, ioType, e); } } -} \ No newline at end of file +}