diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/FastApiTaskRunner.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/FastApiTaskRunner.java deleted file mode 100644 index 5a36afa3..00000000 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/FastApiTaskRunner.java +++ /dev/null @@ -1,34 +0,0 @@ -package site.icebang.domain.workflow.runner; - -import org.springframework.http.HttpMethod; -import org.springframework.stereotype.Component; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - -import lombok.RequiredArgsConstructor; - -import site.icebang.domain.execution.model.TaskRun; -import site.icebang.domain.workflow.model.Task; -import site.icebang.external.fastapi.adapter.FastApiAdapter; - -@Component("fastapiTaskRunner") -@RequiredArgsConstructor -public class FastApiTaskRunner implements TaskRunner { - - private final FastApiAdapter fastApiAdapter; - - @Override - public TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode requestBody) { - JsonNode params = task.getParameters(); - String endpoint = params.path("endpoint").asText(); - HttpMethod method = HttpMethod.valueOf(params.path("method").asText("POST").toUpperCase()); - - String responseBody = fastApiAdapter.call(endpoint, method, requestBody.toString()); - - if (responseBody == null) { - return TaskExecutionResult.failure("FastApiAdapter 호출에 실패했습니다."); - } - return TaskExecutionResult.success(responseBody); - } -} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java new file mode 100644 index 00000000..861edd5a --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java @@ -0,0 +1,67 @@ +package site.icebang.domain.workflow.runner; + +import org.springframework.http.*; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestClientException; +import org.springframework.web.client.RestTemplate; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import site.icebang.domain.execution.model.TaskRun; +import site.icebang.domain.workflow.model.Task; + +@Slf4j +@Component("httpTaskRunner") // "httpTaskRunner"라는 이름의 Bean으로 등록 +@RequiredArgsConstructor +public class HttpTaskRunner implements TaskRunner { + + private final RestTemplate restTemplate; + + // private final TaskIoDataRepository taskIoDataRepository; // TODO: 입출력 저장을 위해 주입 + + @Override + public TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode requestBody) { + JsonNode params = task.getParameters(); + if (params == null) { + return TaskExecutionResult.failure("Task에 파라미터가 정의되지 않았습니다."); + } + + String url = params.path("url").asText(); + String method = params.path("method").asText("POST"); // 기본값 POST + + if (url.isEmpty()) { + return TaskExecutionResult.failure("Task 파라미터에 'url'이 없습니다."); + } + + try { + // 1. HTTP 헤더 설정 + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + + // 2. HTTP 요청 엔티티 생성 (헤더 + 동적 Body) + HttpEntity requestEntity = new HttpEntity<>(requestBody.toString(), headers); + + log.debug("HTTP Task 요청: URL={}, Method={}, Body={}", url, method, requestBody.toString()); + + // 3. RestTemplate으로 API 호출 + ResponseEntity responseEntity = + restTemplate.exchange( + url, HttpMethod.valueOf(method.toUpperCase()), requestEntity, String.class); + + String responseBody = responseEntity.getBody(); + log.debug("HTTP Task 응답: Status={}, Body={}", responseEntity.getStatusCode(), responseBody); + + // TODO: taskIoDataRepository를 사용하여 requestBody와 responseBody를 DB에 저장 + + return TaskExecutionResult.success(responseBody); + + } catch (RestClientException e) { + log.error("HTTP Task 실행 중 에러 발생: TaskRunId={}, Error={}", taskRun.getId(), e.getMessage()); + return TaskExecutionResult.failure(e.getMessage()); + } + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductSearchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductSearchBodyBuilder.java deleted file mode 100644 index 2dd3fcb6..00000000 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductSearchBodyBuilder.java +++ /dev/null @@ -1,34 +0,0 @@ -package site.icebang.domain.workflow.runner.body; - -import java.util.Map; - -import org.springframework.stereotype.Component; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - -import lombok.RequiredArgsConstructor; - -import site.icebang.domain.workflow.model.Task; - -@Component -@RequiredArgsConstructor -public class ProductSearchBodyBuilder implements TaskBodyBuilder { - - private final ObjectMapper objectMapper; - private static final String TASK_NAME = "상품 검색 태스크"; - private static final String SOURCE_TASK_NAME = "키워드 검색 태스크"; - - @Override - public boolean supports(String taskName) { - return TASK_NAME.equals(taskName); - } - - @Override - public ObjectNode build(Task task, Map workflowContext) { - JsonNode sourceResult = workflowContext.get(SOURCE_TASK_NAME); - String keyword = sourceResult != null ? sourceResult.path("keyword").asText("") : ""; - return objectMapper.createObjectNode().put("keyword", keyword); - } -} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java deleted file mode 100644 index da6f1597..00000000 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java +++ /dev/null @@ -1,14 +0,0 @@ -package site.icebang.domain.workflow.runner.body; - -import java.util.Map; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - -import site.icebang.domain.workflow.model.Task; - -public interface TaskBodyBuilder { - boolean supports(String taskName); - - ObjectNode build(Task task, Map workflowContext); -} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index d142b630..0f0e316d 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -3,7 +3,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -27,7 +26,6 @@ import site.icebang.domain.workflow.model.Job; import site.icebang.domain.workflow.model.Task; import site.icebang.domain.workflow.runner.TaskRunner; -import site.icebang.domain.workflow.runner.body.TaskBodyBuilder; @Slf4j @Service @@ -39,8 +37,7 @@ public class WorkflowExecutionService { private final JobRunMapper jobRunMapper; private final TaskRunMapper taskRunMapper; private final Map taskRunners; - private final ObjectMapper objectMapper; - private final List bodyBuilders; + private final ObjectMapper objectMapper; // 📌 JSON 처리를 위해 ObjectMapper 주입 @Transactional public void executeWorkflow(Long workflowId) { @@ -48,7 +45,9 @@ public void executeWorkflow(Long workflowId) { WorkflowRun workflowRun = WorkflowRun.start(workflowId); workflowRunMapper.insert(workflowRun); + // 📌 1. 워크플로우 전체 실행 동안 데이터를 공유할 컨텍스트 생성 Map workflowContext = new HashMap<>(); + List jobs = jobMapper.findJobsByWorkflowId(workflowId); log.info("총 {}개의 Job을 순차적으로 실행합니다.", jobs.size()); @@ -58,6 +57,7 @@ public void executeWorkflow(Long workflowId) { log.info( "---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); + // 📌 2. Job 내의 Task들을 실행하고, 컨텍스트를 전달하여 데이터 파이프라이닝 수행 boolean jobSucceeded = executeTasksForJob(jobRun, workflowContext); jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED"); @@ -77,13 +77,16 @@ public void executeWorkflow(Long workflowId) { log.info("========== 워크플로우 실행 성공: WorkflowRunId={} ==========", workflowRun.getId()); } + /** + * 특정 Job에 속한 Task들을 순차적으로 실행합니다. + * + * @param jobRun 실행중인 Job의 기록 객체 + * @return 모든 Task가 성공하면 true, 하나라도 실패하면 false + */ private boolean executeTasksForJob(JobRun jobRun, Map workflowContext) { - // 📌 Mapper로부터 TaskDto 리스트를 조회합니다. + // TaskDto를 조회하고 Task로 변환 List taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId()); - - // 📌 convertToTask 메소드를 사용하여 Task 모델 리스트로 변환합니다. - List tasks = taskDtos.stream().map(this::convertToTask).collect(Collectors.toList()); - + List tasks = taskDtos.stream().map(this::convertToTask).toList(); log.info("Job (JobRunId={}) 내 총 {}개의 Task를 실행합니다.", jobRun.getId(), tasks.size()); for (Task task : tasks) { @@ -101,13 +104,10 @@ private boolean executeTasksForJob(JobRun jobRun, Map workflow return false; } - ObjectNode requestBody = - bodyBuilders.stream() - .filter(builder -> builder.supports(task.getName())) - .findFirst() - .map(builder -> builder.build(task, workflowContext)) - .orElse(objectMapper.createObjectNode()); + // 📌 3. Task 실행 전, 컨텍스트를 이용해 동적으로 Request Body를 생성 + ObjectNode requestBody = prepareRequestBody(task, workflowContext); + // 📌 4. 동적으로 생성된 Request Body를 전달하여 Task 실행 TaskRunner.TaskExecutionResult result = runner.execute(task, taskRun, requestBody); taskRun.finish(result.status(), result.message()); taskRunMapper.update(taskRun); @@ -117,6 +117,7 @@ private boolean executeTasksForJob(JobRun jobRun, Map workflow return false; } + // 📌 5. 성공한 Task의 결과를 다음 Task가 사용할 수 있도록 컨텍스트에 저장 try { JsonNode resultJson = objectMapper.readTree(result.message()); workflowContext.put(task.getName(), resultJson); @@ -132,7 +133,82 @@ private boolean executeTasksForJob(JobRun jobRun, Map workflow return true; } - /** TaskDto를 Task 모델로 변환합니다. 📌 주의: Reflection을 사용한 방식은 성능이 느리고 불안정하므로 권장되지 않습니다. */ + /** 워크플로우 컨텍스트와 Task의 input_mapping 설정을 기반으로 API 요청에 사용할 동적인 Request Body를 생성합니다. */ + private ObjectNode prepareRequestBody(Task task, Map context) { + ObjectNode requestBody = objectMapper.createObjectNode(); + JsonNode params = task.getParameters(); + if (params == null) return requestBody; + + JsonNode mappingRules = params.get("input_mapping"); + JsonNode staticBody = params.get("body"); + + // 정적 body가 있으면 우선적으로 복사 + if (staticBody != null && staticBody.isObject()) { + requestBody.setAll((ObjectNode) staticBody); + } + + // 📌 디버깅용: 현재 컨텍스트 출력 + log.debug("=== 워크플로우 컨텍스트 확인 ==="); + for (Map.Entry entry : context.entrySet()) { + log.debug("Task: {}, Data: {}", entry.getKey(), entry.getValue().toString()); + } + + // input_mapping 규칙에 따라 동적으로 값 덮어쓰기/추가 + if (mappingRules != null && mappingRules.isObject()) { + mappingRules + .fields() + .forEachRemaining( + entry -> { + String targetField = entry.getKey(); // 예: "product_url" + String sourcePath = + entry + .getValue() + .asText(); // 예: "상품 유사도 분석 태스크.data.selected_product.product_url" + + log.debug("=== input_mapping 처리 ==="); + log.debug("targetField: {}, sourcePath: {}", targetField, sourcePath); + + String[] parts = sourcePath.split("\\.", 2); + if (parts.length == 2) { + String sourceTaskName = parts[0]; + String sourceFieldPath = parts[1]; + + log.debug( + "sourceTaskName: {}, sourceFieldPath: {}", sourceTaskName, sourceFieldPath); + + JsonNode sourceData = context.get(sourceTaskName); + log.debug("sourceData found: {}", sourceData != null); + + if (sourceData != null) { + log.debug("sourceData content: {}", sourceData.toString()); + + String jsonPath = "/" + sourceFieldPath.replace('.', '/'); + log.debug("jsonPath: {}", jsonPath); + + JsonNode valueToSet = sourceData.at(jsonPath); + log.debug( + "valueToSet found: {}, isMissing: {}", + valueToSet, + valueToSet.isMissingNode()); + + if (!valueToSet.isMissingNode()) { + log.debug("설정할 값: {}", valueToSet.toString()); + requestBody.set(targetField, valueToSet); + } else { + log.warn("값을 찾을 수 없음: jsonPath={}", jsonPath); + } + } else { + log.warn("소스 태스크 데이터를 찾을 수 없음: {}", sourceTaskName); + } + } + }); + } + + log.debug("최종 requestBody: {}", requestBody.toString()); + return requestBody; + } + + /** TaskDto를 Task 모델로 변환합니다. 비즈니스 로직 실행에 필요한 필드만 복사합니다. */ private Task convertToTask(TaskDto taskDto) { Task task = new Task(); try { @@ -155,6 +231,7 @@ private Task convertToTask(TaskDto taskDto) { } catch (Exception e) { throw new RuntimeException("TaskDto to Task 변환 중 오류 발생", e); } + return task; } } diff --git a/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java b/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java deleted file mode 100644 index 2a5bd001..00000000 --- a/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java +++ /dev/null @@ -1,40 +0,0 @@ -package site.icebang.external.fastapi.adapter; - -import org.springframework.http.*; -import org.springframework.stereotype.Component; -import org.springframework.web.client.RestClientException; -import org.springframework.web.client.RestTemplate; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import site.icebang.global.config.properties.FastApiProperties; - -@Slf4j -@Component -@RequiredArgsConstructor -public class FastApiAdapter { - - private final RestTemplate restTemplate; - private final FastApiProperties properties; - - // 📌 Task나 context에 대한 의존성이 완전히 사라짐 - public String call(String endpoint, HttpMethod method, String requestBody) { - String fullUrl = properties.getUrl() + endpoint; - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON); - HttpEntity requestEntity = new HttpEntity<>(requestBody, headers); - - try { - log.debug("FastAPI 요청: URL={}, Method={}, Body={}", fullUrl, method, requestBody); - ResponseEntity responseEntity = - restTemplate.exchange(fullUrl, method, requestEntity, String.class); - String responseBody = responseEntity.getBody(); - log.debug("FastAPI 응답: Status={}, Body={}", responseEntity.getStatusCode(), responseBody); - return responseBody; - } catch (RestClientException e) { - log.error("FastAPI 호출 실패: URL={}, Error={}", fullUrl, e.getMessage()); - return null; - } - } -} diff --git a/apps/user-service/src/main/resources/sql/01-schema-h2.sql b/apps/user-service/src/main/resources/sql/01-schema-h2.sql new file mode 100644 index 00000000..018ebb1d --- /dev/null +++ b/apps/user-service/src/main/resources/sql/01-schema-h2.sql @@ -0,0 +1,328 @@ +-- H2 데이터베이스 호환 스키마 (테스트용) +-- MySQL의 unsigned, AFTER 절 등을 H2 호환으로 변경 + +CREATE TABLE `permission` ( + `id` int NOT NULL AUTO_INCREMENT, + `resource` varchar(100) NULL, + `description` varchar(255) NULL, + `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, + `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `is_active` boolean DEFAULT TRUE, + `updated_by` bigint NULL, + `created_by` bigint NULL, + PRIMARY KEY (`id`) +); + +CREATE TABLE `organization` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `name` varchar(150) NULL, + `domain_name` varchar(100) NULL, + `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, + `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) +); + +CREATE TABLE `role` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `organization_id` bigint NULL, + `name` varchar(100) NULL, + `description` varchar(500) NULL, + PRIMARY KEY (`id`) +); + +CREATE TABLE `user` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `name` varchar(50) NULL, + `email` varchar(100) NULL, + `password` varchar(255) NULL, + `status` varchar(20) NULL, + `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, + `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) +); + +CREATE TABLE `department` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `organization_id` bigint NOT NULL, + `name` varchar(100) NULL, + PRIMARY KEY (`id`) +); + +CREATE TABLE `position` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `organization_id` bigint NOT NULL, + `title` varchar(100) NULL, + PRIMARY KEY (`id`) +); + +CREATE TABLE `user_organization` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `user_id` bigint NOT NULL, + `organization_id` bigint NOT NULL, + `position_id` bigint NOT NULL, + `department_id` bigint NOT NULL, + `employee_number` varchar(50) NULL, + `status` varchar(20) NULL, + `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, + `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) +); + +CREATE TABLE `role_permission` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `role_id` bigint NOT NULL, + `permission_id` int NOT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_role_permission` (`role_id`, `permission_id`) +); + +CREATE TABLE `user_role` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `role_id` bigint NOT NULL, + `user_organization_id` bigint NOT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_user_role` (`role_id`, `user_organization_id`) +); + +-- 성능 최적화를 위한 인덱스 +CREATE INDEX `idx_user_email` ON `user` (`email`); +CREATE INDEX `idx_user_status` ON `user` (`status`); +CREATE INDEX `idx_user_organization_user` ON `user_organization` (`user_id`); +CREATE INDEX `idx_user_organization_org` ON `user_organization` (`organization_id`); +CREATE INDEX `idx_user_organization_status` ON `user_organization` (`status`); +CREATE INDEX `idx_role_org` ON `role` (`organization_id`); +CREATE INDEX `idx_permission_resource` ON `permission` (`resource`); +CREATE INDEX `idx_permission_active` ON `permission` (`is_active`); + +CREATE TABLE `workflow` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `name` varchar(100) NOT NULL UNIQUE, + `description` text NULL, + `is_enabled` boolean DEFAULT TRUE, + `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, + `created_by` bigint NULL, + `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `updated_by` bigint NULL, + PRIMARY KEY (`id`) +); + +CREATE TABLE `schedule` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `workflow_id` bigint NOT NULL, + `cron_expression` varchar(50) NULL, + `parameters` json NULL, + `is_active` boolean DEFAULT TRUE, + `last_run_status` varchar(20) NULL, + `last_run_at` timestamp NULL, + `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, + `created_by` bigint NULL, + `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `updated_by` bigint NULL, + PRIMARY KEY (`id`) +); + +CREATE TABLE `job` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `name` varchar(100) NOT NULL UNIQUE, + `description` text NULL, + `is_enabled` boolean DEFAULT TRUE, + `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, + `created_by` bigint NULL, + `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `updated_by` bigint NULL, + PRIMARY KEY (`id`) +); + +CREATE TABLE `task` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `name` varchar(100) NOT NULL UNIQUE, + `type` varchar(50) NULL, + `parameters` json NULL, + `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, + `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) +); + +CREATE TABLE `workflow_job` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `workflow_id` bigint NOT NULL, + `job_id` bigint NOT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_workflow_job` (`workflow_id`, `job_id`) +); + +CREATE TABLE `job_task` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `job_id` bigint NOT NULL, + `task_id` bigint NOT NULL, + `execution_order` int NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_job_task` (`job_id`, `task_id`) +); + +CREATE TABLE `execution_log` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `execution_type` varchar(20) NULL COMMENT 'task, schedule, job, workflow', + `source_id` bigint NULL COMMENT '모든 데이터에 대한 ID ex: job_id, schedule_id, task_id, ...', + `log_level` varchar(20) NULL, + `executed_at` timestamp DEFAULT CURRENT_TIMESTAMP, + `log_message` text NULL, + `trace_id` char(36) NULL, + `config_snapshot` json NULL, + PRIMARY KEY (`id`), + INDEX `idx_source_id_type` (`source_id`, `execution_type`) +); + +CREATE TABLE `task_io_data` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `task_run_id` bigint NOT NULL, + `io_type` varchar(10) NOT NULL COMMENT 'INPUT, OUTPUT', + `name` varchar(100) NOT NULL COMMENT '파라미터/변수 이름', + `data_type` varchar(50) NOT NULL COMMENT 'string, number, json, file, etc', + `data_value` json NULL COMMENT '실제 데이터 값', + `data_size` bigint NULL COMMENT '데이터 크기 (bytes)', + `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + INDEX `idx_task_io_task_run_id` (`task_run_id`), + INDEX `idx_task_io_type` (`io_type`), + INDEX `idx_task_io_name` (`name`) +); + +CREATE TABLE `config` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `target_type` varchar(50) NULL COMMENT 'user, job, workflow', + `target_id` bigint NULL, + `version` int NULL, + `json` json NULL, + `is_active` boolean DEFAULT TRUE, + `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, + `created_by` bigint NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_config_target` (`target_type`, `target_id`) +); + +CREATE TABLE `category` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `name` varchar(100) NULL, + `description` text NULL, + `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, + `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) +); + +CREATE TABLE `user_config` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `user_id` bigint NOT NULL, + `type` varchar(50) NULL, + `name` varchar(100) NULL, + `json` json NULL, + `is_active` boolean DEFAULT TRUE, + `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, + `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) +); + +-- 인덱스 추가 (성능 최적화) +CREATE INDEX `idx_schedule_workflow` ON `schedule` (`workflow_id`); +CREATE INDEX `idx_job_enabled` ON `job` (`is_enabled`); +CREATE INDEX `idx_task_type` ON `task` (`type`); +CREATE INDEX `idx_workflow_enabled` ON `workflow` (`is_enabled`); +CREATE UNIQUE INDEX `uk_schedule_workflow` ON `schedule` (`workflow_id`); +CREATE UNIQUE INDEX `uk_job_name` ON `job` (`name`); +CREATE UNIQUE INDEX `uk_task_name` ON `task` (`name`); +CREATE UNIQUE INDEX `uk_workflow_name` ON `workflow` (`name`); +CREATE INDEX `idx_user_config_user` ON `user_config` (`user_id`); + +-- 워크플로우 실행 테이블 +CREATE TABLE `workflow_run` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `workflow_id` bigint NOT NULL, + `trace_id` char(36) NOT NULL, + `run_number` varchar(20) NULL, + `status` varchar(20) NULL COMMENT 'pending, running, success, failed, cancelled', + `trigger_type` varchar(20) NULL COMMENT 'manual, schedule, push, pull_request', + `started_at` timestamp NULL, + `finished_at` timestamp NULL, + `created_by` bigint NULL, + `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_workflow_run_trace` (`trace_id`), + INDEX `idx_workflow_run_status` (`status`), + INDEX `idx_workflow_run_workflow_id` (`workflow_id`), + INDEX `idx_workflow_run_created_at` (`created_at`) +); + +-- Job 실행 테이블 +CREATE TABLE `job_run` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `workflow_run_id` bigint NOT NULL, + `job_id` bigint NOT NULL, + `status` varchar(20) NULL COMMENT 'pending, running, success, failed, cancelled, skipped', + `started_at` timestamp NULL, + `finished_at` timestamp NULL, + `execution_order` int NULL, + `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + INDEX `idx_job_run_workflow_run_id` (`workflow_run_id`), + INDEX `idx_job_run_status` (`status`), + INDEX `idx_job_run_job_id` (`job_id`) +); + +-- Task 실행 테이블 +CREATE TABLE `task_run` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `job_run_id` bigint NOT NULL, + `task_id` bigint NOT NULL, + `status` varchar(20) NULL COMMENT 'pending, running, success, failed, cancelled, skipped', + `started_at` timestamp NULL, + `finished_at` timestamp NULL, + `execution_order` int NULL, + `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + INDEX `idx_task_run_job_run_id` (`job_run_id`), + INDEX `idx_task_run_status` (`status`), + INDEX `idx_task_run_task_id` (`task_id`) +); + +-- v0.0.3 - H2 호환 버전 +DROP TABLE IF EXISTS `config`; + +-- H2에서는 한 번에 하나씩 컬럼 추가 +ALTER TABLE `workflow_job` ADD COLUMN `execution_order` INT NULL; + +ALTER TABLE `schedule` ADD COLUMN `schedule_text` varchar(20) NULL; + +ALTER TABLE `workflow` ADD COLUMN `default_config` json NULL; + +ALTER TABLE `user` ADD COLUMN `joined_at` timestamp NULL; + +ALTER TABLE `department` ADD COLUMN `description` varchar(100) NULL; + +-- v0.4 - H2 호환 버전 (AFTER 절 제거, unsigned 제거, 개별 ALTER 구문으로 분리) +-- execution_log 테이블 컬럼 추가 (H2 호환) +ALTER TABLE `execution_log` ADD COLUMN `run_id` bigint NULL; +ALTER TABLE `execution_log` ADD COLUMN `status` varchar(20) NULL; +ALTER TABLE `execution_log` ADD COLUMN `duration_ms` int NULL; +ALTER TABLE `execution_log` ADD COLUMN `error_code` varchar(50) NULL; +ALTER TABLE `execution_log` ADD COLUMN `reserved1` varchar(100) NULL; +ALTER TABLE `execution_log` ADD COLUMN `reserved2` varchar(100) NULL; +ALTER TABLE `execution_log` ADD COLUMN `reserved3` int NULL; +ALTER TABLE `execution_log` ADD COLUMN `reserved4` json NULL; +ALTER TABLE `execution_log` ADD COLUMN `reserved5` timestamp NULL; + +-- 기존 컬럼 수정 (H2 호환) +ALTER TABLE `execution_log` ALTER COLUMN `log_message` varchar(500) NOT NULL; +ALTER TABLE `execution_log` ALTER COLUMN `executed_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP; + +-- 기존 불필요한 컬럼 제거 +ALTER TABLE `execution_log` DROP COLUMN IF EXISTS `config_snapshot`; + +-- 새로운 인덱스 추가 +CREATE INDEX `idx_run_id` ON `execution_log` (`run_id`); +CREATE INDEX `idx_log_level_status` ON `execution_log` (`log_level`, `status`); +CREATE INDEX `idx_error_code` ON `execution_log` (`error_code`); +CREATE INDEX `idx_duration` ON `execution_log` (`duration_ms`); + +-- 기존 인덱스 수정 +DROP INDEX IF EXISTS `idx_source_id_type`; +CREATE INDEX `idx_execution_type_source` ON `execution_log` (`execution_type`, `source_id`); diff --git a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql index 9b6db4c0..8e902745 100644 --- a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql +++ b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql @@ -1,90 +1,89 @@ --- =================================================================== --- 워크플로우 관련 데이터 초기화 --- =================================================================== --- 참조 관계 역순으로 데이터 삭제 -DELETE FROM `schedule`; +-- 기존 워크플로우 관련 데이터 삭제 DELETE FROM `job_task`; DELETE FROM `workflow_job`; DELETE FROM `task`; DELETE FROM `job`; DELETE FROM `workflow`; - --- =================================================================== --- 워크플로우 정적 데이터 삽입 --- =================================================================== +DELETE FROM `schedule`; -- 워크플로우 생성 (ID: 1) -INSERT INTO `workflow` (`id`, `name`, `description`, `created_by`) VALUES - (1, '상품 분석 및 블로그 자동 발행', '키워드 검색부터 상품 분석 후 블로그 발행까지의 자동화 프로세스', 1) - ON DUPLICATE KEY UPDATE name = VALUES(name), description = VALUES(description), updated_at = NOW(); +INSERT INTO `workflow` (`id`, `name`, `description`) VALUES + (1, '상품 분석 및 블로그 자동 발행', '키워드 검색부터 상품 분석 후 블로그 발행까지의 자동화 프로세스'); -- Job 생성 (ID: 1, 2) -INSERT INTO `job` (`id`, `name`, `description`, `created_by`) VALUES - (1, '상품 분석', '키워드 검색, 상품 크롤링 및 유사도 분석 작업', 1), - (2, '블로그 콘텐츠 생성', '분석 데이터를 기반으로 RAG 콘텐츠 생성 및 발행 작업', 1) - ON DUPLICATE KEY UPDATE name = VALUES(name), description = VALUES(description), updated_at = NOW(); +INSERT INTO `job` (`id`, `name`, `description`) VALUES + (1, '상품 분석', '키워드 검색, 상품 크롤링 및 유사도 분석 작업'), + (2, '블로그 콘텐츠 생성', '분석 데이터를 기반으로 RAG 콘텐츠 생성 및 발행 작업'); --- Task 생성 (ID: 1 ~ 7) - FastAPI Request Body 스키마 반영 +-- Task 생성 (ID: 1 ~ 7) INSERT INTO `task` (`id`, `name`, `type`, `parameters`) VALUES - (1, '키워드 검색 태스크', 'FastAPI', JSON_OBJECT( - 'endpoint', '/keywords/search', 'method', 'POST', - 'body', JSON_OBJECT('tag', 'String') -- { "tag": str } +-- Job 1의 Task들 + (1, '키워드 검색 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://127.0.0.1:8000/keywords/search', + 'method', 'POST', + 'body', JSON_OBJECT('tag', 'naver') + )), + (2, '상품 검색 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://127.0.0.1:8000/products/search', + 'method', 'POST', + 'input_mapping', JSON_OBJECT( + 'keyword', '키워드 검색 태스크.data.keyword' + ) + )), + (3, '상품 매칭 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://127.0.0.1:8000/products/match', + 'method', 'POST', + 'input_mapping', JSON_OBJECT( + 'keyword', '키워드 검색 태스크.data.keyword', + 'search_results', '상품 검색 태스크.data.search_results' + ) + )), + (4, '상품 유사도 분석 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://127.0.0.1:8000/products/similarity', + 'method', 'POST', + 'input_mapping', JSON_OBJECT( + 'keyword', '키워드 검색 태스크.data.keyword', + 'matched_products', '상품 매칭 태스크.data.matched_products' + ) )), - (2, '상품 검색 태스크', 'FastAPI', JSON_OBJECT( - 'endpoint', '/products/search', 'method', 'POST', - 'body', JSON_OBJECT('keyword', 'String') -- { "keyword": str } - )), - (3, '상품 매칭 태스크', 'FastAPI', JSON_OBJECT( - 'endpoint', '/products/match', 'method', 'POST', - 'body', JSON_OBJECT( -- { keyword: str, search_results: List } - 'keyword', 'String', - 'search_results', 'List' - ) - )), - (4, '상품 유사도 분석 태스크', 'FastAPI', JSON_OBJECT( - 'endpoint', '/products/similarity', 'method', 'POST', - 'body', JSON_OBJECT( -- { keyword: str, matched_products: List, search_results: List } - 'keyword', 'String', - 'matched_products', 'List', - 'search_results', 'List' - ) - )), - (5, '상품 정보 크롤링 태스크', 'FastAPI', JSON_OBJECT( - 'endpoint', '/products/crawl', 'method', 'POST', - 'body', JSON_OBJECT('product_url', 'String') -- { "product_url": str } - )), - -- RAG관련 request body는 추후에 결정될 예정 - (6, '블로그 RAG 생성 태스크', 'FastAPI', JSON_OBJECT('endpoint', '/blogs/rag/create', 'method', 'POST')), - (7, '블로그 발행 태스크', 'FastAPI', JSON_OBJECT( - 'endpoint', '/blogs/publish', 'method', 'POST', - 'body', JSON_OBJECT( -- { tag: str, blog_id: str, ... } - 'tag', 'String', - 'blog_id', 'String', - 'blog_pw', 'String', - 'blog_name', 'String', - 'post_title', 'String', - 'post_content', 'String', - 'post_tags', 'List' - ) - )) - ON DUPLICATE KEY UPDATE name = VALUES(name), type = VALUES(type), parameters = VALUES(parameters), updated_at = NOW(); + (5, '상품 정보 크롤링 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://127.0.0.1:8000/products/crawl', + 'method', 'POST', + 'input_mapping', JSON_OBJECT( + 'product_url', '상품 유사도 분석 태스크.data.selected_product.url' + ) + )), + + -- Job 2의 Task들 + (6, '블로그 RAG 생성 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://127.0.0.1:8000/blogs/rag/create', + 'method', 'POST', + 'input_mapping', JSON_OBJECT( + 'keyword', '키워드 검색 태스크.data.keyword', + 'product_info', '상품 정보 크롤링 태스크.data.product_detail' + ))), + +-- Task 7 설정 확인 필요 + (7, '블로그 발행 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://127.0.0.1:8000/blogs/publish', + 'method', 'POST', + 'body', JSON_OBJECT('tag', 'tistory', 'blog_id', 'test', 'blog_pw', 'test'), + 'input_mapping', JSON_OBJECT( + 'post_title', '블로그 RAG 생성 태스크.data.title', + 'post_content', '블로그 RAG 생성 태스크.data.content', + 'post_tags', '블로그 RAG 생성 태스크.data.tags' + ))); --- =================================================================== --- 워크플로우 구조 및 스케줄 데이터 삽입 --- =================================================================== -- 워크플로우-Job 연결 INSERT INTO `workflow_job` (`workflow_id`, `job_id`, `execution_order`) VALUES (1, 1, 1), - (1, 2, 2) - ON DUPLICATE KEY UPDATE execution_order = VALUES(execution_order); + (1, 2, 2); -- Job-Task 연결 INSERT INTO `job_task` (`job_id`, `task_id`, `execution_order`) VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5), - (2, 6, 1), (2, 7, 2) - ON DUPLICATE KEY UPDATE execution_order = VALUES(execution_order); + (2, 6, 1), (2, 7, 2); --- 스케줄 설정 (매일 오전 8시) -INSERT INTO `schedule` (`workflow_id`, `cron_expression`, `is_active`, `created_by`) VALUES - (1, '0 0 8 * * ?', TRUE, 1) - ON DUPLICATE KEY UPDATE cron_expression = VALUES(cron_expression), is_active = VALUES(is_active), updated_at = NOW(); \ No newline at end of file +-- 스케줄 설정 (매분 0초마다 실행) +INSERT INTO `schedule` (`workflow_id`, `cron_expression`, `is_active`) VALUES + (1, '0 * * * * ?', TRUE); \ No newline at end of file