Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package site.icebang.domain.workflow.runner;

import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import site.icebang.domain.execution.model.TaskRun;
import site.icebang.domain.workflow.model.Task;

@Slf4j
@Component("httpTaskRunner") // "httpTaskRunner"라는 이름의 Bean으로 등록
@RequiredArgsConstructor
public class HttpTaskRunner implements TaskRunner {

private final RestTemplate restTemplate;

// private final TaskIoDataRepository taskIoDataRepository; // TODO: 입출력 저장을 위해 주입

@Override
public TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode requestBody) {
JsonNode params = task.getParameters();
if (params == null) {
return TaskExecutionResult.failure("Task에 파라미터가 정의되지 않았습니다.");
}

String url = params.path("url").asText();
String method = params.path("method").asText("POST"); // 기본값 POST

if (url.isEmpty()) {
return TaskExecutionResult.failure("Task 파라미터에 'url'이 없습니다.");
}

try {
// 1. HTTP 헤더 설정
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);

// 2. HTTP 요청 엔티티 생성 (헤더 + 동적 Body)
HttpEntity<String> requestEntity = new HttpEntity<>(requestBody.toString(), headers);

log.debug("HTTP Task 요청: URL={}, Method={}, Body={}", url, method, requestBody.toString());

// 3. RestTemplate으로 API 호출
ResponseEntity<String> responseEntity =
restTemplate.exchange(
url, HttpMethod.valueOf(method.toUpperCase()), requestEntity, String.class);

String responseBody = responseEntity.getBody();
log.debug("HTTP Task 응답: Status={}, Body={}", responseEntity.getStatusCode(), responseBody);

// TODO: taskIoDataRepository를 사용하여 requestBody와 responseBody를 DB에 저장

return TaskExecutionResult.success(responseBody);

} catch (RestClientException e) {
log.error("HTTP Task 실행 중 에러 발생: TaskRunId={}, Error={}", taskRun.getId(), e.getMessage());
return TaskExecutionResult.failure(e.getMessage());
}
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -27,7 +26,6 @@
import site.icebang.domain.workflow.model.Job;
import site.icebang.domain.workflow.model.Task;
import site.icebang.domain.workflow.runner.TaskRunner;
import site.icebang.domain.workflow.runner.body.TaskBodyBuilder;

@Slf4j
@Service
Expand All @@ -39,16 +37,17 @@ public class WorkflowExecutionService {
private final JobRunMapper jobRunMapper;
private final TaskRunMapper taskRunMapper;
private final Map<String, TaskRunner> taskRunners;
private final ObjectMapper objectMapper;
private final List<TaskBodyBuilder> bodyBuilders;
private final ObjectMapper objectMapper; // 📌 JSON 처리를 위해 ObjectMapper 주입

@Transactional
public void executeWorkflow(Long workflowId) {
log.info("========== 워크플로우 실행 시작: WorkflowId={} ==========", workflowId);
WorkflowRun workflowRun = WorkflowRun.start(workflowId);
workflowRunMapper.insert(workflowRun);

// 📌 1. 워크플로우 전체 실행 동안 데이터를 공유할 컨텍스트 생성
Map<String, JsonNode> workflowContext = new HashMap<>();

List<Job> jobs = jobMapper.findJobsByWorkflowId(workflowId);
log.info("총 {}개의 Job을 순차적으로 실행합니다.", jobs.size());

Expand All @@ -58,6 +57,7 @@ public void executeWorkflow(Long workflowId) {
log.info(
"---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId());

// 📌 2. Job 내의 Task들을 실행하고, 컨텍스트를 전달하여 데이터 파이프라이닝 수행
boolean jobSucceeded = executeTasksForJob(jobRun, workflowContext);

jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED");
Expand All @@ -77,13 +77,16 @@ public void executeWorkflow(Long workflowId) {
log.info("========== 워크플로우 실행 성공: WorkflowRunId={} ==========", workflowRun.getId());
}

/**
* 특정 Job에 속한 Task들을 순차적으로 실행합니다.
*
* @param jobRun 실행중인 Job의 기록 객체
* @return 모든 Task가 성공하면 true, 하나라도 실패하면 false
*/
private boolean executeTasksForJob(JobRun jobRun, Map<String, JsonNode> workflowContext) {
// 📌 Mapper로부터 TaskDto 리스트를 조회합니다.
// TaskDto를 조회하고 Task로 변환
List<TaskDto> taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId());

// 📌 convertToTask 메소드를 사용하여 Task 모델 리스트로 변환합니다.
List<Task> tasks = taskDtos.stream().map(this::convertToTask).collect(Collectors.toList());

List<Task> tasks = taskDtos.stream().map(this::convertToTask).toList();
log.info("Job (JobRunId={}) 내 총 {}개의 Task를 실행합니다.", jobRun.getId(), tasks.size());

for (Task task : tasks) {
Expand All @@ -101,13 +104,10 @@ private boolean executeTasksForJob(JobRun jobRun, Map<String, JsonNode> workflow
return false;
}

ObjectNode requestBody =
bodyBuilders.stream()
.filter(builder -> builder.supports(task.getName()))
.findFirst()
.map(builder -> builder.build(task, workflowContext))
.orElse(objectMapper.createObjectNode());
// 📌 3. Task 실행 전, 컨텍스트를 이용해 동적으로 Request Body를 생성
ObjectNode requestBody = prepareRequestBody(task, workflowContext);

// 📌 4. 동적으로 생성된 Request Body를 전달하여 Task 실행
TaskRunner.TaskExecutionResult result = runner.execute(task, taskRun, requestBody);
taskRun.finish(result.status(), result.message());
taskRunMapper.update(taskRun);
Expand All @@ -117,6 +117,7 @@ private boolean executeTasksForJob(JobRun jobRun, Map<String, JsonNode> workflow
return false;
}

// 📌 5. 성공한 Task의 결과를 다음 Task가 사용할 수 있도록 컨텍스트에 저장
try {
JsonNode resultJson = objectMapper.readTree(result.message());
workflowContext.put(task.getName(), resultJson);
Expand All @@ -132,7 +133,82 @@ private boolean executeTasksForJob(JobRun jobRun, Map<String, JsonNode> workflow
return true;
}

/** TaskDto를 Task 모델로 변환합니다. 📌 주의: Reflection을 사용한 방식은 성능이 느리고 불안정하므로 권장되지 않습니다. */
/** 워크플로우 컨텍스트와 Task의 input_mapping 설정을 기반으로 API 요청에 사용할 동적인 Request Body를 생성합니다. */
private ObjectNode prepareRequestBody(Task task, Map<String, JsonNode> context) {
ObjectNode requestBody = objectMapper.createObjectNode();
JsonNode params = task.getParameters();
if (params == null) return requestBody;

JsonNode mappingRules = params.get("input_mapping");
JsonNode staticBody = params.get("body");

// 정적 body가 있으면 우선적으로 복사
if (staticBody != null && staticBody.isObject()) {
requestBody.setAll((ObjectNode) staticBody);
}

// 📌 디버깅용: 현재 컨텍스트 출력
log.debug("=== 워크플로우 컨텍스트 확인 ===");
for (Map.Entry<String, JsonNode> entry : context.entrySet()) {
log.debug("Task: {}, Data: {}", entry.getKey(), entry.getValue().toString());
}

// input_mapping 규칙에 따라 동적으로 값 덮어쓰기/추가
if (mappingRules != null && mappingRules.isObject()) {
mappingRules
.fields()
.forEachRemaining(
entry -> {
String targetField = entry.getKey(); // 예: "product_url"
String sourcePath =
entry
.getValue()
.asText(); // 예: "상품 유사도 분석 태스크.data.selected_product.product_url"

log.debug("=== input_mapping 처리 ===");
log.debug("targetField: {}, sourcePath: {}", targetField, sourcePath);

String[] parts = sourcePath.split("\\.", 2);
if (parts.length == 2) {
String sourceTaskName = parts[0];
String sourceFieldPath = parts[1];

log.debug(
"sourceTaskName: {}, sourceFieldPath: {}", sourceTaskName, sourceFieldPath);

JsonNode sourceData = context.get(sourceTaskName);
log.debug("sourceData found: {}", sourceData != null);

if (sourceData != null) {
log.debug("sourceData content: {}", sourceData.toString());

String jsonPath = "/" + sourceFieldPath.replace('.', '/');
log.debug("jsonPath: {}", jsonPath);

JsonNode valueToSet = sourceData.at(jsonPath);
log.debug(
"valueToSet found: {}, isMissing: {}",
valueToSet,
valueToSet.isMissingNode());

if (!valueToSet.isMissingNode()) {
log.debug("설정할 값: {}", valueToSet.toString());
requestBody.set(targetField, valueToSet);
} else {
log.warn("값을 찾을 수 없음: jsonPath={}", jsonPath);
}
} else {
log.warn("소스 태스크 데이터를 찾을 수 없음: {}", sourceTaskName);
}
}
});
}

log.debug("최종 requestBody: {}", requestBody.toString());
return requestBody;
}

/** TaskDto를 Task 모델로 변환합니다. 비즈니스 로직 실행에 필요한 필드만 복사합니다. */
private Task convertToTask(TaskDto taskDto) {
Task task = new Task();
try {
Expand All @@ -155,6 +231,7 @@ private Task convertToTask(TaskDto taskDto) {
} catch (Exception e) {
throw new RuntimeException("TaskDto to Task 변환 중 오류 발생", e);
}

return task;
}
}

This file was deleted.

Loading
Loading