From 2b0dc21eaf6ce9cc105eac12366f48a65897e6d6 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Thu, 22 Jan 2026 16:55:35 +0900 Subject: [PATCH 1/4] refactor: replace findSuccessfulJobByTraceId with findSuccessfulJobByWorkflowRunId --- .../domain/workflow/mapper/JobRunMapper.java | 5 ++++- .../service/WorkflowExecutionService.java | 13 +++++-------- .../resources/mybatis/mapper/JobRunMapper.xml | 15 +++++++-------- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java index 6249f259..a916da54 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java @@ -11,5 +11,8 @@ public interface JobRunMapper { void update(JobRun jobRun); - JobRun findSuccessfulJobByTraceId(@Param("traceId") String traceId, @Param("jobId") Long jobId); + JobRun findSuccessfulJobByWorkflowRunId( + @Param("workflowRunId") Long workflowRunId, + @Param("jobId") Long jobId + ); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index 1e4c2ab9..02214c6a 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -86,15 +86,12 @@ public void executeWorkflow(Long workflowId, RequestContextDto context) { Job job = new Job(jobDto); mdcManager.setJobContext(job.getId()); - // 이미 성공적으로 수행된 Job인지 확인합니다. - JobRun existingSuccessfulJob = - jobRunMapper.findSuccessfulJobByTraceId(context.getTraceId(), job.getId()); + // 📌 이미 성공한 Job인지 확인하여 중복 실행 방지 (Resume 기능) + JobRun existingSuccessfulJob = jobRunMapper.findSuccessfulJobByWorkflowRunId(workflowRun.getId(), job.getId()); if (existingSuccessfulJob != null) { - workflowLogger.info( - "---------- Job 스킵 (이미 성공함): JobId={}, PreviousJobRunId={} ----------", - job.getId(), - existingSuccessfulJob.getId()); - continue; // 이미 성공했으므로 실행하지 않고 다음 Job으로 넘어갑니다. + workflowLogger.info("---------- Job 스킵 (이미 성공함): JobId={}, PreviousJobRunId={} ----------", + job.getId(), existingSuccessfulJob.getId()); + continue; } JobRun jobRun = JobRun.start(workflowRun.getId(), job.getId()); diff --git a/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml index 927cbafe..90f20565 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml @@ -25,14 +25,13 @@ WHERE id = #{id} - + SELECT * + FROM job_run + WHERE workflow_run_id = #{workflowRunId} + AND job_id = #{jobId} + AND status = 'SUCCESS' + ORDER BY id DESC LIMIT 1 From 944f31ad946fb3e4907e1f00d1742dc2f6703e11 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Thu, 22 Jan 2026 17:35:33 +0900 Subject: [PATCH 2/4] refactor: update task output retrieval to use workflowRunId scope and refine query methods --- .../domain/workflow/mapper/TaskRunMapper.java | 15 +++++---- .../service/WorkflowContextService.java | 32 ++++++++++--------- .../mybatis/mapper/TaskRunMapper.xml | 11 ++++--- 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java index 267e931a..0b7d9059 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java @@ -1,16 +1,17 @@ package site.icebang.domain.workflow.mapper; -import java.util.Optional; - import org.apache.ibatis.annotations.Mapper; - +import org.apache.ibatis.annotations.Param; import site.icebang.domain.workflow.model.TaskRun; @Mapper public interface TaskRunMapper { - void insert(TaskRun taskRun); + void insert(TaskRun taskRun); - void update(TaskRun taskRun); + void update(TaskRun taskRun); - Optional findLatestSuccessRunInJob(Long jobRunId, String taskName); -} + TaskRun findSuccessfulTaskRunByWorkflowRunId( + @Param("workflowRunId") Long workflowRunId, + @Param("taskName") String taskName + ); +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java index bbdff181..0bfecd6d 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java @@ -13,6 +13,7 @@ import site.icebang.domain.workflow.mapper.TaskIoDataMapper; import site.icebang.domain.workflow.mapper.TaskRunMapper; import site.icebang.domain.workflow.model.JobRun; +import site.icebang.domain.workflow.model.TaskIoData; @Slf4j @Service @@ -24,30 +25,31 @@ public class WorkflowContextService { private final ObjectMapper objectMapper; /** - * 특정 Job 실행 내에서, 이전에 성공한 Task의 이름으로 결과(Output)를 조회합니다. + * 전체 워크플로우 실행 범위(WorkflowRun) 내에서, 이전에 성공한 Task의 이름으로 결과(Output)를 조회합니다. + * Resume(이어하기) 시, 이전 Job이 스킵되더라도 DB에서 전체 이력을 조회하여 데이터를 가져옵니다. * - * @param jobRun 현재 실행중인 JobRun + * @param jobRun 현재 실행중인 JobRun (내부의 workflowRunId를 사용하여 전체 범위 조회) * @param sourceTaskName 결과를 조회할 이전 Task의 이름 * @return 조회된 결과 데이터 (JsonNode) */ public Optional getPreviousTaskOutput(JobRun jobRun, String sourceTaskName) { + Long workflowRunId = jobRun.getWorkflowRunId(); try { - return taskRunMapper - .findLatestSuccessRunInJob(jobRun.getId(), sourceTaskName) + return Optional.ofNullable(taskRunMapper.findSuccessfulTaskRunByWorkflowRunId(workflowRunId, sourceTaskName)) .flatMap(taskRun -> taskIoDataMapper.findOutputByTaskRunId(taskRun.getId())) - .map( - ioData -> { - try { - return objectMapper.readTree(ioData.getDataValue()); - } catch (Exception e) { - log.error("TaskIoData JSON 파싱 실패: TaskIoDataId={}", ioData.getId(), e); - return null; - } - }); + .map(this::parseJson); } catch (Exception e) { - log.error( - "이전 Task 결과 조회 중 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), sourceTaskName, e); + log.error("워크플로우 데이터 조회 실패: WorkflowRunId={}, TaskName={}", workflowRunId, sourceTaskName, e); return Optional.empty(); } } + + private JsonNode parseJson(TaskIoData ioData) { + try { + return objectMapper.readTree(ioData.getDataValue()); + } catch (Exception e) { + log.error("TaskIoData JSON 파싱 실패: TaskIoDataId={}", ioData.getId(), e); + return null; + } + } } diff --git a/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml index 0bdf8cb0..2a3016b3 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml @@ -23,17 +23,20 @@ UPDATE task_run SET status = #{status}, + message = #{message}, finished_at = #{finishedAt} WHERE id = #{id} - SELECT tr.* FROM task_run tr - JOIN task t ON tr.task_id = t.id - where t.name = #{taskName} + JOIN job_run jr ON tr.job_run_id = jr.id + JOIN task t ON tr.task_id = t.id + WHERE jr.workflow_run_id = #{workflowRunId} + AND t.name = #{taskName} AND tr.status = 'SUCCESS' ORDER BY tr.id DESC - LIMIT 1 + LIMIT 1 \ No newline at end of file From 30e18db8846c40ee80a7f2915ad6d19787a8bef4 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Thu, 22 Jan 2026 22:57:09 +0900 Subject: [PATCH 3/4] docs: update README with environment variables, setup, and deployment steps --- README.md | 199 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 194 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index d6b58260..3155bda2 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,9 @@ 8. [주요 구성 요소 및 역할](#8-주요-구성-요소-및-역할) 9. [프로젝트 디렉토리 구조](#9-프로젝트-디렉토리-구조) 10. [환경 변수 관리 전략](#10-환경-변수-관리-전략) -11. [시연 영상](#11-시연-영상) +11. [실행 방법 (Getting Started)](#11-실행-방법-getting-started) +12. [배포 방법 (Deployment)](#12-배포-방법-deployment) +13. [시연 영상](#13-시연-영상) --- @@ -223,14 +225,201 @@ pre-processing-service/ ## 10. 환경 변수 관리 전략 -추후 작성 예정 -* **FastAPI (`pre-processing-service`)**: -* **Spring Boot (`user-service`)**: + +각 서비스는 환경별 설정 관리를 위해 다음과 같은 전략을 사용합니다. + + + +### 10.1. FastAPI (`pre-processing-service`) + + + +`.env` 파일을 통해 환경 변수를 로드합니다. `apps/pre-processing-service/app/core/config.py`의 `BaseSettings`를 기반으로 동작합니다. + + + +* **필수 변수**: + + * `DB_HOST`, `DB_PORT`, `DB_USER`, `DB_PASS`, `DB_NAME`: MariaDB 연결 정보 + + * `LOKI_HOST`, `LOKI_PORT`: Loki 로깅 서버 정보 + + * `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `S3_BUCKET_NAME`: AWS S3 저장소 설정 (OCR 및 이미지 업로드) + +* **선택 변수**: + + * `OPENAI_API_KEY`: AI 콘텐츠 생성 기능 사용 시 필요 + + * `MODE`: `dev` 또는 `prd` (기본값: `dev`) + + + +### 10.2. Spring Boot (`user-service`) + + + +Spring Profiles(`develop`, `production`)를 사용하여 환경을 분리합니다. + + + +* **Local (Develop)**: `application-develop.yml`을 사용하며, 로컬 Docker 인프라(localhost)에 맞춰져 있습니다. + +* **Production**: `application-production.yml`을 사용하며, 민감한 정보(DB 비밀번호, API 키 등)는 **배포 시점의 환경 변수** 또는 **Docker Compose의 environment** 설정을 통해 주입받습니다. + + --- -## 11. 시연 영상 + + +## 11. 실행 방법 (Getting Started) + + + +로컬 개발 환경에서 프로젝트를 실행하는 방법입니다. + + + +### 11.1. 사전 준비 사항 (Prerequisites) + + + +* **Java 21** (Amazon Corretto 21 권장) + +* **Python 3.11** (Poetry 패키지 매니저 설치 필요) + +* **Docker & Docker Compose** + + + +### 11.2. 1단계: 인프라 실행 (Database & Monitoring) + + + +프로젝트 실행에 필요한 DB(MariaDB)와 모니터링 도구(Loki, Promtail, Grafana)를 Docker로 실행합니다. + + + +```bash + +cd docker/local + +docker-compose up -d + +``` + + + +### 11.3. 2단계: Backend (FastAPI - Worker) 실행 + + + +Python 기반의 AI/전처리 워커 서비스를 실행합니다. + + + +```bash + +cd apps/pre-processing-service + + + +# 1. 의존성 설치 + +poetry install + + + +# 2. 서비스 실행 (Uvicorn) + +poetry run uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload + +``` + + + +### 11.4. 3단계: Backend (Spring Boot - Orchestrator) 실행 + + + +Java 기반의 메인 오케스트레이터 서비스를 실행합니다. + + + +```bash + +cd apps/user-service + + + +# develop 프로파일로 실행 + +./gradlew bootRun --args='--spring.profiles.active=develop' + +``` + + + +* **API 문서 (Swagger)**: + + * Spring Boot: `http://localhost:8081/swagger-ui/index.html` (설정 필요 시) + + * FastAPI: `http://localhost:8000/docs` + + + +--- + + + +## 12. 배포 방법 (Deployment) + + + +본 프로젝트는 **GitHub Actions**를 통해 CI/CD 파이프라인이 구축되어 있습니다. + + + +### 12.1. CI/CD 파이프라인 + + + +1. **CI (Continuous Integration)**: + + * `main` 브랜치에 Push 또는 PR 시 자동으로 Java/Python 테스트 및 빌드가 수행됩니다. + +2. **CD (Continuous Deployment)**: + + * 릴리즈 태그 생성 시 Docker Image를 빌드하여 Docker Hub에 Push 합니다. + + * AWS EC2 인스턴스에 SSH로 접속하여 최신 이미지를 Pull 하고 `docker-compose`를 재실행합니다. + + + +### 12.2. 프로덕션 실행 설정 + + + +프로덕션 환경의 Docker 설정은 `docker/production` 디렉토리에 위치합니다. + + + +```bash + +cd docker/production + +docker-compose up -d + +``` + + + +--- + + + +## 13. 시연 영상 [https://www.youtube.com/watch?v=1vApNttVxVg](https://www.youtube.com/watch?v=1vApNttVxVg) [![Video Label](http://img.youtube.com/vi/1vApNttVxVg/0.jpg)](https://www.youtube.com/watch?v=1vApNttVxVg) From 2a7fb758314d098bf8de84bded04fedd528aa72a Mon Sep 17 00:00:00 2001 From: jihukimme Date: Thu, 22 Jan 2026 22:57:09 +0900 Subject: [PATCH 4/4] refactor: improve code formatting across Workflow services and mappers --- README.md | 199 +++++++++++++++++- .../domain/workflow/mapper/JobRunMapper.java | 4 +- .../domain/workflow/mapper/TaskRunMapper.java | 13 +- .../service/WorkflowContextService.java | 19 +- .../service/WorkflowExecutionService.java | 9 +- 5 files changed, 217 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index d6b58260..3155bda2 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,9 @@ 8. [주요 구성 요소 및 역할](#8-주요-구성-요소-및-역할) 9. [프로젝트 디렉토리 구조](#9-프로젝트-디렉토리-구조) 10. [환경 변수 관리 전략](#10-환경-변수-관리-전략) -11. [시연 영상](#11-시연-영상) +11. [실행 방법 (Getting Started)](#11-실행-방법-getting-started) +12. [배포 방법 (Deployment)](#12-배포-방법-deployment) +13. [시연 영상](#13-시연-영상) --- @@ -223,14 +225,201 @@ pre-processing-service/ ## 10. 환경 변수 관리 전략 -추후 작성 예정 -* **FastAPI (`pre-processing-service`)**: -* **Spring Boot (`user-service`)**: + +각 서비스는 환경별 설정 관리를 위해 다음과 같은 전략을 사용합니다. + + + +### 10.1. FastAPI (`pre-processing-service`) + + + +`.env` 파일을 통해 환경 변수를 로드합니다. `apps/pre-processing-service/app/core/config.py`의 `BaseSettings`를 기반으로 동작합니다. + + + +* **필수 변수**: + + * `DB_HOST`, `DB_PORT`, `DB_USER`, `DB_PASS`, `DB_NAME`: MariaDB 연결 정보 + + * `LOKI_HOST`, `LOKI_PORT`: Loki 로깅 서버 정보 + + * `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `S3_BUCKET_NAME`: AWS S3 저장소 설정 (OCR 및 이미지 업로드) + +* **선택 변수**: + + * `OPENAI_API_KEY`: AI 콘텐츠 생성 기능 사용 시 필요 + + * `MODE`: `dev` 또는 `prd` (기본값: `dev`) + + + +### 10.2. Spring Boot (`user-service`) + + + +Spring Profiles(`develop`, `production`)를 사용하여 환경을 분리합니다. + + + +* **Local (Develop)**: `application-develop.yml`을 사용하며, 로컬 Docker 인프라(localhost)에 맞춰져 있습니다. + +* **Production**: `application-production.yml`을 사용하며, 민감한 정보(DB 비밀번호, API 키 등)는 **배포 시점의 환경 변수** 또는 **Docker Compose의 environment** 설정을 통해 주입받습니다. + + --- -## 11. 시연 영상 + + +## 11. 실행 방법 (Getting Started) + + + +로컬 개발 환경에서 프로젝트를 실행하는 방법입니다. + + + +### 11.1. 사전 준비 사항 (Prerequisites) + + + +* **Java 21** (Amazon Corretto 21 권장) + +* **Python 3.11** (Poetry 패키지 매니저 설치 필요) + +* **Docker & Docker Compose** + + + +### 11.2. 1단계: 인프라 실행 (Database & Monitoring) + + + +프로젝트 실행에 필요한 DB(MariaDB)와 모니터링 도구(Loki, Promtail, Grafana)를 Docker로 실행합니다. + + + +```bash + +cd docker/local + +docker-compose up -d + +``` + + + +### 11.3. 2단계: Backend (FastAPI - Worker) 실행 + + + +Python 기반의 AI/전처리 워커 서비스를 실행합니다. + + + +```bash + +cd apps/pre-processing-service + + + +# 1. 의존성 설치 + +poetry install + + + +# 2. 서비스 실행 (Uvicorn) + +poetry run uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload + +``` + + + +### 11.4. 3단계: Backend (Spring Boot - Orchestrator) 실행 + + + +Java 기반의 메인 오케스트레이터 서비스를 실행합니다. + + + +```bash + +cd apps/user-service + + + +# develop 프로파일로 실행 + +./gradlew bootRun --args='--spring.profiles.active=develop' + +``` + + + +* **API 문서 (Swagger)**: + + * Spring Boot: `http://localhost:8081/swagger-ui/index.html` (설정 필요 시) + + * FastAPI: `http://localhost:8000/docs` + + + +--- + + + +## 12. 배포 방법 (Deployment) + + + +본 프로젝트는 **GitHub Actions**를 통해 CI/CD 파이프라인이 구축되어 있습니다. + + + +### 12.1. CI/CD 파이프라인 + + + +1. **CI (Continuous Integration)**: + + * `main` 브랜치에 Push 또는 PR 시 자동으로 Java/Python 테스트 및 빌드가 수행됩니다. + +2. **CD (Continuous Deployment)**: + + * 릴리즈 태그 생성 시 Docker Image를 빌드하여 Docker Hub에 Push 합니다. + + * AWS EC2 인스턴스에 SSH로 접속하여 최신 이미지를 Pull 하고 `docker-compose`를 재실행합니다. + + + +### 12.2. 프로덕션 실행 설정 + + + +프로덕션 환경의 Docker 설정은 `docker/production` 디렉토리에 위치합니다. + + + +```bash + +cd docker/production + +docker-compose up -d + +``` + + + +--- + + + +## 13. 시연 영상 [https://www.youtube.com/watch?v=1vApNttVxVg](https://www.youtube.com/watch?v=1vApNttVxVg) [![Video Label](http://img.youtube.com/vi/1vApNttVxVg/0.jpg)](https://www.youtube.com/watch?v=1vApNttVxVg) diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java index a916da54..55e6197d 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java @@ -12,7 +12,5 @@ public interface JobRunMapper { void update(JobRun jobRun); JobRun findSuccessfulJobByWorkflowRunId( - @Param("workflowRunId") Long workflowRunId, - @Param("jobId") Long jobId - ); + @Param("workflowRunId") Long workflowRunId, @Param("jobId") Long jobId); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java index 0b7d9059..5ac48e81 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java @@ -2,16 +2,15 @@ import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; + import site.icebang.domain.workflow.model.TaskRun; @Mapper public interface TaskRunMapper { - void insert(TaskRun taskRun); + void insert(TaskRun taskRun); - void update(TaskRun taskRun); + void update(TaskRun taskRun); - TaskRun findSuccessfulTaskRunByWorkflowRunId( - @Param("workflowRunId") Long workflowRunId, - @Param("taskName") String taskName - ); -} \ No newline at end of file + TaskRun findSuccessfulTaskRunByWorkflowRunId( + @Param("workflowRunId") Long workflowRunId, @Param("taskName") String taskName); +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java index 0bfecd6d..39fb3aaf 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java @@ -25,8 +25,8 @@ public class WorkflowContextService { private final ObjectMapper objectMapper; /** - * 전체 워크플로우 실행 범위(WorkflowRun) 내에서, 이전에 성공한 Task의 이름으로 결과(Output)를 조회합니다. - * Resume(이어하기) 시, 이전 Job이 스킵되더라도 DB에서 전체 이력을 조회하여 데이터를 가져옵니다. + * 전체 워크플로우 실행 범위(WorkflowRun) 내에서, 이전에 성공한 Task의 이름으로 결과(Output)를 조회합니다. Resume(이어하기) 시, 이전 Job이 + * 스킵되더라도 DB에서 전체 이력을 조회하여 데이터를 가져옵니다. * * @param jobRun 현재 실행중인 JobRun (내부의 workflowRunId를 사용하여 전체 범위 조회) * @param sourceTaskName 결과를 조회할 이전 Task의 이름 @@ -35,7 +35,8 @@ public class WorkflowContextService { public Optional getPreviousTaskOutput(JobRun jobRun, String sourceTaskName) { Long workflowRunId = jobRun.getWorkflowRunId(); try { - return Optional.ofNullable(taskRunMapper.findSuccessfulTaskRunByWorkflowRunId(workflowRunId, sourceTaskName)) + return Optional.ofNullable( + taskRunMapper.findSuccessfulTaskRunByWorkflowRunId(workflowRunId, sourceTaskName)) .flatMap(taskRun -> taskIoDataMapper.findOutputByTaskRunId(taskRun.getId())) .map(this::parseJson); } catch (Exception e) { @@ -45,11 +46,11 @@ public Optional getPreviousTaskOutput(JobRun jobRun, String sourceTask } private JsonNode parseJson(TaskIoData ioData) { - try { - return objectMapper.readTree(ioData.getDataValue()); - } catch (Exception e) { - log.error("TaskIoData JSON 파싱 실패: TaskIoDataId={}", ioData.getId(), e); - return null; - } + try { + return objectMapper.readTree(ioData.getDataValue()); + } catch (Exception e) { + log.error("TaskIoData JSON 파싱 실패: TaskIoDataId={}", ioData.getId(), e); + return null; + } } } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index 02214c6a..6089c1cc 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -87,10 +87,13 @@ public void executeWorkflow(Long workflowId, RequestContextDto context) { mdcManager.setJobContext(job.getId()); // 📌 이미 성공한 Job인지 확인하여 중복 실행 방지 (Resume 기능) - JobRun existingSuccessfulJob = jobRunMapper.findSuccessfulJobByWorkflowRunId(workflowRun.getId(), job.getId()); + JobRun existingSuccessfulJob = + jobRunMapper.findSuccessfulJobByWorkflowRunId(workflowRun.getId(), job.getId()); if (existingSuccessfulJob != null) { - workflowLogger.info("---------- Job 스킵 (이미 성공함): JobId={}, PreviousJobRunId={} ----------", - job.getId(), existingSuccessfulJob.getId()); + workflowLogger.info( + "---------- Job 스킵 (이미 성공함): JobId={}, PreviousJobRunId={} ----------", + job.getId(), + existingSuccessfulJob.getId()); continue; }