본문으로 건너뛰기

076 Spark 연동 - 대규모 데이터 처리

키워드: Spark, 대규모 데이터, 분산 처리

개요

Apache Spark는 대규모 데이터 처리를 위한 분산 컴퓨팅 프레임워크입니다. FLAML과 Spark를 연동하면 빅데이터 환경에서도 AutoML을 활용할 수 있습니다.

실습 환경

  • Python 버전: 3.11 권장
  • 필요 패키지: flaml[automl], pyspark
pip install flaml[automl] pyspark pandas numpy

Spark 기초

from pyspark.sql import SparkSession
import pandas as pd
import numpy as np

# 076 Spark 세션 생성
spark = SparkSession.builder \
.appName("FLAML-Spark") \
.config("spark.driver.memory", "4g") \
.config("spark.executor.memory", "4g") \
.getOrCreate()

print("Spark 세션 정보:")
print(f" 버전: {spark.version}")
print(f" 앱 이름: {spark.sparkContext.appName}")

# 076 샘플 데이터 생성
np.random.seed(42)
n_samples = 10000
data = {
'feature_1': np.random.randn(n_samples),
'feature_2': np.random.randn(n_samples),
'feature_3': np.random.randn(n_samples),
'feature_4': np.random.rand(n_samples),
'feature_5': np.random.randint(0, 10, n_samples),
'target': np.random.randint(0, 2, n_samples)
}

pandas_df = pd.DataFrame(data)
spark_df = spark.createDataFrame(pandas_df)

print(f"\nSpark DataFrame:")
spark_df.show(5)
print(f"행 수: {spark_df.count()}")

Spark ML과 FLAML

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier as SparkRF
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 076 특성 벡터 생성
feature_cols = [f'feature_{i}' for i in range(1, 6)]
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
spark_df_assembled = assembler.transform(spark_df)

# 076 학습/테스트 분할
train_df, test_df = spark_df_assembled.randomSplit([0.8, 0.2], seed=42)

print(f"학습 데이터: {train_df.count()}")
print(f"테스트 데이터: {test_df.count()}")

# 076 Spark ML 모델 학습
spark_rf = SparkRF(
labelCol='target',
featuresCol='features',
numTrees=100,
maxDepth=5
)

model = spark_rf.fit(train_df)
predictions = model.transform(test_df)

# 076 평가
evaluator = MulticlassClassificationEvaluator(
labelCol='target',
predictionCol='prediction',
metricName='accuracy'
)
accuracy = evaluator.evaluate(predictions)
print(f"\nSpark ML 정확도: {accuracy:.4f}")

Spark 데이터를 FLAML로 처리

from flaml import AutoML

# 076 Spark DataFrame을 Pandas로 변환 (작은 데이터)
# 076 주의: 대용량 데이터는 샘플링 권장
sample_size = 5000
pandas_train = train_df.select(feature_cols + ['target']).limit(sample_size).toPandas()

X_train = pandas_train[feature_cols].values
y_train = pandas_train['target'].values

# 076 FLAML AutoML
automl = AutoML()
automl.fit(
X_train, y_train,
task="classification",
time_budget=60,
verbose=1
)

print(f"\nFLAML 최적 모델: {automl.best_estimator}")
print(f"최적 설정: {automl.best_config}")

대용량 데이터 샘플링 전략

def stratified_sample_spark(df, label_col, sample_fraction):
"""계층적 샘플링 (Spark)"""
fractions = df.groupBy(label_col) \
.count() \
.rdd \
.map(lambda x: (x[label_col], sample_fraction)) \
.collectAsMap()

return df.sampleBy(label_col, fractions, seed=42)

# 10% 샘플링
sampled_df = stratified_sample_spark(spark_df_assembled, 'target', 0.1)
print(f"샘플링된 데이터: {sampled_df.count()}")

# 076 클래스 분포 확인
print("샘플 클래스 분포:")
sampled_df.groupBy('target').count().show()

Spark UDF로 FLAML 모델 적용

from pyspark.sql.functions import udf, col
from pyspark.sql.types import DoubleType
import pickle

# 076 FLAML 모델 브로드캐스트
model_broadcast = spark.sparkContext.broadcast(automl)

# 076 예측 UDF
@udf(returnType=DoubleType())
def predict_udf(features):
"""FLAML 모델 예측 UDF"""
model = model_broadcast.value
features_array = np.array(features).reshape(1, -1)
return float(model.predict(features_array)[0])

# 076 Spark에서 모든 열을 배열로 변환 후 예측
from pyspark.sql.functions import array

# 076 특성 배열 생성
test_pandas = test_df.select(feature_cols + ['target']).limit(1000).toPandas()
X_test = test_pandas[feature_cols].values
y_test = test_pandas['target'].values

# 076 FLAML 예측
y_pred = automl.predict(X_test)

from sklearn.metrics import accuracy_score
print(f"\nFLAML 테스트 정확도: {accuracy_score(y_test, y_pred):.4f}")

Spark와 FLAML 파이프라인

def spark_flaml_pipeline(spark_df, feature_cols, label_col, sample_size=10000, time_budget=60):
"""Spark + FLAML 통합 파이프라인"""

# 1. 샘플링
if spark_df.count() > sample_size:
sample_ratio = sample_size / spark_df.count()
sampled = spark_df.sample(fraction=sample_ratio, seed=42)
else:
sampled = spark_df

# 2. Pandas 변환
pandas_df = sampled.select(feature_cols + [label_col]).toPandas()

X = pandas_df[feature_cols].values
y = pandas_df[label_col].values

# 3. FLAML AutoML
automl = AutoML()
automl.fit(
X, y,
task="classification",
time_budget=time_budget,
verbose=0
)

return automl

# 076 파이프라인 실행
pipeline_model = spark_flaml_pipeline(
spark_df,
feature_cols,
'target',
sample_size=5000,
time_budget=30
)

print(f"파이프라인 결과: {pipeline_model.best_estimator}")

Spark SQL과 연동

# 076 Spark SQL로 특성 엔지니어링
spark_df.createOrReplaceTempView("data")

# 076 SQL로 특성 생성
engineered_df = spark.sql("""
SELECT
feature_1,
feature_2,
feature_3,
feature_4,
feature_5,
feature_1 * feature_2 AS interaction_1_2,
feature_1 + feature_3 AS sum_1_3,
LOG(ABS(feature_4) + 1) AS log_feature_4,
target
FROM data
""")

engineered_df.show(5)

# 076 새 특성으로 학습
new_feature_cols = [
'feature_1', 'feature_2', 'feature_3', 'feature_4', 'feature_5',
'interaction_1_2', 'sum_1_3', 'log_feature_4'
]

pandas_engineered = engineered_df.limit(5000).toPandas()
X_eng = pandas_engineered[new_feature_cols].values
y_eng = pandas_engineered['target'].values

automl_eng = AutoML()
automl_eng.fit(X_eng, y_eng, task="classification", time_budget=30, verbose=0)
print(f"\n특성 엔지니어링 후 모델: {automl_eng.best_estimator}")

모델 저장 및 배포

import joblib

# 076 FLAML 모델 저장
joblib.dump(automl, 'flaml_model.pkl')

# 076 저장된 모델로 Spark에서 대규모 예측
def batch_predict_spark(spark_df, feature_cols, model_path, batch_size=10000):
"""대규모 배치 예측"""
loaded_model = joblib.load(model_path)

total_count = spark_df.count()
predictions = []

# 배치 처리
for offset in range(0, total_count, batch_size):
batch = spark_df.select(feature_cols).limit(batch_size).offset(offset).toPandas()
if len(batch) == 0:
break
batch_pred = loaded_model.predict(batch.values)
predictions.extend(batch_pred)

return predictions

print("모델 저장 및 배치 예측 파이프라인 준비 완료")

성능 최적화 팁

tips = {
'상황': ['데이터 로딩', '샘플링', '모델 적용', '메모리'],
'최적화': [
'Parquet 포맷 사용',
'계층적 샘플링',
'broadcast 변수 활용',
'persist() 캐싱'
],
'이유': [
'컬럼 기반 압축',
'클래스 균형 유지',
'직렬화 오버헤드 감소',
'반복 연산 최적화'
]
}

print("\nSpark + FLAML 최적화 팁:")
print(pd.DataFrame(tips).to_string(index=False))

정리

  • Spark DataFrame: 대규모 데이터 분산 처리
  • 샘플링: 대용량 데이터는 샘플링 후 FLAML 적용
  • VectorAssembler: Spark ML 특성 벡터 생성
  • UDF: FLAML 모델을 Spark에서 사용
  • 파이프라인: Spark 전처리 + FLAML AutoML
  • 대규모 데이터에서는 샘플링이 핵심
# 076 Spark 세션 종료
spark.stop()

다음 글 예고

다음 글에서는 메모리 효율적인 학습 전략에 대해 알아보겠습니다. 제한된 메모리에서 효율적으로 학습하는 방법을 다룹니다.


FLAML AutoML 마스터 시리즈 #076