Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 67 additions & 32 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import joblib
from elasticsearch import Elasticsearch, helpers as es_helpers
from sklearn.cluster import AgglomerativeClustering
from collections import defaultdict
from datetime import datetime
from tqdm import tqdm

Expand Down Expand Up @@ -185,45 +186,80 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str):

print("임베딩 생성 완료.")

# --- 5. 클러스터링 (그룹핑) ---
# --- 5. HostName 별 클러스터링 (그룹핑) ---
CLUSTERING_THRESHOLD = 0.8
print(f"임베딩 클러스터링(그룹핑) 시작... (임계값: {CLUSTERING_THRESHOLD})")
print(f"임베딩 클러스터링(그룹핑) 시작... (HostName별 분리, 임계값: {CLUSTERING_THRESHOLD})")


num_samples = all_embeddings_np.shape[0]

if num_samples < 2:
if num_samples == 1:
print("샘플이 1개만 존재하여, 단일 클러스터(ID: 0)로 할당합니다.")
predicted_labels = np.array([0])
else:
print("임베딩된 샘플이 없어 클러스터링을 건너뜁니다.")
return
# 5-0. HostName 매핑을 위한 준비
# df에는 HostName이 있지만, 정렬된 features_df_only 순서와 맞춰야 함
# _id를 Key로 HostName을 찾는 맵 생성
if 'HostName' in df.columns:
id_to_host_map = df.set_index('_id')['HostName'].to_dict()
else:
print(f"총 {num_samples}개의 샘플로 클러스터링을 수행합니다.")
clustering = AgglomerativeClustering(
n_clusters=None,
distance_threshold=CLUSTERING_THRESHOLD,
metric='euclidean',
linkage='average'
)
predicted_labels = clustering.fit_predict(all_embeddings_np) # predicted_labels는 정렬된 features_df_only와 순서가 일치
# HostName이 없으면 AgentIP 등 다른 필드 사용 고려, 여기선 Unknown 처리
print("경고: DataFrame에 'HostName' 컬럼이 없습니다. 전체를 하나로 처리합니다.")
id_to_host_map = {doc_id: "Unknown" for doc_id in ids_sorted}

# 5-1. 데이터를 HostName 별로 분리
# host_groups 구조: { "PC-A": { "indices": [0, 3, 5], "embeddings": [...] }, ... }
host_groups = defaultdict(lambda: {"indices": [], "embeddings": []})

for idx, doc_id in enumerate(ids_sorted):
host_name = id_to_host_map.get(doc_id, "Unknown")
host_groups[host_name]["indices"].append(idx)
# numpy array에서 해당 인덱스의 임베딩 추출
host_groups[host_name]["embeddings"].append(all_embeddings_np[idx])

# 5-2. 결과 저장을 위한 배열 초기화 (전체 샘플 크기만큼 -1로 초기화)
final_predicted_labels = np.full(all_embeddings_np.shape[0], -1, dtype=int)

# 5.5 전역 그룹 ID 오프셋 계산 및 적용
# 5-3. 전역 그룹 ID 오프셋 가져오기 (최초 1회)
try:
# 1. ES에서 현재 최대 ID + 1 값을 가져옴
id_offset = get_next_group_id_offset(es_client, f"planit-edr-ai-grouping-*")

# 2. 로컬 ID(0, 1, 2...)에 오프셋 적용
if id_offset > 0:
print(f"로컬 그룹 ID {len(set(predicted_labels))}개에 오프셋 {id_offset} 적용 중...")
# (predicted_labels는 numpy 배열이므로 단순 덧셈으로 전체 요소에 적용됨)
predicted_labels = predicted_labels + id_offset

current_global_offset = get_next_group_id_offset(es_client, f"planit-edr-ai-grouping-*")
except Exception as e:
print(f"오류: 다음 그룹 ID 오프셋을 가져올 수 없습니다. {e}")
print(f"오류: 그룹 ID 오프셋 조회 실패. {e}")
return

# 5-4. Host별 반복문 돌며 클러스터링 수행
print(f"총 {len(host_groups)}개의 Host에 대해 개별 클러스터링 수행 중...")

for host_name, data in host_groups.items():
indices = data["indices"]
embeddings = np.array(data["embeddings"])

num_host_samples = embeddings.shape[0]

if num_host_samples == 0:
continue

# Host 내 샘플이 1개인 경우 -> 독자 그룹 할당
if num_host_samples == 1:
local_labels = np.array([0])
else:
# Host 내 샘플끼리만 클러스터링
clustering = AgglomerativeClustering(
n_clusters=None,
distance_threshold=CLUSTERING_THRESHOLD,
metric='euclidean',
linkage='average'
)
local_labels = clustering.fit_predict(embeddings)

# [중요] 로컬 라벨에 현재 글로벌 오프셋을 더해 전역 ID 부여
# 예: PC-A에서 그룹 0, 1이 나오면 -> 100, 101 (오프셋이 100일 때)
global_labels = local_labels + current_global_offset

# 결과 배열의 원래 인덱스 위치에 저장
final_predicted_labels[indices] = global_labels

# 다음 Host를 위해 오프셋 업데이트
# (현재 Host에서 생성된 그룹 수만큼 증가)
current_global_offset += (len(set(local_labels)))

# 변수명 맞춤 (기존 코드 호환성 유지)
predicted_labels = final_predicted_labels


# --- 6. 그룹핑 결과 Elasticsearch에 저장 ---
# 그룹핑 결과를 저장할 새 인덱스 이름
Expand Down Expand Up @@ -285,8 +321,7 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str):
ES_USER = ssm_client.get_parameter(Name='/planit/es-user/super')['Parameter']['Value']
ES_PASSWORD = ssm_client.get_parameter(Name='/planit/es-password/super', WithDecryption=True)['Parameter']['Value']
print("SSM Parameter Store에서 접속 정보를 성공적으로 가져왔습니다.")

run_inference(ES_HOST, ES_USER, ES_PASSWORD, args.index_date_str)


except Exception as e:
print(f"SSM 정보 조회 중 오류 발생: {e}")
Expand Down
Loading