본문으로 건너뛰기

033 sklearn 파이프라인과 FLAML 연동

키워드: 파이프라인, Pipeline, 전처리

개요

sklearn 파이프라인을 사용하면 전처리와 모델링을 하나의 워크플로우로 통합할 수 있습니다. 이 글에서는 FLAML을 파이프라인에 통합하여 재현 가능하고 배포하기 쉬운 ML 시스템을 구축하는 방법을 알아봅니다.

실습 환경

  • Python 버전: 3.11 권장
  • 필요 패키지: flaml[automl], scikit-learn, pandas
pip install flaml[automl] scikit-learn pandas

파이프라인이란?

기본 개념

여러 데이터 처리 단계를 순차적으로 연결합니다.

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

# 033 기본 파이프라인 예시
simple_pipeline = Pipeline([
('scaler', StandardScaler()), # 1단계: 스케일링
('classifier', LogisticRegression()) # 2단계: 분류
])

# 033 파이프라인 구조 확인
print("파이프라인 단계:")
for name, step in simple_pipeline.steps:
print(f" {name}: {step.__class__.__name__}")

파이프라인의 장점

advantages = {
'코드 간결성': '전처리 + 모델을 하나로 통합',
'데이터 누수 방지': 'fit/transform이 적절하게 분리됨',
'재현성': '동일한 파이프라인으로 일관된 결과',
'배포 용이': '전체 워크플로우를 하나의 객체로 저장',
'교차 검증': '각 폴드에서 전처리도 함께 수행'
}

for advantage, description in advantages.items():
print(f"✓ {advantage}: {description}")

FLAML과 파이프라인 통합

방법 1: 파이프라인 내 FLAML

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from flaml import AutoML

# 033 샘플 데이터 생성
np.random.seed(42)
n_samples = 1000

# 033 결측치가 있는 데이터
X = pd.DataFrame({
'feature_1': np.random.randn(n_samples),
'feature_2': np.random.randn(n_samples) * 10 + 50,
'feature_3': np.random.randn(n_samples),
})
# 033 결측치 추가
X.loc[np.random.choice(n_samples, 50), 'feature_1'] = np.nan
X.loc[np.random.choice(n_samples, 30), 'feature_2'] = np.nan

y = (X['feature_1'].fillna(0) + X['feature_2'].fillna(50) / 10 + np.random.randn(n_samples) > 5).astype(int)

# 033 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

print(f"학습 데이터 결측치:")
print(X_train.isnull().sum())

# 033 FLAML AutoML 객체 생성
automl = AutoML()

# 033 파이프라인 구성
pipeline = Pipeline([
('imputer', SimpleImputer(strategy='mean')), # 결측치 처리
('scaler', StandardScaler()), # 스케일링
('classifier', automl) # FLAML
])

# 033 파이프라인 학습
# 033 주의: FLAML은 fit() 시 추가 파라미터가 필요
# 033 파이프라인에서는 set_params 또는 직접 설정 필요

방법 2: 전처리 후 FLAML (권장)

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

# 033 더 복잡한 데이터 생성
np.random.seed(42)
n_samples = 1000

data = pd.DataFrame({
'age': np.random.randint(18, 80, n_samples),
'income': np.random.uniform(20000, 150000, n_samples),
'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n_samples),
'gender': np.random.choice(['M', 'F'], n_samples),
'score': np.random.randn(n_samples) * 10 + 50,
})

# 033 결측치 추가
data.loc[np.random.choice(n_samples, 50), 'age'] = np.nan
data.loc[np.random.choice(n_samples, 30), 'income'] = np.nan

# 033 타겟 생성
y = ((data['age'].fillna(40) > 40) & (data['income'].fillna(70000) > 60000)).astype(int)

print("데이터 샘플:")
print(data.head())
print(f"\n데이터 타입:\n{data.dtypes}")

# 033 수치형/범주형 컬럼 분리
numeric_features = ['age', 'income', 'score']
categorical_features = ['education', 'gender']

# 033 전처리 파이프라인 정의
preprocessor = ColumnTransformer(
transformers=[
('num', Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
]), numeric_features),
('cat', Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('encoder', OneHotEncoder(drop='first', sparse_output=False))
]), categorical_features)
]
)

# 033 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(
data, y, test_size=0.2, random_state=42
)

# 033 전처리 적용
X_train_processed = preprocessor.fit_transform(X_train)
X_test_processed = preprocessor.transform(X_test)

print(f"\n전처리 후 shape: {X_train_processed.shape}")

# 033 FLAML 학습
automl = AutoML()
automl.fit(
X_train_processed, y_train,
task="classification",
time_budget=60,
metric="accuracy",
verbose=0
)

print(f"최적 모델: {automl.best_estimator}")
print(f"테스트 정확도: {automl.score(X_test_processed, y_test):.4f}")

완전한 파이프라인 클래스

class FLAMLPipeline:
"""전처리와 FLAML을 통합한 파이프라인"""

def __init__(self, preprocessor, task="classification", time_budget=60, metric="accuracy"):
self.preprocessor = preprocessor
self.task = task
self.time_budget = time_budget
self.metric = metric
self.automl = None

def fit(self, X, y, **flaml_kwargs):
"""전처리 + FLAML 학습"""
# 전처리 fit_transform
X_processed = self.preprocessor.fit_transform(X)

# FLAML 학습
self.automl = AutoML()
self.automl.fit(
X_processed, y,
task=self.task,
time_budget=self.time_budget,
metric=self.metric,
**flaml_kwargs
)
return self

def predict(self, X):
"""전처리 + 예측"""
X_processed = self.preprocessor.transform(X)
return self.automl.predict(X_processed)

def predict_proba(self, X):
"""전처리 + 확률 예측"""
X_processed = self.preprocessor.transform(X)
return self.automl.predict_proba(X_processed)

def score(self, X, y):
"""전처리 + 평가"""
X_processed = self.preprocessor.transform(X)
return self.automl.score(X_processed, y)

# 033 사용 예
flaml_pipeline = FLAMLPipeline(
preprocessor=preprocessor,
task="classification",
time_budget=60,
metric="accuracy"
)

flaml_pipeline.fit(X_train, y_train, verbose=0)
print(f"파이프라인 테스트 정확도: {flaml_pipeline.score(X_test, y_test):.4f}")

파이프라인 저장 및 로드

import pickle

# 033 파이프라인 저장
with open('flaml_pipeline.pkl', 'wb') as f:
pickle.dump(flaml_pipeline, f)

print("파이프라인 저장 완료: flaml_pipeline.pkl")

# 033 파이프라인 로드
with open('flaml_pipeline.pkl', 'rb') as f:
loaded_pipeline = pickle.load(f)

# 033 새 데이터로 예측
new_data = pd.DataFrame({
'age': [35, 55],
'income': [75000, 120000],
'education': ['Bachelor', 'PhD'],
'gender': ['M', 'F'],
'score': [55, 65]
})

predictions = loaded_pipeline.predict(new_data)
probabilities = loaded_pipeline.predict_proba(new_data)

print("\n새 데이터 예측:")
for i in range(len(new_data)):
print(f" 샘플 {i+1}: 예측={predictions[i]}, 확률={probabilities[i][1]:.4f}")

교차 검증과 파이프라인

from sklearn.model_selection import cross_val_score

# 033 sklearn 호환 래퍼 (간단한 버전)
class FLAMLEstimator:
"""sklearn 호환 FLAML 래퍼"""

def __init__(self, task="classification", time_budget=30, metric="accuracy"):
self.task = task
self.time_budget = time_budget
self.metric = metric
self.model = None

def fit(self, X, y):
self.model = AutoML()
self.model.fit(
X, y,
task=self.task,
time_budget=self.time_budget,
metric=self.metric,
verbose=0
)
return self

def predict(self, X):
return self.model.predict(X)

def predict_proba(self, X):
return self.model.predict_proba(X)

def score(self, X, y):
return self.model.score(X, y)

def get_params(self, deep=True):
return {
'task': self.task,
'time_budget': self.time_budget,
'metric': self.metric
}

def set_params(self, **params):
for key, value in params.items():
setattr(self, key, value)
return self

# 033 파이프라인에서 교차 검증
from sklearn.pipeline import make_pipeline

# 033 전체 파이프라인 (간단한 버전)
full_pipeline = make_pipeline(
SimpleImputer(strategy='mean'),
StandardScaler(),
FLAMLEstimator(time_budget=20)
)

# 033 교차 검증 (시간이 오래 걸릴 수 있음)
# 033 cv_scores = cross_val_score(full_pipeline, X_train_processed, y_train, cv=3)
# 033 print(f"교차 검증 점수: {cv_scores.mean():.4f} (+/- {cv_scores.std()*2:.4f})")

고급: 특성 엔지니어링 포함

from sklearn.base import BaseEstimator, TransformerMixin

class FeatureEngineer(BaseEstimator, TransformerMixin):
"""커스텀 특성 엔지니어링"""

def __init__(self, create_interactions=True):
self.create_interactions = create_interactions

def fit(self, X, y=None):
return self

def transform(self, X):
X_new = X.copy()

if self.create_interactions:
# 수치형 특성 간 상호작용
numeric_cols = X_new.select_dtypes(include=[np.number]).columns
for i, col1 in enumerate(numeric_cols):
for col2 in numeric_cols[i+1:]:
X_new[f'{col1}_x_{col2}'] = X_new[col1] * X_new[col2]

return X_new

# 033 특성 엔지니어링 포함 파이프라인
advanced_preprocessor = Pipeline([
('feature_eng', FeatureEngineer(create_interactions=True)),
('column_transform', ColumnTransformer([
('num', Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
]), numeric_features + [c for c in data.columns if '_x_' in c]),
('cat', Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('encoder', OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore'))
]), categorical_features)
], remainder='passthrough'))
])

print("고급 전처리 파이프라인 구성 완료")

정리

  • 파이프라인은 전처리와 모델링을 통합합니다.
  • 데이터 누수를 방지하고 재현성을 보장합니다.
  • FLAML은 파이프라인의 마지막 단계로 사용합니다.
  • ColumnTransformer로 수치형/범주형을 다르게 처리합니다.
  • 커스텀 클래스로 FLAML 파이프라인을 만들 수 있습니다.
  • 전체 파이프라인을 pickle로 저장하여 배포합니다.
  • 교차 검증에도 파이프라인을 사용할 수 있습니다.

다음 글 예고

다음 글에서는 범주형 특성 자동 처리에 대해 알아보겠습니다. FLAML이 범주형 변수를 어떻게 처리하는지 자세히 다룹니다.


FLAML AutoML 마스터 시리즈 #033