From 07f98fc20f3097a2bc16e71eabc3f4db565af3c3 Mon Sep 17 00:00:00 2001 From: jinsol24 Date: Tue, 11 Nov 2025 18:21:45 +0900 Subject: [PATCH] Add handling for single malicious event and prevent duplicate ai_group_id creation --- main.py | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 85 insertions(+), 12 deletions(-) diff --git a/main.py b/main.py index b1b9930..85db961 100644 --- a/main.py +++ b/main.py @@ -61,6 +61,43 @@ def save_grouped_events_to_es(es_client, docs_to_save, target_index): if hasattr(e, 'errors'): print(f"상세 오류: {e.errors}") +def get_next_group_id_offset(es_client, index_pattern="planit-edr-ai-grouping-*"): + """ + 지정된 인덱스 패턴에서 ai_group_id의 최대값을 찾아 + 다음 시작 ID(offset)를 반환합니다. + """ + print(f"'{index_pattern}'에서 현재 최대 ai_group_id 조회 중...") + + try: + response = es_client.search( + index=index_pattern, + size=0, # 문서는 가져오지 않음 + aggs={ + "max_group_id": { + "max": {"field": "ai_group_id"} # ai_group_id 필드의 최대값 집계 + } + }, + ignore_unavailable=True # 인덱스가 없는 경우(최초 실행) 오류 무시 + ) + + # 집계 결과에서 'value' 추출 + max_id = response['aggregations']['max_group_id']['value'] + + if max_id is None: + # 인덱스가 없거나, 필드에 값이 없는 경우 (최초 실행) + print("기존 그룹 ID가 없습니다. 0부터 시작합니다.") + return 0 + else: + # 기존 최대 ID가 99 였다면, 다음 시작 ID는 100 + next_id_offset = int(max_id) + 1 + print(f"현재 최대 ID: {int(max_id)}. 다음 ID(오프셋): {next_id_offset}부터 시작합니다.") + return next_id_offset + + except Exception as e: + print(f"최대 그룹 ID 조회 중 심각한 오류 발생: {e}") + print("파이프라인을 안전하게 중지하기 위해 오류를 발생시킵니다.") + # ID 중복을 막기 위해, 조회 실패 시 파이프라인을 중지시킴 + raise e def run_inference(es_host, es_user, es_pass, target_index_date_str): @@ -151,17 +188,43 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str): # --- 5. 클러스터링 (그룹핑) --- CLUSTERING_THRESHOLD = 0.8 print(f"임베딩 클러스터링(그룹핑) 시작... (임계값: {CLUSTERING_THRESHOLD})") - - clustering = AgglomerativeClustering( - n_clusters=None, - distance_threshold=CLUSTERING_THRESHOLD, - metric='euclidean', - linkage='average' - ) - # predicted_labels는 정렬된 features_df_only와 순서가 일치 - predicted_labels = clustering.fit_predict(all_embeddings_np) - + + num_samples = all_embeddings_np.shape[0] + + if num_samples < 2: + if num_samples == 1: + print("샘플이 1개만 존재하여, 단일 클러스터(ID: 0)로 할당합니다.") + predicted_labels = np.array([0]) + else: + print("임베딩된 샘플이 없어 클러스터링을 건너뜁니다.") + return + else: + print(f"총 {num_samples}개의 샘플로 클러스터링을 수행합니다.") + clustering = AgglomerativeClustering( + n_clusters=None, + distance_threshold=CLUSTERING_THRESHOLD, + metric='euclidean', + linkage='average' + ) + predicted_labels = clustering.fit_predict(all_embeddings_np) # predicted_labels는 정렬된 features_df_only와 순서가 일치 + + # 5.5 전역 그룹 ID 오프셋 계산 및 적용 + try: + # 1. ES에서 현재 최대 ID + 1 값을 가져옴 + id_offset = get_next_group_id_offset(es_client, f"planit-edr-ai-grouping-*") + + # 2. 로컬 ID(0, 1, 2...)에 오프셋 적용 + if id_offset > 0: + print(f"로컬 그룹 ID {len(set(predicted_labels))}개에 오프셋 {id_offset} 적용 중...") + # (predicted_labels는 numpy 배열이므로 단순 덧셈으로 전체 요소에 적용됨) + predicted_labels = predicted_labels + id_offset + + except Exception as e: + print(f"오류: 다음 그룹 ID 오프셋을 가져올 수 없습니다. {e}") + return + + # --- 6. 그룹핑 결과 Elasticsearch에 저장 --- # 그룹핑 결과를 저장할 새 인덱스 이름 # TARGET_INDEX = "planit-edr-ai-grouping" @@ -225,6 +288,16 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str): run_inference(ES_HOST, ES_USER, ES_PASSWORD, args.index_date_str) + except Exception as e: + print(f"SSM 정보 조회 중 오류 발생: {e}") + print("IAM 역할에 'ssm:GetParameter' 권한이 있는지 확인하세요.") + exit(1) # SSM 실패 시 종료 + + try: + run_inference(ES_HOST, ES_USER, ES_PASSWORD, args.index_date_str) + except Exception as e: - print(f"SSM 정보 조회 또는 파이프라인 실행 중 오류: {e}") - print("IAM 역할에 'ssm:GetParameter' 권한이 있는지 확인하세요.") \ No newline at end of file + print(f"EC2 인스턴스에서 메인 파이프라인(run_inference) 실행 중 오류: {e}") + import traceback + traceback.print_exc() + exit(1) # 파이프라인 실패 시 종료 \ No newline at end of file