Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions apps/pre-processing-service/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@
FROM python:3.11-slim AS builder
WORKDIR /app

# 필수 OS 패키지
RUN apt-get update && apt-get install -y --no-install-recommends curl \
# 필수 OS 패키지 (기존 + Chrome 설치용 패키지 추가)
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
wget \
unzip \
gnupg \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*

# Poetry 설치
RUN curl -sSL https://install.python-poetry.org | python3 -
ENV PATH="/root/.local/bin:$PATH"
RUN poetry self add "poetry-plugin-export>=1.7.0"

# 런타임 가상환경
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
Expand All @@ -23,6 +29,38 @@ RUN poetry export --without dev -f requirements.txt -o requirements.txt \
FROM python:3.11-slim AS final
WORKDIR /app

# Chrome과 ChromeDriver 설치를 위한 패키지 설치
RUN apt-get update && apt-get install -y --no-install-recommends \
wget \
unzip \
curl \
gnupg \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*

# Chrome 설치
RUN wget -q -O - https://dl.google.com/linux/linux_signing_key.pub | apt-key add - && \
echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" > /etc/apt/sources.list.d/google-chrome.list && \
apt-get update && \
apt-get install -y --no-install-recommends google-chrome-stable && \
rm -rf /var/lib/apt/lists/*

# ChromeDriver 설치
RUN LATEST_VERSION=$(curl -s "https://googlechromelabs.github.io/chrome-for-testing/LATEST_RELEASE_STABLE") && \
wget -O /tmp/chromedriver-linux64.zip "https://storage.googleapis.com/chrome-for-testing-public/${LATEST_VERSION}/linux64/chromedriver-linux64.zip" && \
unzip /tmp/chromedriver-linux64.zip -d /tmp/ && \
mv /tmp/chromedriver-linux64/chromedriver /usr/local/bin/chromedriver && \
chmod +x /usr/local/bin/chromedriver && \
rm -rf /tmp/* && \
apt-get clean

# MeCab & 사전 설치 (형태소 분석 의존)
RUN apt-get update && apt-get install -y --no-install-recommends \
mecab \
libmecab-dev \
mecab-ipadic-utf8 \
&& rm -rf /var/lib/apt/lists/*

# /opt/venv 복사
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
Expand All @@ -31,5 +69,8 @@ ENV PATH="/opt/venv/bin:$PATH"
COPY . .


# 환경변수로 MeCab 경로 지정
ENV MECAB_PATH=/usr/lib/mecab/dic/ipadic

# (권장 대안) 코드에서 uvicorn import 안 하고 프로세스 매니저로 실행하려면:
ENTRYPOINT ["gunicorn", "-k", "uvicorn.workers.UvicornWorker", "app.main:app", "-b", "0.0.0.0:8000"]
ENTRYPOINT ["gunicorn", "-k", "uvicorn.workers.UvicornWorker", "app.main:app", "-b", "0.0.0.0:8000"]
16 changes: 7 additions & 9 deletions apps/pre-processing-service/app/utils/keyword_matcher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

from app.core.config import settings # pydantic_settings 기반
from loguru import logger

Expand All @@ -15,26 +17,22 @@


class KeywordMatcher:
"""키워드 매칭 분석기"""

def __init__(self):
self.konlpy_available = False

# MeCab 사용 가능 여부 확인
if MECAB_AVAILABLE:
try:
# 경로가 있으면 사용, 없으면 기본값
if settings.mecab_path:
self.mecab = MeCab.Tagger(f"-d {settings.mecab_path}")
# 환경변수 MECAB_PATH가 있으면 사용, 없으면 기본값
mecab_path = os.getenv("MECAB_PATH")
if mecab_path:
self.mecab = MeCab.Tagger(f"-d {mecab_path}")
else:
self.mecab = MeCab.Tagger() # 기본 경로

# 테스트 실행
test_result = self.mecab.parse("테스트")
if test_result and test_result.strip():
self.konlpy_available = True
logger.info(
f"MeCab 형태소 분석기 사용 가능 (경로: {settings.mecab_path or '기본'})"
f"MeCab 형태소 분석기 사용 가능 (경로: {mecab_path or '기본'})"
)
else:
logger.warning("MeCab 테스트 실패")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class TaskRun {
private Long id;
private Long jobRunId;
private Long taskId;
private Integer executionOrder;
private String status; // PENDING, RUNNING, SUCCESS, FAILED
private String resultMessage; // 실행 결과 메시지
private LocalDateTime startedAt;
Expand All @@ -27,8 +28,9 @@ private TaskRun(Long jobRunId, Long taskId) {
}

/** Task 실행 시작을 위한 정적 팩토리 메서드 */
public static TaskRun start(Long jobRunId, Long taskId) {
public static TaskRun start(Long jobRunId, Long taskId, Integer executionOrder) {
TaskRun taskRun = new TaskRun(jobRunId, taskId);
taskRun.executionOrder = executionOrder;
taskRun.status = "RUNNING";
taskRun.startedAt = LocalDateTime.now();
return taskRun;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package site.icebang.domain.workflow.controller;

import java.math.BigInteger;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

Expand All @@ -9,8 +11,8 @@
import site.icebang.common.dto.PageParams;
import site.icebang.common.dto.PageResult;
import site.icebang.domain.workflow.dto.WorkflowCardDto;
import site.icebang.domain.workflow.dto.WorkflowDetailCardDto;
import site.icebang.domain.workflow.service.WorkflowExecutionService;
import site.icebang.domain.workflow.service.WorkflowHistoryService;
import site.icebang.domain.workflow.service.WorkflowService;

@RestController
Expand All @@ -19,7 +21,6 @@
public class WorkflowController {
private final WorkflowService workflowService;
private final WorkflowExecutionService workflowExecutionService;
private final WorkflowHistoryService workflowHistoryService;

@GetMapping("")
public ApiResponse<PageResult<WorkflowCardDto>> getWorkflowList(
Expand All @@ -34,4 +35,10 @@ public ResponseEntity<Void> runWorkflow(@PathVariable Long workflowId) {
workflowExecutionService.executeWorkflow(workflowId);
return ResponseEntity.accepted().build();
}

@GetMapping("/{workflowId}/detail")
public ApiResponse<WorkflowDetailCardDto> getWorkflowDetail(@PathVariable BigInteger workflowId) {
WorkflowDetailCardDto result = workflowService.getWorkflowDetail(workflowId);
return ApiResponse.success(result);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package site.icebang.domain.workflow.dto;

import java.time.LocalDateTime;

import lombok.Data;

@Data
public class ScheduleDto {
private Long id;
private String cronExpression;
private Boolean isActive;
private String lastRunStatus;
private LocalDateTime lastRunAt;
private String scheduleText;
private LocalDateTime createdAt;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class TaskDto {
private Long id;
private String name;
private String type;
private Integer executionOrder;
private JsonNode parameters;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package site.icebang.domain.workflow.dto;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;

import lombok.Data;

@Data
public class WorkflowDetailCardDto extends WorkflowCardDto {
private String defaultConfig;
private LocalDateTime updatedAt;
private String updatedBy;
private List<ScheduleDto> schedules;
private List<Map<String, Object>> jobs;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@
import java.util.*;

import site.icebang.common.dto.PageParams;
import site.icebang.domain.workflow.dto.ScheduleDto;
import site.icebang.domain.workflow.dto.WorkflowCardDto;
import site.icebang.domain.workflow.dto.WorkflowDetailCardDto;

public interface WorkflowMapper {
List<WorkflowCardDto> selectWorkflowList(PageParams pageParams);

int selectWorkflowCount(PageParams pageParams);

WorkflowCardDto selectWorkflowById(BigInteger id);

WorkflowDetailCardDto selectWorkflowDetailById(BigInteger workflowId);

List<ScheduleDto> selectSchedulesByWorkflowId(BigInteger workflowId);

List<Map<String, Object>> selectWorkflowWithJobsAndTasks(BigInteger workflowId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import lombok.Getter;
import lombok.NoArgsConstructor;

import site.icebang.domain.workflow.dto.TaskDto;

@Getter
@NoArgsConstructor // MyBatis가 객체를 생성하기 위해 필요
public class Task {
Expand All @@ -17,4 +19,11 @@ public class Task {

/** Task 실행에 필요한 파라미터 (JSON) 예: {"url": "http://...", "method": "POST", "body": {...}} */
private JsonNode parameters;

public Task(TaskDto taskDto) {
this.id = taskDto.getId();
this.name = taskDto.getName();
this.type = taskDto.getType();
this.parameters = taskDto.getParameters();
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package site.icebang.domain.workflow.service;

import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -101,58 +101,82 @@ public void executeWorkflow(Long workflowId) {

private boolean executeTasksForJob(JobRun jobRun, Map<String, JsonNode> workflowContext) {
List<TaskDto> taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId());
List<Task> tasks = taskDtos.stream().map(this::convertToTask).collect(Collectors.toList());

workflowLogger.info("Job (JobRunId={}) 내 총 {}개의 Task를 실행합니다.", jobRun.getId(), tasks.size());
// execution_order null 처리 및 중복 처리
taskDtos.sort(
Comparator.comparing(
TaskDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(TaskDto::getId));

for (Task task : tasks) {
TaskRun taskRun = TaskRun.start(jobRun.getId(), task.getId());
workflowLogger.info(
"Job (JobRunId={}) 내 총 {}개의 Task를 execution_order 순으로 실행합니다.",
jobRun.getId(),
taskDtos.size());

for (TaskDto taskDto : taskDtos) {
TaskRun taskRun = TaskRun.start(jobRun.getId(), taskDto.getId(), taskDto.getExecutionOrder());
taskRunMapper.insert(taskRun);

// Task 컨텍스트로 전환
mdcManager.setTaskContext(taskRun.getId());
workflowLogger.info("Task 실행 시작: TaskId={}, TaskRunId={}", task.getId(), taskRun.getId());
workflowLogger.info(
"Task 실행 시작: TaskId={}, ExecutionOrder={}, TaskName={}, TaskRunId={}",
taskDto.getId(),
taskDto.getExecutionOrder(),
taskDto.getName(),
taskRun.getId());

String runnerBeanName = task.getType().toLowerCase() + "TaskRunner";
String runnerBeanName = taskDto.getType().toLowerCase() + "TaskRunner";
TaskRunner runner = taskRunners.get(runnerBeanName);

if (runner == null) {
taskRun.finish("FAILED", "지원하지 않는 Task 타입: " + task.getType());
taskRun.finish("FAILED", "지원하지 않는 Task 타입: " + taskDto.getType());
taskRunMapper.update(taskRun);
workflowLogger.error("Task 실행 실패 (미지원 타입): Type={}", task.getType());
workflowLogger.error(
"Task 실행 실패 (미지원 타입): Type={}, ExecutionOrder={}",
taskDto.getType(),
taskDto.getExecutionOrder());
mdcManager.setJobContext(jobRun.getId()); // Job 컨텍스트로 복원
return false;
}

// TaskDto에서 직접 Task 생성 (불필요한 변환 제거)
ObjectNode requestBody =
bodyBuilders.stream()
.filter(builder -> builder.supports(task.getName()))
.filter(builder -> builder.supports(taskDto.getName()))
.findFirst()
.map(builder -> builder.build(task, workflowContext))
.map(builder -> builder.build(createTaskFromDto(taskDto), workflowContext))
.orElse(objectMapper.createObjectNode());

TaskRunner.TaskExecutionResult result = runner.execute(task, taskRun, requestBody);
TaskRunner.TaskExecutionResult result =
runner.execute(createTaskFromDto(taskDto), taskRun, requestBody);
taskRun.finish(result.status(), result.message());
taskRunMapper.update(taskRun);

if (result.isFailure()) {
workflowLogger.error("Task 실행 실패: Message={}", result.message());
workflowLogger.error(
"Task 실행 실패: ExecutionOrder={}, Message={}",
taskDto.getExecutionOrder(),
result.message());
mdcManager.setJobContext(jobRun.getId()); // Job 컨텍스트로 복원
return false;
}

try {
JsonNode resultJson = objectMapper.readTree(result.message());
workflowContext.put(task.getName(), resultJson);
workflowContext.put(taskDto.getName(), resultJson);
} catch (JsonProcessingException e) {
workflowLogger.error("Task 결과 JSON 파싱 실패");
workflowLogger.error("Task 결과 JSON 파싱 실패: ExecutionOrder={}", taskDto.getExecutionOrder());
taskRun.finish("FAILED", "결과 JSON 파싱 실패");
taskRunMapper.update(taskRun);
mdcManager.setJobContext(jobRun.getId()); // Job 컨텍스트로 복원
return false;
}

workflowLogger.info("Task 실행 성공: TaskRunId={}", taskRun.getId());
workflowLogger.info(
"Task 실행 성공: ExecutionOrder={}, TaskRunId={}",
taskDto.getExecutionOrder(),
taskRun.getId());

// 다시 Job 컨텍스트로 복원
mdcManager.setJobContext(jobRun.getId());
Expand All @@ -161,28 +185,7 @@ private boolean executeTasksForJob(JobRun jobRun, Map<String, JsonNode> workflow
}

/** TaskDto를 Task 모델로 변환합니다. 📌 주의: Reflection을 사용한 방식은 성능이 느리고 불안정하므로 권장되지 않습니다. */
private Task convertToTask(TaskDto taskDto) {
Task task = new Task();
try {
java.lang.reflect.Field idField = Task.class.getDeclaredField("id");
idField.setAccessible(true);
idField.set(task, taskDto.getId());

java.lang.reflect.Field nameField = Task.class.getDeclaredField("name");
nameField.setAccessible(true);
nameField.set(task, taskDto.getName());

java.lang.reflect.Field typeField = Task.class.getDeclaredField("type");
typeField.setAccessible(true);
typeField.set(task, taskDto.getType());

java.lang.reflect.Field parametersField = Task.class.getDeclaredField("parameters");
parametersField.setAccessible(true);
parametersField.set(task, taskDto.getParameters());

} catch (Exception e) {
throw new RuntimeException("TaskDto to Task 변환 중 오류 발생", e);
}
return task;
private Task createTaskFromDto(TaskDto taskDto) {
return new Task(taskDto); // 생성자 사용
}
}
Loading
Loading