본문으로 건너뛰기

065 클러스터링 결과 활용하기

키워드: 클러스터링, 활용

개요

클러스터링 결과를 어떻게 활용할 수 있을까요? 이 글에서는 클러스터링 결과를 다른 분석이나 모델에 활용하는 다양한 방법을 알아봅니다.

실습 환경

  • Python 버전: 3.11 권장
  • 필요 패키지: pycaret[full]>=3.0

1. 특성 엔지니어링에 활용

클러스터를 새로운 특성으로 사용:

from pycaret.clustering import *
from pycaret.datasets import get_data
import pandas as pd

# 065 클러스터링
data = get_data('jewellery')
clust = setup(data, normalize=True, session_id=42, verbose=False)

kmeans = create_model('kmeans', num_clusters=4)
clustered = assign_model(kmeans)

# 065 클러스터를 특성으로 추가
data['Cluster'] = clustered['Cluster']

# 065 원-핫 인코딩
cluster_dummies = pd.get_dummies(data['Cluster'], prefix='cluster')
data_with_clusters = pd.concat([data, cluster_dummies], axis=1)

print("클러스터 특성 추가 후 컬럼:")
print(data_with_clusters.columns.tolist())

2. 세그먼트별 예측 모델

클러스터별로 별도의 예측 모델 학습:

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

# 065 타겟 변수 생성 (예시)
np.random.seed(42)
data['target'] = data['Age'] * 100 + data['Income'] * 0.01 + np.random.normal(0, 500, len(data))

# 065 전체 데이터로 단일 모델
X = data[['Age', 'Income', 'SpendingScore', 'Savings']]
y = data['target']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

single_model = RandomForestRegressor(n_estimators=100, random_state=42)
single_model.fit(X_train, y_train)
single_pred = single_model.predict(X_test)
single_rmse = np.sqrt(mean_squared_error(y_test, single_pred))
print(f"단일 모델 RMSE: {single_rmse:.2f}")

# 065 클러스터별 모델
cluster_models = {}
cluster_predictions = np.zeros(len(X_test))

for cluster in data['Cluster'].unique():
cluster_mask_train = data.iloc[X_train.index]['Cluster'] == cluster
cluster_mask_test = data.iloc[X_test.index]['Cluster'] == cluster

if cluster_mask_train.sum() > 10: # 충분한 데이터가 있을 때만
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train[cluster_mask_train], y_train[cluster_mask_train])
cluster_models[cluster] = model

if cluster_mask_test.sum() > 0:
cluster_predictions[cluster_mask_test.values] = model.predict(X_test[cluster_mask_test])

cluster_rmse = np.sqrt(mean_squared_error(y_test, cluster_predictions))
print(f"클러스터별 모델 RMSE: {cluster_rmse:.2f}")
print(f"개선율: {(single_rmse - cluster_rmse) / single_rmse * 100:.1f}%")

3. 추천 시스템에 활용

같은 클러스터의 사용자/아이템 추천:

import numpy as np
import pandas as pd

def recommend_similar_items(item_id, data, cluster_col, n_recommendations=5):
"""같은 클러스터의 유사 아이템 추천"""

# 아이템의 클러스터
item_cluster = data.loc[item_id, cluster_col]

# 같은 클러스터의 다른 아이템
same_cluster = data[
(data[cluster_col] == item_cluster) &
(data.index != item_id)
]

# 유사도 기반 정렬 (단순히 랜덤 선택으로 예시)
recommendations = same_cluster.sample(n=min(n_recommendations, len(same_cluster)))

return recommendations.index.tolist()

# 065 예시
sample_item = data.index[0]
recommendations = recommend_similar_items(sample_item, data, 'Cluster')
print(f"아이템 {sample_item}와 유사한 아이템: {recommendations}")

4. 이상치 탐지 기반선

클러스터 중심에서 먼 데이터를 이상치로 탐지:

from scipy.spatial.distance import cdist
import numpy as np
from pycaret.clustering import *
from pycaret.datasets import get_data

data = get_data('jewellery')
clust = setup(data, normalize=True, session_id=42, verbose=False)

kmeans = create_model('kmeans', num_clusters=4)
clustered = assign_model(kmeans)

X = get_config('X').values

# 065 각 점에서 클러스터 중심까지 거리
distances = cdist(X, kmeans.cluster_centers_, metric='euclidean')
min_distances = distances.min(axis=1)

# 065 거리 임계값 (상위 5%를 이상치로)
threshold = np.percentile(min_distances, 95)
outliers = min_distances > threshold

print(f"이상치 임계값: {threshold:.4f}")
print(f"탐지된 이상치: {outliers.sum()}개 ({outliers.mean()*100:.1f}%)")

# 065 이상치 확인
data['is_outlier'] = outliers
data['distance_to_center'] = min_distances

print("\n이상치 샘플:")
print(data[data['is_outlier']].head())

5. A/B 테스트 그룹 분할

클러스터 기반 층화 샘플링:

from sklearn.model_selection import StratifiedShuffleSplit
import numpy as np

# 065 클러스터 기반 층화 분할
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.5, random_state=42)

for train_idx, test_idx in sss.split(data, data['Cluster']):
group_a = data.iloc[train_idx]
group_b = data.iloc[test_idx]

print("=== A/B 테스트 그룹 ===\n")

print("그룹 A 클러스터 분포:")
print(group_a['Cluster'].value_counts(normalize=True).round(3))

print("\n그룹 B 클러스터 분포:")
print(group_b['Cluster'].value_counts(normalize=True).round(3))

# 065 두 그룹의 클러스터 분포가 유사함

6. 세그먼트별 대시보드

클러스터별 KPI 모니터링:

import pandas as pd

def create_segment_dashboard(data, cluster_col, metrics):
"""세그먼트별 대시보드 생성"""

dashboard = data.groupby(cluster_col).agg(metrics)

# 전체 대비 비율
for col in dashboard.columns:
dashboard[f'{col}_pct'] = dashboard[col] / dashboard[col].sum() * 100

return dashboard

# 065 예시 메트릭
metrics = {
'Age': ['mean', 'std'],
'Income': ['mean', 'sum'],
'SpendingScore': ['mean'],
'Savings': ['mean', 'sum']
}

dashboard = create_segment_dashboard(data, 'Cluster', metrics)

print("=== 세그먼트 대시보드 ===\n")
print(dashboard.round(2))

7. 시계열에서 패턴 클러스터링

시계열 패턴 그룹화:

import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler

# 065 시계열 데이터 시뮬레이션 (일별 판매량)
np.random.seed(42)
n_products = 100
n_days = 30

# 065 다양한 패턴 생성
sales_data = []
for i in range(n_products):
pattern_type = i % 3

if pattern_type == 0: # 상승 트렌드
sales = np.linspace(50, 150, n_days) + np.random.normal(0, 10, n_days)
elif pattern_type == 1: # 하락 트렌드
sales = np.linspace(150, 50, n_days) + np.random.normal(0, 10, n_days)
else: # 안정
sales = np.ones(n_days) * 100 + np.random.normal(0, 15, n_days)

sales_data.append(sales)

sales_df = pd.DataFrame(sales_data, columns=[f'day_{i}' for i in range(n_days)])

# 065 정규화
scaler = StandardScaler()
sales_scaled = scaler.fit_transform(sales_df)

# 065 클러스터링
kmeans = KMeans(n_clusters=3, random_state=42, n_init=10)
pattern_clusters = kmeans.fit_predict(sales_scaled)

sales_df['Pattern'] = pattern_clusters

print("패턴 클러스터 분포:")
print(pd.Series(pattern_clusters).value_counts().sort_index())

# 065 클러스터별 평균 패턴
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 4))
for cluster in range(3):
cluster_mean = sales_df[sales_df['Pattern'] == cluster].iloc[:, :n_days].mean()
plt.plot(range(n_days), cluster_mean, label=f'Pattern {cluster}', linewidth=2)

plt.xlabel('Day')
plt.ylabel('Sales')
plt.title('Sales Patterns by Cluster')
plt.legend()
plt.grid(True, alpha=0.3)
plt.savefig('sales_patterns.png', dpi=150)

8. 반자동 레이블링

클러스터 정보를 활용한 약한 레이블링:

import pandas as pd
import numpy as np

# 065 클러스터 특성 기반 규칙으로 레이블 생성
def auto_label_cluster(cluster_profile):
"""클러스터 프로파일 기반 자동 레이블링"""

if cluster_profile['Income'] > 80000 and cluster_profile['SpendingScore'] > 60:
return 'High Value Customer'
elif cluster_profile['Age'] < 30 and cluster_profile['SpendingScore'] > 50:
return 'Young Active'
elif cluster_profile['Savings'] > 50000:
return 'Conservative Saver'
else:
return 'General Customer'

# 065 클러스터별 프로파일
cluster_profiles = data.groupby('Cluster')[['Age', 'Income', 'SpendingScore', 'Savings']].mean()

# 065 자동 레이블 부여
auto_labels = {}
for cluster in cluster_profiles.index:
auto_labels[cluster] = auto_label_cluster(cluster_profiles.loc[cluster])

print("자동 생성된 레이블:")
for cluster, label in auto_labels.items():
print(f" 클러스터 {cluster}: {label}")

# 065 데이터에 레이블 추가
data['AutoLabel'] = data['Cluster'].map(auto_labels)

9. 파이프라인 통합

전처리 → 클러스터링 → 예측 파이프라인:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.base import BaseEstimator, TransformerMixin

class ClusterFeatureAdder(BaseEstimator, TransformerMixin):
"""클러스터를 특성으로 추가하는 변환기"""

def __init__(self, n_clusters=4):
self.n_clusters = n_clusters
self.kmeans = None

def fit(self, X, y=None):
self.kmeans = KMeans(n_clusters=self.n_clusters, random_state=42, n_init=10)
self.kmeans.fit(X)
return self

def transform(self, X):
clusters = self.kmeans.predict(X)
# 원-핫 인코딩
cluster_features = np.zeros((len(X), self.n_clusters))
cluster_features[np.arange(len(X)), clusters] = 1
return np.hstack([X, cluster_features])

# 065 파이프라인 구성
pipeline = Pipeline([
('scaler', StandardScaler()),
('cluster_features', ClusterFeatureAdder(n_clusters=4))
])

# 065 적용
X = data[['Age', 'Income', 'SpendingScore', 'Savings']].values
X_transformed = pipeline.fit_transform(X)

print(f"원본 특성: {X.shape[1]}")
print(f"변환 후 특성: {X_transformed.shape[1]}")

10. 클러스터 모니터링

시간에 따른 클러스터 변화 추적:

def track_cluster_drift(new_data, original_model, original_distribution):
"""클러스터 분포 변화 모니터링"""

# 새 데이터 클러스터 할당
new_labels = original_model.predict(new_data)
new_distribution = pd.Series(new_labels).value_counts(normalize=True).sort_index()

# 분포 차이 계산
drift = {}
for cluster in original_distribution.index:
orig_pct = original_distribution.get(cluster, 0)
new_pct = new_distribution.get(cluster, 0)
drift[cluster] = abs(new_pct - orig_pct)

avg_drift = np.mean(list(drift.values()))

return {
'new_distribution': new_distribution,
'drift_by_cluster': drift,
'average_drift': avg_drift,
'alert': avg_drift > 0.1 # 10% 이상 변화 시 알림
}

# 065 예시 (원본 분포 저장)
original_distribution = data['Cluster'].value_counts(normalize=True).sort_index()
print("원본 클러스터 분포:")
print(original_distribution.round(3))

정리

클러스터링 결과 활용 방법:

  1. 특성 엔지니어링: 클러스터를 새 특성으로
  2. 세그먼트별 모델: 클러스터별 전문 모델
  3. 추천 시스템: 유사 사용자/아이템 추천
  4. 이상치 탐지: 중심에서 먼 데이터
  5. A/B 테스트: 층화 샘플링
  6. 대시보드: 세그먼트별 KPI
  7. 시계열 패턴: 행동 패턴 그룹화
  8. 약한 레이블링: 규칙 기반 자동 레이블
  9. 파이프라인: 전처리와 통합
  10. 모니터링: 분포 변화 추적

다음 글 예고

다음 글에서는 이상치 탐지의 이해를 다룹니다.


PyCaret 머신러닝 마스터 시리즈 #065