diff --git a/main.py b/main.py index 85db961..36b9bb6 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ import joblib from elasticsearch import Elasticsearch, helpers as es_helpers from sklearn.cluster import AgglomerativeClustering +from collections import defaultdict from datetime import datetime from tqdm import tqdm @@ -185,45 +186,80 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str): print("임베딩 생성 완료.") - # --- 5. 클러스터링 (그룹핑) --- + # --- 5. HostName 별 클러스터링 (그룹핑) --- CLUSTERING_THRESHOLD = 0.8 - print(f"임베딩 클러스터링(그룹핑) 시작... (임계값: {CLUSTERING_THRESHOLD})") + print(f"임베딩 클러스터링(그룹핑) 시작... (HostName별 분리, 임계값: {CLUSTERING_THRESHOLD})") - num_samples = all_embeddings_np.shape[0] - - if num_samples < 2: - if num_samples == 1: - print("샘플이 1개만 존재하여, 단일 클러스터(ID: 0)로 할당합니다.") - predicted_labels = np.array([0]) - else: - print("임베딩된 샘플이 없어 클러스터링을 건너뜁니다.") - return + # 5-0. HostName 매핑을 위한 준비 + # df에는 HostName이 있지만, 정렬된 features_df_only 순서와 맞춰야 함 + # _id를 Key로 HostName을 찾는 맵 생성 + if 'HostName' in df.columns: + id_to_host_map = df.set_index('_id')['HostName'].to_dict() else: - print(f"총 {num_samples}개의 샘플로 클러스터링을 수행합니다.") - clustering = AgglomerativeClustering( - n_clusters=None, - distance_threshold=CLUSTERING_THRESHOLD, - metric='euclidean', - linkage='average' - ) - predicted_labels = clustering.fit_predict(all_embeddings_np) # predicted_labels는 정렬된 features_df_only와 순서가 일치 + # HostName이 없으면 AgentIP 등 다른 필드 사용 고려, 여기선 Unknown 처리 + print("경고: DataFrame에 'HostName' 컬럼이 없습니다. 전체를 하나로 처리합니다.") + id_to_host_map = {doc_id: "Unknown" for doc_id in ids_sorted} + + # 5-1. 데이터를 HostName 별로 분리 + # host_groups 구조: { "PC-A": { "indices": [0, 3, 5], "embeddings": [...] }, ... } + host_groups = defaultdict(lambda: {"indices": [], "embeddings": []}) + + for idx, doc_id in enumerate(ids_sorted): + host_name = id_to_host_map.get(doc_id, "Unknown") + host_groups[host_name]["indices"].append(idx) + # numpy array에서 해당 인덱스의 임베딩 추출 + host_groups[host_name]["embeddings"].append(all_embeddings_np[idx]) + + # 5-2. 결과 저장을 위한 배열 초기화 (전체 샘플 크기만큼 -1로 초기화) + final_predicted_labels = np.full(all_embeddings_np.shape[0], -1, dtype=int) - # 5.5 전역 그룹 ID 오프셋 계산 및 적용 + # 5-3. 전역 그룹 ID 오프셋 가져오기 (최초 1회) try: - # 1. ES에서 현재 최대 ID + 1 값을 가져옴 - id_offset = get_next_group_id_offset(es_client, f"planit-edr-ai-grouping-*") - - # 2. 로컬 ID(0, 1, 2...)에 오프셋 적용 - if id_offset > 0: - print(f"로컬 그룹 ID {len(set(predicted_labels))}개에 오프셋 {id_offset} 적용 중...") - # (predicted_labels는 numpy 배열이므로 단순 덧셈으로 전체 요소에 적용됨) - predicted_labels = predicted_labels + id_offset - + current_global_offset = get_next_group_id_offset(es_client, f"planit-edr-ai-grouping-*") except Exception as e: - print(f"오류: 다음 그룹 ID 오프셋을 가져올 수 없습니다. {e}") + print(f"오류: 그룹 ID 오프셋 조회 실패. {e}") return + # 5-4. Host별 반복문 돌며 클러스터링 수행 + print(f"총 {len(host_groups)}개의 Host에 대해 개별 클러스터링 수행 중...") + + for host_name, data in host_groups.items(): + indices = data["indices"] + embeddings = np.array(data["embeddings"]) + + num_host_samples = embeddings.shape[0] + + if num_host_samples == 0: + continue + + # Host 내 샘플이 1개인 경우 -> 독자 그룹 할당 + if num_host_samples == 1: + local_labels = np.array([0]) + else: + # Host 내 샘플끼리만 클러스터링 + clustering = AgglomerativeClustering( + n_clusters=None, + distance_threshold=CLUSTERING_THRESHOLD, + metric='euclidean', + linkage='average' + ) + local_labels = clustering.fit_predict(embeddings) + + # [중요] 로컬 라벨에 현재 글로벌 오프셋을 더해 전역 ID 부여 + # 예: PC-A에서 그룹 0, 1이 나오면 -> 100, 101 (오프셋이 100일 때) + global_labels = local_labels + current_global_offset + + # 결과 배열의 원래 인덱스 위치에 저장 + final_predicted_labels[indices] = global_labels + + # 다음 Host를 위해 오프셋 업데이트 + # (현재 Host에서 생성된 그룹 수만큼 증가) + current_global_offset += (len(set(local_labels))) + + # 변수명 맞춤 (기존 코드 호환성 유지) + predicted_labels = final_predicted_labels + # --- 6. 그룹핑 결과 Elasticsearch에 저장 --- # 그룹핑 결과를 저장할 새 인덱스 이름 @@ -285,8 +321,7 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str): ES_USER = ssm_client.get_parameter(Name='/planit/es-user/super')['Parameter']['Value'] ES_PASSWORD = ssm_client.get_parameter(Name='/planit/es-password/super', WithDecryption=True)['Parameter']['Value'] print("SSM Parameter Store에서 접속 정보를 성공적으로 가져왔습니다.") - - run_inference(ES_HOST, ES_USER, ES_PASSWORD, args.index_date_str) + except Exception as e: print(f"SSM 정보 조회 중 오류 발생: {e}")