Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def get_next_group_id_offset(es_client, index_pattern="planit-edr-ai-grouping-*"
# ID 중복을 막기 위해, 조회 실패 시 파이프라인을 중지시킴
raise e

def run_inference(es_host, es_user, es_pass, target_index_date_str):
def run_inference(es_host, es_user, es_pass, target_batch_id):

# --- 1. 아티팩트(모델, 전처리기) 로드 ---
print("모델 및 전처리기 로드 중...")
Expand Down Expand Up @@ -156,10 +156,10 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str):

# df(모델용), original_docs(원본 저장용) 2개를 받음
# (src.es_client에서 함수 가져옴)
df, original_docs = fetch_malicious_events(es_client, preprocessors, target_index_date_str)
df, original_docs = fetch_malicious_events(es_client, target_batch_id)

if df is None or original_docs is None:
print(f"{target_index_date_str} 인덱스에 처리할 데이터가 없습니다.")
print(f"{target_batch_id} 인덱스에 처리할 데이터가 없습니다.")
return # 조회된 데이터가 없으면 종료

except Exception as e:
Expand Down Expand Up @@ -265,7 +265,8 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str):
# --- 6. 그룹핑 결과 Elasticsearch에 저장 ---
# 그룹핑 결과를 저장할 새 인덱스 이름
# TARGET_INDEX = "planit-edr-ai-grouping"
TARGET_INDEX = f"planit-edr-ai-grouping-{target_index_date_str}"
current_date_suffix = target_batch_id.split('_')[0]
TARGET_INDEX = f"planit-edr-ai-grouping-{current_date_suffix}"

# 저장할 그룹 ID 필드 이름
GROUP_ID_FIELD = "ai_group_id"
Expand Down Expand Up @@ -314,6 +315,10 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str):
help="The specific index date string (e.g., '2025.11.06_14') to process."
)
args = parser.parse_args()

target_batch_id = args.index_date_str # "2025.11.20_14"

print(f"Processing Batch ID: {target_batch_id}")

ssm_client = boto3.client('ssm', region_name='ap-northeast-2')

Expand All @@ -330,7 +335,7 @@ def run_inference(es_host, es_user, es_pass, target_index_date_str):
exit(1) # SSM 실패 시 종료

try:
run_inference(ES_HOST, ES_USER, ES_PASSWORD, args.index_date_str)
run_inference(ES_HOST, ES_USER, ES_PASSWORD, target_batch_id)

except Exception as e:
print(f"EC2 인스턴스에서 메인 파이프라인(run_inference) 실행 중 오류: {e}")
Expand Down
83 changes: 44 additions & 39 deletions src/es_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,68 @@
from elasticsearch import Elasticsearch, helpers as es_helpers

# Elasticsearch에서 데이터 가져오기
def fetch_malicious_events(es_client, preprocessors, target_index_date_str):
def fetch_malicious_events(es_client, target_batch_id):
"""
Elasticsearch에서 '악성' 이벤트를 조회하여
Elasticsearch에서 특정 시간대(Batch ID)의 '악성' 이벤트를 조회
1. 모델 저장장용 DataFrame
2. 저장용 원본 문서(dict) 리스트
를 반환
2. 저장용 원본 문서(dict) 리스트를 반환
"""
print(f"Elasticsearch에서 '{target_index_date_str}' 시간대 '악성' 데이터 로드 중...")
try:
# INDEX_PATTERN = "planit-edr-ai-analyzed-*"
INDEX_PATTERN = f"planit-edr-ai-classified-{target_index_date_str}"


# 1. 인덱스 이름과 시간대 분리
target_index_date = target_batch_id.split('_')[0]

INDEX_PATTERN = f"planit-edr-ai-classified-{target_index_date}"

print(f"Elasticsearch 조회 시작: 인덱스='{INDEX_PATTERN}', 배치ID='{target_batch_id}'")

try:
# 2. 쿼리 구성 (Bool Query 사용)
# 조건 1: AI 분석 결과가 'malicious' 인 것
# 조건 2: EventDate가 해당 시간(Batch ID) 인 것
query = {
"query": {
"term": {
"ai_analysis.result": "malicious"
"bool": {
"must": [
{"term": {"ai_analysis.result": "malicious"}},
{"term": {"EventDate": target_batch_id}}
]
}
},
}
}

print(f"'{INDEX_PATTERN}' 인덱스에서 '악성' 이벤트 조회...")

raw_docs = [doc for doc in es_helpers.scan(es_client, index=INDEX_PATTERN, query=query)]
# 3. 데이터 스캔 (Scan API 사용)
scan_gen = es_helpers.scan(
es_client,
index=INDEX_PATTERN,
query=query,
preserve_order=False
)

if not raw_docs:
print(f"'{INDEX_PATTERN}' 인덱스에 조회된 '악성' 이벤트가 없습니다.")
return None, None
original_docs = []

print(f"총 {len(raw_docs)}개의 '악성' 문서를 가져왔습니다.")

original_docs = [] # 원본 저장을 위한 리스트 (dict)
data_for_df = [] # Dataframe용 리스트

for doc in raw_docs:
# 4. 데이터 수집
for doc in scan_gen:
source_data = doc['_source']
doc_id = doc['_id']

# 1.저장용 원본 리스트 (원본 _source 딕셔너리 + _id)
source_data_copy = source_data.copy()
source_data_copy['_id'] = doc_id
original_docs.append(source_data_copy)
source_data['_id'] = doc['_id'] # ID를 source에 포함시킴
original_docs.append(source_data)

if not original_docs:
print(f"'{INDEX_PATTERN}' (Batch: {target_batch_id})에 '악성' 이벤트가 없습니다.")
return None, None

# 2. Dataframe용 데이터 (json_normalize를 위해 _id 추가)
source_data['_id'] = doc_id
data_for_df.append(source_data)
print(f"총 {len(original_docs)}개의 '악성' 문서를 가져왔습니다.")

# 모델용 DataFrame 생성
df = pd.json_normalize(data_for_df, sep='_')
# 5. DataFrame 생성
# json_normalize를 사용하면 중첩된 JSON(ai_analysis.result 등)을 자동으로 펼쳐줍니다.
# sep='_' 옵션으로 컬럼명이 'ai_analysis_result' 처럼 생성됩니다.
df = pd.json_normalize(original_docs, sep='_')

# DataFrame과 원본 리스트 2개 반환환
return df, original_docs

except Exception as e:
# 인덱스가 없을 때(데이터 유실이 아니라, 원래 없는 경우)를 정상 처리
if 'index_not_found_exception' in str(e):
print(f"'{INDEX_PATTERN}' 인덱스가 존재하지 않습니다 (해당 시간대 악성 이벤트 없음).")
print(f"인덱스가 존재하지 않습니다: {INDEX_PATTERN} (해당 날짜에 데이터 없음)")
return None, None

print(f"Elasticsearch 데이터 로드 중 오류: {e}")
print(f"Elasticsearch 데이터 로드 중 오류 발생: {e}")
raise e
2 changes: 0 additions & 2 deletions src/feature_engineering.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# feature engineering 함수

import pandas as pd
import numpy as np
import torch
Expand Down
2 changes: 0 additions & 2 deletions src/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# 모델 클래스 정의

import torch
import torch.nn as nn

Expand Down
Loading