089 파이프라인과 전처리
키워드: 파이프라인, pipeline
개요
파이프라인(Pipeline)은 데이터 전처리부터 모델 학습까지의 과정을 하나의 객체로 묶어 관리하는 방법입니다. 코드를 깔끔하게 하고, 데이터 누출을 방지하며, 재현성을 보장합니다.
실습 환경
- Python 버전: 3.11 권장
- 필요 패키지:
pycaret[full]>=3.0
파이프라인의 필요성
파이프라인 없이:
1. 스케일링 fit → transform
2. 특성 선택 fit → transform
3. 모델 fit → predict
→ 각 단계 따로 관리, 실수 발생 쉬움
파이프라인으로:
pipeline.fit(X_train, y_train)
pipeline.predict(X_test)
→ 한 번에 처리, 데이터 누출 방지
sklearn 파이프라인 기본
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from pycaret.datasets import get_data
# 089 데이터 로드
data = get_data('diabetes')
X = data.drop('Class variable', axis=1)
y = data['Class variable']
# 089 학습/테스트 분할
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 089 파이프라인 구성
pipeline = Pipeline([
('scaler', StandardScaler()),
('selector', SelectKBest(f_classif, k=5)),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
# 089 학습
pipeline.fit(X_train, y_train)
# 089 예측
y_pred = pipeline.predict(X_test)
# 089 평가
from sklearn.metrics import accuracy_score
print(f"정확도: {accuracy_score(y_test, y_pred):.4f}")
make_pipeline
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
# 089 make_pipeline: 자동 이름 지정
pipeline = make_pipeline(
StandardScaler(),
RandomForestClassifier(n_estimators=100, random_state=42)
)
pipeline.fit(X_train, y_train)
print(f"정확도: {pipeline.score(X_test, y_test):.4f}")
# 089 단계 확인
print("파이프라인 단계:")
for name, step in pipeline.named_steps.items():
print(f" {name}: {type(step).__name__}")
전처리 단계 구성
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
import pandas as pd
import numpy as np
# 089 혼합 데이터 생성
np.random.seed(42)
data_mixed = pd.DataFrame({
'age': [25, 30, np.nan, 45, 50, 35, np.nan, 40],
'income': [50000, 60000, 70000, np.nan, 90000, 55000, 65000, np.nan],
'gender': ['M', 'F', 'M', 'F', 'M', 'F', 'M', 'F'],
'city': ['Seoul', 'Busan', 'Seoul', 'Daegu', 'Seoul', 'Busan', 'Daegu', 'Seoul'],
'target': [0, 1, 0, 1, 1, 0, 1, 0]
})
X = data_mixed.drop('target', axis=1)
y = data_mixed['target']
# 089 수치형/범주형 특성 분리
numeric_features = ['age', 'income']
categorical_features = ['gender', 'city']
# 089 전처리기 정의
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
# 089 ColumnTransformer로 결합
preprocessor = ColumnTransformer([
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
# 089 최종 파이프라인
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
# 089 학습 및 예측
full_pipeline.fit(X, y)
print(f"학습 완료")
PyCaret의 내부 파이프라인
from pycaret.classification import *
from pycaret.datasets import get_data
data = get_data('diabetes')
# 089 setup()이 자동으로 파이프라인 구성
clf = setup(
data,
target='Class variable',
normalize=True, # StandardScaler
transformation=True, # 분포 변환
remove_outliers=True, # 이상치 제거
fix_imbalance=True, # SMOTE
session_id=42,
verbose=False
)
# 089 모델 생성
rf = create_model('rf')
# 089 전체 파이프라인 확인
pipeline = get_config('pipeline')
print(f"파이프라인 단계 수: {len(pipeline.steps)}")
for name, step in pipeline.steps:
print(f" {name}: {type(step).__name__}")
파이프라인 저장 및 로드
from pycaret.classification import *
import joblib
# 089 설정 및 모델 생성
clf = setup(data, target='Class variable', normalize=True, session_id=42, verbose=False)
rf = create_model('rf')
# 089 전체 데이터로 재학습
final_model = finalize_model(rf)
# 089 PyCaret 방식 저장
save_model(final_model, 'my_pipeline')
# 089 로드
loaded = load_model('my_pipeline')
# 089 예측
predictions = predict_model(loaded, data=data.head())
print(predictions)
sklearn 파이프라인 저장
import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
# 089 파이프라인 구성 및 학습
pipeline = Pipeline([
('scaler', StandardScaler()),
('clf', RandomForestClassifier(n_estimators=100, random_state=42))
])
pipeline.fit(X_train, y_train)
# 089 저장
joblib.dump(pipeline, 'sklearn_pipeline.pkl')
# 089 로드
loaded_pipeline = joblib.load('sklearn_pipeline.pkl')
# 089 예측
y_pred = loaded_pipeline.predict(X_test)
print(f"로드 후 정확도: {accuracy_score(y_test, y_pred):.4f}")
파이프라인 하이퍼파라미터 튜닝
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
# 089 파이프라인
pipeline = Pipeline([
('scaler', StandardScaler()),
('clf', RandomForestClassifier(random_state=42))
])
# 089 파라미터 그리드 (step__parameter 형식)
param_grid = {
'clf__n_estimators': [50, 100, 200],
'clf__max_depth': [3, 5, 10, None],
'clf__min_samples_split': [2, 5, 10]
}
# 089 GridSearchCV
grid_search = GridSearchCV(
pipeline,
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1
)
grid_search.fit(X_train, y_train)
print(f"최적 파라미터: {grid_search.best_params_}")
print(f"최적 점수: {grid_search.best_score_:.4f}")
커스텀 변환기
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
class LogTransformer(BaseEstimator, TransformerMixin):
"""로그 변환 커스텀 변환기"""
def __init__(self, features=None):
self.features = features
def fit(self, X, y=None):
return self
def transform(self, X):
X = X.copy()
if self.features:
for col in self.features:
X[col] = np.log1p(X[col])
else:
X = np.log1p(X)
return X
# 089 파이프라인에서 사용
pipeline = Pipeline([
('log', LogTransformer(features=['income'])),
('scaler', StandardScaler()),
('clf', RandomForestClassifier(random_state=42))
])
FunctionTransformer
from sklearn.preprocessing import FunctionTransformer
import numpy as np
# 089 간단한 변환은 FunctionTransformer
log_transformer = FunctionTransformer(np.log1p, validate=True)
pipeline = Pipeline([
('log', log_transformer),
('scaler', StandardScaler()),
('clf', RandomForestClassifier(random_state=42))
])
데이터 누출 방지
# 089 잘못된 예 (데이터 누출)
from sklearn.preprocessing import StandardScaler
# 089 전체 데이터로 fit (테스트 데이터 정보 사용!)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) # 전체 데이터!
X_train, X_test = X_scaled[:800], X_scaled[800:]
# 089 올바른 예 (파이프라인)
from sklearn.pipeline import Pipeline
pipeline = Pipeline([
('scaler', StandardScaler()),
('clf', RandomForestClassifier())
])
# 089 학습 데이터만으로 fit
pipeline.fit(X_train, y_train)
# 089 테스트는 transform만 (새 데이터 취급)
전처리 옵션 정리
from pycaret.classification import *
# 089 PyCaret setup의 주요 전처리 옵션
clf = setup(
data,
target='target',
# 결측치
imputation_type='simple',
numeric_imputation='mean',
categorical_imputation='mode',
# 스케일링
normalize=True,
normalize_method='zscore', # 'minmax', 'maxabs', 'robust'
# 변환
transformation=False,
transformation_method='yeo-johnson',
# 이상치
remove_outliers=False,
outliers_threshold=0.05,
# 불균형
fix_imbalance=False,
fix_imbalance_method='SMOTE',
# PCA
pca=False,
pca_components=None,
# 특성
ignore_features=None,
low_variance_threshold=None,
session_id=42,
verbose=False
)
정리
- 파이프라인: 전처리 + 모델을 하나로 묶음
- 데이터 누출 방지 (fit은 학습 데이터만)
- 재현성 보장
- PyCaret은 내부적으로 파이프라인 사용
- 커스텀 변환기로 확장 가능
다음 글 예고
다음 글에서는 **실험 로깅 (MLflow)**을 다룹니다.
PyCaret 머신러닝 마스터 시리즈 #089