-
Notifications
You must be signed in to change notification settings - Fork 0
Develop Guide
LogProcessor는 Streaming Server에서 로그를 단계별로 처리하기 위한 표준 인터페이스입니다.
모든 로그 처리 과정(AI 분석, 저장, 필터링 등)은 이 인터페이스를 구현한 Processor 형태로 등록되며,
order 값에 따라 순차적 또는 병렬적으로 실행됩니다.
이를 통해 로그 처리 파이프라인을 유연하게 확장할 수 있습니다.
예를 들어:
-
AiAnalysisProcessor: AI 이상 탐지 수행 -
DatabaseSaveProcessor: 로그를 DB에 저장 -
WebhookProcessor: 특정 이벤트 발생 시 외부 전송
package com.logmate.streaming.processor;
import com.logmate.streaming.global.log.LogEnvelope;
import reactor.core.publisher.Mono;
/**
- LogProcessor
-
- 로그 처리기의 표준 인터페이스.
*/
public interface LogProcessor {
int getOrder();
boolean supports(LogEnvelope env);
Mono<LogEnvelope> process(LogEnvelope env);
}
| 메서드 | 반환 타입 | 설명 |
|---|---|---|
| getOrder() | int | 프로세서의 실행 순서를 지정합니다. 낮을수록 먼저 실행됩니다. |
| supports(LogEnvelope env) | boolean | 해당 로그가 이 Processor의 처리 대상인지 여부를 판단합니다. |
| process(LogEnvelope env) | Mono | 실제 로그를 처리하고 결과를 반환합니다. Reactive Stream 기반으로 비동기 처리를 지원합니다. |
-
Processor 클래스 생성
- 예:
AiLogProcessor,DatabaseLogProcessor,WebhookProcessor
- 예:
-
LogProcessor 인터페이스 구현
@Component public class AiLogProcessor implements LogProcessor { @Override public int getOrder() { return 100; // 낮을수록 먼저 실행됨 }@Override public boolean supports(LogEnvelope env) { return env.getType().equals("AI_ANALYSIS"); } @Override public Mono<LogEnvelope> process(LogEnvelope env) { // AI 분석 로직 수행 return Mono.just(env); }}
-
LogProcessorRegistry에 Bean 등록- Spring Bean 으로 등록되면 자동으로 파이프라인에서 인식됩니다.
- Processor 간의 실행 순서는
getOrder()값으로 제어합니다.
-
테스트 및 검증
-
다양한 로그 타입(
LogEnvelope)으로 테스트하여supports()조건이 정상적으로 분기되는지 확인하세요. -
Reactive Stream 체인 내에서 예외가 발생하지 않도록
onErrorResume()등의 에러 처리 전략을 권장합니다.
-
com.logmate.streaming.processor.impl
-
병렬 실행 제어: 동일한
order값을 가진 Processor들은 병렬로 실행됩니다. -
예외 처리:
Mono체인 내부에서는 반드시 예외를 캐치하거나.onErrorResume()으로 복구하세요. - 상태 변경 금지: Processor는 무상태(stateless)로 설계하는 것이 권장됩니다.
- 유연한 확장성: 신규 Processor 추가만으로 파이프라인 로직을 수정하지 않고 기능을 확장할 수 있습니다.
LogStorageService는 Streaming Server의 로그 영속화 계층(Storage Layer) 을 담당하는 표준 인터페이스입니다.
이 인터페이스를 구현함으로써, LogMate 시스템은 저장소 타입에 독립적인 확장성을 확보합니다.
즉, OpenSearch, RDBMS, NoSQL, S3, GCS 등 다양한 저장소로 로그를 손쉽게 저장할 수 있습니다.
모든 로그는 LogEnvelope 객체 형태로 전달되며,
이 객체를 적절한 저장소 모델로 변환하여 안정적이고 내결함적인 로그 영속화를 수행합니다.
package com.logmate.streaming.storage; import com.logmate.streaming.global.log.LogEnvelope;/**
public interface LogStorageService { void storeLogEnvelope(LogEnvelope env); }
- LogStorageService
- 로그 저장소 연동을 위한 표준 인터페이스. */
-
새로운 저장소 서비스 구현
-
예:
-
OpenSearchStorageService -
RdbStorageService -
S3StorageService -
LocalFileStorageService
-
-
-
LogStorageService 인터페이스 구현
@Service public class OpenSearchStorageService implements LogStorageService {</span><span><span class="hljs-keyword">private</span></span><span> </span><span><span class="hljs-keyword">final</span></span><span> OpenSearchClient client; </span><span><span class="hljs-keyword">public</span></span><span> </span><span><span class="hljs-title function_">OpenSearchStorageService</span></span><span><span class="hljs-params">(OpenSearchClient client)</span></span><span> { </span><span><span class="hljs-built_in">this</span></span><span>.client = client; } </span><span><span class="hljs-meta">@Override</span></span><span> </span><span><span class="hljs-keyword">public</span></span><span> </span><span><span class="hljs-keyword">void</span></span><span> </span><span><span class="hljs-title function_">storeLogEnvelope</span></span><span><span class="hljs-params">(LogEnvelope env)</span></span><span> { </span><span><span class="hljs-keyword">try</span></span><span> { client.index(</span><span><span class="hljs-string">"logmate-index"</span></span><span>, env); } </span><span><span class="hljs-keyword">catch</span></span><span> (Exception e) { </span><span><span class="hljs-keyword">throw</span></span><span> </span><span><span class="hljs-keyword">new</span></span><span> </span><span><span class="hljs-title class_">LogStorageException</span></span><span>(</span><span><span class="hljs-string">"Failed to store log in OpenSearch"</span></span><span>, e); } }}
-
LogStorageProcessor에 주입@Bean public LogProcessor storageProcessor(LogStorageService storage) { return new LogStorageProcessor(storage, 1); } -
에러 처리 설계
-
저장 실패 시
RuntimeException또는 커스텀 예외(LogStorageException)을 발생시킵니다. -
상위 Processor(
LogStorageProcessor)는 이를 감지하고onErrorResume()또는 DLQ(Dead Letter Queue) 전송 로직으로 복구를 수행합니다.
-
com.logmate.streaming.storage.impl
(서비스 인터페이스는 com.logmate.streaming.storage,
구현체는 impl 하위 패키지에 위치시키는 것을 권장합니다.)
대상 인터페이스:
LogConsumer,LogProducer,DlqProducer
패키지:com.logmate.streaming.message
LogConsumer는 외부 메시지 브로커나 파일 스트림 등 다양한 입력 소스에서 로그를 수신하고 처리하는 역할을 담당합니다.
Kafka, RabbitMQ, 파일 기반 스트림 등 다양한 구현체를 통해 로그 메시지를 수신 → 파싱 → 파이프라인 전달하는 구조로 동작합니다.
실패한 메시지는 DLQ(Dead Letter Queue)로 전송되거나 알림 시스템을 통해 통보될 수 있습니다.
public interface LogConsumer { void consume(String message); }
-
DLQ 브로커 설정
-
Kafka 사용 시 별도 DLQ 토픽(
logmate-dlq) 생성 -
메시지 포맷에 예외 요약 포함 (
cause.getMessage())
-
-
DlqProducer 구현
@Component public class KafkaDlqProducer implements DlqProducer { private final KafkaTemplate<String, String> kafkaTemplate;</span><span><span class="hljs-meta">@Override</span></span><span> </span><span><span class="hljs-keyword">public</span></span><span> </span><span><span class="hljs-keyword">void</span></span><span> </span><span><span class="hljs-title function_">send</span></span><span><span class="hljs-params">(String key, String message, Throwable cause)</span></span><span> { </span><span><span class="hljs-type">String</span></span><span> </span><span><span class="hljs-variable">payload</span></span><span> </span><span><span class="hljs-operator">=</span></span><span> </span><span><span class="hljs-string">"[DLQ] "</span></span><span> + message + </span><span><span class="hljs-string">" :: cause="</span></span><span> + cause.getMessage(); kafkaTemplate.send(</span><span><span class="hljs-string">"logmate-dlq"</span></span><span>, key, payload); }}
-
통합
-
LogConsumer또는LogProcessor내부에서 에러 발생 시 호출 -
DLQ 전송 실패 자체도 별도 로깅 처리
-
com.logmate.streaming.message.dlq.impl
-
DLQ 메시지 포맷 통일
-
timestamp,originalTopic,errorMessage,stackTrace등 포함
-
-
모니터링
-
DLQ 메시지 개수를 지표로 수집하여 장애 감지 트리거로 활용
-
-
운영 시나리오
-
DLQ 소비 전용 Processor(
DlqReprocessor)를 별도로 두어 재처리 가능
-
LogProducer는 Streaming Server 내부에서 파싱된 로그 데이터를 외부 메시지 브로커(Kafka 등) 로 전송하기 위한 표준 인터페이스입니다.
Agent, AI Server, 혹은 외부 통합 시스템으로 로그를 전송할 때 사용됩니다.
public interface LogProducer { Mono<Void> sendLog(ParsedLogData logData, LogType logType, String agentId, String thNum); }
-
DLQ 브로커 설정
-
Kafka 사용 시 별도 DLQ 토픽(
logmate-dlq) 생성 -
메시지 포맷에 예외 요약 포함 (
cause.getMessage())
-
-
DlqProducer 구현
@Component public class KafkaDlqProducer implements DlqProducer { private final KafkaTemplate<String, String> kafkaTemplate;</span><span><span class="hljs-meta">@Override</span></span><span> </span><span><span class="hljs-keyword">public</span></span><span> </span><span><span class="hljs-keyword">void</span></span><span> </span><span><span class="hljs-title function_">send</span></span><span><span class="hljs-params">(String key, String message, Throwable cause)</span></span><span> { </span><span><span class="hljs-type">String</span></span><span> </span><span><span class="hljs-variable">payload</span></span><span> </span><span><span class="hljs-operator">=</span></span><span> </span><span><span class="hljs-string">"[DLQ] "</span></span><span> + message + </span><span><span class="hljs-string">" :: cause="</span></span><span> + cause.getMessage(); kafkaTemplate.send(</span><span><span class="hljs-string">"logmate-dlq"</span></span><span>, key, payload); }}
-
통합
-
LogConsumer또는LogProcessor내부에서 에러 발생 시 호출 -
DLQ 전송 실패 자체도 별도 로깅 처리
-
com.logmate.streaming.message.dlq.impl
-
DLQ 메시지 포맷 통일
-
timestamp,originalTopic,errorMessage,stackTrace등 포함
-
-
모니터링
-
DLQ 메시지 개수를 지표로 수집하여 장애 감지 트리거로 활용
-
-
운영 시나리오
-
DLQ 소비 전용 Processor(
DlqReprocessor)를 별도로 두어 재처리 가능
-
DlqProducer는 Dead Letter Queue(DLQ) 로 실패한 로그 메시지를 전송하여 유실을 방지하기 위한 인터페이스입니다.
주로 LogConsumer나 LogPipeline에서 파싱/처리 실패가 발생했을 때 호출됩니다.
public interface DlqProducer { void send(String key, String message, Throwable cause); }
-
DLQ 브로커 설정
-
Kafka 사용 시 별도 DLQ 토픽(
logmate-dlq) 생성 -
메시지 포맷에 예외 요약 포함 (
cause.getMessage())
-
-
DlqProducer 구현
@Component public class KafkaDlqProducer implements DlqProducer { private final KafkaTemplate<String, String> kafkaTemplate;</span><span><span class="hljs-meta">@Override</span></span><span> </span><span><span class="hljs-keyword">public</span></span><span> </span><span><span class="hljs-keyword">void</span></span><span> </span><span><span class="hljs-title function_">send</span></span><span><span class="hljs-params">(String key, String message, Throwable cause)</span></span><span> { </span><span><span class="hljs-type">String</span></span><span> </span><span><span class="hljs-variable">payload</span></span><span> </span><span><span class="hljs-operator">=</span></span><span> </span><span><span class="hljs-string">"[DLQ] "</span></span><span> + message + </span><span><span class="hljs-string">" :: cause="</span></span><span> + cause.getMessage(); kafkaTemplate.send(</span><span><span class="hljs-string">"logmate-dlq"</span></span><span>, key, payload); }}
-
통합
-
LogConsumer또는LogProcessor내부에서 에러 발생 시 호출 -
DLQ 전송 실패 자체도 별도 로깅 처리
-
com.logmate.streaming.message.dlq.impl
-
DLQ 메시지 포맷 통일
-
timestamp,originalTopic,errorMessage,stackTrace등 포함
-
-
모니터링
-
DLQ 메시지 개수를 지표로 수집하여 장애 감지 트리거로 활용
-
-
운영 시나리오
-
DLQ 소비 전용 Processor(
DlqReprocessor)를 별도로 두어 재처리 가능
-