071 이상치 탐지 실전 - 사기 거래 탐지
키워드: 사기 탐지, fraud detection
개요
신용카드 사기 탐지는 이상치 탐지의 대표적인 활용 사례입니다. 정상 거래 패턴에서 벗어난 사기 거래를 자동으로 탐지하는 시스템을 구축합니다.
실습 환경
- Python 버전: 3.11 권장
- 필요 패키지:
pycaret[full]>=3.0
문제 정의
사기 탐지의 특징:
- 극심한 클래스 불균형 (사기 < 1%)
- 실시간 탐지 필요
- 높은 재현율 중요 (사기 놓치면 안 됨)
- 오탐 비용도 고려 (정상을 사기로 판단)
비지도 학습 접근법:
- 레이블 없이 이상 패턴 학습
- 새로운 사기 유형에도 대응 가능
데이터 생성
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
np.random.seed(42)
# 071 정상 거래 생성
n_normal = 10000
normal_transactions = pd.DataFrame({
'amount': np.random.lognormal(4, 1, n_normal), # 대부분 소액
'hour': np.random.choice(range(8, 23), n_normal), # 주간 거래
'day_of_week': np.random.choice(range(7), n_normal),
'merchant_category': np.random.choice(['grocery', 'restaurant', 'gas', 'retail', 'online'], n_normal),
'distance_from_home': np.random.exponential(5, n_normal), # 집 근처
'time_since_last_transaction': np.random.exponential(24, n_normal), # 시간 단위
'transaction_count_24h': np.random.poisson(3, n_normal),
'is_fraud': 0
})
# 071 사기 거래 생성 (다양한 패턴)
n_fraud = 200
# 071 패턴 1: 고액 거래
fraud1 = pd.DataFrame({
'amount': np.random.uniform(5000, 20000, n_fraud // 4),
'hour': np.random.choice(range(0, 6), n_fraud // 4), # 새벽
'day_of_week': np.random.choice(range(7), n_fraud // 4),
'merchant_category': np.random.choice(['online', 'retail'], n_fraud // 4),
'distance_from_home': np.random.uniform(100, 500, n_fraud // 4), # 먼 거리
'time_since_last_transaction': np.random.uniform(0.1, 1, n_fraud // 4), # 연속 거래
'transaction_count_24h': np.random.randint(10, 30, n_fraud // 4),
'is_fraud': 1
})
# 071 패턴 2: 해외 사용
fraud2 = pd.DataFrame({
'amount': np.random.uniform(1000, 5000, n_fraud // 4),
'hour': np.random.choice(range(24), n_fraud // 4),
'day_of_week': np.random.choice(range(7), n_fraud // 4),
'merchant_category': np.random.choice(['online'], n_fraud // 4),
'distance_from_home': np.random.uniform(500, 10000, n_fraud // 4), # 매우 먼 거리
'time_since_last_transaction': np.random.exponential(2, n_fraud // 4),
'transaction_count_24h': np.random.randint(5, 15, n_fraud // 4),
'is_fraud': 1
})
# 071 패턴 3: 연속 소액 거래
fraud3 = pd.DataFrame({
'amount': np.random.uniform(10, 100, n_fraud // 4),
'hour': np.random.choice(range(24), n_fraud // 4),
'day_of_week': np.random.choice(range(7), n_fraud // 4),
'merchant_category': np.random.choice(['online'], n_fraud // 4),
'distance_from_home': np.random.exponential(10, n_fraud // 4),
'time_since_last_transaction': np.random.uniform(0.01, 0.1, n_fraud // 4), # 매우 짧은 간격
'transaction_count_24h': np.random.randint(20, 50, n_fraud // 4), # 많은 거래
'is_fraud': 1
})
# 071 패턴 4: 비정상 카테고리
fraud4 = pd.DataFrame({
'amount': np.random.uniform(500, 3000, n_fraud // 4),
'hour': np.random.choice(range(24), n_fraud // 4),
'day_of_week': np.random.choice(range(7), n_fraud // 4),
'merchant_category': np.random.choice(['jewelry', 'electronics', 'luxury'], n_fraud // 4),
'distance_from_home': np.random.uniform(50, 200, n_fraud // 4),
'time_since_last_transaction': np.random.exponential(5, n_fraud // 4),
'transaction_count_24h': np.random.randint(5, 10, n_fraud // 4),
'is_fraud': 1
})
# 071 데이터 결합
data = pd.concat([normal_transactions, fraud1, fraud2, fraud3, fraud4], ignore_index=True)
data = data.sample(frac=1, random_state=42).reset_index(drop=True)
print(f"전체 거래: {len(data)}")
print(f"정상 거래: {(data['is_fraud'] == 0).sum()}")
print(f"사기 거래: {(data['is_fraud'] == 1).sum()}")
print(f"사기 비율: {data['is_fraud'].mean()*100:.2f}%")
특성 엔지니어링
import pandas as pd
import numpy as np
# 071 범주형 인코딩
data_encoded = pd.get_dummies(data, columns=['merchant_category'])
# 071 파생 특성
data_encoded['amount_log'] = np.log1p(data_encoded['amount'])
data_encoded['is_night'] = data_encoded['hour'].apply(lambda x: 1 if x < 6 or x > 22 else 0)
data_encoded['is_weekend'] = data_encoded['day_of_week'].apply(lambda x: 1 if x >= 5 else 0)
data_encoded['high_frequency'] = (data_encoded['transaction_count_24h'] > 10).astype(int)
data_encoded['far_from_home'] = (data_encoded['distance_from_home'] > 50).astype(int)
# 071 이상치 탐지용 데이터 (레이블 제외)
y_true = data_encoded['is_fraud'].values
X = data_encoded.drop(['is_fraud'], axis=1)
print(f"특성 수: {X.shape[1]}")
print(X.columns.tolist())
PyCaret 이상치 탐지
from pycaret.anomaly import *
# 071 환경 설정
anomaly = setup(X, normalize=True, session_id=42, verbose=False)
# 071 여러 알고리즘 비교
from sklearn.metrics import f1_score, precision_score, recall_score, roc_auc_score
algorithms = ['iforest', 'lof', 'knn', 'svm']
results = []
for algo in algorithms:
model = create_model(algo, fraction=0.02) # 2% 이상치
result = assign_model(model)
y_pred = result['Anomaly'].values
results.append({
'Algorithm': algo,
'Precision': precision_score(y_true, y_pred),
'Recall': recall_score(y_true, y_pred),
'F1': f1_score(y_true, y_pred)
})
import pandas as pd
df_results = pd.DataFrame(results).sort_values('F1', ascending=False)
print("알고리즘 비교:")
print(df_results.round(4))
Isolation Forest 상세 튜닝
from pycaret.anomaly import *
from sklearn.ensemble import IsolationForest
from sklearn.metrics import f1_score, precision_score, recall_score
import matplotlib.pyplot as plt
# 071 fraction 튜닝
fractions = [0.01, 0.02, 0.03, 0.05, 0.07, 0.1]
results = []
for frac in fractions:
anomaly = setup(X, normalize=True, session_id=42, verbose=False)
model = create_model('iforest', fraction=frac)
result = assign_model(model)
y_pred = result['Anomaly'].values
results.append({
'Fraction': frac,
'Detected': y_pred.sum(),
'Precision': precision_score(y_true, y_pred),
'Recall': recall_score(y_true, y_pred),
'F1': f1_score(y_true, y_pred)
})
df = pd.DataFrame(results)
print("Fraction 튜닝:")
print(df.round(4))
# 071 시각화
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
axes[0].plot(df['Fraction'], df['Precision'], 'bo-', label='Precision')
axes[0].plot(df['Fraction'], df['Recall'], 'rs-', label='Recall')
axes[0].set_xlabel('Fraction')
axes[0].set_ylabel('Score')
axes[0].set_title('Precision vs Recall')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
axes[1].plot(df['Fraction'], df['F1'], 'go-')
axes[1].set_xlabel('Fraction')
axes[1].set_ylabel('F1 Score')
axes[1].set_title('F1 Score')
axes[1].grid(True, alpha=0.3)
axes[2].plot(df['Fraction'], df['Detected'], 'mo-')
axes[2].axhline(y=y_true.sum(), color='red', linestyle='--', label=f'Actual Fraud ({y_true.sum()})')
axes[2].set_xlabel('Fraction')
axes[2].set_ylabel('Detected Count')
axes[2].set_title('Detected Anomalies')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('fraud_detection_tuning.png', dpi=150)
앙상블 접근법
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
import numpy as np
# 071 스케일링
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 071 여러 모델
models = {
'iforest': IsolationForest(contamination=0.02, random_state=42),
'lof': LocalOutlierFactor(n_neighbors=20, contamination=0.02, novelty=False)
}
# 071 각 모델 예측
predictions = {}
for name, model in models.items():
if name == 'lof':
pred = model.fit_predict(X_scaled)
else:
pred = model.fit_predict(X_scaled)
predictions[name] = (pred == -1).astype(int)
# 071 앙상블 (다수결)
ensemble_pred = (predictions['iforest'] + predictions['lof']) >= 1 # OR 방식
# 071 평가
print("\n=== 앙상블 결과 ===")
print(f"Precision: {precision_score(y_true, ensemble_pred):.4f}")
print(f"Recall: {recall_score(y_true, ensemble_pred):.4f}")
print(f"F1: {f1_score(y_true, ensemble_pred):.4f}")
임계값 조정 (비용 기반)
from sklearn.ensemble import IsolationForest
import numpy as np
import matplotlib.pyplot as plt
# 071 모델 학습
iforest = IsolationForest(random_state=42)
iforest.fit(X_scaled)
# 071 이상 점수
scores = -iforest.score_samples(X_scaled) # 높을수록 이상
# 071 다양한 임계값에서 성능
thresholds = np.percentile(scores, range(90, 100))
results = []
for thresh in thresholds:
y_pred = (scores > thresh).astype(int)
results.append({
'Threshold': thresh,
'Detected': y_pred.sum(),
'Precision': precision_score(y_true, y_pred),
'Recall': recall_score(y_true, y_pred),
'F1': f1_score(y_true, y_pred)
})
df = pd.DataFrame(results)
print("\n임계값 조정:")
print(df.round(4))
# 071 비용 기반 최적화
# 071 사기 놓침 비용: $1000, 오탐 비용: $10
cost_fn = 1000 # False Negative cost
cost_fp = 10 # False Positive cost
def calculate_cost(y_true, y_pred, cost_fn, cost_fp):
fn = ((y_true == 1) & (y_pred == 0)).sum()
fp = ((y_true == 0) & (y_pred == 1)).sum()
return fn * cost_fn + fp * cost_fp
costs = []
for thresh in thresholds:
y_pred = (scores > thresh).astype(int)
cost = calculate_cost(y_true, y_pred, cost_fn, cost_fp)
costs.append(cost)
best_idx = np.argmin(costs)
best_thresh = thresholds[best_idx]
print(f"\n최적 임계값 (비용 기준): {best_thresh:.4f}")
print(f"최소 비용: ${costs[best_idx]:,.0f}")
실시간 탐지 파이프라인
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
import joblib
class FraudDetector:
def __init__(self, contamination=0.02):
self.scaler = StandardScaler()
self.model = IsolationForest(
contamination=contamination,
n_estimators=100,
random_state=42
)
self.threshold = None
def fit(self, X):
"""학습"""
X_scaled = self.scaler.fit_transform(X)
self.model.fit(X_scaled)
# 이상 점수 임계값 설정
scores = -self.model.score_samples(X_scaled)
self.threshold = np.percentile(scores, 98) # 상위 2%
return self
def predict(self, X):
"""예측"""
X_scaled = self.scaler.transform(X)
scores = -self.model.score_samples(X_scaled)
return (scores > self.threshold).astype(int)
def predict_proba(self, X):
"""이상 점수 반환"""
X_scaled = self.scaler.transform(X)
scores = -self.model.score_samples(X_scaled)
# 0~1 범위로 정규화
min_score = scores.min()
max_score = scores.max()
return (scores - min_score) / (max_score - min_score + 1e-10)
def save(self, path):
"""모델 저장"""
joblib.dump({
'scaler': self.scaler,
'model': self.model,
'threshold': self.threshold
}, path)
@classmethod
def load(cls, path):
"""모델 로드"""
data = joblib.load(path)
detector = cls()
detector.scaler = data['scaler']
detector.model = data['model']
detector.threshold = data['threshold']
return detector
# 071 사용 예
detector = FraudDetector(contamination=0.02)
detector.fit(X)
# 071 새 거래 예측
new_transaction = X.iloc[[0]] # 예시
is_fraud = detector.predict(new_transaction)
fraud_score = detector.predict_proba(new_transaction)
print(f"사기 여부: {'사기' if is_fraud[0] else '정상'}")
print(f"사기 점수: {fraud_score[0]:.4f}")
# 071 모델 저장
detector.save('fraud_detector.pkl')
모니터링 대시보드
import matplotlib.pyplot as plt
import numpy as np
# 071 예측 결과
y_pred = detector.predict(X)
scores = detector.predict_proba(X)
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 1. 점수 분포
axes[0, 0].hist(scores[y_true == 0], bins=50, alpha=0.7, label='Normal', density=True)
axes[0, 0].hist(scores[y_true == 1], bins=50, alpha=0.7, label='Fraud', density=True)
axes[0, 0].axvline(x=np.percentile(scores, 98), color='red', linestyle='--', label='Threshold')
axes[0, 0].set_xlabel('Anomaly Score')
axes[0, 0].set_ylabel('Density')
axes[0, 0].set_title('Score Distribution')
axes[0, 0].legend()
# 2. 혼동 행렬
from sklearn.metrics import confusion_matrix
import seaborn as sns
cm = confusion_matrix(y_true, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[0, 1])
axes[0, 1].set_xlabel('Predicted')
axes[0, 1].set_ylabel('Actual')
axes[0, 1].set_title('Confusion Matrix')
# 3. 금액별 탐지율
amount_bins = [0, 100, 500, 1000, 5000, float('inf')]
amount_labels = ['<100', '100-500', '500-1K', '1K-5K', '>5K']
X['amount_bin'] = pd.cut(X['amount'], bins=amount_bins, labels=amount_labels)
detection_by_amount = []
for label in amount_labels:
mask = X['amount_bin'] == label
if mask.sum() > 0 and y_true[mask].sum() > 0:
recall = recall_score(y_true[mask], y_pred[mask])
detection_by_amount.append(recall)
else:
detection_by_amount.append(0)
axes[1, 0].bar(amount_labels, detection_by_amount, color='steelblue')
axes[1, 0].set_xlabel('Transaction Amount')
axes[1, 0].set_ylabel('Recall')
axes[1, 0].set_title('Detection Rate by Amount')
# 4. 시간대별 탐지
detection_by_hour = []
for h in range(24):
mask = X['hour'] == h
if mask.sum() > 0 and y_true[mask].sum() > 0:
recall = recall_score(y_true[mask], y_pred[mask])
detection_by_hour.append(recall)
else:
detection_by_hour.append(0)
axes[1, 1].bar(range(24), detection_by_hour, color='coral')
axes[1, 1].set_xlabel('Hour')
axes[1, 1].set_ylabel('Recall')
axes[1, 1].set_title('Detection Rate by Hour')
plt.tight_layout()
plt.savefig('fraud_dashboard.png', dpi=150)
장단점
장점:
- 레이블 없이 학습 가능
- 새로운 사기 유형 탐지 가능
- 실시간 처리 가능
단점:
- 임계값 설정 필요
- 오탐율 관리 필요
- 컨셉 드리프트 대응 필요
정리
- 사기 탐지는 이상치 탐지의 대표 활용 사례
- 극심한 클래스 불균형 상황
- Isolation Forest가 효과적
- 비용 기반 임계값 조정 중요
- 앙상블로 성능 향상 가능
다음 글 예고
다음 글에서는 이상치 탐지 실전 - 설비 이상 감지를 다룹니다.
PyCaret 머신러닝 마스터 시리즈 #071