본문으로 건너뛰기

074 병렬 처리로 학습 가속화

키워드: 병렬 처리, parallel, n_jobs

개요

병렬 처리를 활용하면 FLAML의 AutoML 속도를 크게 향상시킬 수 있습니다. 이 글에서는 멀티코어 CPU를 활용한 병렬 학습과 설정 방법을 알아봅니다.

실습 환경

  • Python 버전: 3.11 권장
  • 필요 패키지: flaml[automl]
pip install flaml[automl] pandas numpy

병렬 처리의 종류

import numpy as np
import pandas as pd
import multiprocessing
from flaml import AutoML

print(f"시스템 CPU 코어 수: {multiprocessing.cpu_count()}")

parallel_types = {
'유형': ['모델 내 병렬', '탐색 병렬', '교차 검증 병렬'],
'설명': [
'단일 모델 학습을 병렬화',
'여러 설정을 동시 평가',
'CV fold를 병렬 실행'
],
'FLAML 파라미터': [
'n_jobs (모델별)',
'n_concurrent_trials',
'n_jobs (cv)'
]
}

print("\n병렬 처리 유형:")
print(pd.DataFrame(parallel_types).to_string(index=False))

n_jobs 설정

from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
import time

# 074 데이터 준비
X, y = make_classification(n_samples=5000, n_features=30,
n_informative=15, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 074 n_jobs=-1: 모든 코어 사용
automl = AutoML()
start = time.time()
automl.fit(
X_train, y_train,
task="classification",
time_budget=30,
n_jobs=-1, # 모든 코어 사용
verbose=1
)
elapsed = time.time() - start

print(f"\nn_jobs=-1 (모든 코어) 소요 시간: {elapsed:.2f}초")
print(f"최적 모델: {automl.best_estimator}")

코어 수별 성능 비교

def benchmark_njobs(X_train, y_train, n_jobs_list, time_budget=20):
"""n_jobs별 성능 벤치마크"""
results = []

for n_jobs in n_jobs_list:
automl = AutoML()

start = time.time()
automl.fit(
X_train, y_train,
task="classification",
time_budget=time_budget,
n_jobs=n_jobs,
verbose=0
)
elapsed = time.time() - start

results.append({
'n_jobs': n_jobs,
'elapsed': elapsed,
'trials': len(automl.config_history),
'best_loss': automl.best_loss
})

print(f"n_jobs={n_jobs}: {elapsed:.2f}s, {len(automl.config_history)} trials")

return pd.DataFrame(results)

# 074 벤치마크 실행 (시간이 오래 걸릴 수 있음)
# 074 results = benchmark_njobs(X_train, y_train, [1, 2, 4, -1])

print("\nn_jobs 권장값:")
print(" - 단일 모델: -1 (모든 코어)")
print(" - 메모리 제한: 코어 수의 절반")
print(" - 공유 서버: 할당된 코어 수")

모델별 병렬 처리

# 074 각 모델의 병렬 처리 지원
model_parallel = {
'모델': ['LightGBM', 'XGBoost', 'Random Forest', 'Extra Trees', 'CatBoost'],
'n_jobs 지원': ['Yes', 'Yes', 'Yes', 'Yes', 'Yes (thread_count)'],
'효과': ['높음', '높음', '매우 높음', '매우 높음', '높음'],
'메모리 증가': ['낮음', '낮음', '높음', '높음', '낮음']
}

print("모델별 병렬 처리:")
print(pd.DataFrame(model_parallel).to_string(index=False))

커스텀 병렬 설정

# 074 모델별로 다른 n_jobs 설정
custom_hp = {
'lgbm': {
'n_estimators': {'domain': list(range(50, 300, 25))},
# LightGBM은 내부적으로 병렬화
},
'rf': {
'n_estimators': {'domain': list(range(50, 200, 25))},
'n_jobs': {'domain': [-1], 'init_value': -1}, # RF에 n_jobs 전달
}
}

automl_custom = AutoML()
automl_custom.fit(
X_train, y_train,
task="classification",
time_budget=30,
estimator_list=['lgbm', 'rf'],
custom_hp=custom_hp,
n_jobs=-1, # FLAML 레벨 병렬화
verbose=1
)

print(f"\n커스텀 병렬 설정 결과: {automl_custom.best_estimator}")

메모리 고려사항

import psutil

# 074 현재 메모리 상태
memory = psutil.virtual_memory()
print("시스템 메모리 상태:")
print(f" 총 메모리: {memory.total / 1e9:.1f} GB")
print(f" 사용 가능: {memory.available / 1e9:.1f} GB")
print(f" 사용률: {memory.percent}%")

# 074 메모리 기반 n_jobs 권장
def recommend_njobs(data_size_mb, available_gb):
"""데이터 크기와 메모리 기반 n_jobs 권장"""
# 각 워커가 데이터 복사본 필요
max_workers = int(available_gb * 1000 / (data_size_mb * 2)) # 2배 여유
cpu_count = multiprocessing.cpu_count()

recommended = min(max_workers, cpu_count)
return max(1, recommended)

# 074 예시
data_size = X_train.nbytes / 1e6 # MB
print(f"\n데이터 크기: {data_size:.1f} MB")
print(f"권장 n_jobs: {recommend_njobs(data_size, memory.available / 1e9)}")

병렬 교차 검증

from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier
import time

# 074 교차 검증 병렬화 비교
model = RandomForestClassifier(n_estimators=100, random_state=42)

# 074 순차 CV
start = time.time()
scores_seq = cross_val_score(model, X_train, y_train, cv=5, n_jobs=1)
time_seq = time.time() - start

# 074 병렬 CV
start = time.time()
scores_par = cross_val_score(model, X_train, y_train, cv=5, n_jobs=-1)
time_par = time.time() - start

print("교차 검증 병렬화 효과:")
print(f" 순차 (n_jobs=1): {time_seq:.2f}s")
print(f" 병렬 (n_jobs=-1): {time_par:.2f}s")
print(f" 속도 향상: {time_seq/time_par:.2f}x")

대규모 데이터 전략

strategies = {
'데이터 크기': ['< 100MB', '100MB - 1GB', '> 1GB'],
'n_jobs': ['-1', '코어 수 / 2', '1-2'],
'추가 전략': [
'전체 병렬화',
'메모리 모니터링',
'샘플링 + 병렬화'
],
'주의사항': [
'없음',
'OOM 주의',
'데이터 로딩 병목'
]
}

print("\n대규모 데이터 병렬 처리 전략:")
print(pd.DataFrame(strategies).to_string(index=False))

하이퍼파라미터 탐색 병렬화

from flaml import tune

# 074 tune.run의 병렬 탐색
def objective(config):
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score

model = RandomForestClassifier(
n_estimators=config['n_estimators'],
max_depth=config['max_depth'],
n_jobs=1, # 개별 모델은 단일 코어
random_state=42
)
scores = cross_val_score(model, X_train, y_train, cv=3)
return {'accuracy': scores.mean()}

search_space = {
'n_estimators': tune.randint(50, 200),
'max_depth': tune.randint(3, 15),
}

# 074 병렬 탐색
analysis = tune.run(
objective,
config=search_space,
metric='accuracy',
mode='max',
num_samples=20,
time_budget_s=60,
use_ray=False, # Ray 없이 로컬 병렬화
verbose=1
)

print(f"\n병렬 탐색 결과: {analysis.best_config}")

성능 최적화 팁

tips = {
'상황': [
'학습 시간 > 데이터 로딩',
'학습 시간 < 데이터 로딩',
'메모리 부족',
'많은 설정 탐색'
],
'권장': [
'n_jobs=-1',
'데이터 캐싱, 적은 n_jobs',
'n_jobs=1-2, 샘플링',
'Ray 분산 처리'
],
'이유': [
'학습 병렬화가 효과적',
'데이터 복사 오버헤드',
'메모리 공유 최소화',
'여러 노드 활용'
]
}

print("\n병렬 처리 최적화 팁:")
print(pd.DataFrame(tips).to_string(index=False))

정리

  • n_jobs=-1: 모든 코어 사용 (기본 권장)
  • 모델 내 병렬: RF, XGBoost, LightGBM 등 지원
  • CV 병렬: cross_val_score의 n_jobs
  • 메모리 고려: 데이터 크기에 따라 조정
  • 대규모 데이터: 샘플링 + 적은 n_jobs
  • 병렬화 효과는 모델과 데이터에 따라 다름

다음 글 예고

다음 글에서는 Ray 연동 - 분산 AutoML에 대해 알아보겠습니다. Ray를 활용한 대규모 분산 학습을 다룹니다.


FLAML AutoML 마스터 시리즈 #074