다변량 시계열 예측을 할 때 시퀀스 투 시퀀스(sequence to sequence, seq2seq) 형태를 사용하면 더 정교한 시계열 예측이 가능해진다. Seq2 Seq 모델은 주로 두 개의 LSTM으로 구성되며, 첫 번째 LSTM(인코더)은 입력 시퀀스를 고정 길이의 콘텍스트 벡터로 인코딩하고, 두 번째 LSTM(디코더)은 이 벡터를 기반으로 출력 시퀀스를 생성한다. 이는 주로 번역, 대화 생성 및 시계열 예측에 사용된다.
Seq2 Seq을 이용한 다중시점 시계열 예측의 가장 큰 장점은, known future input 값을 사용할 수 있다는 점이다. Seq2 Seq의 Encoder 가 과거의 정보를 받아서 콘텍스트 벡터로 인코딩을 하고, Decoder는 인코딩 된 콘텍스트 벡터에 known future input을 결합하여 예측을 수행하게 된다.
예를 들어, 미래 τ 구간의 주식의 가격을 예측한다고 할 때, 우리는 예측 모델의 학습을 위해 입력값으로 ① 과거의 주식 가격 (y_t-k:t) ② 과거의 정보만 알 수 있는 설명변수들. 예를 들어 거래량, 다른 주식의 가격정보 등 (x_t-k:t) 뿐만 아니라 ③ 미래에도 알 수 있는 정보 (known future input). 예를들어 요일정보, 휴장일 정보 등(u_t-k:t+ τ)을 학습과 예측에 사용할 수 있는 것이다.
※ 이때 known future input 의 과적정보 (u_t-k:t)는 S2S의 Encoder의 입력값으로 사용되며, 미래정보 (u_t+1: t+ τ) 는 Decoder의 입력값으로 사용된다.
아래부터는 Seq2Seq 를 이용한 다변량 다중시점 시계열 예측 예제 코드이다.
시계열 데이터셋 생성
- 필요 라이브러리 import
! pip install keras-hypetune # 케라스 그리드 서치
! pip install fitter # GEV 분포 생성
import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from tensorflow.keras.callbacks import *
from tensorflow.keras.optimizers import *
from kerashypetune import KerasGridSearch
- 예제에 사용된 데이터의 형태는 아래와 같다. 타깃은 일자별 범죄건수 (n_crimes)이며, 3개의 변수 (month, wday, day)는 모두 미래에도 알 수 있는 정보 (known future input)이다.
- 시계열 데이터를 슬라이딩 윈도우 형태로 변환해 주는 사용자 정의함수.
- ① start_id 부터 end_id 인덱스 구간 내에서 ② window_shape을 윈도 사이즈로 ③ step 간격으로 추출
- 결과물은 3D 데이터 : (윈도우 개수, 윈도 사이즈, 피쳐 개수)
def create_windows(data, window_shape, step = 1, start_id = None, end_id = None):
data = np.asarray(data)
# np.prod 는 데이터의 shape 원소를 모두 곱함. ==max(data.shape) 이라는건 1차원인 경우 => 1차원 데이터만 2차원으로 바꿔주겠다..
data = data.reshape(-1,1) if np.prod(data.shape) == max(data.shape) else data
start_id = 0 if start_id is None else start_id
end_id = data.shape[0] if end_id is None else end_id
data = data[int(start_id):int(end_id),:]
window_shape = (int(window_shape), data.shape[-1]) # (window_shape, feature개수)
step = (int(step),) * data.ndim # 데이터가 2차원이면 (1,1)
slices = tuple(slice(None, None, st) for st in step) # slice 는 [::] 와 동일, slice(first:end,step) = first 부터 end 까지 step 간격으로 추출하겠다.
indexing_strides = data[slices].strides # step이 1이기 때문에 data[slices] = data와 동일함.
win_indices_shape = ((np.array(data.shape) - window_shape) // step) + 1 # 전체 윈도우 인덱스의 shape (data 전체길이 - window size +1, 1)
new_shape = tuple(list(win_indices_shape) + list(window_shape))
strides = tuple(list(indexing_strides) + list(data.strides))
window_data = np.lib.stride_tricks.as_strided(data, shape=new_shape, strides=strides)
return np.squeeze(window_data, 1)
- 3D 시계열 데이터의 정규화 클래스 (mean 과 std 인스턴스 와 transform ↔ inverse_trainsform 매써드 제공)
# 3D 데이터의 정규화와 원복시키는 클래스 생성
class Scaler3D:
def fit(self, X):
self.mean = X.reshape(-1,X.shape[-1]).mean(0).reshape(1,1,-1) # 피쳐 기준으로 평균구한 뒤 (1,1,n) 형태로 저장
self.std = X.reshape(-1,X.shape[-1]).std(0).reshape(1,1,-1) # 피쳐 기준으로 분산구한 뒤 (1,1,n) 형태로 저장
return self
def transform(self, X):
return (X - self.mean) / self.std
def inverse_transform(self, X):
return (X * self.std) + self.mean
- ① 숫자형 타겟변수 , ② 혼합형 변수 (과거시점의 윈도 사이즈로 구분), ③ 혼합형 변수 (미래시점의 예측 구간으로 구분)을 딕셔너리 형태로 구분하여 저장
# 학습용 데이터셋 만들기
def prepare_seq(num_X, cat1_X, cat2_X, scaler=None):
if scaler is not None:
num_X = scaler.transform(num_X) # X 입력데이터 중 타겟값은 정규화
inp_dict = {}
inp_dict['inp_num'] = num_X
for i,c in enumerate(cat_var):
inp_dict[f"inp_{c}_cat1"] = cat1_X[:,:,i]
inp_dict[f"inp_{c}_cat2"] = cat2_X[:,:,i]
return inp_dict
- 앞선 3개의 사용자 지정함수 및 클래스를 이용하여, 최종적으로 Train, Valid 데이터셋을 생성한다.
- 혼합형 변수인지, 숫자형 변수인지에 따라. 그리고 과거시점의 값인지 (Encoder의 인풋인지) 미래시점의 값인지 (Decoder의 인풋인지)를 구분하여 만든다.
### Split train & test, apply scaler
time_h = 7 # 예측하고자 하는 시점과의 시차
window_shape = 21 # 윈도우 사이즈
target = ['n_crimes']
cat_var = ['month','wday','day']
# 시계열 데이터이기 때문에 shuffle 없이 앞에서부터 자름
_id_train, _id_valid = train_test_split(np.arange((len(df) - window_shape - time_h + 1)), test_size=0.3, shuffle=False)
# _num 과 _target 은 각각 숫자형 타겟 데이터를 학습용과 타겟용으로 분리한것
# _cat1 과 _cat2 는 각각 범주형 변수들을 윈도우사이즈, 예측구간으로 분리한것
_num_X = create_windows(df[target], step =1, window_shape=window_shape, end_id=-time_h) # 원본데이터 타겟값(범죄수)를 윈도우 데이터로 생성
_cat1_X = create_windows(df[cat_var], window_shape=window_shape, end_id=-time_h) # 원본데이터 피쳐값(날짜정보를)를 앞에서부터 윈도우 데이터로 생성
_cat2_X = create_windows(df[cat_var], window_shape=time_h, start_id=window_shape) # 원본데이터 피쳐값(날짜정보를)를 윈도우사이즈+1 인덱스부터 예측시차 사이즈의 윈도우 데이터로 생성
_target = create_windows(df[target], window_shape=time_h, start_id=window_shape) # 원본데이터 타겟값(범죄수)를 윈도우사이즈+1 인덱스부터 예측시차 사이즈의 윈도우 데이터로 생성
print(_num_X.shape , _cat1_X.shape, _cat2_X.shape, _target.shape)
_num_X_train = _num_X[_id_train]; _num_X_valid = _num_X[_id_valid]
_cat1_X_train = _cat1_X[_id_train]; _cat1_X_valid = _cat1_X[_id_valid]
_cat2_X_train = _cat2_X[_id_train]; _cat2_X_valid = _cat2_X[_id_valid]
_target_train = _target[_id_train]; _target_valid = _target[_id_valid]
print(_num_X_train.shape, _cat1_X_train.shape, _cat2_X_train.shape, _target_train.shape)
X_scaler = Scaler3D().fit(_num_X_train)
y_scaler = Scaler3D().fit(_target_train)
print(X_scaler.mean, X_scaler.std)
X_train = prepare_seq(_num_X_train, _cat1_X_train, _cat2_X_train, scaler=X_scaler)
y_train = y_scaler.transform(_target_train)
X_valid = prepare_seq(_num_X_valid, _cat1_X_valid, _cat2_X_valid, scaler=X_scaler)
y_valid = y_scaler.transform(_target_valid)
print(X_train['inp_num'].shape, X_train['inp_month_cat1'].shape, X_train['inp_month_cat2'].shape)
print(X_train['inp_wday_cat1'].shape, X_train['inp_wday_cat2'].shape)
print(X_train['inp_day_cat1'].shape, X_train['inp_day_cat2'].shape)
Seq2 Seq 모델 생성 및 학습 (교차검증)
- LSTM 기반의 Encoder와 Decoder로 구성된 Seq2 Seq 모델 생성
### Define Model
def set_seed(seed):
tf.random.set_seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
random.seed(seed)
def get_model(param): # 여기서 param 은 KerasGridSearch 에서 param_grid 딕셔너리로 입력됨
set_seed(33)
# Input 레이어는 모델의 입력을 정의. shape 에는 배치크기를 제외한 입력 데이터의 형태, name 에는 입력 레이어의 이름
# name 에 입력된 값은 실제 입력 데이터 이름 (X_train 딕셔너리의 키값)과 일치해야함
inp_num = Input((window_shape,1), name='inp_num')
# month, wday, day와 같은 카테고리형 변수들에 대한 임베딩 레이어 정의
# Embedding (input_dim, output_dim) 레이어는 카테코리형 데이터를 고차원 데이터로 변환해준다.
embeddings = {
'month': Embedding(12+1, param['emb_dim']),
'wday': Embedding(6+1, param['emb_dim']),
'day': Embedding(31+1, param['emb_dim'])
}
inp_cat1, emb_cat = [], []
for c in cat_var:
_inp_c1 = Input((window_shape,), name=f"inp_{c}_cat1") # (window_shape,) 형태의 입력 레이어 생성
emb = embeddings[c](_inp_c1) # c 에 해당하는 변수의 임베딩 레이어를 _inp_c1 적용
inp_cat1.append(_inp_c1) # inp_cat1 에는 임베딩하기전의 카테고리형 변수를 넣음
emb_cat.append(emb) # emb_cat 에는 임베딩 이후 카테고리형 변수를 넣음
enc = Concatenate()([inp_num] + emb_cat)
# 숫자형 입력과 임베딩된 카테코리형 입력을 연결. 이때 input_num 은 (batch_size, window_shape, 1) // emb_cat 은 (batch_size, window_shape, emb_dim)
# 두개를 연결하고난 뒤 enc 의 shape 은 (batch_size, window_shape, total_features), ※ total_features = 1 + len(cat_var) * emb_dim
enc = LSTM(param['lstm_unit'], return_sequences=False)(enc)
# LSTM 레이어를 concat된 입력 (batch_size, window_shape, total_features)에 적용
# LSTM 레이어를 적용한뒤 출력된 enc 의 형태는 (batch_size, lstm_units)
inp_cat2, emb_future = [], [] # 위에 for 문과 동일하게, (time_h,) 형태의 카테코리형 변수의 임베딩 전/후 상태를 리스트에 각각 저장
for c in cat_var:
_inp_c2 = Input((time_h,), name=f"inp_{c}_cat2")
emb = embeddings[c](_inp_c2)
inp_cat2.append(_inp_c2)
emb_future.append(emb) # emb_future 의 shape 은 (batch_size, time_h, emb_dim)
x = RepeatVector(time_h)(enc) # x 는 enc 를 time_h 만큼 반복하라는 의미 => x 의 shape 은 (batch_size, time_h, lstm_units)
dec = Concatenate()([x] + emb_future) # dec 의 shape 은 (batch_size, time_h, lstm_units + len(cat_var) * emb_dim)
dec = LSTM(param['lstm_unit'], return_sequences=True)(dec)
# return_sequences=True 이기 때문에 각 타임스텝에 대한 출력을 반환 => 출력 후 dec 의 shape 은 (batch_size, time_h, lstm_units)
# TimeDistributed 레이어는 ()안에 주어진 레이어를 입력의 각 타임 스텝에 독립적으로 적용
# 디코더 출력 dec 의 각 타임스텝에 Dense 레이어를 적용 => output 의 shape 은 (batch_size, time_h, 1)
# Dense layer 에 activation 파라미터 추가 가능, 디폴트는 activation=None
output = TimeDistributed(Dense(1))(dec)
model = Model([inp_num] + inp_cat1 + inp_cat2, output)
model.compile(Adam(lr=param['lr']), 'mse')
return model
- GridSearch 방식을 이용한 교차검증 및 파라미터 튜닝
## Modeling Fitting
param_grid = {
'lstm_unit': [128,64,32],
'emb_dim': [8,16,32],
'lr': [1e-3,1e-2], ## learning rate
'epochs': 10,
'batch_size': 64
}
es = EarlyStopping(patience=7, verbose=0, min_delta=0.001, monitor='val_loss', mode='auto', restore_best_weights=True)
hypermodel = get_model
kgs = KerasGridSearch(hypermodel, param_grid, monitor='val_loss', greater_is_better=False, tuner_verbose=1)
kgs.search(X_train, y_train, validation_data=(X_valid, y_valid), callbacks=[es])
print('BEST PARAM:', kgs.best_params)
- 학습된 모델로 예측 및 평가
## Get predicted values
pred = np.squeeze(
y_scaler.inverse_transform(
kgs.best_model.predict(X_valid)
), -1)
y_valid = np.squeeze(y_scaler.inverse_transform(y_valid), -1)
print(kgs.best_model.predict(X_valid).shape, pred.shape, y_valid.shape)
## Visualize Test vs Predicted
## Test vs prediction values are on different time horizons
plt.figure(figsize=(12,5))
for t_h in range(time_h):
plt.plot(pred[(time_h - t_h):-(t_h+1),t_h],
c='blue', alpha=1-1/(time_h+1)*(t_h+1),
label=f"Pred day + {t_h+1}")
# t_h 가 0 이면, 각 예측 윈도우 중에서 첫번째 값들을 사용 // 2 이면 각 예측 윈도우 중에서 3번째 값들을 사용
plt.plot(y_valid[time_h:,0], c='red', alpha=0.4, label='Actual')
plt.ylabel('Daily # of Crimes: Actual vs Predicted')
plt.xlabel('Time')
plt.legend()
plt.show()
'Study > 시계열' 카테고리의 다른 글
7. (Transformer-1편) RNN 과 Attention (1) | 2024.07.24 |
---|---|
6. 시계열 데이터 클러스터링 (DTW, TimeSeriesKMeans) (0) | 2024.07.09 |
4. Prophet 패키지를 이용한 간단 시계열 예측 (0) | 2024.07.08 |
3. 딥러닝을 이용한 시계열 예측 (TensorFlow, LSTM) (1) | 2024.07.08 |
2. 시계열 분석을 위한 확률모형 (0) | 2024.06.26 |
댓글