diff --git a/main.py b/main.py index 5920810..e66c17c 100644 --- a/main.py +++ b/main.py @@ -101,7 +101,7 @@ def get_next_group_id_offset(es_client, index_pattern="planit-edr-ai-grouping-*" # ID 중복을 막기 위해, 조회 실패 시 파이프라인을 중지시킴 raise e -def run_inference(es_host, es_user, es_pass, target_index_date_str): +def run_inference(es_host, es_user, es_pass, target_batch_id): # --- 1. 아티팩트(모델, 전처리기) 로드 --- print("모델 및 전처리기 로드 중...") @@ -156,10 +156,10 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str): # df(모델용), original_docs(원본 저장용) 2개를 받음 # (src.es_client에서 함수 가져옴) - df, original_docs = fetch_malicious_events(es_client, preprocessors, target_index_date_str) + df, original_docs = fetch_malicious_events(es_client, target_batch_id) if df is None or original_docs is None: - print(f"{target_index_date_str} 인덱스에 처리할 데이터가 없습니다.") + print(f"{target_batch_id} 인덱스에 처리할 데이터가 없습니다.") return # 조회된 데이터가 없으면 종료 except Exception as e: @@ -265,7 +265,8 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str): # --- 6. 그룹핑 결과 Elasticsearch에 저장 --- # 그룹핑 결과를 저장할 새 인덱스 이름 # TARGET_INDEX = "planit-edr-ai-grouping" - TARGET_INDEX = f"planit-edr-ai-grouping-{target_index_date_str}" + current_date_suffix = target_batch_id.split('_')[0] + TARGET_INDEX = f"planit-edr-ai-grouping-{current_date_suffix}" # 저장할 그룹 ID 필드 이름 GROUP_ID_FIELD = "ai_group_id" @@ -314,6 +315,10 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str): help="The specific index date string (e.g., '2025.11.06_14') to process." ) args = parser.parse_args() + + target_batch_id = args.index_date_str # "2025.11.20_14" + + print(f"Processing Batch ID: {target_batch_id}") ssm_client = boto3.client('ssm', region_name='ap-northeast-2') @@ -330,7 +335,7 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str): exit(1) # SSM 실패 시 종료 try: - run_inference(ES_HOST, ES_USER, ES_PASSWORD, args.index_date_str) + run_inference(ES_HOST, ES_USER, ES_PASSWORD, target_batch_id) except Exception as e: print(f"EC2 인스턴스에서 메인 파이프라인(run_inference) 실행 중 오류: {e}") diff --git a/src/es_client.py b/src/es_client.py index 85b5f29..2197392 100644 --- a/src/es_client.py +++ b/src/es_client.py @@ -2,63 +2,68 @@ from elasticsearch import Elasticsearch, helpers as es_helpers # Elasticsearch에서 데이터 가져오기 -def fetch_malicious_events(es_client, preprocessors, target_index_date_str): +def fetch_malicious_events(es_client, target_batch_id): """ - Elasticsearch에서 '악성' 이벤트를 조회하여 + Elasticsearch에서 특정 시간대(Batch ID)의 '악성' 이벤트를 조회 1. 모델 저장장용 DataFrame - 2. 저장용 원본 문서(dict) 리스트 - 를 반환 + 2. 저장용 원본 문서(dict) 리스트를 반환 """ - print(f"Elasticsearch에서 '{target_index_date_str}' 시간대 '악성' 데이터 로드 중...") - try: - # INDEX_PATTERN = "planit-edr-ai-analyzed-*" - INDEX_PATTERN = f"planit-edr-ai-classified-{target_index_date_str}" - + + # 1. 인덱스 이름과 시간대 분리 + target_index_date = target_batch_id.split('_')[0] + + INDEX_PATTERN = f"planit-edr-ai-classified-{target_index_date}" + + print(f"Elasticsearch 조회 시작: 인덱스='{INDEX_PATTERN}', 배치ID='{target_batch_id}'") + + try: + # 2. 쿼리 구성 (Bool Query 사용) + # 조건 1: AI 분석 결과가 'malicious' 인 것 + # 조건 2: EventDate가 해당 시간(Batch ID) 인 것 query = { "query": { - "term": { - "ai_analysis.result": "malicious" + "bool": { + "must": [ + {"term": {"ai_analysis.result": "malicious"}}, + {"term": {"EventDate": target_batch_id}} + ] } - }, + } } - print(f"'{INDEX_PATTERN}' 인덱스에서 '악성' 이벤트 조회...") - - raw_docs = [doc for doc in es_helpers.scan(es_client, index=INDEX_PATTERN, query=query)] + # 3. 데이터 스캔 (Scan API 사용) + scan_gen = es_helpers.scan( + es_client, + index=INDEX_PATTERN, + query=query, + preserve_order=False + ) - if not raw_docs: - print(f"'{INDEX_PATTERN}' 인덱스에 조회된 '악성' 이벤트가 없습니다.") - return None, None + original_docs = [] - print(f"총 {len(raw_docs)}개의 '악성' 문서를 가져왔습니다.") - - original_docs = [] # 원본 저장을 위한 리스트 (dict) - data_for_df = [] # Dataframe용 리스트 - - for doc in raw_docs: + # 4. 데이터 수집 + for doc in scan_gen: source_data = doc['_source'] - doc_id = doc['_id'] - - # 1.저장용 원본 리스트 (원본 _source 딕셔너리 + _id) - source_data_copy = source_data.copy() - source_data_copy['_id'] = doc_id - original_docs.append(source_data_copy) + source_data['_id'] = doc['_id'] # ID를 source에 포함시킴 + original_docs.append(source_data) + + if not original_docs: + print(f"'{INDEX_PATTERN}' (Batch: {target_batch_id})에 '악성' 이벤트가 없습니다.") + return None, None - # 2. Dataframe용 데이터 (json_normalize를 위해 _id 추가) - source_data['_id'] = doc_id - data_for_df.append(source_data) + print(f"총 {len(original_docs)}개의 '악성' 문서를 가져왔습니다.") - # 모델용 DataFrame 생성 - df = pd.json_normalize(data_for_df, sep='_') + # 5. DataFrame 생성 + # json_normalize를 사용하면 중첩된 JSON(ai_analysis.result 등)을 자동으로 펼쳐줍니다. + # sep='_' 옵션으로 컬럼명이 'ai_analysis_result' 처럼 생성됩니다. + df = pd.json_normalize(original_docs, sep='_') - # DataFrame과 원본 리스트 2개 반환환 return df, original_docs except Exception as e: - # 인덱스가 없을 때(데이터 유실이 아니라, 원래 없는 경우)를 정상 처리 if 'index_not_found_exception' in str(e): - print(f"'{INDEX_PATTERN}' 인덱스가 존재하지 않습니다 (해당 시간대 악성 이벤트 없음).") + print(f"인덱스가 존재하지 않습니다: {INDEX_PATTERN} (해당 날짜에 데이터 없음)") return None, None - print(f"Elasticsearch 데이터 로드 중 오류: {e}") + print(f"Elasticsearch 데이터 로드 중 오류 발생: {e}") raise e \ No newline at end of file diff --git a/src/feature_engineering.py b/src/feature_engineering.py index 0fe3659..f48310d 100644 --- a/src/feature_engineering.py +++ b/src/feature_engineering.py @@ -1,5 +1,3 @@ -# feature engineering 함수 - import pandas as pd import numpy as np import torch diff --git a/src/models.py b/src/models.py index f8ad615..49bf673 100644 --- a/src/models.py +++ b/src/models.py @@ -1,5 +1,3 @@ -# 모델 클래스 정의 - import torch import torch.nn as nn