096 모니터링과 재학습
키워드: 모니터링, 재학습, MLOps
개요
프로덕션 환경에서 모델은 시간이 지나면서 성능이 저하될 수 있습니다. 모델 드리프트를 감지하고, 적절한 시점에 재학습하는 MLOps 전략을 알아봅니다.
실습 환경
- Python 버전: 3.11 권장
- 필요 패키지:
pycaret[full]>=3.0,evidently,prometheus-client
모델 드리프트란?
모델 드리프트 유형:
1. 데이터 드리프트 (Data Drift)
- 입력 데이터 분포 변화
- 예: 고객 연령대 변화
2. 개념 드리프트 (Concept Drift)
- 입력-출력 관계 변화
- 예: 경제 상황 변화로 대출 패턴 변화
3. 예측 드리프트 (Prediction Drift)
- 모델 예측 분포 변화
- 위 두 드리프트의 결과
감지 방법:
- 통계적 검정 (KS, PSI)
- 분포 비교
- 성능 메트릭 추적
예측 로깅
# 096 prediction_logger.py
import pandas as pd
from datetime import datetime
import json
import os
class PredictionLogger:
"""예측 로그 저장"""
def __init__(self, log_dir='./prediction_logs'):
self.log_dir = log_dir
os.makedirs(log_dir, exist_ok=True)
def log(self, input_data, prediction, probability, actual=None):
"""예측 기록"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'input': input_data,
'prediction': prediction,
'probability': probability,
'actual': actual
}
# 일별 로그 파일
date_str = datetime.now().strftime('%Y-%m-%d')
log_file = os.path.join(self.log_dir, f'predictions_{date_str}.jsonl')
with open(log_file, 'a') as f:
f.write(json.dumps(log_entry) + '\n')
def get_logs(self, date_str=None):
"""로그 조회"""
if date_str is None:
date_str = datetime.now().strftime('%Y-%m-%d')
log_file = os.path.join(self.log_dir, f'predictions_{date_str}.jsonl')
if not os.path.exists(log_file):
return []
logs = []
with open(log_file, 'r') as f:
for line in f:
logs.append(json.loads(line))
return logs
# 096 사용 예
logger = PredictionLogger()
logger.log(
input_data={'Glucose': 148, 'BMI': 33.6},
prediction=1,
probability=0.85
)
성능 모니터링
# 096 performance_monitor.py
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from datetime import datetime, timedelta
class PerformanceMonitor:
"""모델 성능 모니터링"""
def __init__(self, baseline_metrics):
self.baseline = baseline_metrics
self.history = []
def evaluate(self, y_true, y_pred, y_prob=None):
"""현재 성능 평가"""
metrics = {
'timestamp': datetime.now().isoformat(),
'accuracy': accuracy_score(y_true, y_pred),
'f1': f1_score(y_true, y_pred)
}
if y_prob is not None:
metrics['auc'] = roc_auc_score(y_true, y_prob)
self.history.append(metrics)
return metrics
def check_degradation(self, threshold=0.05):
"""성능 저하 감지"""
if not self.history:
return False, {}
current = self.history[-1]
degradations = {}
for metric in ['accuracy', 'f1', 'auc']:
if metric in current and metric in self.baseline:
diff = self.baseline[metric] - current[metric]
if diff > threshold:
degradations[metric] = {
'baseline': self.baseline[metric],
'current': current[metric],
'diff': diff
}
return len(degradations) > 0, degradations
def get_trend(self, metric='accuracy', window=7):
"""성능 추세 분석"""
if len(self.history) < window:
return None
recent = [h[metric] for h in self.history[-window:] if metric in h]
if len(recent) < 2:
return None
# 단순 선형 추세
x = np.arange(len(recent))
slope = np.polyfit(x, recent, 1)[0]
return {
'metric': metric,
'slope': slope,
'trend': 'declining' if slope < -0.01 else 'stable' if abs(slope) < 0.01 else 'improving'
}
# 096 사용 예
baseline = {'accuracy': 0.78, 'f1': 0.75, 'auc': 0.82}
monitor = PerformanceMonitor(baseline)
# 096 새 데이터로 평가
y_true = [0, 1, 1, 0, 1]
y_pred = [0, 1, 0, 0, 1]
metrics = monitor.evaluate(y_true, y_pred)
# 096 성능 저하 확인
degraded, details = monitor.check_degradation()
if degraded:
print(f"성능 저하 감지: {details}")
데이터 드리프트 감지
# 096 drift_detector.py
from scipy import stats
import numpy as np
import pandas as pd
class DriftDetector:
"""데이터 드리프트 감지"""
def __init__(self, reference_data):
self.reference = reference_data
def ks_test(self, current_data, feature, threshold=0.05):
"""Kolmogorov-Smirnov 검정"""
ref_values = self.reference[feature].dropna()
cur_values = current_data[feature].dropna()
statistic, p_value = stats.ks_2samp(ref_values, cur_values)
return {
'feature': feature,
'statistic': statistic,
'p_value': p_value,
'drift_detected': p_value < threshold
}
def psi(self, current_data, feature, bins=10):
"""Population Stability Index"""
ref_values = self.reference[feature].dropna()
cur_values = current_data[feature].dropna()
# 구간 설정
breakpoints = np.percentile(ref_values, np.linspace(0, 100, bins + 1))
breakpoints[0] = -np.inf
breakpoints[-1] = np.inf
# 각 구간 비율 계산
ref_counts = np.histogram(ref_values, bins=breakpoints)[0]
cur_counts = np.histogram(cur_values, bins=breakpoints)[0]
ref_pct = ref_counts / len(ref_values)
cur_pct = cur_counts / len(cur_values)
# 0 방지
ref_pct = np.where(ref_pct == 0, 0.0001, ref_pct)
cur_pct = np.where(cur_pct == 0, 0.0001, cur_pct)
# PSI 계산
psi_value = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
return {
'feature': feature,
'psi': psi_value,
'drift_level': 'none' if psi_value < 0.1 else 'moderate' if psi_value < 0.25 else 'significant'
}
def detect_all(self, current_data, features=None):
"""모든 특성 드리프트 검사"""
if features is None:
features = self.reference.columns
results = []
for feature in features:
if feature in current_data.columns:
ks_result = self.ks_test(current_data, feature)
psi_result = self.psi(current_data, feature)
results.append({
'feature': feature,
'ks_drift': ks_result['drift_detected'],
'psi': psi_result['psi'],
'psi_level': psi_result['drift_level']
})
return pd.DataFrame(results)
# 096 사용 예
from pycaret.datasets import get_data
# 096 참조 데이터 (학습 시점)
reference = get_data('diabetes')
detector = DriftDetector(reference)
# 096 현재 데이터 (시뮬레이션: 일부 수정)
current = reference.copy()
current['Glucose'] = current['Glucose'] * 1.2 # 분포 변경
# 096 드리프트 검사
drift_report = detector.detect_all(current, features=['Glucose', 'BMI', 'Age'])
print(drift_report)
Evidently 활용
# 096 evidently_monitoring.py
try:
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, ClassificationPreset
from evidently import ColumnMapping
import pandas as pd
# 데이터 준비
from pycaret.datasets import get_data
data = get_data('diabetes')
# 참조/현재 데이터 분할
reference = data.iloc[:500]
current = data.iloc[500:]
# 컬럼 매핑
column_mapping = ColumnMapping(
target='Class variable',
numerical_features=['Glucose', 'BMI', 'Age', 'BloodPressure']
)
# 데이터 드리프트 리포트
drift_report = Report(metrics=[DataDriftPreset()])
drift_report.run(
reference_data=reference,
current_data=current,
column_mapping=column_mapping
)
# HTML 저장
drift_report.save_html('drift_report.html')
print("드리프트 리포트 생성: drift_report.html")
except ImportError:
print("evidently 패키지 필요: pip install evidently")
자동 재학습 파이프라인
# 096 retraining_pipeline.py
from pycaret.classification import *
from datetime import datetime
import pandas as pd
import json
import os
class RetrainingPipeline:
"""자동 재학습 파이프라인"""
def __init__(self, model_path, config):
self.model_path = model_path
self.config = config
self.current_model = load_model(model_path)
def should_retrain(self, performance_metrics, drift_metrics):
"""재학습 필요 여부 판단"""
# 성능 저하 체크
perf_threshold = self.config.get('performance_threshold', 0.05)
for metric, value in performance_metrics.items():
baseline = self.config.get('baseline_metrics', {}).get(metric, 0)
if baseline - value > perf_threshold:
return True, f"Performance degradation in {metric}"
# 드리프트 체크
drift_threshold = self.config.get('drift_threshold', 0.2)
for feature, psi in drift_metrics.items():
if psi > drift_threshold:
return True, f"Data drift in {feature}"
return False, "No retraining needed"
def retrain(self, new_data, target):
"""모델 재학습"""
print(f"[{datetime.now()}] 재학습 시작...")
# PyCaret 설정
clf = setup(
new_data,
target=target,
session_id=42,
verbose=False
)
# 기존 모델과 동일한 유형으로 재학습
model_type = type(self.current_model).__name__
# 모델 ID 매핑
model_map = {
'RandomForestClassifier': 'rf',
'XGBClassifier': 'xgboost',
'LGBMClassifier': 'lightgbm'
}
model_id = model_map.get(model_type, 'rf')
new_model = create_model(model_id, verbose=False)
# 튜닝
tuned_model = tune_model(new_model, n_iter=20, verbose=False)
# 최종화
final_model = finalize_model(tuned_model)
# 새 모델 저장
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
new_path = f"{self.model_path}_{timestamp}"
save_model(final_model, new_path)
# 메타데이터 저장
metrics = pull()
meta = {
'timestamp': timestamp,
'model_type': model_id,
'metrics': metrics.to_dict(),
'data_size': len(new_data)
}
with open(f"{new_path}_meta.json", 'w') as f:
json.dump(meta, f, indent=2)
print(f"[{datetime.now()}] 재학습 완료: {new_path}")
return final_model, new_path
# 096 사용 예
config = {
'baseline_metrics': {'accuracy': 0.78, 'f1': 0.75},
'performance_threshold': 0.05,
'drift_threshold': 0.2
}
pipeline = RetrainingPipeline('diabetes_model', config)
# 096 재학습 필요 여부 확인
performance = {'accuracy': 0.70, 'f1': 0.65} # 현재 성능
drift = {'Glucose': 0.3} # PSI 값
should_retrain, reason = pipeline.should_retrain(performance, drift)
if should_retrain:
print(f"재학습 필요: {reason}")
# new_model, path = pipeline.retrain(new_data, 'Class variable')
Prometheus 메트릭
# 096 prometheus_metrics.py
try:
from prometheus_client import Counter, Gauge, Histogram, start_http_server
import time
# 메트릭 정의
PREDICTIONS_TOTAL = Counter(
'predictions_total',
'Total predictions made',
['model_version', 'prediction']
)
PREDICTION_LATENCY = Histogram(
'prediction_latency_seconds',
'Prediction latency in seconds',
buckets=[0.01, 0.05, 0.1, 0.5, 1.0]
)
MODEL_ACCURACY = Gauge(
'model_accuracy',
'Current model accuracy',
['model_version']
)
DATA_DRIFT_PSI = Gauge(
'data_drift_psi',
'Data drift PSI score',
['feature']
)
def record_prediction(model_version, prediction, latency):
"""예측 기록"""
PREDICTIONS_TOTAL.labels(
model_version=model_version,
prediction=str(prediction)
).inc()
PREDICTION_LATENCY.observe(latency)
def update_accuracy(model_version, accuracy):
"""정확도 업데이트"""
MODEL_ACCURACY.labels(model_version=model_version).set(accuracy)
def update_drift(feature, psi):
"""드리프트 업데이트"""
DATA_DRIFT_PSI.labels(feature=feature).set(psi)
# 메트릭 서버 시작 (별도 포트)
# start_http_server(8001)
except ImportError:
print("prometheus-client 패키지 필요: pip install prometheus-client")
알림 시스템
# 096 alerting.py
import requests
from datetime import datetime
class AlertManager:
"""알림 관리"""
def __init__(self, webhook_url=None, email_config=None):
self.webhook_url = webhook_url
self.email_config = email_config
def send_slack(self, message, severity='warning'):
"""Slack 알림"""
if not self.webhook_url:
print(f"[{severity.upper()}] {message}")
return
color = {'info': '#36a64f', 'warning': '#ffcc00', 'critical': '#ff0000'}
payload = {
'attachments': [{
'color': color.get(severity, '#808080'),
'title': f'ML Model Alert - {severity.upper()}',
'text': message,
'ts': datetime.now().timestamp()
}]
}
requests.post(self.webhook_url, json=payload)
def alert_performance_degradation(self, metrics, threshold):
"""성능 저하 알림"""
message = f"""
모델 성능 저하 감지
현재 메트릭:
- Accuracy: {metrics.get('accuracy', 'N/A'):.4f}
- F1 Score: {metrics.get('f1', 'N/A'):.4f}
임계값: {threshold}
권장 조치: 모델 재학습 검토
"""
self.send_slack(message, severity='warning')
def alert_data_drift(self, drift_report):
"""데이터 드리프트 알림"""
message = f"""
데이터 드리프트 감지
영향받은 특성:
{drift_report}
권장 조치: 데이터 파이프라인 및 모델 재학습 검토
"""
self.send_slack(message, severity='critical')
# 096 사용 예
alert = AlertManager(webhook_url=None) # 실제 URL 설정
alert.alert_performance_degradation({'accuracy': 0.70, 'f1': 0.65}, 0.05)
정리
- 모델 드리프트: 데이터/개념/예측 드리프트
- 성능 모니터링: 메트릭 추적, 추세 분석
- 드리프트 감지: KS 검정, PSI
- 자동 재학습: 조건 기반 파이프라인
- 알림: Slack, 이메일 통합
다음 글 예고
다음 글에서는 종합 프로젝트 1 - 고객 이탈 예측을 다룹니다.
PyCaret 머신러닝 마스터 시리즈 #096