072 이상치 탐지 실전 - 설비 이상 감지
키워드: 설비 이상, predictive maintenance
개요
설비 이상 감지(Predictive Maintenance)는 센서 데이터를 분석하여 설비 고장을 사전에 예측하는 기술입니다. 이상치 탐지를 통해 정상 운전 패턴에서 벗어난 이상 징후를 조기에 감지합니다.
실습 환경
- Python 버전: 3.11 권장
- 필요 패키지:
pycaret[full]>=3.0
문제 정의
설비 이상 감지의 특징:
- 다변량 센서 데이터
- 시간적 패턴 존재
- 이상 = 고장 전조 or 이미 고장
- 조기 감지가 핵심
비용 구조:
- 미탐지 → 설비 고장 → 큰 손실
- 오탐지 → 불필요한 점검 → 작은 손실
- 조기 경고 → 예방 정비 → 최소 비용
데이터 생성 (시뮬레이션)
import pandas as pd
import numpy as np
np.random.seed(42)
# 072 시간 인덱스
n_samples = 10000
timestamps = pd.date_range('2024-01-01', periods=n_samples, freq='10min')
# 072 정상 운전 데이터
def generate_normal_data(n):
"""정상 센서 데이터 생성"""
return pd.DataFrame({
'temperature': np.random.normal(70, 3, n), # 온도: 70±3
'vibration': np.random.normal(2.5, 0.3, n), # 진동: 2.5±0.3
'pressure': np.random.normal(100, 5, n), # 압력: 100±5
'current': np.random.normal(50, 2, n), # 전류: 50±2
'rpm': np.random.normal(1500, 30, n), # RPM: 1500±30
'oil_level': np.random.normal(80, 2, n), # 오일 레벨: 80±2
'is_anomaly': 0
})
# 072 이상 패턴 데이터
def generate_anomaly_bearing(n):
"""베어링 마모 (진동 증가)"""
return pd.DataFrame({
'temperature': np.random.normal(75, 3, n), # 약간 상승
'vibration': np.random.normal(5.0, 0.5, n), # 크게 증가
'pressure': np.random.normal(100, 5, n),
'current': np.random.normal(55, 3, n), # 약간 증가
'rpm': np.random.normal(1500, 50, n),
'oil_level': np.random.normal(78, 3, n),
'is_anomaly': 1
})
def generate_anomaly_overheating(n):
"""과열 (온도 급상승)"""
return pd.DataFrame({
'temperature': np.random.normal(95, 5, n), # 크게 상승
'vibration': np.random.normal(3.0, 0.4, n), # 약간 증가
'pressure': np.random.normal(110, 8, n), # 상승
'current': np.random.normal(60, 4, n), # 증가
'rpm': np.random.normal(1450, 40, n), # 약간 감소
'oil_level': np.random.normal(75, 3, n), # 감소
'is_anomaly': 1
})
def generate_anomaly_pressure(n):
"""압력 이상 (누출)"""
return pd.DataFrame({
'temperature': np.random.normal(70, 3, n),
'vibration': np.random.normal(2.5, 0.3, n),
'pressure': np.random.normal(70, 10, n), # 크게 감소
'current': np.random.normal(45, 3, n), # 감소
'rpm': np.random.normal(1400, 50, n), # 감소
'oil_level': np.random.normal(70, 5, n), # 감소
'is_anomaly': 1
})
# 072 데이터 생성
normal = generate_normal_data(9400)
anomaly1 = generate_anomaly_bearing(200)
anomaly2 = generate_anomaly_overheating(200)
anomaly3 = generate_anomaly_pressure(200)
data = pd.concat([normal, anomaly1, anomaly2, anomaly3], ignore_index=True)
data = data.sample(frac=1, random_state=42).reset_index(drop=True)
data['timestamp'] = timestamps
print(f"전체 데이터: {len(data)}")
print(f"정상: {(data['is_anomaly'] == 0).sum()}")
print(f"이상: {(data['is_anomaly'] == 1).sum()}")
print(f"이상 비율: {data['is_anomaly'].mean()*100:.1f}%")
탐색적 데이터 분석
import matplotlib.pyplot as plt
import seaborn as sns
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
sensors = ['temperature', 'vibration', 'pressure', 'current', 'rpm', 'oil_level']
for ax, sensor in zip(axes.flatten(), sensors):
ax.hist(data[data['is_anomaly'] == 0][sensor], bins=50, alpha=0.7, label='Normal', density=True)
ax.hist(data[data['is_anomaly'] == 1][sensor], bins=50, alpha=0.7, label='Anomaly', density=True)
ax.set_xlabel(sensor)
ax.set_ylabel('Density')
ax.set_title(f'{sensor} Distribution')
ax.legend()
plt.tight_layout()
plt.savefig('sensor_distributions.png', dpi=150)
# 072 상관관계
plt.figure(figsize=(10, 8))
corr = data[sensors].corr()
sns.heatmap(corr, annot=True, cmap='coolwarm', center=0)
plt.title('Sensor Correlation')
plt.savefig('sensor_correlation.png', dpi=150)
특성 엔지니어링
import pandas as pd
import numpy as np
# 072 원본 특성
feature_cols = ['temperature', 'vibration', 'pressure', 'current', 'rpm', 'oil_level']
X = data[feature_cols].copy()
y_true = data['is_anomaly'].values
# 072 파생 특성
# 072 이동 통계량 (실제로는 시계열 순서대로)
window = 6 # 1시간 윈도우
for col in feature_cols:
X[f'{col}_rolling_mean'] = X[col].rolling(window, min_periods=1).mean()
X[f'{col}_rolling_std'] = X[col].rolling(window, min_periods=1).std().fillna(0)
# 072 비율 특성
X['temp_pressure_ratio'] = X['temperature'] / (X['pressure'] + 1)
X['current_rpm_ratio'] = X['current'] / (X['rpm'] + 1)
X['power_factor'] = X['current'] * X['rpm'] / 100000
# 072 편차 특성 (정상 범위에서 얼마나 벗어났는지)
normal_means = {
'temperature': 70, 'vibration': 2.5, 'pressure': 100,
'current': 50, 'rpm': 1500, 'oil_level': 80
}
normal_stds = {
'temperature': 3, 'vibration': 0.3, 'pressure': 5,
'current': 2, 'rpm': 30, 'oil_level': 2
}
for col in feature_cols:
X[f'{col}_zscore'] = (X[col] - normal_means[col]) / normal_stds[col]
# 072 NaN 처리
X = X.fillna(0)
print(f"특성 수: {X.shape[1]}")
print(X.columns.tolist()[:10], '...')
PyCaret 이상치 탐지
from pycaret.anomaly import *
from sklearn.metrics import f1_score, precision_score, recall_score, classification_report
# 072 환경 설정
anomaly = setup(X, normalize=True, session_id=42, verbose=False)
# 072 알고리즘 비교
algorithms = ['iforest', 'lof', 'knn']
results = []
for algo in algorithms:
model = create_model(algo, fraction=0.05)
result = assign_model(model)
y_pred = result['Anomaly'].values
results.append({
'Algorithm': algo,
'Precision': precision_score(y_true, y_pred),
'Recall': recall_score(y_true, y_pred),
'F1': f1_score(y_true, y_pred)
})
df_results = pd.DataFrame(results).sort_values('F1', ascending=False)
print("알고리즘 비교:")
print(df_results.round(4))
다중 모델 앙상블
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler
import numpy as np
# 072 스케일링
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 072 여러 파라미터로 앙상블
models = []
predictions_list = []
# 072 Isolation Forest 앙상블
for contamination in [0.03, 0.05, 0.07]:
for n_estimators in [50, 100]:
iforest = IsolationForest(
contamination=contamination,
n_estimators=n_estimators,
random_state=42
)
pred = iforest.fit_predict(X_scaled)
predictions_list.append((pred == -1).astype(int))
# 072 LOF 앙상블
for n_neighbors in [10, 20, 30]:
lof = LocalOutlierFactor(
n_neighbors=n_neighbors,
contamination=0.05,
novelty=False
)
pred = lof.fit_predict(X_scaled)
predictions_list.append((pred == -1).astype(int))
# 072 앙상블 (투표)
predictions_array = np.array(predictions_list)
ensemble_votes = predictions_array.sum(axis=0)
# 072 다양한 투표 임계값
print("\n투표 임계값별 성능:")
for threshold in range(1, len(predictions_list) + 1):
y_pred = (ensemble_votes >= threshold).astype(int)
if y_pred.sum() > 0:
print(f"임계값 {threshold}: P={precision_score(y_true, y_pred):.3f}, "
f"R={recall_score(y_true, y_pred):.3f}, "
f"F1={f1_score(y_true, y_pred):.3f}, "
f"탐지={y_pred.sum()}")
실시간 모니터링 시스템
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
class EquipmentMonitor:
def __init__(self, contamination=0.05, alert_threshold=0.7):
self.scaler = StandardScaler()
self.model = IsolationForest(
contamination=contamination,
n_estimators=100,
random_state=42
)
self.alert_threshold = alert_threshold
self.feature_means = None
self.feature_stds = None
def fit(self, X):
"""정상 운전 데이터로 학습"""
X_scaled = self.scaler.fit_transform(X)
self.model.fit(X_scaled)
self.feature_means = X.mean()
self.feature_stds = X.std()
return self
def monitor(self, X):
"""실시간 모니터링"""
X_scaled = self.scaler.transform(X)
scores = -self.model.score_samples(X_scaled)
# 0~1 범위로 정규화
min_score = 0.4 # 일반적인 정상 점수
max_score = 0.6 # 일반적인 이상 점수
normalized_scores = np.clip((scores - min_score) / (max_score - min_score), 0, 1)
# 상태 판단
statuses = []
for score in normalized_scores:
if score < 0.3:
statuses.append('NORMAL')
elif score < self.alert_threshold:
statuses.append('WARNING')
else:
statuses.append('CRITICAL')
return pd.DataFrame({
'anomaly_score': normalized_scores,
'status': statuses
})
def diagnose(self, X):
"""이상 원인 진단"""
deviations = {}
for col in X.columns:
if col in self.feature_means.index:
z_score = abs((X[col].values[0] - self.feature_means[col]) / (self.feature_stds[col] + 1e-10))
deviations[col] = z_score
# 상위 기여 특성
sorted_devs = sorted(deviations.items(), key=lambda x: x[1], reverse=True)
return sorted_devs[:5]
# 072 사용 예
monitor = EquipmentMonitor(contamination=0.05)
monitor.fit(X)
# 072 테스트 데이터
test_sample = X.iloc[[100]]
result = monitor.monitor(test_sample)
print(f"상태: {result['status'].values[0]}")
print(f"이상 점수: {result['anomaly_score'].values[0]:.3f}")
# 072 이상 진단
if result['status'].values[0] != 'NORMAL':
diagnosis = monitor.diagnose(test_sample)
print("\n이상 원인 (상위 기여 특성):")
for feature, deviation in diagnosis:
print(f" - {feature}: {deviation:.2f} σ")
알림 시스템
from datetime import datetime
class AlertSystem:
def __init__(self, monitor):
self.monitor = monitor
self.alert_history = []
def check_and_alert(self, X, equipment_id):
"""상태 확인 및 알림"""
result = self.monitor.monitor(X)
alert = {
'timestamp': datetime.now(),
'equipment_id': equipment_id,
'status': result['status'].values[0],
'score': result['anomaly_score'].values[0]
}
# 이상 발생 시 진단 추가
if alert['status'] != 'NORMAL':
diagnosis = self.monitor.diagnose(X)
alert['diagnosis'] = diagnosis
alert['message'] = self._generate_message(alert)
self.alert_history.append(alert)
return alert
def _generate_message(self, alert):
"""알림 메시지 생성"""
status_emoji = {'WARNING': '⚠️', 'CRITICAL': '🚨'}
emoji = status_emoji.get(alert['status'], '')
msg = f"{emoji} [{alert['status']}] Equipment {alert['equipment_id']}\n"
msg += f"Anomaly Score: {alert['score']:.3f}\n"
msg += "Suspected Issues:\n"
for feature, deviation in alert['diagnosis'][:3]:
msg += f" - {feature}: {deviation:.1f}σ deviation\n"
return msg
# 072 사용 예
alert_system = AlertSystem(monitor)
# 072 이상 데이터 테스트
anomaly_sample = X[y_true == 1].iloc[[0]]
alert = alert_system.check_and_alert(anomaly_sample, 'PUMP-001')
if alert['status'] != 'NORMAL':
print(alert['message'])
대시보드 시각화
import matplotlib.pyplot as plt
import numpy as np
# 072 전체 데이터 모니터링 결과
results = monitor.monitor(X)
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 1. 시계열 이상 점수
axes[0, 0].plot(results['anomaly_score'], alpha=0.7)
axes[0, 0].axhline(y=0.7, color='red', linestyle='--', label='Critical Threshold')
axes[0, 0].axhline(y=0.3, color='orange', linestyle='--', label='Warning Threshold')
axes[0, 0].set_xlabel('Time Index')
axes[0, 0].set_ylabel('Anomaly Score')
axes[0, 0].set_title('Real-time Anomaly Score')
axes[0, 0].legend()
# 2. 상태 분포
status_counts = results['status'].value_counts()
colors = {'NORMAL': 'green', 'WARNING': 'orange', 'CRITICAL': 'red'}
axes[0, 1].pie(status_counts.values, labels=status_counts.index,
colors=[colors[s] for s in status_counts.index],
autopct='%1.1f%%')
axes[0, 1].set_title('Status Distribution')
# 3. 센서별 기여도 (이상 데이터)
anomaly_indices = np.where(y_true == 1)[0][:100]
feature_contributions = {}
for col in feature_cols:
if col in X.columns:
z_scores = abs((X.iloc[anomaly_indices][col] - monitor.feature_means[col]) /
(monitor.feature_stds[col] + 1e-10))
feature_contributions[col] = z_scores.mean()
axes[1, 0].barh(list(feature_contributions.keys()), list(feature_contributions.values()),
color='steelblue')
axes[1, 0].set_xlabel('Average Z-Score')
axes[1, 0].set_title('Feature Contribution to Anomalies')
# 4. 이상 점수 분포
axes[1, 1].hist(results.loc[y_true == 0, 'anomaly_score'], bins=50, alpha=0.7,
label='Normal', density=True)
axes[1, 1].hist(results.loc[y_true == 1, 'anomaly_score'], bins=50, alpha=0.7,
label='Anomaly', density=True)
axes[1, 1].axvline(x=0.7, color='red', linestyle='--')
axes[1, 1].set_xlabel('Anomaly Score')
axes[1, 1].set_ylabel('Density')
axes[1, 1].set_title('Score Distribution by Class')
axes[1, 1].legend()
plt.tight_layout()
plt.savefig('equipment_dashboard.png', dpi=150)
모델 업데이트 전략
class AdaptiveMonitor:
"""적응형 모니터링 (모델 주기적 업데이트)"""
def __init__(self, update_interval=1000):
self.update_interval = update_interval
self.sample_count = 0
self.recent_normal_data = []
self.monitor = None
def initialize(self, X):
"""초기 모델 학습"""
self.monitor = EquipmentMonitor()
self.monitor.fit(X)
return self
def process(self, X):
"""데이터 처리 및 필요시 업데이트"""
result = self.monitor.monitor(X)
# 정상 데이터 수집
if result['status'].values[0] == 'NORMAL':
self.recent_normal_data.append(X.values[0])
self.sample_count += 1
# 주기적 업데이트
if self.sample_count >= self.update_interval and len(self.recent_normal_data) > 100:
self._update_model()
self.sample_count = 0
return result
def _update_model(self):
"""모델 업데이트"""
recent_df = pd.DataFrame(self.recent_normal_data, columns=X.columns)
self.monitor.fit(recent_df)
self.recent_normal_data = self.recent_normal_data[-500:] # 최근 500개만 유지
print(f"모델 업데이트 완료 (학습 데이터: {len(recent_df)})")
장단점
장점:
- 레이블 없이 학습 가능
- 다양한 이상 유형 탐지
- 원인 진단 가능
- 실시간 처리
단점:
- 컨셉 드리프트 대응 필요
- 임계값 설정 중요
- 센서 노이즈 영향
정리
- 설비 이상 감지는 제조업 핵심 응용
- 다변량 센서 데이터 분석
- 특성 엔지니어링이 중요
- 앙상블로 안정성 향상
- 실시간 모니터링 시스템 구축
다음 글 예고
다음 글에서는 시계열 예측의 이해를 다룹니다.
PyCaret 머신러닝 마스터 시리즈 #072