본문으로 건너뛰기

052 시계열 데이터 전처리

키워드: 전처리, 결측치, 이상치

개요

시계열 데이터는 시간의 연속성 때문에 일반 데이터와 다른 전처리 방법이 필요합니다. 이 글에서는 결측치 처리, 이상치 탐지, 스케일링 등 시계열 전처리 기법을 알아봅니다.

실습 환경

  • Python 버전: 3.11 권장
  • 필요 패키지: pandas, numpy, matplotlib
pip install pandas numpy matplotlib scipy

시계열 데이터 생성

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# 052 결측치와 이상치가 있는 시계열 생성
np.random.seed(42)
dates = pd.date_range('2023-01-01', periods=365, freq='D')

# 052 기본 패턴
trend = np.linspace(100, 150, 365)
seasonality = 20 * np.sin(np.arange(365) * 2 * np.pi / 365)
noise = np.random.randn(365) * 5
values = trend + seasonality + noise

# 052 결측치 추가
missing_idx = [10, 50, 51, 52, 100, 200, 201, 202, 203, 300]
values_with_missing = values.copy()
values_with_missing[missing_idx] = np.nan

# 052 이상치 추가
outlier_idx = [30, 150, 250]
values_with_outliers = values_with_missing.copy()
values_with_outliers[outlier_idx] = [300, 10, 280]

ts = pd.Series(values_with_outliers, index=dates, name='value')

print("시계열 데이터 상태:")
print(f" 총 데이터: {len(ts)}")
print(f" 결측치: {ts.isna().sum()}")
print(f" 이상치: 3개 (수동 추가)")

결측치 처리

결측치 탐지

# 052 결측치 확인
print("결측치 위치:")
missing_dates = ts[ts.isna()].index
print(missing_dates.strftime('%Y-%m-%d').tolist())

# 052 연속 결측치 그룹 찾기
def find_missing_groups(series):
"""연속 결측치 그룹 찾기"""
is_missing = series.isna()
groups = []
in_group = False
start = None

for i, (idx, missing) in enumerate(is_missing.items()):
if missing and not in_group:
in_group = True
start = idx
elif not missing and in_group:
in_group = False
groups.append((start, series.index[i-1]))

if in_group:
groups.append((start, series.index[-1]))

return groups

missing_groups = find_missing_groups(ts)
print(f"\n연속 결측치 그룹: {len(missing_groups)}개")
for start, end in missing_groups:
print(f" {start.strftime('%Y-%m-%d')} ~ {end.strftime('%Y-%m-%d')}")

결측치 보간 방법

# 052 다양한 보간 방법
methods = {}

# 1. 전방 채움 (Forward Fill)
methods['forward'] = ts.ffill()

# 2. 후방 채움 (Backward Fill)
methods['backward'] = ts.bfill()

# 3. 선형 보간
methods['linear'] = ts.interpolate(method='linear')

# 4. 시간 기반 보간
methods['time'] = ts.interpolate(method='time')

# 5. 스플라인 보간
methods['spline'] = ts.interpolate(method='spline', order=3)

# 6. 이동 평균 (커스텀)
def fill_with_rolling_mean(series, window=7):
filled = series.copy()
rolling = series.rolling(window=window, min_periods=1, center=True).mean()
filled = filled.fillna(rolling)
return filled

methods['rolling'] = fill_with_rolling_mean(ts)

# 052 시각화
fig, axes = plt.subplots(3, 2, figsize=(14, 12))
axes = axes.flatten()

for ax, (name, filled_ts) in zip(axes, methods.items()):
ax.plot(ts.index, ts.values, 'b.', alpha=0.5, label='Original')
ax.plot(filled_ts.index, filled_ts.values, 'r-', alpha=0.7, label='Filled')
ax.set_title(f'Method: {name}')
ax.legend()

plt.tight_layout()
plt.show()

# 052 결과 비교
print("\n결측치 처리 후 통계:")
for name, filled_ts in methods.items():
print(f" {name}: 평균={filled_ts.mean():.2f}, 표준편차={filled_ts.std():.2f}")

계절성 기반 보간

def seasonal_interpolate(series, period=7):
"""계절성 기반 보간"""
filled = series.copy()

for i in range(len(filled)):
if pd.isna(filled.iloc[i]):
# 같은 요일/월의 값들로 평균
same_period = []
for j in range(i - period, -1, -period):
if j >= 0 and not pd.isna(filled.iloc[j]):
same_period.append(filled.iloc[j])
if len(same_period) >= 4: # 최대 4개
break

if same_period:
filled.iloc[i] = np.mean(same_period)

return filled.interpolate() # 남은 결측치는 선형 보간

ts_seasonal = seasonal_interpolate(ts, period=7)
print(f"계절성 보간 후 결측치: {ts_seasonal.isna().sum()}")

이상치 처리

이상치 탐지

# 052 다양한 이상치 탐지 방법
ts_filled = methods['linear'] # 결측치 처리된 데이터 사용

# 1. IQR 방법
def detect_outliers_iqr(series, factor=1.5):
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - factor * IQR
upper = Q3 + factor * IQR
return (series < lower) | (series > upper)

# 2. Z-Score 방법
def detect_outliers_zscore(series, threshold=3):
z_scores = (series - series.mean()) / series.std()
return np.abs(z_scores) > threshold

# 3. 이동 통계 기반
def detect_outliers_rolling(series, window=30, threshold=3):
rolling_mean = series.rolling(window=window, center=True).mean()
rolling_std = series.rolling(window=window, center=True).std()
z_scores = (series - rolling_mean) / rolling_std
return np.abs(z_scores) > threshold

# 052 탐지 결과
outliers_iqr = detect_outliers_iqr(ts_filled)
outliers_zscore = detect_outliers_zscore(ts_filled)
outliers_rolling = detect_outliers_rolling(ts_filled)

print("이상치 탐지 결과:")
print(f" IQR 방법: {outliers_iqr.sum()}개")
print(f" Z-Score 방법: {outliers_zscore.sum()}개")
print(f" 이동 통계 방법: {outliers_rolling.sum()}개")

# 052 시각화
plt.figure(figsize=(14, 6))
plt.plot(ts_filled.index, ts_filled.values, 'b-', alpha=0.7, label='Data')
plt.scatter(ts_filled.index[outliers_rolling], ts_filled[outliers_rolling],
color='red', s=100, label='Outliers (Rolling)')
plt.title('Outlier Detection using Rolling Statistics')
plt.legend()
plt.show()

이상치 처리 방법

# 052 이상치 처리 방법
def handle_outliers(series, method='clip', **kwargs):
"""이상치 처리"""
outliers = detect_outliers_rolling(series, window=30, threshold=3)
result = series.copy()

if method == 'remove':
# NaN으로 대체 후 보간
result[outliers] = np.nan
result = result.interpolate()

elif method == 'clip':
# 백분위수로 클리핑
lower = series.quantile(kwargs.get('lower', 0.01))
upper = series.quantile(kwargs.get('upper', 0.99))
result = result.clip(lower, upper)

elif method == 'rolling_replace':
# 이동 평균으로 대체
window = kwargs.get('window', 7)
rolling_mean = series.rolling(window=window, center=True).mean()
result[outliers] = rolling_mean[outliers]

return result

# 052 각 방법 적용
ts_no_outliers = {
'remove': handle_outliers(ts_filled, 'remove'),
'clip': handle_outliers(ts_filled, 'clip'),
'rolling': handle_outliers(ts_filled, 'rolling_replace', window=7)
}

# 052 시각화
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
for ax, (name, ts_handled) in zip(axes, ts_no_outliers.items()):
ax.plot(ts_filled.index, ts_filled.values, 'b-', alpha=0.3, label='Original')
ax.plot(ts_handled.index, ts_handled.values, 'r-', alpha=0.7, label='Processed')
ax.set_title(f'Method: {name}')
ax.legend()

plt.tight_layout()
plt.show()

스케일링

시계열 스케일링

from sklearn.preprocessing import StandardScaler, MinMaxScaler

# 052 시계열용 스케일링 (과거 데이터만 사용)
class TimeSeriesScaler:
"""시계열용 스케일러 (데이터 누출 방지)"""

def __init__(self, scaler_type='standard'):
if scaler_type == 'standard':
self.scaler = StandardScaler()
else:
self.scaler = MinMaxScaler()

def fit_transform(self, series):
values = series.values.reshape(-1, 1)
scaled = self.scaler.fit_transform(values)
return pd.Series(scaled.flatten(), index=series.index)

def transform(self, series):
values = series.values.reshape(-1, 1)
scaled = self.scaler.transform(values)
return pd.Series(scaled.flatten(), index=series.index)

def inverse_transform(self, series):
values = series.values.reshape(-1, 1)
original = self.scaler.inverse_transform(values)
return pd.Series(original.flatten(), index=series.index)

# 052 사용 예
ts_clean = ts_no_outliers['rolling']

# 052 학습/테스트 분할 (시간 기반)
train_size = int(len(ts_clean) * 0.8)
train = ts_clean.iloc[:train_size]
test = ts_clean.iloc[train_size:]

# 052 스케일링 (학습 데이터로만 fit)
scaler = TimeSeriesScaler('standard')
train_scaled = scaler.fit_transform(train)
test_scaled = scaler.transform(test)

print("스케일링 결과:")
print(f" 학습 데이터 범위: {train_scaled.min():.2f} ~ {train_scaled.max():.2f}")
print(f" 테스트 데이터 범위: {test_scaled.min():.2f} ~ {test_scaled.max():.2f}")

차분 (Differencing)

# 052 차분으로 트렌드 제거
def difference(series, periods=1):
"""차분"""
return series.diff(periods).dropna()

def inverse_difference(differenced, first_value, periods=1):
"""차분 역변환"""
result = [first_value]
for i in range(len(differenced)):
result.append(result[-1] + differenced.iloc[i])
return pd.Series(result[1:], index=differenced.index)

# 1차 차분
ts_diff1 = difference(ts_clean, 1)

# 052 계절 차분 (7일)
ts_diff7 = difference(ts_clean, 7)

# 052 시각화
fig, axes = plt.subplots(3, 1, figsize=(14, 10))

axes[0].plot(ts_clean)
axes[0].set_title('Original')

axes[1].plot(ts_diff1)
axes[1].set_title('First Difference')

axes[2].plot(ts_diff7)
axes[2].set_title('Seasonal Difference (7 days)')

plt.tight_layout()
plt.show()

print("차분 후 통계:")
print(f" 원본 평균: {ts_clean.mean():.2f}")
print(f" 1차 차분 평균: {ts_diff1.mean():.4f} (0에 가까워야 함)")

전처리 파이프라인

class TimeSeriesPreprocessor:
"""시계열 전처리 파이프라인"""

def __init__(self):
self.scaler = None
self.first_value = None

def fit_transform(self, series):
# 1. 결측치 처리
series = series.interpolate(method='time')

# 2. 이상치 처리
series = handle_outliers(series, 'rolling_replace', window=7)

# 3. 스케일링
self.scaler = TimeSeriesScaler('standard')
series = self.scaler.fit_transform(series)

# 4. 차분 (선택적)
self.first_value = series.iloc[0]
series = difference(series, 1)

return series

def transform(self, series):
series = series.interpolate(method='time')
series = handle_outliers(series, 'rolling_replace', window=7)
series = self.scaler.transform(series)
first_val = series.iloc[0]
series = difference(series, 1)
return series, first_val

def inverse_transform(self, series, first_value):
# 역차분
series = inverse_difference(series, first_value, 1)
# 역스케일링
series = self.scaler.inverse_transform(series)
return series

# 052 사용
preprocessor = TimeSeriesPreprocessor()
ts_processed = preprocessor.fit_transform(ts)

print("\n전처리 완료:")
print(f" 처리 전: {len(ts)} 포인트, 결측치 {ts.isna().sum()}개")
print(f" 처리 후: {len(ts_processed)} 포인트, 결측치 {ts_processed.isna().sum()}개")

정리

  • 결측치 처리: 선형 보간, 시간 기반 보간, 계절성 보간
  • 이상치 탐지: IQR, Z-Score, 이동 통계 기반
  • 이상치 처리: 제거, 클리핑, 이동 평균 대체
  • 스케일링: 학습 데이터로만 fit (데이터 누출 방지)
  • 차분: 트렌드 제거, 정상성 확보
  • 항상 시간 순서를 유지하며 전처리

다음 글 예고

다음 글에서는 시계열 분해에 대해 알아보겠습니다. 트렌드, 계절성, 잔차를 분리하는 방법을 다룹니다.


FLAML AutoML 마스터 시리즈 #052