Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.springframework.web.client.RestTemplate;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -14,35 +15,52 @@
import site.icebang.domain.workflow.model.Task;

@Slf4j
@Component("httpTaskRunner")
@Component("httpTaskRunner") // "httpTaskRunner"라는 이름의 Bean으로 등록
@RequiredArgsConstructor
public class HttpTaskRunner implements TaskRunner {

private final RestTemplate restTemplate;

// private final TaskIoDataRepository taskIoDataRepository; // TODO: 입출력 저장을 위해 주입

@Override
public TaskExecutionResult execute(Task task, TaskRun taskRun) {
public TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode requestBody) {
JsonNode params = task.getParameters();
String url = params.get("url").asText();
String method = params.get("method").asText();
JsonNode body = params.get("body");
if (params == null) {
return TaskExecutionResult.failure("Task에 파라미터가 정의되지 않았습니다.");
}

String url = params.path("url").asText();
String method = params.path("method").asText("POST"); // 기본값 POST

if (url.isEmpty()) {
return TaskExecutionResult.failure("Task 파라미터에 'url'이 없습니다.");
}

try {
HttpEntity<String> requestEntity =
new HttpEntity<>(
body.toString(),
new HttpHeaders() {
{
setContentType(MediaType.APPLICATION_JSON);
}
});

ResponseEntity<String> response =
// 1. HTTP 헤더 설정
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);

// 2. HTTP 요청 엔티티 생성 (헤더 + 동적 Body)
HttpEntity<String> requestEntity = new HttpEntity<>(requestBody.toString(), headers);

log.debug("HTTP Task 요청: URL={}, Method={}, Body={}", url, method, requestBody.toString());

// 3. RestTemplate으로 API 호출
ResponseEntity<String> responseEntity =
restTemplate.exchange(
url, HttpMethod.valueOf(method.toUpperCase()), requestEntity, String.class);

return TaskExecutionResult.success(response.getBody());
String responseBody = responseEntity.getBody();
log.debug("HTTP Task 응답: Status={}, Body={}", responseEntity.getStatusCode(), responseBody);

// TODO: taskIoDataRepository를 사용하여 requestBody와 responseBody를 DB에 저장

return TaskExecutionResult.success(responseBody);

} catch (RestClientException e) {
log.error("HTTP Task 실행 실패: TaskRunId={}, Error={}", taskRun.getId(), e.getMessage());
log.error("HTTP Task 실행 중 에러 발생: TaskRunId={}, Error={}", taskRun.getId(), e.getMessage());
return TaskExecutionResult.failure(e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package site.icebang.domain.workflow.runner;

import com.fasterxml.jackson.databind.node.ObjectNode;

import site.icebang.domain.execution.model.TaskRun;
import site.icebang.domain.workflow.model.Task;

/** 워크플로우의 개별 Task를 실행하는 모든 Runner가 구현해야 할 인터페이스 */
public interface TaskRunner {

/** Task 실행 결과를 담는 Record. status: SUCCESS 또는 FAILED message: 실행 결과 또는 에러 메시지 */
record TaskExecutionResult(String status, String message) {
public static TaskExecutionResult success(String message) {
return new TaskExecutionResult("SUCCESS", message);
Expand All @@ -14,9 +19,17 @@ public static TaskExecutionResult failure(String message) {
}

public boolean isFailure() {
return "FAILED".equals(status);
return "FAILED".equals(this.status);
}
}

TaskExecutionResult execute(Task task, TaskRun taskRun);
/**
* 특정 Task를 실행합니다.
*
* @param task 실행할 Task의 정적 정의
* @param taskRun 현재 실행에 대한 기록 객체
* @param requestBody 동적으로 생성된 요청 데이터
* @return Task 실행 결과
*/
TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode requestBody);
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package site.icebang.domain.workflow.service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -30,18 +36,17 @@ public class WorkflowExecutionService {
private final JobRunMapper jobRunMapper;
private final TaskRunMapper taskRunMapper;
private final Map<String, TaskRunner> taskRunners;
private final ObjectMapper objectMapper; // 📌 JSON 처리를 위해 ObjectMapper 주입

/**
* 워크플로우 실행의 시작점. 전체 과정은 하나의 트랜잭션으로 묶입니다.
*
* @param workflowId 실행할 워크플로우의 ID
*/
@Transactional
public void executeWorkflow(Long workflowId) {
log.info("========== 워크플로우 실행 시작: WorkflowId={} ==========", workflowId);
WorkflowRun workflowRun = WorkflowRun.start(workflowId);
workflowRunMapper.insert(workflowRun);

// 📌 1. 워크플로우 전체 실행 동안 데이터를 공유할 컨텍스트 생성
Map<String, JsonNode> workflowContext = new HashMap<>();

List<Job> jobs = jobMapper.findJobsByWorkflowId(workflowId);
log.info("총 {}개의 Job을 순차적으로 실행합니다.", jobs.size());

Expand All @@ -51,7 +56,8 @@ public void executeWorkflow(Long workflowId) {
log.info(
"---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId());

boolean jobSucceeded = executeTasksForJob(jobRun);
// 📌 2. Job 내의 Task들을 실행하고, 컨텍스트를 전달하여 데이터 파이프라이닝 수행
boolean jobSucceeded = executeTasksForJob(jobRun, workflowContext);

jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED");
jobRunMapper.update(jobRun);
Expand All @@ -60,7 +66,7 @@ public void executeWorkflow(Long workflowId) {
workflowRun.finish("FAILED");
workflowRunMapper.update(workflowRun);
log.error("Job 실패로 인해 워크플로우 실행을 중단합니다: WorkflowRunId={}", workflowRun.getId());
return; // Job이 실패하면 전체 워크플로우를 중단
return;
}
log.info("---------- Job 실행 성공: JobRunId={} ----------", jobRun.getId());
}
Expand All @@ -70,13 +76,7 @@ public void executeWorkflow(Long workflowId) {
log.info("========== 워크플로우 실행 성공: WorkflowRunId={} ==========", workflowRun.getId());
}

/**
* 특정 Job에 속한 Task들을 순차적으로 실행합니다.
*
* @param jobRun 실행중인 Job의 기록 객체
* @return 모든 Task가 성공하면 true, 하나라도 실패하면 false
*/
private boolean executeTasksForJob(JobRun jobRun) {
private boolean executeTasksForJob(JobRun jobRun, Map<String, JsonNode> workflowContext) {
List<Task> tasks = jobMapper.findTasksByJobId(jobRun.getJobId());
log.info("Job (JobRunId={}) 내 총 {}개의 Task를 실행합니다.", jobRun.getId(), tasks.size());

Expand All @@ -92,20 +92,76 @@ private boolean executeTasksForJob(JobRun jobRun) {
taskRun.finish("FAILED", "지원하지 않는 Task 타입: " + task.getType());
taskRunMapper.update(taskRun);
log.error("Task 실행 실패 (미지원 타입): TaskRunId={}, Type={}", taskRun.getId(), task.getType());
return false; // 실행할 Runner가 없으므로 실패
return false;
}

TaskRunner.TaskExecutionResult result = runner.execute(task, taskRun);
// 📌 3. Task 실행 전, 컨텍스트를 이용해 동적으로 Request Body를 생성
ObjectNode requestBody = prepareRequestBody(task, workflowContext);

// 📌 4. 동적으로 생성된 Request Body를 전달하여 Task 실행
TaskRunner.TaskExecutionResult result = runner.execute(task, taskRun, requestBody);
taskRun.finish(result.status(), result.message());
taskRunMapper.update(taskRun);

if (result.isFailure()) {
log.error("Task 실행 실패: TaskRunId={}, Message={}", taskRun.getId(), result.message());
return false; // Task가 실패하면 즉시 중단하고 실패 반환
return false;
}

// 📌 5. 성공한 Task의 결과를 다음 Task가 사용할 수 있도록 컨텍스트에 저장
try {
JsonNode resultJson = objectMapper.readTree(result.message());
workflowContext.put(task.getName(), resultJson);
// TODO: task_io_data 테이블에 requestBody(INPUT)와 resultJson(OUTPUT) 저장
} catch (JsonProcessingException e) {
log.error("Task 결과 JSON 파싱 실패: TaskRunId={}", taskRun.getId(), e);
taskRun.finish("FAILED", "결과 JSON 파싱 실패");
taskRunMapper.update(taskRun);
return false;
}
log.info("Task 실행 성공: TaskRunId={}", taskRun.getId());
}
return true;
}

/** 워크플로우 컨텍스트와 Task의 input_mapping 설정을 기반으로 API 요청에 사용할 동적인 Request Body를 생성합니다. */
private ObjectNode prepareRequestBody(Task task, Map<String, JsonNode> context) {
ObjectNode requestBody = objectMapper.createObjectNode();
JsonNode params = task.getParameters();
if (params == null) return requestBody;

JsonNode mappingRules = params.get("input_mapping");
JsonNode staticBody = params.get("body");

return true; // 모든 Task가 성공적으로 완료됨
// 정적 body가 있으면 우선적으로 복사
if (staticBody != null && staticBody.isObject()) {
requestBody.setAll((ObjectNode) staticBody);
}

// input_mapping 규칙에 따라 동적으로 값 덮어쓰기/추가
if (mappingRules != null && mappingRules.isObject()) {
mappingRules
.fields()
.forEachRemaining(
entry -> {
String targetField = entry.getKey(); // 예: "keyword"
String sourcePath = entry.getValue().asText(); // 예: "키워드 검색 태스크.keyword"

String[] parts = sourcePath.split("\\.", 2);
if (parts.length == 2) {
String sourceTaskName = parts[0];
String sourceFieldPath = parts[1];

JsonNode sourceData = context.get(sourceTaskName);
if (sourceData != null) {
JsonNode valueToSet = sourceData.at("/" + sourceFieldPath.replace('.', '/'));
if (!valueToSet.isMissingNode()) {
requestBody.set(targetField, valueToSet);
}
}
}
});
}
return requestBody;
}
}
2 changes: 1 addition & 1 deletion apps/user-service/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ mybatis:
# 외부 API 연동을 위한 설정 섹션
api:
fastapi:
url: http://pre-processing-service:8000 # FastAPI 서버의 기본 URL
url: http://127.0.0.1:8000 # FastAPI 서버의 기본 URL
timeout: 10000 # API 요청 타임아웃 (밀리초 단위)
Loading
Loading