075 Ray 연동 - 분산 AutoML
키워드: Ray, 분산 학습, distributed
개요
Ray는 대규모 분산 컴퓨팅을 위한 프레임워크입니다. FLAML과 Ray를 연동하면 클러스터 환경에서 AutoML을 확장할 수 있습니다. 이 글에서는 Ray를 활용한 분산 AutoML 구현 방법을 알아봅니다.
실습 환경
- Python 버전: 3.11 권장
- 필요 패키지:
flaml[automl], ray
pip install flaml[automl] ray pandas numpy
Ray 기초
import ray
import numpy as np
import pandas as pd
# 075 Ray 초기화
ray.init(ignore_reinit_error=True)
print(f"Ray 클러스터 정보:")
print(f" 노드 수: {len(ray.nodes())}")
print(f" 사용 가능 CPU: {ray.cluster_resources().get('CPU', 0)}")
print(f" 사용 가능 메모리: {ray.cluster_resources().get('memory', 0) / 1e9:.1f} GB")
# 075 간단한 Ray 예제
@ray.remote
def train_model(config, data):
"""원격 실행될 학습 함수"""
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
X, y = data
model = RandomForestClassifier(**config, random_state=42)
scores = cross_val_score(model, X, y, cv=3)
return scores.mean()
# 075 데이터 준비
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=2000, n_features=20, random_state=42)
# 075 병렬 실행
configs = [
{'n_estimators': 50, 'max_depth': 5},
{'n_estimators': 100, 'max_depth': 10},
{'n_estimators': 150, 'max_depth': 15},
]
futures = [train_model.remote(c, (X, y)) for c in configs]
results = ray.get(futures)
print("\nRay 병렬 실행 결과:")
for config, score in zip(configs, results):
print(f" {config}: {score:.4f}")
FLAML + Ray tune
from flaml import tune
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
def objective(config):
"""Ray tune 목적 함수"""
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score
model = GradientBoostingClassifier(
n_estimators=config['n_estimators'],
max_depth=config['max_depth'],
learning_rate=config['learning_rate'],
random_state=42
)
scores = cross_val_score(model, X_train, y_train, cv=3)
return {'accuracy': scores.mean()}
# 075 탐색 공간
search_space = {
'n_estimators': tune.randint(50, 300),
'max_depth': tune.randint(3, 15),
'learning_rate': tune.loguniform(0.01, 0.3),
}
# 075 Ray를 사용한 분산 탐색
analysis = tune.run(
objective,
config=search_space,
metric='accuracy',
mode='max',
num_samples=20,
use_ray=True, # Ray 사용
verbose=1
)
print(f"\nRay 분산 탐색 결과:")
print(f" 최적 설정: {analysis.best_config}")
print(f" 최적 정확도: {analysis.best_result['accuracy']:.4f}")
Ray 클러스터 설정
# 075 로컬 클러스터 설정
ray.shutdown()
ray.init(
num_cpus=4, # 사용할 CPU 수
num_gpus=0, # 사용할 GPU 수
include_dashboard=False,
ignore_reinit_error=True
)
print("로컬 Ray 클러스터 설정:")
print(f" CPU: {ray.cluster_resources()['CPU']}")
# 075 원격 클러스터 연결 (예시)
# 075 ray.init(address="ray://192.168.1.100:10001")
Ray + FLAML AutoML
from flaml import AutoML
# 075 Ray를 사용한 FLAML AutoML
automl = AutoML()
automl.fit(
X_train, y_train,
task="classification",
time_budget=60,
n_concurrent_trials=4, # 동시 탐색 수
use_ray=True, # Ray 사용
verbose=1
)
print(f"\nRay + FLAML AutoML 결과:")
print(f" 최적 모델: {automl.best_estimator}")
print(f" 최적 설정: {automl.best_config}")
대규모 데이터 분산 처리
# 075 대규모 데이터용 분산 학습
@ray.remote
def distributed_cv_fold(model_class, model_params, X_train, y_train, X_val, y_val):
"""분산 교차 검증 fold"""
from sklearn.metrics import accuracy_score
model = model_class(**model_params)
model.fit(X_train, y_train)
y_pred = model.predict(X_val)
return accuracy_score(y_val, y_pred)
def distributed_cross_validation(model_class, params, X, y, n_splits=5):
"""분산 교차 검증"""
from sklearn.model_selection import KFold
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
futures = []
for train_idx, val_idx in kf.split(X):
X_train_fold = X[train_idx]
y_train_fold = y[train_idx]
X_val_fold = X[val_idx]
y_val_fold = y[val_idx]
future = distributed_cv_fold.remote(
model_class, params,
X_train_fold, y_train_fold,
X_val_fold, y_val_fold
)
futures.append(future)
scores = ray.get(futures)
return np.mean(scores), np.std(scores)
# 075 분산 CV 실행
from sklearn.ensemble import RandomForestClassifier
mean_score, std_score = distributed_cross_validation(
RandomForestClassifier,
{'n_estimators': 100, 'max_depth': 10, 'random_state': 42},
X, y, n_splits=5
)
print(f"\n분산 교차 검증 결과:")
print(f" 평균 정확도: {mean_score:.4f} (±{std_score:.4f})")
Ray Data로 대용량 데이터 처리
# 075 Ray Dataset 사용 (대용량 데이터)
try:
import ray.data as rd
# DataFrame을 Ray Dataset으로 변환
df = pd.DataFrame(X, columns=[f'f{i}' for i in range(X.shape[1])])
df['target'] = y
# Ray Dataset 생성
ds = rd.from_pandas(df)
print("Ray Dataset 정보:")
print(f" 행 수: {ds.count()}")
print(f" 스키마: {ds.schema()}")
except ImportError:
print("ray.data를 사용하려면 ray[data]를 설치하세요")
리소스 할당
# 075 작업별 리소스 할당
@ray.remote(num_cpus=2, num_gpus=0)
def resource_intensive_training(config, data):
"""리소스 집약적 학습"""
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
X, y = data
model = RandomForestClassifier(**config, n_jobs=2)
scores = cross_val_score(model, X, y, cv=3)
return scores.mean()
# 075 리소스 제한 실행
future = resource_intensive_training.remote(
{'n_estimators': 100, 'max_depth': 10},
(X_train, y_train)
)
result = ray.get(future)
print(f"\n리소스 할당 학습 결과: {result:.4f}")
분산 하이퍼파라미터 탐색
from ray import tune as ray_tune
def trainable(config):
"""Ray Tune trainable 함수"""
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score
from ray import train
model = GradientBoostingClassifier(
n_estimators=config['n_estimators'],
max_depth=config['max_depth'],
learning_rate=config['learning_rate'],
random_state=42
)
scores = cross_val_score(model, X_train, y_train, cv=3)
# Ray Tune에 결과 보고
train.report({'accuracy': scores.mean()})
# 075 Ray Tune 실행
try:
tuner = ray_tune.Tuner(
trainable,
param_space={
'n_estimators': ray_tune.randint(50, 200),
'max_depth': ray_tune.randint(3, 12),
'learning_rate': ray_tune.loguniform(0.01, 0.2),
},
tune_config=ray_tune.TuneConfig(
num_samples=10,
metric='accuracy',
mode='max',
),
)
results = tuner.fit()
print(f"\nRay Tune 최적 설정: {results.get_best_result().config}")
except Exception as e:
print(f"Ray Tune 실행 중 오류: {e}")
Ray 모니터링
# 075 클러스터 상태 확인
print("\nRay 클러스터 상태:")
print(f" 활성 노드: {len(ray.nodes())}")
print(f" 사용 가능 리소스: {ray.available_resources()}")
print(f" 사용 중 리소스: {ray.cluster_resources()}")
# 075 Ray 종료
ray.shutdown()
print("\nRay 클러스터 종료됨")
Ray 사용 가이드
guide = {
'상황': ['단일 머신', '소규모 클러스터', '대규모 클러스터'],
'권장 설정': [
'n_concurrent_trials=코어수',
'노드당 워커 균등 배분',
'데이터 파티셔닝 + 분산 저장'
],
'주의사항': [
'오버헤드 고려',
'네트워크 대역폭',
'데이터 이동 최소화'
]
}
print("\nRay 사용 가이드:")
print(pd.DataFrame(guide).to_string(index=False))
정리
- Ray 초기화: ray.init()으로 클러스터 연결
- @ray.remote: 함수를 원격 실행 가능하게 변환
- FLAML + Ray: use_ray=True로 활성화
- n_concurrent_trials: 동시 탐색 수 설정
- 리소스 할당: num_cpus, num_gpus로 제어
- 대규모 데이터와 클러스터 환경에 효과적
다음 글 예고
다음 글에서는 Spark 연동 - 대규모 데이터 처리에 대해 알아보겠습니다. Apache Spark와 FLAML을 함께 사용하는 방법을 다룹니다.
FLAML AutoML 마스터 시리즈 #075