From 39e04aa48a6b36521d86fb808a9e3ee26907d52f Mon Sep 17 00:00:00 2001 From: can019 Date: Fri, 19 Sep 2025 16:05:34 +0900 Subject: [PATCH 1/4] fix: Set grafana-agent env expand true --- docker/production/docker-compose.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docker/production/docker-compose.yml b/docker/production/docker-compose.yml index 9f0a72ad..d595886f 100644 --- a/docker/production/docker-compose.yml +++ b/docker/production/docker-compose.yml @@ -44,6 +44,9 @@ services: - app-network env_file: - .env.prod + command: + - --config.file=/etc/agent/agent.yml + - --config.expand-env=true promtail: image: grafana/promtail:2.9.0 From 0f7e15e07c2d9120bddd0c347337b194e55f2e1d Mon Sep 17 00:00:00 2001 From: thkim7 Date: Fri, 19 Sep 2025 16:34:31 +0900 Subject: [PATCH 2/4] =?UTF-8?q?feat:=20Quartz=20scheduling=20=EC=9E=91?= =?UTF-8?q?=EC=97=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/core/logging_config.py | 25 ++-- .../app/middleware/rds_logger.py | 42 +++++- .../service/QuartzScheduleService.java | 43 ++++-- .../icebang/domain/workflow/dto/TaskDto.java | 17 +++ .../domain/workflow/mapper/JobMapper.java | 4 +- .../service/WorkflowExecutionService.java | 131 +++++++++++++----- .../site/icebang/global/config/WebConfig.java | 4 +- .../resources/mybatis/mapper/JobMapper.xml | 9 +- .../mybatis/mapper/TaskRunMapper.xml | 3 +- .../main/resources/sql/03-insert-workflow.sql | 53 +++++-- 10 files changed, 252 insertions(+), 79 deletions(-) create mode 100644 apps/user-service/src/main/java/site/icebang/domain/workflow/dto/TaskDto.java diff --git a/apps/pre-processing-service/app/core/logging_config.py b/apps/pre-processing-service/app/core/logging_config.py index 50edd1a6..b2c10d88 100644 --- a/apps/pre-processing-service/app/core/logging_config.py +++ b/apps/pre-processing-service/app/core/logging_config.py @@ -3,9 +3,11 @@ import sys from contextvars import ContextVar +# trace_id context 변수 import try: from app.middleware.ServiceLoggerMiddleware import trace_id_context except ImportError: + # 모듈이 아직 로드되지 않은 경우를 위한 기본값 trace_id_context: ContextVar[str] = ContextVar("trace_id", default="") @@ -16,14 +18,16 @@ def setup_file_logging(): # 기존 loguru 핸들러 제거 (기본 콘솔 출력 제거) logger.remove() - log_dir = "/logs/production" + # 환경변수로 로그 디렉토리 설정 (기본값: logs/develop) + log_dir = "../../docker/local/logs/develop" - os.makedirs(log_dir, exist_ok=True) + # 로그 디렉토리가 없으면 생성 - log_file_path = os.path.join(log_dir, "app.log") - error_log_file_path = os.path.join(log_dir, "error.log") + # 로그 파일 경로 설정 + log_file_path = log_dir + "/pre-processing-app.log" + error_log_file_path = log_dir + "/pre-processing-app-error.log" - # trace_id를 포함한 structured 로그 포맷 + # trace_id를 포함한 간단한 포맷 문자열 사용 def add_trace_id_filter(record): try: current_trace_id = trace_id_context.get() @@ -42,11 +46,10 @@ def exclude_logging_middleware_filter(record): return False return add_trace_id_filter(record) - structured_format = "{time:YYYY-MM-DD HH:mm:ss.SSS} {level: <8} {thread.name: <15} {name}:{function}:{line} [{extra[trace_id]}] {message}" - + # 파일 로깅 핸들러 추가 - trace_id 포함, LoggingMiddleware 제외 logger.add( log_file_path, - format=structured_format, + format="[{extra[trace_id]}] {time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {name}:{function}:{line} | {message}", level="DEBUG", rotation="100 MB", # 100MB마다 로테이션 retention="7 days", # 7일간 보관 @@ -58,10 +61,10 @@ def exclude_logging_middleware_filter(record): filter=exclude_logging_middleware_filter, ) - # 에러 레벨 이상은 별도 파일에도 기록 + # 에러 레벨 이상은 별도 파일에도 기록 - trace_id 포함, LoggingMiddleware 제외 logger.add( error_log_file_path, - format=structured_format, + format="[{extra[trace_id]}] {time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {name}:{function}:{line} | {message}", level="ERROR", rotation="50 MB", retention="30 days", @@ -79,7 +82,7 @@ def exclude_logging_middleware_filter(record): sys.stdout, format="[{extra[trace_id]}] {time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} | {message}", level="DEBUG", - colorize=False, + colorize=False, # colorize 비활성화하여 태그 충돌 방지 filter=add_trace_id_filter, ) diff --git a/apps/pre-processing-service/app/middleware/rds_logger.py b/apps/pre-processing-service/app/middleware/rds_logger.py index 66bad19c..cca28609 100644 --- a/apps/pre-processing-service/app/middleware/rds_logger.py +++ b/apps/pre-processing-service/app/middleware/rds_logger.py @@ -12,6 +12,43 @@ class RDSLogger: def __init__(self): self.db_manager = MariadbManager() + self.max_log_message_length = 450 + + def _truncate_log_message(self, log_message: str) -> str: + """ + log_message를 VARCHAR(500) 크기에 맞게 자르기 + + Args: + log_message: 원본 로그 메시지 + + Returns: + str: 잘린 로그 메시지 + """ + if not log_message: + return log_message or "" + + # UTF-8 인코딩 기준으로 바이트 길이 체크 + encoded_message = log_message.encode("utf-8") + original_length = len(encoded_message) + + if original_length <= self.max_log_message_length: + return log_message + + # 메시지가 너무 길면 자르기 + truncate_suffix = "... [TRUNCATED]" + available_length = self.max_log_message_length - len( + truncate_suffix.encode("utf-8") + ) + + truncated_message = encoded_message[:available_length].decode( + "utf-8", errors="ignore" + ) + truncated_message += truncate_suffix + + logger.warning( + f"로그 메시지 잘림: {original_length} bytes -> {len(truncated_message.encode('utf-8'))} bytes" + ) + return truncated_message async def log_execution( self, @@ -45,12 +82,15 @@ async def log_execution( bool: 저장 성공 여부 """ try: + # TODO: Issue #XXX - log_message VARCHAR(500) 제한으로 인한 임시 truncation + truncated_log_message = self._truncate_log_message(log_message) + # 향후 TEXT 타입으로 변경하거나 별도 로그 저장소 검토 필요 execution_log = ExecutionLog( execution_type=execution_type, source_id=source_id, log_level=log_level, executed_at=datetime.now(), - log_message=log_message, + log_message=truncated_log_message, trace_id=trace_id, run_id=run_id, status=status, diff --git a/apps/user-service/src/main/java/site/icebang/domain/schedule/service/QuartzScheduleService.java b/apps/user-service/src/main/java/site/icebang/domain/schedule/service/QuartzScheduleService.java index 3a5f1aef..172ba807 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/schedule/service/QuartzScheduleService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/schedule/service/QuartzScheduleService.java @@ -15,21 +15,25 @@ public class QuartzScheduleService { private final Scheduler scheduler; public void addOrUpdateSchedule(Schedule schedule) { - JobKey jobKey = JobKey.jobKey("workflow-" + schedule.getWorkflowId()); - JobDetail jobDetail = JobBuilder.newJob(WorkflowTriggerJob.class) - .withIdentity(jobKey) - .withDescription("Workflow " + schedule.getWorkflowId() + " Trigger Job") - .usingJobData("workflowId", schedule.getWorkflowId()) - .storeDurably() - .build(); - - TriggerKey triggerKey = TriggerKey.triggerKey("trigger-for-workflow-" + schedule.getWorkflowId()); - Trigger trigger = TriggerBuilder.newTrigger() - .forJob(jobDetail) - .withIdentity(triggerKey) - .withSchedule(CronScheduleBuilder.cronSchedule(schedule.getCronExpression())) - .build(); try { + // 기존 스케줄 삭제 (있다면) + deleteSchedule(schedule.getWorkflowId()); + + JobKey jobKey = JobKey.jobKey("workflow-" + schedule.getWorkflowId()); + JobDetail jobDetail = JobBuilder.newJob(WorkflowTriggerJob.class) + .withIdentity(jobKey) + .withDescription("Workflow " + schedule.getWorkflowId() + " Trigger Job") + .usingJobData("workflowId", schedule.getWorkflowId()) + .storeDurably() + .build(); + + TriggerKey triggerKey = TriggerKey.triggerKey("trigger-for-workflow-" + schedule.getWorkflowId()); + Trigger trigger = TriggerBuilder.newTrigger() + .forJob(jobDetail) + .withIdentity(triggerKey) + .withSchedule(CronScheduleBuilder.cronSchedule(schedule.getCronExpression())) + .build(); + scheduler.scheduleJob(jobDetail, trigger); log.info("Quartz 스케줄 등록/업데이트 완료: Workflow ID {}", schedule.getWorkflowId()); } catch (SchedulerException e) { @@ -38,6 +42,15 @@ public void addOrUpdateSchedule(Schedule schedule) { } public void deleteSchedule(Long workflowId) { - // ... (삭제 로직) + try { + JobKey jobkey = JobKey.jobKey("workflow-" + workflowId); + TriggerKey triggerKey = TriggerKey.triggerKey("trigger-for-workflow-" + workflowId); + + scheduler.unscheduleJob(triggerKey); + scheduler.deleteJob(jobkey); + log.info("Quartz 스케줄 삭제 완료: Workflow ID {}", workflowId); + } catch (SchedulerException e) { + log.error("Quartz 스케줄 삭제 실패: Workflow ID {}", workflowId, e); + } } } \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/TaskDto.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/TaskDto.java new file mode 100644 index 00000000..54a0ca08 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/TaskDto.java @@ -0,0 +1,17 @@ +package site.icebang.domain.workflow.dto; + +import java.time.LocalDateTime; + +import com.fasterxml.jackson.databind.JsonNode; + +import lombok.Data; + +@Data +public class TaskDto { + private Long id; + private String name; + private String type; + private JsonNode parameters; + private LocalDateTime createdAt; + private LocalDateTime updatedAt; +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobMapper.java index a82739f4..f3bb69fa 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobMapper.java @@ -4,12 +4,12 @@ import org.apache.ibatis.annotations.Mapper; +import site.icebang.domain.workflow.dto.TaskDto; import site.icebang.domain.workflow.model.Job; -import site.icebang.domain.workflow.model.Task; @Mapper public interface JobMapper { List findJobsByWorkflowId(Long workflowId); - List findTasksByJobId(Long jobId); + List findTasksByJobId(Long jobId); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index 3d0ca0c5..4043fbad 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -21,6 +21,7 @@ import site.icebang.domain.execution.model.JobRun; import site.icebang.domain.execution.model.TaskRun; import site.icebang.domain.execution.model.WorkflowRun; +import site.icebang.domain.workflow.dto.TaskDto; import site.icebang.domain.workflow.mapper.JobMapper; import site.icebang.domain.workflow.model.Job; import site.icebang.domain.workflow.model.Task; @@ -76,8 +77,16 @@ public void executeWorkflow(Long workflowId) { log.info("========== 워크플로우 실행 성공: WorkflowRunId={} ==========", workflowRun.getId()); } + /** + * 특정 Job에 속한 Task들을 순차적으로 실행합니다. + * + * @param jobRun 실행중인 Job의 기록 객체 + * @return 모든 Task가 성공하면 true, 하나라도 실패하면 false + */ private boolean executeTasksForJob(JobRun jobRun, Map workflowContext) { - List tasks = jobMapper.findTasksByJobId(jobRun.getJobId()); + // TaskDto를 조회하고 Task로 변환 + List taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId()); + List tasks = taskDtos.stream().map(this::convertToTask).toList(); log.info("Job (JobRunId={}) 내 총 {}개의 Task를 실행합니다.", jobRun.getId(), tasks.size()); for (Task task : tasks) { @@ -126,42 +135,96 @@ private boolean executeTasksForJob(JobRun jobRun, Map workflow /** 워크플로우 컨텍스트와 Task의 input_mapping 설정을 기반으로 API 요청에 사용할 동적인 Request Body를 생성합니다. */ private ObjectNode prepareRequestBody(Task task, Map context) { - ObjectNode requestBody = objectMapper.createObjectNode(); - JsonNode params = task.getParameters(); - if (params == null) return requestBody; + ObjectNode requestBody = objectMapper.createObjectNode(); + JsonNode params = task.getParameters(); + if (params == null) return requestBody; - JsonNode mappingRules = params.get("input_mapping"); - JsonNode staticBody = params.get("body"); + JsonNode mappingRules = params.get("input_mapping"); + JsonNode staticBody = params.get("body"); - // 정적 body가 있으면 우선적으로 복사 - if (staticBody != null && staticBody.isObject()) { - requestBody.setAll((ObjectNode) staticBody); - } + // 정적 body가 있으면 우선적으로 복사 + if (staticBody != null && staticBody.isObject()) { + requestBody.setAll((ObjectNode) staticBody); + } + + // 📌 디버깅용: 현재 컨텍스트 출력 + log.debug("=== 워크플로우 컨텍스트 확인 ==="); + for (Map.Entry entry : context.entrySet()) { + log.debug("Task: {}, Data: {}", entry.getKey(), entry.getValue().toString()); + } - // input_mapping 규칙에 따라 동적으로 값 덮어쓰기/추가 - if (mappingRules != null && mappingRules.isObject()) { - mappingRules - .fields() - .forEachRemaining( - entry -> { - String targetField = entry.getKey(); // 예: "keyword" - String sourcePath = entry.getValue().asText(); // 예: "키워드 검색 태스크.keyword" - - String[] parts = sourcePath.split("\\.", 2); - if (parts.length == 2) { - String sourceTaskName = parts[0]; - String sourceFieldPath = parts[1]; - - JsonNode sourceData = context.get(sourceTaskName); - if (sourceData != null) { - JsonNode valueToSet = sourceData.at("/" + sourceFieldPath.replace('.', '/')); - if (!valueToSet.isMissingNode()) { - requestBody.set(targetField, valueToSet); - } - } - } - }); + // input_mapping 규칙에 따라 동적으로 값 덮어쓰기/추가 + if (mappingRules != null && mappingRules.isObject()) { + mappingRules + .fields() + .forEachRemaining( + entry -> { + String targetField = entry.getKey(); // 예: "product_url" + String sourcePath = entry.getValue().asText(); // 예: "상품 유사도 분석 태스크.data.selected_product.product_url" + + log.debug("=== input_mapping 처리 ==="); + log.debug("targetField: {}, sourcePath: {}", targetField, sourcePath); + + String[] parts = sourcePath.split("\\.", 2); + if (parts.length == 2) { + String sourceTaskName = parts[0]; + String sourceFieldPath = parts[1]; + + log.debug("sourceTaskName: {}, sourceFieldPath: {}", sourceTaskName, sourceFieldPath); + + JsonNode sourceData = context.get(sourceTaskName); + log.debug("sourceData found: {}", sourceData != null); + + if (sourceData != null) { + log.debug("sourceData content: {}", sourceData.toString()); + + String jsonPath = "/" + sourceFieldPath.replace('.', '/'); + log.debug("jsonPath: {}", jsonPath); + + JsonNode valueToSet = sourceData.at(jsonPath); + log.debug("valueToSet found: {}, isMissing: {}", valueToSet, valueToSet.isMissingNode()); + + if (!valueToSet.isMissingNode()) { + log.debug("설정할 값: {}", valueToSet.toString()); + requestBody.set(targetField, valueToSet); + } else { + log.warn("값을 찾을 수 없음: jsonPath={}", jsonPath); + } + } else { + log.warn("소스 태스크 데이터를 찾을 수 없음: {}", sourceTaskName); + } + } + }); + } + + log.debug("최종 requestBody: {}", requestBody.toString()); + return requestBody; + } + + /** TaskDto를 Task 모델로 변환합니다. 비즈니스 로직 실행에 필요한 필드만 복사합니다. */ + private Task convertToTask(TaskDto taskDto) { + Task task = new Task(); + try { + java.lang.reflect.Field idField = Task.class.getDeclaredField("id"); + idField.setAccessible(true); + idField.set(task, taskDto.getId()); + + java.lang.reflect.Field nameField = Task.class.getDeclaredField("name"); + nameField.setAccessible(true); + nameField.set(task, taskDto.getName()); + + java.lang.reflect.Field typeField = Task.class.getDeclaredField("type"); + typeField.setAccessible(true); + typeField.set(task, taskDto.getType()); + + java.lang.reflect.Field parametersField = Task.class.getDeclaredField("parameters"); + parametersField.setAccessible(true); + parametersField.set(task, taskDto.getParameters()); + + } catch (Exception e) { + throw new RuntimeException("TaskDto to Task 변환 중 오류 발생", e); } - return requestBody; + + return task; } } diff --git a/apps/user-service/src/main/java/site/icebang/global/config/WebConfig.java b/apps/user-service/src/main/java/site/icebang/global/config/WebConfig.java index 22fd4be8..43cfd8b1 100644 --- a/apps/user-service/src/main/java/site/icebang/global/config/WebConfig.java +++ b/apps/user-service/src/main/java/site/icebang/global/config/WebConfig.java @@ -17,8 +17,8 @@ public RestTemplate restTemplate(RestTemplateBuilder builder) { SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory(); // 2. 타임아웃 설정 (이 메서드들은 deprecated 아님) - requestFactory.setConnectTimeout(Duration.ofSeconds(5)); - requestFactory.setReadTimeout(Duration.ofSeconds(5)); + requestFactory.setConnectTimeout(Duration.ofSeconds(30000)); + requestFactory.setReadTimeout(Duration.ofSeconds(30000)); // 3. 빌더에 직접 생성한 requestFactory를 설정 return builder.requestFactory(() -> requestFactory).build(); diff --git a/apps/user-service/src/main/resources/mybatis/mapper/JobMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/JobMapper.xml index 54e29ae4..4d5b1e60 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/JobMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/JobMapper.xml @@ -17,6 +17,13 @@ + + + + + + + @@ -28,7 +35,7 @@ ORDER BY wj.execution_order ASC - SELECT t.* FROM task t JOIN job_task jt ON t.id = jt.task_id WHERE jt.job_id = #{jobId} diff --git a/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml index 582af278..7333e065 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml @@ -10,8 +10,7 @@ UPDATE task_run SET status = #{status}, - finished_at = #{finishedAt}, - result_message = #{resultMessage} + finished_at = #{finishedAt} WHERE id = #{id} \ No newline at end of file diff --git a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql index bff80749..8e902745 100644 --- a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql +++ b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql @@ -17,31 +17,62 @@ INSERT INTO `job` (`id`, `name`, `description`) VALUES -- Task 생성 (ID: 1 ~ 7) INSERT INTO `task` (`id`, `name`, `type`, `parameters`) VALUES - -- Job 1의 Task들 +-- Job 1의 Task들 (1, '키워드 검색 태스크', 'HTTP', JSON_OBJECT( 'url', 'http://127.0.0.1:8000/keywords/search', 'method', 'POST', - 'body', JSON_OBJECT('tag', 'naver') -- 초기 입력값은 정적으로 정의 + 'body', JSON_OBJECT('tag', 'naver') )), (2, '상품 검색 태스크', 'HTTP', JSON_OBJECT( 'url', 'http://127.0.0.1:8000/products/search', 'method', 'POST', - 'input_mapping', JSON_OBJECT('keyword', '키워드 검색 태스크.keyword') -- "키워드 검색 태스크"의 결과에서 "keyword" 필드를 가져와 매핑 + 'input_mapping', JSON_OBJECT( + 'keyword', '키워드 검색 태스크.data.keyword' + ) )), (3, '상품 매칭 태스크', 'HTTP', JSON_OBJECT( 'url', 'http://127.0.0.1:8000/products/match', 'method', 'POST', 'input_mapping', JSON_OBJECT( - 'keyword', '키워드 검색 태스크.keyword', - 'search_results', '상품 검색 태스크.search_results' + 'keyword', '키워드 검색 태스크.data.keyword', + 'search_results', '상품 검색 태스크.data.search_results' ) )), - (4, '상품 유사도 분석 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/products/similarity', 'method', 'POST')), - (5, '상품 정보 크롤링 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/products/crawl', 'method', 'POST')), + (4, '상품 유사도 분석 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://127.0.0.1:8000/products/similarity', + 'method', 'POST', + 'input_mapping', JSON_OBJECT( + 'keyword', '키워드 검색 태스크.data.keyword', + 'matched_products', '상품 매칭 태스크.data.matched_products' + ) + )), + (5, '상품 정보 크롤링 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://127.0.0.1:8000/products/crawl', + 'method', 'POST', + 'input_mapping', JSON_OBJECT( + 'product_url', '상품 유사도 분석 태스크.data.selected_product.url' + ) + )), -- Job 2의 Task들 - (6, '블로그 RAG 생성 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/blogs/rag/create', 'method', 'POST')), - (7, '블로그 발행 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/blogs/publish', 'method', 'POST')); + (6, '블로그 RAG 생성 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://127.0.0.1:8000/blogs/rag/create', + 'method', 'POST', + 'input_mapping', JSON_OBJECT( + 'keyword', '키워드 검색 태스크.data.keyword', + 'product_info', '상품 정보 크롤링 태스크.data.product_detail' + ))), + +-- Task 7 설정 확인 필요 + (7, '블로그 발행 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://127.0.0.1:8000/blogs/publish', + 'method', 'POST', + 'body', JSON_OBJECT('tag', 'tistory', 'blog_id', 'test', 'blog_pw', 'test'), + 'input_mapping', JSON_OBJECT( + 'post_title', '블로그 RAG 생성 태스크.data.title', + 'post_content', '블로그 RAG 생성 태스크.data.content', + 'post_tags', '블로그 RAG 생성 태스크.data.tags' + ))); -- 워크플로우-Job 연결 INSERT INTO `workflow_job` (`workflow_id`, `job_id`, `execution_order`) VALUES @@ -53,6 +84,6 @@ INSERT INTO `job_task` (`job_id`, `task_id`, `execution_order`) VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5), (2, 6, 1), (2, 7, 2); --- 스케줄 설정 (매일 오전 8시) +-- 스케줄 설정 (매분 0초마다 실행) INSERT INTO `schedule` (`workflow_id`, `cron_expression`, `is_active`) VALUES - (1, '0 0 8 * * ?', TRUE); \ No newline at end of file + (1, '0 * * * * ?', TRUE); \ No newline at end of file From 6b1ba4a3dbf68876ee44247639f99c49c16177e2 Mon Sep 17 00:00:00 2001 From: thkim7 Date: Fri, 19 Sep 2025 16:36:54 +0900 Subject: [PATCH 3/4] chore: spotlessApply --- .../service/WorkflowExecutionService.java | 125 +++++++++--------- 1 file changed, 66 insertions(+), 59 deletions(-) diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index 4043fbad..0f0e316d 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -135,70 +135,77 @@ private boolean executeTasksForJob(JobRun jobRun, Map workflow /** 워크플로우 컨텍스트와 Task의 input_mapping 설정을 기반으로 API 요청에 사용할 동적인 Request Body를 생성합니다. */ private ObjectNode prepareRequestBody(Task task, Map context) { - ObjectNode requestBody = objectMapper.createObjectNode(); - JsonNode params = task.getParameters(); - if (params == null) return requestBody; + ObjectNode requestBody = objectMapper.createObjectNode(); + JsonNode params = task.getParameters(); + if (params == null) return requestBody; - JsonNode mappingRules = params.get("input_mapping"); - JsonNode staticBody = params.get("body"); + JsonNode mappingRules = params.get("input_mapping"); + JsonNode staticBody = params.get("body"); - // 정적 body가 있으면 우선적으로 복사 - if (staticBody != null && staticBody.isObject()) { - requestBody.setAll((ObjectNode) staticBody); - } + // 정적 body가 있으면 우선적으로 복사 + if (staticBody != null && staticBody.isObject()) { + requestBody.setAll((ObjectNode) staticBody); + } - // 📌 디버깅용: 현재 컨텍스트 출력 - log.debug("=== 워크플로우 컨텍스트 확인 ==="); - for (Map.Entry entry : context.entrySet()) { - log.debug("Task: {}, Data: {}", entry.getKey(), entry.getValue().toString()); - } + // 📌 디버깅용: 현재 컨텍스트 출력 + log.debug("=== 워크플로우 컨텍스트 확인 ==="); + for (Map.Entry entry : context.entrySet()) { + log.debug("Task: {}, Data: {}", entry.getKey(), entry.getValue().toString()); + } - // input_mapping 규칙에 따라 동적으로 값 덮어쓰기/추가 - if (mappingRules != null && mappingRules.isObject()) { - mappingRules - .fields() - .forEachRemaining( - entry -> { - String targetField = entry.getKey(); // 예: "product_url" - String sourcePath = entry.getValue().asText(); // 예: "상품 유사도 분석 태스크.data.selected_product.product_url" - - log.debug("=== input_mapping 처리 ==="); - log.debug("targetField: {}, sourcePath: {}", targetField, sourcePath); - - String[] parts = sourcePath.split("\\.", 2); - if (parts.length == 2) { - String sourceTaskName = parts[0]; - String sourceFieldPath = parts[1]; - - log.debug("sourceTaskName: {}, sourceFieldPath: {}", sourceTaskName, sourceFieldPath); - - JsonNode sourceData = context.get(sourceTaskName); - log.debug("sourceData found: {}", sourceData != null); - - if (sourceData != null) { - log.debug("sourceData content: {}", sourceData.toString()); - - String jsonPath = "/" + sourceFieldPath.replace('.', '/'); - log.debug("jsonPath: {}", jsonPath); - - JsonNode valueToSet = sourceData.at(jsonPath); - log.debug("valueToSet found: {}, isMissing: {}", valueToSet, valueToSet.isMissingNode()); - - if (!valueToSet.isMissingNode()) { - log.debug("설정할 값: {}", valueToSet.toString()); - requestBody.set(targetField, valueToSet); - } else { - log.warn("값을 찾을 수 없음: jsonPath={}", jsonPath); - } - } else { - log.warn("소스 태스크 데이터를 찾을 수 없음: {}", sourceTaskName); - } - } - }); - } + // input_mapping 규칙에 따라 동적으로 값 덮어쓰기/추가 + if (mappingRules != null && mappingRules.isObject()) { + mappingRules + .fields() + .forEachRemaining( + entry -> { + String targetField = entry.getKey(); // 예: "product_url" + String sourcePath = + entry + .getValue() + .asText(); // 예: "상품 유사도 분석 태스크.data.selected_product.product_url" + + log.debug("=== input_mapping 처리 ==="); + log.debug("targetField: {}, sourcePath: {}", targetField, sourcePath); + + String[] parts = sourcePath.split("\\.", 2); + if (parts.length == 2) { + String sourceTaskName = parts[0]; + String sourceFieldPath = parts[1]; + + log.debug( + "sourceTaskName: {}, sourceFieldPath: {}", sourceTaskName, sourceFieldPath); + + JsonNode sourceData = context.get(sourceTaskName); + log.debug("sourceData found: {}", sourceData != null); + + if (sourceData != null) { + log.debug("sourceData content: {}", sourceData.toString()); + + String jsonPath = "/" + sourceFieldPath.replace('.', '/'); + log.debug("jsonPath: {}", jsonPath); + + JsonNode valueToSet = sourceData.at(jsonPath); + log.debug( + "valueToSet found: {}, isMissing: {}", + valueToSet, + valueToSet.isMissingNode()); + + if (!valueToSet.isMissingNode()) { + log.debug("설정할 값: {}", valueToSet.toString()); + requestBody.set(targetField, valueToSet); + } else { + log.warn("값을 찾을 수 없음: jsonPath={}", jsonPath); + } + } else { + log.warn("소스 태스크 데이터를 찾을 수 없음: {}", sourceTaskName); + } + } + }); + } - log.debug("최종 requestBody: {}", requestBody.toString()); - return requestBody; + log.debug("최종 requestBody: {}", requestBody.toString()); + return requestBody; } /** TaskDto를 Task 모델로 변환합니다. 비즈니스 로직 실행에 필요한 필드만 복사합니다. */ From 36755fba453e56b5723dfbdf0b5f560232e822bd Mon Sep 17 00:00:00 2001 From: can019 Date: Fri, 19 Sep 2025 16:40:15 +0900 Subject: [PATCH 4/4] =?UTF-8?q?fix:=20Actuator=20=EB=AA=A8=EB=93=A0=20targ?= =?UTF-8?q?et=EC=97=90=20=EB=8C=80=ED=95=B4=20open?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/user-service/src/main/resources/application.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/user-service/src/main/resources/application.yml b/apps/user-service/src/main/resources/application.yml index c689ab1a..6578d868 100644 --- a/apps/user-service/src/main/resources/application.yml +++ b/apps/user-service/src/main/resources/application.yml @@ -27,7 +27,7 @@ management: export: enabled: true server: - address: 127.0.0.1 # localhost에서만 접근 + address: 0.0.0.0 port: 8081 security: enabled: false