Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 85 additions & 12 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,43 @@ def save_grouped_events_to_es(es_client, docs_to_save, target_index):
if hasattr(e, 'errors'):
print(f"상세 오류: {e.errors}")

def get_next_group_id_offset(es_client, index_pattern="planit-edr-ai-grouping-*"):
"""
지정된 인덱스 패턴에서 ai_group_id의 최대값을 찾아
다음 시작 ID(offset)를 반환합니다.
"""
print(f"'{index_pattern}'에서 현재 최대 ai_group_id 조회 중...")

try:
response = es_client.search(
index=index_pattern,
size=0, # 문서는 가져오지 않음
aggs={
"max_group_id": {
"max": {"field": "ai_group_id"} # ai_group_id 필드의 최대값 집계
}
},
ignore_unavailable=True # 인덱스가 없는 경우(최초 실행) 오류 무시
)

# 집계 결과에서 'value' 추출
max_id = response['aggregations']['max_group_id']['value']

if max_id is None:
# 인덱스가 없거나, 필드에 값이 없는 경우 (최초 실행)
print("기존 그룹 ID가 없습니다. 0부터 시작합니다.")
return 0
else:
# 기존 최대 ID가 99 였다면, 다음 시작 ID는 100
next_id_offset = int(max_id) + 1
print(f"현재 최대 ID: {int(max_id)}. 다음 ID(오프셋): {next_id_offset}부터 시작합니다.")
return next_id_offset

except Exception as e:
print(f"최대 그룹 ID 조회 중 심각한 오류 발생: {e}")
print("파이프라인을 안전하게 중지하기 위해 오류를 발생시킵니다.")
# ID 중복을 막기 위해, 조회 실패 시 파이프라인을 중지시킴
raise e

def run_inference(es_host, es_user, es_pass, target_index_date_str):

Expand Down Expand Up @@ -151,17 +188,43 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str):
# --- 5. 클러스터링 (그룹핑) ---
CLUSTERING_THRESHOLD = 0.8
print(f"임베딩 클러스터링(그룹핑) 시작... (임계값: {CLUSTERING_THRESHOLD})")

clustering = AgglomerativeClustering(
n_clusters=None,
distance_threshold=CLUSTERING_THRESHOLD,
metric='euclidean',
linkage='average'
)

# predicted_labels는 정렬된 features_df_only와 순서가 일치
predicted_labels = clustering.fit_predict(all_embeddings_np)


num_samples = all_embeddings_np.shape[0]

if num_samples < 2:
if num_samples == 1:
print("샘플이 1개만 존재하여, 단일 클러스터(ID: 0)로 할당합니다.")
predicted_labels = np.array([0])
else:
print("임베딩된 샘플이 없어 클러스터링을 건너뜁니다.")
return
else:
print(f"총 {num_samples}개의 샘플로 클러스터링을 수행합니다.")
clustering = AgglomerativeClustering(
n_clusters=None,
distance_threshold=CLUSTERING_THRESHOLD,
metric='euclidean',
linkage='average'
)
predicted_labels = clustering.fit_predict(all_embeddings_np) # predicted_labels는 정렬된 features_df_only와 순서가 일치

# 5.5 전역 그룹 ID 오프셋 계산 및 적용
try:
# 1. ES에서 현재 최대 ID + 1 값을 가져옴
id_offset = get_next_group_id_offset(es_client, f"planit-edr-ai-grouping-*")

# 2. 로컬 ID(0, 1, 2...)에 오프셋 적용
if id_offset > 0:
print(f"로컬 그룹 ID {len(set(predicted_labels))}개에 오프셋 {id_offset} 적용 중...")
# (predicted_labels는 numpy 배열이므로 단순 덧셈으로 전체 요소에 적용됨)
predicted_labels = predicted_labels + id_offset

except Exception as e:
print(f"오류: 다음 그룹 ID 오프셋을 가져올 수 없습니다. {e}")
return


# --- 6. 그룹핑 결과 Elasticsearch에 저장 ---
# 그룹핑 결과를 저장할 새 인덱스 이름
# TARGET_INDEX = "planit-edr-ai-grouping"
Expand Down Expand Up @@ -225,6 +288,16 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str):

run_inference(ES_HOST, ES_USER, ES_PASSWORD, args.index_date_str)

except Exception as e:
print(f"SSM 정보 조회 중 오류 발생: {e}")
print("IAM 역할에 'ssm:GetParameter' 권한이 있는지 확인하세요.")
exit(1) # SSM 실패 시 종료

try:
run_inference(ES_HOST, ES_USER, ES_PASSWORD, args.index_date_str)

except Exception as e:
print(f"SSM 정보 조회 또는 파이프라인 실행 중 오류: {e}")
print("IAM 역할에 'ssm:GetParameter' 권한이 있는지 확인하세요.")
print(f"EC2 인스턴스에서 메인 파이프라인(run_inference) 실행 중 오류: {e}")
import traceback
traceback.print_exc()
exit(1) # 파이프라인 실패 시 종료
Loading