Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions apps/pre-processing-service/app/core/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import sys
from contextvars import ContextVar

# trace_id context 변수 import
try:
from app.middleware.ServiceLoggerMiddleware import trace_id_context
except ImportError:
# 모듈이 아직 로드되지 않은 경우를 위한 기본값
trace_id_context: ContextVar[str] = ContextVar("trace_id", default="")


Expand All @@ -18,12 +20,13 @@ def setup_file_logging():

log_dir = "/logs"

os.makedirs(log_dir, exist_ok=True)
# 로그 디렉토리가 없으면 생성

log_file_path = os.path.join(log_dir, "app.log")
error_log_file_path = os.path.join(log_dir, "error.log")
# 로그 파일 경로 설정
log_file_path = log_dir + "/pre-processing-app.log"
error_log_file_path = log_dir + "/pre-processing-app-error.log"

# trace_id를 포함한 structured 로그 포맷
# trace_id를 포함한 간단한 포맷 문자열 사용
def add_trace_id_filter(record):
try:
current_trace_id = trace_id_context.get()
Expand All @@ -42,11 +45,10 @@ def exclude_logging_middleware_filter(record):
return False
return add_trace_id_filter(record)

structured_format = "{time:YYYY-MM-DD HH:mm:ss.SSS} {level: <8} {thread.name: <15} {name}:{function}:{line} [{extra[trace_id]}] {message}"

# 파일 로깅 핸들러 추가 - trace_id 포함, LoggingMiddleware 제외
logger.add(
log_file_path,
format=structured_format,
format="[{extra[trace_id]}] {time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {name}:{function}:{line} | {message}",
level="DEBUG",
rotation="100 MB", # 100MB마다 로테이션
retention="7 days", # 7일간 보관
Expand All @@ -58,10 +60,10 @@ def exclude_logging_middleware_filter(record):
filter=exclude_logging_middleware_filter,
)

# 에러 레벨 이상은 별도 파일에도 기록
# 에러 레벨 이상은 별도 파일에도 기록 - trace_id 포함, LoggingMiddleware 제외
logger.add(
error_log_file_path,
format=structured_format,
format="[{extra[trace_id]}] {time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {name}:{function}:{line} | {message}",
level="ERROR",
rotation="50 MB",
retention="30 days",
Expand All @@ -79,7 +81,7 @@ def exclude_logging_middleware_filter(record):
sys.stdout,
format="[{extra[trace_id]}] {time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} | {message}",
level="DEBUG",
colorize=False,
colorize=False, # colorize 비활성화하여 태그 충돌 방지
filter=add_trace_id_filter,
)

Expand Down
42 changes: 41 additions & 1 deletion apps/pre-processing-service/app/middleware/rds_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,43 @@ class RDSLogger:

def __init__(self):
self.db_manager = MariadbManager()
self.max_log_message_length = 450

def _truncate_log_message(self, log_message: str) -> str:
"""
log_message를 VARCHAR(500) 크기에 맞게 자르기

Args:
log_message: 원본 로그 메시지

Returns:
str: 잘린 로그 메시지
"""
if not log_message:
return log_message or ""

# UTF-8 인코딩 기준으로 바이트 길이 체크
encoded_message = log_message.encode("utf-8")
original_length = len(encoded_message)

if original_length <= self.max_log_message_length:
return log_message

# 메시지가 너무 길면 자르기
truncate_suffix = "... [TRUNCATED]"
available_length = self.max_log_message_length - len(
truncate_suffix.encode("utf-8")
)

truncated_message = encoded_message[:available_length].decode(
"utf-8", errors="ignore"
)
truncated_message += truncate_suffix

logger.warning(
f"로그 메시지 잘림: {original_length} bytes -> {len(truncated_message.encode('utf-8'))} bytes"
)
return truncated_message

async def log_execution(
self,
Expand Down Expand Up @@ -45,12 +82,15 @@ async def log_execution(
bool: 저장 성공 여부
"""
try:
# TODO: Issue #XXX - log_message VARCHAR(500) 제한으로 인한 임시 truncation
truncated_log_message = self._truncate_log_message(log_message)
# 향후 TEXT 타입으로 변경하거나 별도 로그 저장소 검토 필요
execution_log = ExecutionLog(
execution_type=execution_type,
source_id=source_id,
log_level=log_level,
executed_at=datetime.now(),
log_message=log_message,
log_message=truncated_log_message,
trace_id=trace_id,
run_id=run_id,
status=status,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@ public class QuartzScheduleService {
private final Scheduler scheduler;

public void addOrUpdateSchedule(Schedule schedule) {
JobKey jobKey = JobKey.jobKey("workflow-" + schedule.getWorkflowId());
JobDetail jobDetail = JobBuilder.newJob(WorkflowTriggerJob.class)
.withIdentity(jobKey)
.withDescription("Workflow " + schedule.getWorkflowId() + " Trigger Job")
.usingJobData("workflowId", schedule.getWorkflowId())
.storeDurably()
.build();

TriggerKey triggerKey = TriggerKey.triggerKey("trigger-for-workflow-" + schedule.getWorkflowId());
Trigger trigger = TriggerBuilder.newTrigger()
.forJob(jobDetail)
.withIdentity(triggerKey)
.withSchedule(CronScheduleBuilder.cronSchedule(schedule.getCronExpression()))
.build();
try {
// 기존 스케줄 삭제 (있다면)
deleteSchedule(schedule.getWorkflowId());

JobKey jobKey = JobKey.jobKey("workflow-" + schedule.getWorkflowId());
JobDetail jobDetail = JobBuilder.newJob(WorkflowTriggerJob.class)
.withIdentity(jobKey)
.withDescription("Workflow " + schedule.getWorkflowId() + " Trigger Job")
.usingJobData("workflowId", schedule.getWorkflowId())
.storeDurably()
.build();

TriggerKey triggerKey = TriggerKey.triggerKey("trigger-for-workflow-" + schedule.getWorkflowId());
Trigger trigger = TriggerBuilder.newTrigger()
.forJob(jobDetail)
.withIdentity(triggerKey)
.withSchedule(CronScheduleBuilder.cronSchedule(schedule.getCronExpression()))
.build();

scheduler.scheduleJob(jobDetail, trigger);
log.info("Quartz 스케줄 등록/업데이트 완료: Workflow ID {}", schedule.getWorkflowId());
} catch (SchedulerException e) {
Expand All @@ -38,6 +42,15 @@ public void addOrUpdateSchedule(Schedule schedule) {
}

public void deleteSchedule(Long workflowId) {
// ... (삭제 로직)
try {
JobKey jobkey = JobKey.jobKey("workflow-" + workflowId);
TriggerKey triggerKey = TriggerKey.triggerKey("trigger-for-workflow-" + workflowId);

scheduler.unscheduleJob(triggerKey);
scheduler.deleteJob(jobkey);
log.info("Quartz 스케줄 삭제 완료: Workflow ID {}", workflowId);
} catch (SchedulerException e) {
log.error("Quartz 스케줄 삭제 실패: Workflow ID {}", workflowId, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package site.icebang.domain.workflow.dto;

import java.time.LocalDateTime;

import com.fasterxml.jackson.databind.JsonNode;

import lombok.Data;

@Data
public class TaskDto {
private Long id;
private String name;
private String type;
private JsonNode parameters;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

import org.apache.ibatis.annotations.Mapper;

import site.icebang.domain.workflow.dto.TaskDto;
import site.icebang.domain.workflow.model.Job;
import site.icebang.domain.workflow.model.Task;

@Mapper
public interface JobMapper {
List<Job> findJobsByWorkflowId(Long workflowId);

List<Task> findTasksByJobId(Long jobId);
List<TaskDto> findTasksByJobId(Long jobId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import site.icebang.domain.execution.model.JobRun;
import site.icebang.domain.execution.model.TaskRun;
import site.icebang.domain.execution.model.WorkflowRun;
import site.icebang.domain.workflow.dto.TaskDto;
import site.icebang.domain.workflow.mapper.JobMapper;
import site.icebang.domain.workflow.model.Job;
import site.icebang.domain.workflow.model.Task;
Expand Down Expand Up @@ -76,8 +77,16 @@ public void executeWorkflow(Long workflowId) {
log.info("========== 워크플로우 실행 성공: WorkflowRunId={} ==========", workflowRun.getId());
}

/**
* 특정 Job에 속한 Task들을 순차적으로 실행합니다.
*
* @param jobRun 실행중인 Job의 기록 객체
* @return 모든 Task가 성공하면 true, 하나라도 실패하면 false
*/
private boolean executeTasksForJob(JobRun jobRun, Map<String, JsonNode> workflowContext) {
List<Task> tasks = jobMapper.findTasksByJobId(jobRun.getJobId());
// TaskDto를 조회하고 Task로 변환
List<TaskDto> taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId());
List<Task> tasks = taskDtos.stream().map(this::convertToTask).toList();
log.info("Job (JobRunId={}) 내 총 {}개의 Task를 실행합니다.", jobRun.getId(), tasks.size());

for (Task task : tasks) {
Expand Down Expand Up @@ -138,30 +147,91 @@ private ObjectNode prepareRequestBody(Task task, Map<String, JsonNode> context)
requestBody.setAll((ObjectNode) staticBody);
}

// 📌 디버깅용: 현재 컨텍스트 출력
log.debug("=== 워크플로우 컨텍스트 확인 ===");
for (Map.Entry<String, JsonNode> entry : context.entrySet()) {
log.debug("Task: {}, Data: {}", entry.getKey(), entry.getValue().toString());
}

// input_mapping 규칙에 따라 동적으로 값 덮어쓰기/추가
if (mappingRules != null && mappingRules.isObject()) {
mappingRules
.fields()
.forEachRemaining(
entry -> {
String targetField = entry.getKey(); // 예: "keyword"
String sourcePath = entry.getValue().asText(); // 예: "키워드 검색 태스크.keyword"
String targetField = entry.getKey(); // 예: "product_url"
String sourcePath =
entry
.getValue()
.asText(); // 예: "상품 유사도 분석 태스크.data.selected_product.product_url"

log.debug("=== input_mapping 처리 ===");
log.debug("targetField: {}, sourcePath: {}", targetField, sourcePath);

String[] parts = sourcePath.split("\\.", 2);
if (parts.length == 2) {
String sourceTaskName = parts[0];
String sourceFieldPath = parts[1];

log.debug(
"sourceTaskName: {}, sourceFieldPath: {}", sourceTaskName, sourceFieldPath);

JsonNode sourceData = context.get(sourceTaskName);
log.debug("sourceData found: {}", sourceData != null);

if (sourceData != null) {
JsonNode valueToSet = sourceData.at("/" + sourceFieldPath.replace('.', '/'));
log.debug("sourceData content: {}", sourceData.toString());

String jsonPath = "/" + sourceFieldPath.replace('.', '/');
log.debug("jsonPath: {}", jsonPath);

JsonNode valueToSet = sourceData.at(jsonPath);
log.debug(
"valueToSet found: {}, isMissing: {}",
valueToSet,
valueToSet.isMissingNode());

if (!valueToSet.isMissingNode()) {
log.debug("설정할 값: {}", valueToSet.toString());
requestBody.set(targetField, valueToSet);
} else {
log.warn("값을 찾을 수 없음: jsonPath={}", jsonPath);
}
} else {
log.warn("소스 태스크 데이터를 찾을 수 없음: {}", sourceTaskName);
}
}
});
}

log.debug("최종 requestBody: {}", requestBody.toString());
return requestBody;
}

/** TaskDto를 Task 모델로 변환합니다. 비즈니스 로직 실행에 필요한 필드만 복사합니다. */
private Task convertToTask(TaskDto taskDto) {
Task task = new Task();
try {
java.lang.reflect.Field idField = Task.class.getDeclaredField("id");
idField.setAccessible(true);
idField.set(task, taskDto.getId());

java.lang.reflect.Field nameField = Task.class.getDeclaredField("name");
nameField.setAccessible(true);
nameField.set(task, taskDto.getName());

java.lang.reflect.Field typeField = Task.class.getDeclaredField("type");
typeField.setAccessible(true);
typeField.set(task, taskDto.getType());

java.lang.reflect.Field parametersField = Task.class.getDeclaredField("parameters");
parametersField.setAccessible(true);
parametersField.set(task, taskDto.getParameters());

} catch (Exception e) {
throw new RuntimeException("TaskDto to Task 변환 중 오류 발생", e);
}

return task;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ public RestTemplate restTemplate(RestTemplateBuilder builder) {
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();

// 2. 타임아웃 설정 (이 메서드들은 deprecated 아님)
requestFactory.setConnectTimeout(Duration.ofSeconds(5));
requestFactory.setReadTimeout(Duration.ofSeconds(5));
requestFactory.setConnectTimeout(Duration.ofSeconds(30000));
requestFactory.setReadTimeout(Duration.ofSeconds(30000));

// 3. 빌더에 직접 생성한 requestFactory를 설정
return builder.requestFactory(() -> requestFactory).build();
Expand Down
2 changes: 1 addition & 1 deletion apps/user-service/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ management:
export:
enabled: true
server:
address: 127.0.0.1 # localhost에서만 접근
address: 0.0.0.0
port: 8081
security:
enabled: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
<result property="name" column="name"/>
<result property="type" column="type"/>
<result property="parameters" column="parameters" javaType="com.fasterxml.jackson.databind.JsonNode" jdbcType="VARCHAR" typeHandler="site.icebang.global.config.mybatis.typehandler.JsonNodeTypeHandler"/>
</resultMap>

<resultMap id="TaskDtoResultMap" type="site.icebang.domain.workflow.dto.TaskDto">
<id property="id" column="id"/>
<result property="name" column="name"/>
<result property="type" column="type"/>
<result property="parameters" column="parameters" javaType="com.fasterxml.jackson.databind.JsonNode" jdbcType="VARCHAR" typeHandler="site.icebang.global.config.mybatis.typehandler.JsonNodeTypeHandler"/>
<result property="createdAt" column="created_at"/>
<result property="updatedAt" column="updated_at"/>
</resultMap>
Expand All @@ -28,7 +35,7 @@
ORDER BY wj.execution_order ASC
</select>

<select id="findTasksByJobId" resultMap="TaskResultMap">
<select id="findTasksByJobId" resultMap="TaskDtoResultMap">
SELECT t.* FROM task t
JOIN job_task jt ON t.id = jt.task_id
WHERE jt.job_id = #{jobId}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
<update id="update">
UPDATE task_run
SET status = #{status},
finished_at = #{finishedAt},
result_message = #{resultMessage}
finished_at = #{finishedAt}
WHERE id = #{id}
</update>
</mapper>
Loading
Loading