076 Spark 연동 - 대규모 데이터 처리
키워드: Spark, 대규모 데이터, 분산 처리
개요
Apache Spark는 대규모 데이터 처리를 위한 분산 컴퓨팅 프레임워크입니다. FLAML과 Spark를 연동하면 빅데이터 환경에서도 AutoML을 활용할 수 있습니다.
실습 환경
- Python 버전: 3.11 권장
- 필요 패키지:
flaml[automl], pyspark
pip install flaml[automl] pyspark pandas numpy
Spark 기초
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
# 076 Spark 세션 생성
spark = SparkSession.builder \
.appName("FLAML-Spark") \
.config("spark.driver.memory", "4g") \
.config("spark.executor.memory", "4g") \
.getOrCreate()
print("Spark 세션 정보:")
print(f" 버전: {spark.version}")
print(f" 앱 이름: {spark.sparkContext.appName}")
# 076 샘플 데이터 생성
np.random.seed(42)
n_samples = 10000
data = {
'feature_1': np.random.randn(n_samples),
'feature_2': np.random.randn(n_samples),
'feature_3': np.random.randn(n_samples),
'feature_4': np.random.rand(n_samples),
'feature_5': np.random.randint(0, 10, n_samples),
'target': np.random.randint(0, 2, n_samples)
}
pandas_df = pd.DataFrame(data)
spark_df = spark.createDataFrame(pandas_df)
print(f"\nSpark DataFrame:")
spark_df.show(5)
print(f"행 수: {spark_df.count()}")
Spark ML과 FLAML
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier as SparkRF
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# 076 특성 벡터 생성
feature_cols = [f'feature_{i}' for i in range(1, 6)]
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
spark_df_assembled = assembler.transform(spark_df)
# 076 학습/테스트 분할
train_df, test_df = spark_df_assembled.randomSplit([0.8, 0.2], seed=42)
print(f"학습 데이터: {train_df.count()}")
print(f"테스트 데이터: {test_df.count()}")
# 076 Spark ML 모델 학습
spark_rf = SparkRF(
labelCol='target',
featuresCol='features',
numTrees=100,
maxDepth=5
)
model = spark_rf.fit(train_df)
predictions = model.transform(test_df)
# 076 평가
evaluator = MulticlassClassificationEvaluator(
labelCol='target',
predictionCol='prediction',
metricName='accuracy'
)
accuracy = evaluator.evaluate(predictions)
print(f"\nSpark ML 정확도: {accuracy:.4f}")
Spark 데이터를 FLAML로 처리
from flaml import AutoML
# 076 Spark DataFrame을 Pandas로 변환 (작은 데이터)
# 076 주의: 대용량 데이터는 샘플링 권장
sample_size = 5000
pandas_train = train_df.select(feature_cols + ['target']).limit(sample_size).toPandas()
X_train = pandas_train[feature_cols].values
y_train = pandas_train['target'].values
# 076 FLAML AutoML
automl = AutoML()
automl.fit(
X_train, y_train,
task="classification",
time_budget=60,
verbose=1
)
print(f"\nFLAML 최적 모델: {automl.best_estimator}")
print(f"최적 설정: {automl.best_config}")
대용량 데이터 샘플링 전략
def stratified_sample_spark(df, label_col, sample_fraction):
"""계층적 샘플링 (Spark)"""
fractions = df.groupBy(label_col) \
.count() \
.rdd \
.map(lambda x: (x[label_col], sample_fraction)) \
.collectAsMap()
return df.sampleBy(label_col, fractions, seed=42)
# 10% 샘플링
sampled_df = stratified_sample_spark(spark_df_assembled, 'target', 0.1)
print(f"샘플링된 데이터: {sampled_df.count()}")
# 076 클래스 분포 확인
print("샘플 클래스 분포:")
sampled_df.groupBy('target').count().show()
Spark UDF로 FLAML 모델 적용
from pyspark.sql.functions import udf, col
from pyspark.sql.types import DoubleType
import pickle
# 076 FLAML 모델 브로드캐스트
model_broadcast = spark.sparkContext.broadcast(automl)
# 076 예측 UDF
@udf(returnType=DoubleType())
def predict_udf(features):
"""FLAML 모델 예측 UDF"""
model = model_broadcast.value
features_array = np.array(features).reshape(1, -1)
return float(model.predict(features_array)[0])
# 076 Spark에서 모든 열을 배열로 변환 후 예측
from pyspark.sql.functions import array
# 076 특성 배열 생성
test_pandas = test_df.select(feature_cols + ['target']).limit(1000).toPandas()
X_test = test_pandas[feature_cols].values
y_test = test_pandas['target'].values
# 076 FLAML 예측
y_pred = automl.predict(X_test)
from sklearn.metrics import accuracy_score
print(f"\nFLAML 테스트 정확도: {accuracy_score(y_test, y_pred):.4f}")
Spark와 FLAML 파이프라인
def spark_flaml_pipeline(spark_df, feature_cols, label_col, sample_size=10000, time_budget=60):
"""Spark + FLAML 통합 파이프라인"""
# 1. 샘플링
if spark_df.count() > sample_size:
sample_ratio = sample_size / spark_df.count()
sampled = spark_df.sample(fraction=sample_ratio, seed=42)
else:
sampled = spark_df
# 2. Pandas 변환
pandas_df = sampled.select(feature_cols + [label_col]).toPandas()
X = pandas_df[feature_cols].values
y = pandas_df[label_col].values
# 3. FLAML AutoML
automl = AutoML()
automl.fit(
X, y,
task="classification",
time_budget=time_budget,
verbose=0
)
return automl
# 076 파이프라인 실행
pipeline_model = spark_flaml_pipeline(
spark_df,
feature_cols,
'target',
sample_size=5000,
time_budget=30
)
print(f"파이프라인 결과: {pipeline_model.best_estimator}")
Spark SQL과 연동
# 076 Spark SQL로 특성 엔지니어링
spark_df.createOrReplaceTempView("data")
# 076 SQL로 특성 생성
engineered_df = spark.sql("""
SELECT
feature_1,
feature_2,
feature_3,
feature_4,
feature_5,
feature_1 * feature_2 AS interaction_1_2,
feature_1 + feature_3 AS sum_1_3,
LOG(ABS(feature_4) + 1) AS log_feature_4,
target
FROM data
""")
engineered_df.show(5)
# 076 새 특성으로 학습
new_feature_cols = [
'feature_1', 'feature_2', 'feature_3', 'feature_4', 'feature_5',
'interaction_1_2', 'sum_1_3', 'log_feature_4'
]
pandas_engineered = engineered_df.limit(5000).toPandas()
X_eng = pandas_engineered[new_feature_cols].values
y_eng = pandas_engineered['target'].values
automl_eng = AutoML()
automl_eng.fit(X_eng, y_eng, task="classification", time_budget=30, verbose=0)
print(f"\n특성 엔지니어링 후 모델: {automl_eng.best_estimator}")
모델 저장 및 배포
import joblib
# 076 FLAML 모델 저장
joblib.dump(automl, 'flaml_model.pkl')
# 076 저장된 모델로 Spark에서 대규모 예측
def batch_predict_spark(spark_df, feature_cols, model_path, batch_size=10000):
"""대규모 배치 예측"""
loaded_model = joblib.load(model_path)
total_count = spark_df.count()
predictions = []
# 배치 처리
for offset in range(0, total_count, batch_size):
batch = spark_df.select(feature_cols).limit(batch_size).offset(offset).toPandas()
if len(batch) == 0:
break
batch_pred = loaded_model.predict(batch.values)
predictions.extend(batch_pred)
return predictions
print("모델 저장 및 배치 예측 파이프라인 준비 완료")
성능 최적화 팁
tips = {
'상황': ['데이터 로딩', '샘플링', '모델 적용', '메모리'],
'최적화': [
'Parquet 포맷 사용',
'계층적 샘플링',
'broadcast 변수 활용',
'persist() 캐싱'
],
'이유': [
'컬럼 기반 압축',
'클래스 균형 유지',
'직렬화 오버헤드 감소',
'반복 연산 최적화'
]
}
print("\nSpark + FLAML 최적화 팁:")
print(pd.DataFrame(tips).to_string(index=False))
정리
- Spark DataFrame: 대규모 데이터 분산 처리
- 샘플링: 대용량 데이터는 샘플링 후 FLAML 적용
- VectorAssembler: Spark ML 특성 벡터 생성
- UDF: FLAML 모델을 Spark에서 사용
- 파이프라인: Spark 전처리 + FLAML AutoML
- 대규모 데이터에서는 샘플링이 핵심
# 076 Spark 세션 종료
spark.stop()
다음 글 예고
다음 글에서는 메모리 효율적인 학습 전략에 대해 알아보겠습니다. 제한된 메모리에서 효율적으로 학습하는 방법을 다룹니다.
FLAML AutoML 마스터 시리즈 #076