diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java index 9f497b97..861edd5a 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java @@ -6,6 +6,7 @@ import org.springframework.web.client.RestTemplate; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -14,35 +15,52 @@ import site.icebang.domain.workflow.model.Task; @Slf4j -@Component("httpTaskRunner") +@Component("httpTaskRunner") // "httpTaskRunner"라는 이름의 Bean으로 등록 @RequiredArgsConstructor public class HttpTaskRunner implements TaskRunner { + private final RestTemplate restTemplate; + // private final TaskIoDataRepository taskIoDataRepository; // TODO: 입출력 저장을 위해 주입 + @Override - public TaskExecutionResult execute(Task task, TaskRun taskRun) { + public TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode requestBody) { JsonNode params = task.getParameters(); - String url = params.get("url").asText(); - String method = params.get("method").asText(); - JsonNode body = params.get("body"); + if (params == null) { + return TaskExecutionResult.failure("Task에 파라미터가 정의되지 않았습니다."); + } + + String url = params.path("url").asText(); + String method = params.path("method").asText("POST"); // 기본값 POST + + if (url.isEmpty()) { + return TaskExecutionResult.failure("Task 파라미터에 'url'이 없습니다."); + } try { - HttpEntity requestEntity = - new HttpEntity<>( - body.toString(), - new HttpHeaders() { - { - setContentType(MediaType.APPLICATION_JSON); - } - }); - - ResponseEntity response = + // 1. HTTP 헤더 설정 + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + + // 2. HTTP 요청 엔티티 생성 (헤더 + 동적 Body) + HttpEntity requestEntity = new HttpEntity<>(requestBody.toString(), headers); + + log.debug("HTTP Task 요청: URL={}, Method={}, Body={}", url, method, requestBody.toString()); + + // 3. RestTemplate으로 API 호출 + ResponseEntity responseEntity = restTemplate.exchange( url, HttpMethod.valueOf(method.toUpperCase()), requestEntity, String.class); - return TaskExecutionResult.success(response.getBody()); + String responseBody = responseEntity.getBody(); + log.debug("HTTP Task 응답: Status={}, Body={}", responseEntity.getStatusCode(), responseBody); + + // TODO: taskIoDataRepository를 사용하여 requestBody와 responseBody를 DB에 저장 + + return TaskExecutionResult.success(responseBody); + } catch (RestClientException e) { - log.error("HTTP Task 실행 실패: TaskRunId={}, Error={}", taskRun.getId(), e.getMessage()); + log.error("HTTP Task 실행 중 에러 발생: TaskRunId={}, Error={}", taskRun.getId(), e.getMessage()); return TaskExecutionResult.failure(e.getMessage()); } } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/TaskRunner.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/TaskRunner.java index a2b820bb..9c6ab224 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/TaskRunner.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/TaskRunner.java @@ -1,9 +1,14 @@ package site.icebang.domain.workflow.runner; +import com.fasterxml.jackson.databind.node.ObjectNode; + import site.icebang.domain.execution.model.TaskRun; import site.icebang.domain.workflow.model.Task; +/** 워크플로우의 개별 Task를 실행하는 모든 Runner가 구현해야 할 인터페이스 */ public interface TaskRunner { + + /** Task 실행 결과를 담는 Record. status: SUCCESS 또는 FAILED message: 실행 결과 또는 에러 메시지 */ record TaskExecutionResult(String status, String message) { public static TaskExecutionResult success(String message) { return new TaskExecutionResult("SUCCESS", message); @@ -14,9 +19,17 @@ public static TaskExecutionResult failure(String message) { } public boolean isFailure() { - return "FAILED".equals(status); + return "FAILED".equals(this.status); } } - TaskExecutionResult execute(Task task, TaskRun taskRun); + /** + * 특정 Task를 실행합니다. + * + * @param task 실행할 Task의 정적 정의 + * @param taskRun 현재 실행에 대한 기록 객체 + * @param requestBody 동적으로 생성된 요청 데이터 + * @return Task 실행 결과 + */ + TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode requestBody); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index 086b00de..3d0ca0c5 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -1,11 +1,17 @@ package site.icebang.domain.workflow.service; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -30,18 +36,17 @@ public class WorkflowExecutionService { private final JobRunMapper jobRunMapper; private final TaskRunMapper taskRunMapper; private final Map taskRunners; + private final ObjectMapper objectMapper; // 📌 JSON 처리를 위해 ObjectMapper 주입 - /** - * 워크플로우 실행의 시작점. 전체 과정은 하나의 트랜잭션으로 묶입니다. - * - * @param workflowId 실행할 워크플로우의 ID - */ @Transactional public void executeWorkflow(Long workflowId) { log.info("========== 워크플로우 실행 시작: WorkflowId={} ==========", workflowId); WorkflowRun workflowRun = WorkflowRun.start(workflowId); workflowRunMapper.insert(workflowRun); + // 📌 1. 워크플로우 전체 실행 동안 데이터를 공유할 컨텍스트 생성 + Map workflowContext = new HashMap<>(); + List jobs = jobMapper.findJobsByWorkflowId(workflowId); log.info("총 {}개의 Job을 순차적으로 실행합니다.", jobs.size()); @@ -51,7 +56,8 @@ public void executeWorkflow(Long workflowId) { log.info( "---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); - boolean jobSucceeded = executeTasksForJob(jobRun); + // 📌 2. Job 내의 Task들을 실행하고, 컨텍스트를 전달하여 데이터 파이프라이닝 수행 + boolean jobSucceeded = executeTasksForJob(jobRun, workflowContext); jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED"); jobRunMapper.update(jobRun); @@ -60,7 +66,7 @@ public void executeWorkflow(Long workflowId) { workflowRun.finish("FAILED"); workflowRunMapper.update(workflowRun); log.error("Job 실패로 인해 워크플로우 실행을 중단합니다: WorkflowRunId={}", workflowRun.getId()); - return; // Job이 실패하면 전체 워크플로우를 중단 + return; } log.info("---------- Job 실행 성공: JobRunId={} ----------", jobRun.getId()); } @@ -70,13 +76,7 @@ public void executeWorkflow(Long workflowId) { log.info("========== 워크플로우 실행 성공: WorkflowRunId={} ==========", workflowRun.getId()); } - /** - * 특정 Job에 속한 Task들을 순차적으로 실행합니다. - * - * @param jobRun 실행중인 Job의 기록 객체 - * @return 모든 Task가 성공하면 true, 하나라도 실패하면 false - */ - private boolean executeTasksForJob(JobRun jobRun) { + private boolean executeTasksForJob(JobRun jobRun, Map workflowContext) { List tasks = jobMapper.findTasksByJobId(jobRun.getJobId()); log.info("Job (JobRunId={}) 내 총 {}개의 Task를 실행합니다.", jobRun.getId(), tasks.size()); @@ -92,20 +92,76 @@ private boolean executeTasksForJob(JobRun jobRun) { taskRun.finish("FAILED", "지원하지 않는 Task 타입: " + task.getType()); taskRunMapper.update(taskRun); log.error("Task 실행 실패 (미지원 타입): TaskRunId={}, Type={}", taskRun.getId(), task.getType()); - return false; // 실행할 Runner가 없으므로 실패 + return false; } - TaskRunner.TaskExecutionResult result = runner.execute(task, taskRun); + // 📌 3. Task 실행 전, 컨텍스트를 이용해 동적으로 Request Body를 생성 + ObjectNode requestBody = prepareRequestBody(task, workflowContext); + + // 📌 4. 동적으로 생성된 Request Body를 전달하여 Task 실행 + TaskRunner.TaskExecutionResult result = runner.execute(task, taskRun, requestBody); taskRun.finish(result.status(), result.message()); taskRunMapper.update(taskRun); if (result.isFailure()) { log.error("Task 실행 실패: TaskRunId={}, Message={}", taskRun.getId(), result.message()); - return false; // Task가 실패하면 즉시 중단하고 실패 반환 + return false; + } + + // 📌 5. 성공한 Task의 결과를 다음 Task가 사용할 수 있도록 컨텍스트에 저장 + try { + JsonNode resultJson = objectMapper.readTree(result.message()); + workflowContext.put(task.getName(), resultJson); + // TODO: task_io_data 테이블에 requestBody(INPUT)와 resultJson(OUTPUT) 저장 + } catch (JsonProcessingException e) { + log.error("Task 결과 JSON 파싱 실패: TaskRunId={}", taskRun.getId(), e); + taskRun.finish("FAILED", "결과 JSON 파싱 실패"); + taskRunMapper.update(taskRun); + return false; } log.info("Task 실행 성공: TaskRunId={}", taskRun.getId()); } + return true; + } + + /** 워크플로우 컨텍스트와 Task의 input_mapping 설정을 기반으로 API 요청에 사용할 동적인 Request Body를 생성합니다. */ + private ObjectNode prepareRequestBody(Task task, Map context) { + ObjectNode requestBody = objectMapper.createObjectNode(); + JsonNode params = task.getParameters(); + if (params == null) return requestBody; + + JsonNode mappingRules = params.get("input_mapping"); + JsonNode staticBody = params.get("body"); - return true; // 모든 Task가 성공적으로 완료됨 + // 정적 body가 있으면 우선적으로 복사 + if (staticBody != null && staticBody.isObject()) { + requestBody.setAll((ObjectNode) staticBody); + } + + // input_mapping 규칙에 따라 동적으로 값 덮어쓰기/추가 + if (mappingRules != null && mappingRules.isObject()) { + mappingRules + .fields() + .forEachRemaining( + entry -> { + String targetField = entry.getKey(); // 예: "keyword" + String sourcePath = entry.getValue().asText(); // 예: "키워드 검색 태스크.keyword" + + String[] parts = sourcePath.split("\\.", 2); + if (parts.length == 2) { + String sourceTaskName = parts[0]; + String sourceFieldPath = parts[1]; + + JsonNode sourceData = context.get(sourceTaskName); + if (sourceData != null) { + JsonNode valueToSet = sourceData.at("/" + sourceFieldPath.replace('.', '/')); + if (!valueToSet.isMissingNode()) { + requestBody.set(targetField, valueToSet); + } + } + } + }); + } + return requestBody; } } diff --git a/apps/user-service/src/main/resources/application.yml b/apps/user-service/src/main/resources/application.yml index 706eceea..7fd5893d 100644 --- a/apps/user-service/src/main/resources/application.yml +++ b/apps/user-service/src/main/resources/application.yml @@ -16,5 +16,5 @@ mybatis: # 외부 API 연동을 위한 설정 섹션 api: fastapi: - url: http://pre-processing-service:8000 # FastAPI 서버의 기본 URL + url: http://127.0.0.1:8000 # FastAPI 서버의 기본 URL timeout: 10000 # API 요청 타임아웃 (밀리초 단위) \ No newline at end of file diff --git a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql index dd2ddb15..bff80749 100644 --- a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql +++ b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql @@ -1,120 +1,58 @@ --- 워크플로우 관련 데이터 삽입 - --- 카테고리 삽입 -INSERT INTO `category` (`name`, `description`) VALUES - ('마케팅', '마케팅 관련 자동화 워크플로우'), - ('콘텐츠', '콘텐츠 생성 및 관리'), - ('데이터 수집', '웹 크롤링 및 데이터 수집 관련'); - --- 워크플로우 생성 -INSERT INTO `workflow` (`name`, `description`, `is_enabled`, `created_by`) VALUES - ('트렌드_블로그_자동화', '트렌드 검색부터 블로그 글 작성까지 전체 자동화 프로세스', TRUE, 1); - --- Job 생성 -INSERT INTO `job` (`name`, `description`, `is_enabled`, `created_by`) VALUES - ('트렌드_검색_작업', '최신 트렌드 키워드 검색 및 분석', TRUE, 1), - ('싸다구_크롤링_작업', '싸다구 사이트에서 관련 상품 정보 크롤링', TRUE, 1), - ('블로그_글_작성_작업', '수집된 데이터를 바탕으로 블로그 글 자동 생성', TRUE, 1); - --- Task 생성 -INSERT INTO `task` (`name`, `type`, `parameters`) VALUES --- 트렌드 검색 관련 태스크 -('구글_트렌드_검색', 'API_CALL', JSON_OBJECT( - 'api_endpoint', 'https://trends.googleapis.com/trends/api', - 'search_region', 'KR', - 'timeframe', 'now 7-d', - 'category', '0' - )), -('네이버_트렌드_검색', 'API_CALL', JSON_OBJECT( - 'api_endpoint', 'https://datalab.naver.com/keyword/trendSearch.naver', - 'period', 'week', - 'device', 'pc' - )), -('키워드_분석_및_필터링', 'DATA_PROCESSING', JSON_OBJECT( - 'min_score', 50, - 'max_keywords', 10, - 'filter_rules', JSON_ARRAY('adult_content', 'spam_keywords') - )), - --- 싸다구 크롤링 관련 태스크 -('싸다구_상품_검색', 'WEB_SCRAPING', JSON_OBJECT( - 'base_url', 'https://www.ssg.com', - 'search_path', '/search.ssg', - 'max_pages', 3, - 'delay_ms', 2000 - )), -('상품_정보_추출', 'DATA_EXTRACTION', JSON_OBJECT( - 'extract_fields', JSON_ARRAY('title', 'price', 'rating', 'review_count', 'image_url'), - 'data_validation', true - )), -('가격_비교_분석', 'DATA_ANALYSIS', JSON_OBJECT( - 'comparison_sites', JSON_ARRAY('쿠팡', '11번가', '옥션'), - 'price_threshold', 0.1 - )), - --- 블로그 글 작성 관련 태스크 -('블로그_템플릿_선택', 'TEMPLATE_PROCESSING', JSON_OBJECT( - 'template_type', 'product_review', - 'style', 'conversational', - 'target_length', 1500 - )), -('AI_콘텐츠_생성', 'AI_GENERATION', JSON_OBJECT( - 'model', 'gpt-4', - 'temperature', 0.7, - 'max_tokens', 2000, - 'prompt_template', '트렌드 키워드와 상품 정보를 바탕으로 자연스러운 블로그 글을 작성해주세요.' - )), -('콘텐츠_검수_및_최적화', 'CONTENT_REVIEW', JSON_OBJECT( - 'seo_optimization', true, - 'readability_check', true, - 'plagiarism_check', true - )), -('블로그_플랫폼_발행', 'PUBLISHING', JSON_OBJECT( - 'platforms', JSON_ARRAY('네이버 블로그', '티스토리', '브런치'), - 'schedule_publish', false, - 'auto_tags', true - )); +-- 기존 워크플로우 관련 데이터 삭제 +DELETE FROM `job_task`; +DELETE FROM `workflow_job`; +DELETE FROM `task`; +DELETE FROM `job`; +DELETE FROM `workflow`; +DELETE FROM `schedule`; + +-- 워크플로우 생성 (ID: 1) +INSERT INTO `workflow` (`id`, `name`, `description`) VALUES + (1, '상품 분석 및 블로그 자동 발행', '키워드 검색부터 상품 분석 후 블로그 발행까지의 자동화 프로세스'); + +-- Job 생성 (ID: 1, 2) +INSERT INTO `job` (`id`, `name`, `description`) VALUES + (1, '상품 분석', '키워드 검색, 상품 크롤링 및 유사도 분석 작업'), + (2, '블로그 콘텐츠 생성', '분석 데이터를 기반으로 RAG 콘텐츠 생성 및 발행 작업'); + +-- Task 생성 (ID: 1 ~ 7) +INSERT INTO `task` (`id`, `name`, `type`, `parameters`) VALUES + -- Job 1의 Task들 + (1, '키워드 검색 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://127.0.0.1:8000/keywords/search', + 'method', 'POST', + 'body', JSON_OBJECT('tag', 'naver') -- 초기 입력값은 정적으로 정의 + )), + (2, '상품 검색 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://127.0.0.1:8000/products/search', + 'method', 'POST', + 'input_mapping', JSON_OBJECT('keyword', '키워드 검색 태스크.keyword') -- "키워드 검색 태스크"의 결과에서 "keyword" 필드를 가져와 매핑 + )), + (3, '상품 매칭 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://127.0.0.1:8000/products/match', + 'method', 'POST', + 'input_mapping', JSON_OBJECT( + 'keyword', '키워드 검색 태스크.keyword', + 'search_results', '상품 검색 태스크.search_results' + ) + )), + (4, '상품 유사도 분석 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/products/similarity', 'method', 'POST')), + (5, '상품 정보 크롤링 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/products/crawl', 'method', 'POST')), + + -- Job 2의 Task들 + (6, '블로그 RAG 생성 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/blogs/rag/create', 'method', 'POST')), + (7, '블로그 발행 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/blogs/publish', 'method', 'POST')); -- 워크플로우-Job 연결 -INSERT INTO `workflow_job` (`workflow_id`, `job_id`) VALUES - (1, 1), -- 트렌드_블로그_자동화 + 트렌드_검색_작업 - (1, 2), -- 트렌드_블로그_자동화 + 싸다구_크롤링_작업 - (1, 3); -- 트렌드_블로그_자동화 + 블로그_글_작성_작업 +INSERT INTO `workflow_job` (`workflow_id`, `job_id`, `execution_order`) VALUES + (1, 1, 1), + (1, 2, 2); --- Job-Task 연결 (실행 순서 포함) --- 트렌드 검색 작업의 태스크들 +-- Job-Task 연결 INSERT INTO `job_task` (`job_id`, `task_id`, `execution_order`) VALUES - (1, 1, 1), -- 구글_트렌드_검색 - (1, 2, 2), -- 네이버_트렌드_검색 - (1, 3, 3); -- 키워드_분석_및_필터링 - --- 싸다구 크롤링 작업의 태스크들 -INSERT INTO `job_task` (`job_id`, `task_id`, `execution_order`) VALUES - (2, 4, 1), -- 싸다구_상품_검색 - (2, 5, 2), -- 상품_정보_추출 - (2, 6, 3); -- 가격_비교_분석 - --- 블로그 글 작성 작업의 태스크들 -INSERT INTO `job_task` (`job_id`, `task_id`, `execution_order`) VALUES - (3, 7, 1), -- 블로그_템플릿_선택 - (3, 8, 2), -- AI_콘텐츠_생성 - (3, 9, 3), -- 콘텐츠_검수_및_최적화 - (3, 10, 4); -- 블로그_플랫폼_발행 - --- 스케줄 설정 (매일 오전 8시 실행) -INSERT INTO `schedule` (`workflow_id`, `cron_expression`, `parameters`, `is_active`, `created_by`) VALUES - (1, '0 0 8 * * *', JSON_OBJECT( - 'timezone', 'Asia/Seoul', - 'retry_count', 3, - 'timeout_minutes', 60, - 'notification_email', 'admin@icebang.site' - ), TRUE, 1); + (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5), + (2, 6, 1), (2, 7, 2); --- 사용자별 설정 (관리자용) -INSERT INTO `user_config` (`user_id`, `type`, `name`, `json`, `is_active`) VALUES - (1, 'workflow_preference', '트렌드_블로그_설정', JSON_OBJECT( - 'preferred_keywords', JSON_ARRAY('테크', 'IT', '트렌드', '리뷰'), - 'blog_style', 'casual', - 'auto_publish', false, - 'notification_enabled', true - ), TRUE); \ No newline at end of file +-- 스케줄 설정 (매일 오전 8시) +INSERT INTO `schedule` (`workflow_id`, `cron_expression`, `is_active`) VALUES + (1, '0 0 8 * * ?', TRUE); \ No newline at end of file