본문으로 건너뛰기

093 배치 예측 파이프라인 구축

키워드: 배치, 파이프라인, 대용량 처리

개요

대용량 데이터를 효율적으로 예측하려면 배치 처리 파이프라인이 필요합니다. 이 글에서는 FLAML 모델을 활용한 배치 예측 시스템 구축 방법을 알아봅니다.

실습 환경

  • Python 버전: 3.11 권장
  • 필요 패키지: flaml[automl]
pip install flaml[automl] pandas numpy joblib

기본 배치 예측

import numpy as np
import pandas as pd
import joblib
from datetime import datetime
import time

# 093 모델 준비
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from flaml import AutoML

np.random.seed(42)
X, y = make_classification(n_samples=2000, n_features=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

automl = AutoML()
automl.fit(X_train, y_train, task="classification", time_budget=30, verbose=0)
joblib.dump(automl, 'batch_model.pkl')
print(f"모델 저장 완료: {automl.best_estimator}")

# 093 간단한 배치 예측
def simple_batch_predict(model, data, batch_size=1000):
"""간단한 배치 예측"""
predictions = []
n_samples = len(data)

for i in range(0, n_samples, batch_size):
batch = data[i:i+batch_size]
batch_pred = model.predict(batch)
predictions.extend(batch_pred)

print(f" 배치 {i//batch_size + 1}: {min(i+batch_size, n_samples)}/{n_samples}")

return np.array(predictions)

# 093 대용량 데이터 시뮬레이션
large_data = np.random.randn(10000, 10)
print(f"\n배치 예측 시작 (데이터: {len(large_data)}행)")

start = time.time()
predictions = simple_batch_predict(automl, large_data, batch_size=2000)
elapsed = time.time() - start

print(f"\n완료: {elapsed:.2f}초")
print(f"예측 결과: {len(predictions)}개")

배치 파이프라인 클래스

import os
import gc
from typing import Iterator, Optional

class BatchPredictionPipeline:
"""배치 예측 파이프라인"""

def __init__(self, model_path: str, batch_size: int = 1000):
self.model_path = model_path
self.batch_size = batch_size
self.model = None
self.stats = {}

def load_model(self):
"""모델 로드"""
self.model = joblib.load(self.model_path)
print(f"모델 로드: {self.model.best_estimator}")

def process_csv(self, input_path: str, output_path: str,
feature_cols: Optional[list] = None):
"""CSV 파일 배치 처리"""
if self.model is None:
self.load_model()

start_time = time.time()
total_rows = 0
results = []

# 청크 단위 읽기
for chunk in pd.read_csv(input_path, chunksize=self.batch_size):
# 특성 추출
if feature_cols:
X = chunk[feature_cols].values
else:
X = chunk.values

# 예측
predictions = self.model.predict(X)
probabilities = self.model.predict_proba(X)[:, 1]

# 결과 추가
chunk['prediction'] = predictions
chunk['probability'] = probabilities
results.append(chunk)

total_rows += len(chunk)
print(f" 처리: {total_rows}행")

# 메모리 정리
gc.collect()

# 결과 저장
result_df = pd.concat(results, ignore_index=True)
result_df.to_csv(output_path, index=False)

elapsed = time.time() - start_time
self.stats = {
'total_rows': total_rows,
'elapsed_seconds': elapsed,
'rows_per_second': total_rows / elapsed
}

print(f"\n완료: {total_rows}행 / {elapsed:.2f}초")
return result_df

def get_stats(self):
"""통계 반환"""
return self.stats

# 093 파이프라인 사용
# 093 테스트 데이터 생성
test_df = pd.DataFrame(large_data, columns=[f'f{i}' for i in range(10)])
test_df.to_csv('input_data.csv', index=False)

pipeline = BatchPredictionPipeline('batch_model.pkl', batch_size=2000)
result = pipeline.process_csv(
'input_data.csv',
'output_predictions.csv',
feature_cols=[f'f{i}' for i in range(10)]
)

print(f"\n파이프라인 통계: {pipeline.get_stats()}")

병렬 배치 처리

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import multiprocessing

def predict_batch(args):
"""단일 배치 예측 (병렬 처리용)"""
model_path, batch_data, batch_id = args

model = joblib.load(model_path)
predictions = model.predict(batch_data)
probabilities = model.predict_proba(batch_data)[:, 1]

return batch_id, predictions, probabilities

class ParallelBatchPipeline:
"""병렬 배치 예측 파이프라인"""

def __init__(self, model_path: str, batch_size: int = 1000, n_workers: int = None):
self.model_path = model_path
self.batch_size = batch_size
self.n_workers = n_workers or multiprocessing.cpu_count()

def predict(self, data: np.ndarray) -> tuple:
"""병렬 예측"""
n_samples = len(data)
batches = []

# 배치 분할
for i, start in enumerate(range(0, n_samples, self.batch_size)):
end = min(start + self.batch_size, n_samples)
batches.append((self.model_path, data[start:end], i))

print(f"배치 수: {len(batches)}, 워커: {self.n_workers}")

# 병렬 처리
start_time = time.time()

with ProcessPoolExecutor(max_workers=self.n_workers) as executor:
results = list(executor.map(predict_batch, batches))

# 결과 정렬 및 결합
results.sort(key=lambda x: x[0])
predictions = np.concatenate([r[1] for r in results])
probabilities = np.concatenate([r[2] for r in results])

elapsed = time.time() - start_time
print(f"병렬 처리 완료: {elapsed:.2f}초 ({n_samples/elapsed:.0f} rows/sec)")

return predictions, probabilities

# 093 병렬 파이프라인 사용
parallel_pipeline = ParallelBatchPipeline('batch_model.pkl', batch_size=2000, n_workers=4)
pred, prob = parallel_pipeline.predict(large_data)
print(f"결과: {len(pred)}개 예측")

스트리밍 배치 처리

def streaming_predict(model_path: str, data_generator: Iterator, batch_size: int = 1000):
"""스트리밍 배치 예측 (메모리 효율적)"""
model = joblib.load(model_path)
batch = []
total_processed = 0

for item in data_generator:
batch.append(item)

if len(batch) >= batch_size:
batch_array = np.array(batch)
predictions = model.predict(batch_array)

for pred in predictions:
yield pred

total_processed += len(batch)
print(f"스트리밍 처리: {total_processed}")
batch = []

# 남은 배치 처리
if batch:
batch_array = np.array(batch)
predictions = model.predict(batch_array)
for pred in predictions:
yield pred

# 093 데이터 제너레이터 시뮬레이션
def data_generator(n_samples=10000, n_features=10):
for _ in range(n_samples):
yield np.random.randn(n_features)

# 093 스트리밍 처리
print("\n스트리밍 예측:")
streaming_predictions = list(streaming_predict(
'batch_model.pkl',
data_generator(5000, 10),
batch_size=1000
))
print(f"결과: {len(streaming_predictions)}개")

스케줄링된 배치 작업

import schedule
import threading

class ScheduledBatchJob:
"""스케줄링된 배치 작업"""

def __init__(self, model_path: str, input_dir: str, output_dir: str):
self.pipeline = BatchPredictionPipeline(model_path, batch_size=2000)
self.input_dir = input_dir
self.output_dir = output_dir
self.is_running = False

def run_job(self):
"""배치 작업 실행"""
if self.is_running:
print("이전 작업 실행 중, 건너뜀")
return

self.is_running = True
try:
print(f"\n[{datetime.now()}] 배치 작업 시작")

# 입력 파일 처리
import glob
input_files = glob.glob(f"{self.input_dir}/*.csv")

for input_file in input_files:
filename = os.path.basename(input_file)
output_file = os.path.join(self.output_dir, f"pred_{filename}")

self.pipeline.process_csv(input_file, output_file)

print(f"[{datetime.now()}] 배치 작업 완료")

finally:
self.is_running = False

def start(self, interval_minutes: int = 60):
"""스케줄러 시작"""
schedule.every(interval_minutes).minutes.do(self.run_job)

def run_scheduler():
while True:
schedule.run_pending()
time.sleep(1)

thread = threading.Thread(target=run_scheduler, daemon=True)
thread.start()
print(f"스케줄러 시작 (매 {interval_minutes}분)")

# 093 스케줄된 작업 (예시)
# 093 scheduler = ScheduledBatchJob('batch_model.pkl', 'input/', 'output/')
# 093 scheduler.start(interval_minutes=60)
print("\n스케줄 배치 작업 클래스 정의 완료")

배치 모니터링

class BatchMonitor:
"""배치 작업 모니터링"""

def __init__(self):
self.jobs = []

def log_job(self, job_id: str, status: str, stats: dict):
"""작업 로깅"""
self.jobs.append({
'job_id': job_id,
'timestamp': datetime.now().isoformat(),
'status': status,
**stats
})

def get_summary(self):
"""요약 통계"""
if not self.jobs:
return "작업 없음"

df = pd.DataFrame(self.jobs)
return {
'total_jobs': len(df),
'success_rate': (df['status'] == 'success').mean(),
'avg_rows_per_second': df['rows_per_second'].mean() if 'rows_per_second' in df.columns else 0,
'total_rows_processed': df['total_rows'].sum() if 'total_rows' in df.columns else 0
}

# 093 모니터링 사용
monitor = BatchMonitor()
monitor.log_job("job_001", "success", pipeline.get_stats())
print(f"\n배치 모니터링 요약: {monitor.get_summary()}")

배치 파이프라인 베스트 프랙티스

best_practices = {
'항목': ['배치 크기', '메모리', '병렬 처리', '에러 처리', '로깅'],
'권장': [
'1000-10000 (메모리에 따라)',
'gc.collect() 주기적 호출',
'워커 수 = CPU 코어 - 1',
'배치별 try-except',
'진행률, 처리 시간 기록'
],
'이유': [
'I/O와 계산 균형',
'메모리 누수 방지',
'시스템 자원 활용',
'부분 실패 복구',
'문제 추적'
]
}

print("\n=== 배치 파이프라인 베스트 프랙티스 ===")
print(pd.DataFrame(best_practices).to_string(index=False))

정리

  • 배치 처리: 대용량 데이터 청크 단위 처리
  • 병렬 처리: ProcessPoolExecutor로 가속화
  • 스트리밍: 메모리 효율적 처리
  • 스케줄링: 정기 배치 작업 자동화
  • 모니터링: 작업 상태 및 통계 추적

다음 글 예고

다음 글에서는 모델 모니터링과 재학습 전략을 알아봅니다. 배포된 모델의 성능을 추적하고 유지하는 방법을 다룹니다.


FLAML AutoML 마스터 시리즈 #093