본문으로 건너뛰기

094 모델 모니터링과 재학습 전략

키워드: 모니터링, 재학습, 모델 드리프트, MLOps

개요

프로덕션에 배포된 모델은 시간이 지남에 따라 성능이 저하될 수 있습니다. 이 글에서는 모델 성능 모니터링과 재학습 시점을 결정하는 전략을 알아봅니다.

실습 환경

  • Python 버전: 3.11 권장
  • 필요 패키지: flaml[automl]
pip install flaml[automl] pandas numpy scipy

모델 드리프트 유형

import numpy as np
import pandas as pd

drift_types = {
'유형': ['데이터 드리프트', '개념 드리프트', '모델 성능 저하'],
'설명': [
'입력 데이터 분포 변화',
'입출력 관계 변화',
'예측 정확도 하락'
],
'원인': [
'새로운 사용자, 계절성',
'비즈니스 환경 변화',
'데이터/개념 드리프트'
],
'감지 방법': [
'KS 검정, PSI',
'성능 지표 추적',
'A/B 테스트, 정확도 모니터링'
]
}

print("모델 드리프트 유형:")
print(pd.DataFrame(drift_types).to_string(index=False))

기본 모델 준비

from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from flaml import AutoML
import joblib

np.random.seed(42)
X, y = make_classification(n_samples=3000, n_features=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

automl = AutoML()
automl.fit(X_train, y_train, task="classification", time_budget=30, verbose=0)
joblib.dump(automl, 'monitored_model.pkl')
print(f"기준 모델: {automl.best_estimator}")

데이터 드리프트 감지

from scipy import stats

class DataDriftDetector:
"""데이터 드리프트 감지"""

def __init__(self, reference_data: np.ndarray):
self.reference_data = reference_data
self.feature_stats = self._compute_stats(reference_data)

def _compute_stats(self, data: np.ndarray) -> dict:
"""통계량 계산"""
return {
'mean': np.mean(data, axis=0),
'std': np.std(data, axis=0),
'min': np.min(data, axis=0),
'max': np.max(data, axis=0)
}

def detect_drift_ks(self, new_data: np.ndarray, threshold: float = 0.05) -> dict:
"""KS 검정으로 드리프트 감지"""
n_features = self.reference_data.shape[1]
drift_detected = []
p_values = []

for i in range(n_features):
stat, p_value = stats.ks_2samp(self.reference_data[:, i], new_data[:, i])
p_values.append(p_value)
drift_detected.append(p_value < threshold)

return {
'drift_detected': any(drift_detected),
'drifted_features': sum(drift_detected),
'p_values': p_values,
'feature_drift': drift_detected
}

def compute_psi(self, new_data: np.ndarray, bins: int = 10) -> list:
"""PSI (Population Stability Index) 계산"""
psi_values = []

for i in range(self.reference_data.shape[1]):
# 히스토그램 계산
ref_hist, edges = np.histogram(self.reference_data[:, i], bins=bins)
new_hist, _ = np.histogram(new_data[:, i], bins=edges)

# 정규화
ref_pct = (ref_hist + 1) / (sum(ref_hist) + bins)
new_pct = (new_hist + 1) / (sum(new_hist) + bins)

# PSI 계산
psi = sum((new_pct - ref_pct) * np.log(new_pct / ref_pct))
psi_values.append(psi)

return psi_values

# 094 드리프트 감지 테스트
drift_detector = DataDriftDetector(X_train)

# 094 정상 데이터
normal_data = np.random.randn(500, 10)
drift_result = drift_detector.detect_drift_ks(normal_data)
print(f"\n정상 데이터 드리프트: {drift_result['drift_detected']}")

# 094 드리프트된 데이터 (평균 이동)
drifted_data = np.random.randn(500, 10) + 2
drift_result = drift_detector.detect_drift_ks(drifted_data)
print(f"드리프트 데이터: {drift_result['drift_detected']} (변화 특성: {drift_result['drifted_features']}개)")

성능 모니터링

from sklearn.metrics import accuracy_score, roc_auc_score, precision_score, recall_score
from datetime import datetime, timedelta

class PerformanceMonitor:
"""모델 성능 모니터링"""

def __init__(self, model, baseline_metrics: dict = None):
self.model = model
self.baseline_metrics = baseline_metrics or {}
self.history = []

def set_baseline(self, X, y):
"""기준 성능 설정"""
y_pred = self.model.predict(X)
y_proba = self.model.predict_proba(X)[:, 1]

self.baseline_metrics = {
'accuracy': accuracy_score(y, y_pred),
'roc_auc': roc_auc_score(y, y_proba),
'precision': precision_score(y, y_pred),
'recall': recall_score(y, y_pred)
}
print(f"기준 성능 설정: {self.baseline_metrics}")

def evaluate(self, X, y, timestamp: datetime = None) -> dict:
"""성능 평가"""
y_pred = self.model.predict(X)
y_proba = self.model.predict_proba(X)[:, 1]

metrics = {
'timestamp': (timestamp or datetime.now()).isoformat(),
'accuracy': accuracy_score(y, y_pred),
'roc_auc': roc_auc_score(y, y_proba),
'precision': precision_score(y, y_pred),
'recall': recall_score(y, y_pred),
'n_samples': len(y)
}

# 기준 대비 변화율
if self.baseline_metrics:
for key in ['accuracy', 'roc_auc', 'precision', 'recall']:
baseline = self.baseline_metrics.get(key, 0)
if baseline > 0:
metrics[f'{key}_change'] = (metrics[key] - baseline) / baseline * 100

self.history.append(metrics)
return metrics

def check_degradation(self, threshold: float = 0.05) -> dict:
"""성능 저하 확인"""
if not self.history or not self.baseline_metrics:
return {'degraded': False}

latest = self.history[-1]
degraded_metrics = []

for key in ['accuracy', 'roc_auc', 'precision', 'recall']:
baseline = self.baseline_metrics.get(key, 0)
current = latest.get(key, 0)

if baseline > 0 and (baseline - current) / baseline > threshold:
degraded_metrics.append(key)

return {
'degraded': len(degraded_metrics) > 0,
'degraded_metrics': degraded_metrics,
'threshold': threshold
}

def get_history_df(self) -> pd.DataFrame:
"""이력 DataFrame"""
return pd.DataFrame(self.history)

# 094 성능 모니터 사용
monitor = PerformanceMonitor(automl)
monitor.set_baseline(X_test, y_test)

# 094 시간에 따른 평가 시뮬레이션
print("\n시간별 성능 모니터링:")
for i in range(5):
# 시간이 지남에 따라 데이터가 약간 변화
X_new = X_test + np.random.randn(*X_test.shape) * 0.1 * i
y_new = y_test

timestamp = datetime.now() + timedelta(days=i*7)
result = monitor.evaluate(X_new, y_new, timestamp)
print(f" 주 {i}: accuracy={result['accuracy']:.4f}, change={result.get('accuracy_change', 0):.1f}%")

# 094 성능 저하 확인
degradation = monitor.check_degradation(threshold=0.05)
print(f"\n성능 저하 감지: {degradation}")

재학습 트리거

class RetrainTrigger:
"""재학습 트리거 시스템"""

def __init__(self, performance_threshold: float = 0.05,
drift_threshold: float = 0.1,
min_samples: int = 1000):
self.performance_threshold = performance_threshold
self.drift_threshold = drift_threshold
self.min_samples = min_samples
self.trigger_history = []

def check_triggers(self, performance_monitor: PerformanceMonitor,
drift_detector: DataDriftDetector,
new_data: np.ndarray) -> dict:
"""재학습 필요 여부 확인"""
triggers = []

# 1. 성능 저하 체크
degradation = performance_monitor.check_degradation(self.performance_threshold)
if degradation['degraded']:
triggers.append(f"성능 저하: {degradation['degraded_metrics']}")

# 2. 데이터 드리프트 체크
drift = drift_detector.detect_drift_ks(new_data)
if drift['drift_detected']:
triggers.append(f"데이터 드리프트: {drift['drifted_features']}개 특성")

# 3. PSI 체크
psi_values = drift_detector.compute_psi(new_data)
high_psi = sum(1 for p in psi_values if p > self.drift_threshold)
if high_psi > 0:
triggers.append(f"높은 PSI: {high_psi}개 특성")

result = {
'should_retrain': len(triggers) > 0,
'triggers': triggers,
'timestamp': datetime.now().isoformat()
}

self.trigger_history.append(result)
return result

# 094 재학습 트리거 테스트
retrain_trigger = RetrainTrigger(performance_threshold=0.05, drift_threshold=0.1)

# 094 트리거 체크
trigger_result = retrain_trigger.check_triggers(monitor, drift_detector, X_test + 0.5)
print(f"\n재학습 필요: {trigger_result['should_retrain']}")
print(f"트리거: {trigger_result['triggers']}")

자동 재학습 파이프라인

class AutoRetrainPipeline:
"""자동 재학습 파이프라인"""

def __init__(self, model_path: str, time_budget: int = 60):
self.model_path = model_path
self.time_budget = time_budget
self.model = joblib.load(model_path)
self.retrain_history = []

def retrain(self, X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray = None, y_val: np.ndarray = None) -> dict:
"""모델 재학습"""
print(f"\n=== 재학습 시작 ({datetime.now()}) ===")

new_model = AutoML()
new_model.fit(
X_train, y_train,
task="classification",
time_budget=self.time_budget,
verbose=1
)

# 검증 (있으면)
if X_val is not None and y_val is not None:
old_acc = accuracy_score(y_val, self.model.predict(X_val))
new_acc = accuracy_score(y_val, new_model.predict(X_val))

improvement = (new_acc - old_acc) / old_acc * 100

result = {
'timestamp': datetime.now().isoformat(),
'old_accuracy': old_acc,
'new_accuracy': new_acc,
'improvement': improvement,
'new_estimator': new_model.best_estimator
}
else:
result = {
'timestamp': datetime.now().isoformat(),
'new_estimator': new_model.best_estimator
}

# 모델 업데이트
self.model = new_model
joblib.dump(new_model, self.model_path)

self.retrain_history.append(result)
print(f"재학습 완료: {new_model.best_estimator}")

return result

# 094 재학습 파이프라인 사용
retrain_pipeline = AutoRetrainPipeline('monitored_model.pkl', time_budget=30)

# 094 재학습 실행
X_new_train, X_new_val, y_new_train, y_new_val = train_test_split(X, y, test_size=0.2, random_state=123)
retrain_result = retrain_pipeline.retrain(X_new_train, y_new_train, X_new_val, y_new_val)
print(f"\n재학습 결과: {retrain_result}")

모니터링 대시보드 데이터

def generate_monitoring_report(monitor: PerformanceMonitor,
detector: DataDriftDetector) -> dict:
"""모니터링 리포트 생성"""
history = monitor.get_history_df()

report = {
'summary': {
'total_evaluations': len(history),
'latest_accuracy': history['accuracy'].iloc[-1] if len(history) > 0 else None,
'accuracy_trend': history['accuracy'].diff().mean() if len(history) > 1 else None,
},
'baseline': monitor.baseline_metrics,
'history': history.to_dict('records') if len(history) > 0 else [],
'generated_at': datetime.now().isoformat()
}

return report

# 094 리포트 생성
report = generate_monitoring_report(monitor, drift_detector)
print("\n=== 모니터링 리포트 ===")
print(f"총 평가 횟수: {report['summary']['total_evaluations']}")
print(f"최신 정확도: {report['summary']['latest_accuracy']:.4f}")
print(f"정확도 추세: {report['summary']['accuracy_trend']:.4f}")

정리

  • 데이터 드리프트: KS 검정, PSI로 감지
  • 성능 모니터링: 기준 대비 변화 추적
  • 재학습 트리거: 성능 저하, 드리프트 시 자동 트리거
  • 자동 재학습: FLAML로 새 모델 학습 및 교체
  • 정기적인 모니터링이 MLOps 핵심

다음 글 예고

다음 글에서는 FLAML vs PyCaret 심층 비교를 진행합니다. 두 AutoML 프레임워크의 차이점을 분석합니다.


FLAML AutoML 마스터 시리즈 #094