Kerasを用いたディープラーニング(LSTM)によるS&P500種指数の将来予想

   資産運用を行う場合、その投資スタイルは千差万別ですが、多くの投資家がどの商品を、どの割合で、どのくらいの期間運用しようかと悩むことと思います。また、その選択により将来どのくらい収益が上がるのかが気になるところです。その予測の方法も様々かと思いますが、今回は機械学習手法の一つである、Kerasを用いたディープラーニング(LSTM)により、その収益予想をしてみたいと思います。インデックスとして非常にポピュラーであるS&P500種指数に連動する商品に、ある一定期間運用したと仮定し、その投資収益率の範囲を予測したいと思います。実際に投資をしたときにどのくらい儲かるかを知りたいため、指数そのものではなく、収益率を採用しています。また、収益率は期間ごとにある一定の範囲(レンジ)を定め、その範囲をラベル化します。期間は幅を持たせるため、当日内から1年後までとしています。

 各期間後におけるS&P500種指数の投資収益率の予想レンジ一覧表

画像1

 期間は1day(当日内の市場のオープンからクローズまで)から1年後まで7つの期間で予想をしています。また、各期間ごとの予想レンジにラベル(正解ラベル)をつけています。1day、next_day、5daysでは騰落率ごとに5つのラベルを用意し、以降の期間は7つのラベルを用意しています。

ライブラリなどのimport 

 S&P500種指数はyahoo financeのヒストリカルデータを使用します。

!pip install yfinance
!pip install GPy
!pip install gpyopt
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import math
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from keras.callbacks import EarlyStopping
import datetime as dt
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score 
import GPy
import GPyOpt

入力データ・正解ラベルを作成する関数を定義

def create_dataset(dataset, predict_range,look_back):
   data_X, data_Y = [], []
   if predict_range == "1day_open_to_close": 
       i_start = look_back
   elif predict_range =="next_day_colse":
       i_start = look_back+1
   elif predict_range == "5days":
       i_start = look_back+4
   elif predict_range == "1month":
       i_start = look_back+19
   elif predict_range == "3months":
       i_start = look_back+59
   elif predict_range == "6months":
       i_start = look_back+119
   elif predict_range == "1year":
       i_start = look_back+239
   for i in range(i_start, len(dataset)):   
       if predict_range == "1day_open_to_close":
           temp_X = (dataset[i-look_back:i, 3]/dataset[i-look_back:i, 0])-1  #3 ,close price  ,  0,open price
           temp_Y = (dataset[i, 3]/dataset[i, 0])-1
           if temp_Y < -0.02:
               temp_Y_arr = np.array([1,0,0,0,0])
           elif (temp_Y >= -0.02)&(temp_Y < -0.005):
               temp_Y_arr = np.array([0,1,0,0,0])
           elif (temp_Y >= -0.005)&(temp_Y <= 0.005):
               temp_Y_arr = np.array([0,0,1,0,0])
           elif (temp_Y > 0.005)&(temp_Y <= 0.02):
               temp_Y_arr = np.array([0,0,0,1,0])
           elif temp_Y > 0.02:
               temp_Y_arr = np.array([0,0,0,0,1])
       elif predict_range == "next_day_colse":
           temp_X = (dataset[i-look_back:i, 3]/dataset[i-look_back-1:i-1, 3])-1
           temp_Y = (dataset[i, 3]/dataset[i-1, 0])-1
           if temp_Y < -0.03:
               temp_Y_arr = np.array([1,0,0,0,0])
           elif (temp_Y >= -0.03)&(temp_Y < -0.01):
               temp_Y_arr = np.array([0,1,0,0,0])
           elif (temp_Y >= -0.01)&(temp_Y <= 0.01):
               temp_Y_arr = np.array([0,0,1,0,0])
           elif (temp_Y > 0.01)&(temp_Y <= 0.03):
               temp_Y_arr = np.array([0,0,0,1,0])
           elif temp_Y > 0.03:
               temp_Y_arr = np.array([0,0,0,0,1])
       elif predict_range == "5days":
           temp_X = (dataset[i-look_back:i, 3]/dataset[i-look_back-4:i-4, 3])-1
           temp_Y = (dataset[i, 3]/dataset[i-4, 3])-1
           if temp_Y < -0.05:
               temp_Y_arr = np.array([1,0,0,0,0])
           elif (temp_Y >= -0.05)&(temp_Y < -0.03):
               temp_Y_arr = np.array([0,1,0,0,0])
           elif (temp_Y >= -0.03)&(temp_Y <= 0.03):
               temp_Y_arr = np.array([0,0,1,0,0])
           elif (temp_Y > 0.03)&(temp_Y <= 0.05):
               temp_Y_arr = np.array([0,0,0,1,0])
           elif temp_Y > 0.05:
               temp_Y_arr = np.array([0,0,0,0,1])
       elif predict_range == "1month":
           temp_X = (dataset[i-look_back:i, 3]/dataset[i-look_back-19:i-19, 3])-1
           temp_Y = (dataset[i, 3]/dataset[i-19, 3])-1
           if temp_Y < -0.10:
               temp_Y_arr = np.array([1,0,0,0,0,0,0])
           elif (temp_Y >= -0.10)&(temp_Y < -0.05):
               temp_Y_arr = np.array([0,1,0,0,0,0,0])
           elif (temp_Y >= -0.05)&(temp_Y < -0.03):
               temp_Y_arr = np.array([0,0,1,0,0,0,0])
           elif (temp_Y >= -0.03)&(temp_Y <= 0.03):
               temp_Y_arr = np.array([0,0,0,1,0,0,0])
           elif (temp_Y > 0.03)&(temp_Y <= 0.05):
               temp_Y_arr = np.array([0,0,0,0,1,0,0])
           elif (temp_Y > 0.05)&(temp_Y <= 0.10):
               temp_Y_arr = np.array([0,0,0,0,0,1,0])
           elif temp_Y > 0.10:
               temp_Y_arr = np.array([0,0,0,0,0,0,1])
       elif predict_range == "3months":
           temp_X = (dataset[i-look_back:i, 3]/dataset[i-look_back-59:i-59, 3])-1
           temp_Y = (dataset[i, 3]/dataset[i-59, 3])-1
           if temp_Y < -0.15:
               temp_Y_arr = np.array([1,0,0,0,0,0,0])
           elif (temp_Y >= -0.15)&(temp_Y < -0.10):
               temp_Y_arr = np.array([0,1,0,0,0,0,0])
           elif (temp_Y >= -0.10)&(temp_Y < -0.05):
               temp_Y_arr = np.array([0,0,1,0,0,0,0])
           elif (temp_Y >= -0.05)&(temp_Y <= 0.05):
               temp_Y_arr = np.array([0,0,0,1,0,0,0])
           elif (temp_Y > 0.05)&(temp_Y <= 0.10):
               temp_Y_arr = np.array([0,0,0,0,1,0,0])
           elif (temp_Y > 0.10)&(temp_Y <= 0.15):
               temp_Y_arr = np.array([0,0,0,0,0,1,0])
           elif temp_Y > 0.15:
               temp_Y_arr = np.array([0,0,0,0,0,0,1])
       elif predict_range == "6months":
           temp_X = (dataset[i-look_back:i, 3]/dataset[i-look_back-119:i-119, 3])-1
           temp_Y = (dataset[i, 3]/dataset[i-119, 3])-1
           if temp_Y < -0.15:
               temp_Y_arr = np.array([1,0,0,0,0,0,0])
           elif (temp_Y >= -0.15)&(temp_Y < -0.10):
               temp_Y_arr = np.array([0,1,0,0,0,0,0])
           elif (temp_Y >= -0.10)&(temp_Y < -0.05):
               temp_Y_arr = np.array([0,0,1,0,0,0,0])
           elif (temp_Y >= -0.05)&(temp_Y <= 0.05):
               temp_Y_arr = np.array([0,0,0,1,0,0,0])
           elif (temp_Y > 0.05)&(temp_Y <= 0.10):
               temp_Y_arr = np.array([0,0,0,0,1,0,0])
           elif (temp_Y > 0.10)&(temp_Y <= 0.15):
               temp_Y_arr = np.array([0,0,0,0,0,1,0])
           elif temp_Y > 0.15:
               temp_Y_arr = np.array([0,0,0,0,0,0,1])
       elif predict_range == "1year":
           temp_X = (dataset[i-look_back:i, 3]/dataset[i-look_back-239:i-239, 3])-1
           temp_Y = (dataset[i, 3]/dataset[i-239, 3])-1
           if temp_Y < -0.15:
               temp_Y_arr = np.array([1,0,0,0,0,0,0])
           elif (temp_Y >= -0.15)&(temp_Y < -0.10):
               temp_Y_arr = np.array([0,1,0,0,0,0,0])
           elif (temp_Y >= -0.10)&(temp_Y < -0.05):
               temp_Y_arr = np.array([0,0,1,0,0,0,0])
           elif (temp_Y >= -0.05)&(temp_Y <= 0.05):
               temp_Y_arr = np.array([0,0,0,1,0,0,0])
           elif (temp_Y > 0.05)&(temp_Y <= 0.10):
               temp_Y_arr = np.array([0,0,0,0,1,0,0])
           elif (temp_Y > 0.10)&(temp_Y <= 0.15):
               temp_Y_arr = np.array([0,0,0,0,0,1,0])
           elif temp_Y > 0.15:
               temp_Y_arr = np.array([0,0,0,0,0,0,1])
       temp_X[temp_X == np.inf] = 0  
       data_X.append(temp_X)
       data_Y.append(temp_Y_arr)
   return np.array(data_X), np.array(data_Y)

KerasによるLSTMモデルの構築

 LSTMモデルの構築においては、層数とノード数を変数化しました。これにより、後のベイズ最適化により、よりよい数値を求められる設計になっています。


def train(X_train,Y_train,X_test,Y_test,lstm_hidden_ratio,lstm_add_layers,dense_hidden_ratio,dense_add_layers,verbose=2):
   # データの整形
   X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
   X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
   # LSTMモデルの作成と訓練
   model = keras.Sequential()
   model.add(layers.LSTM(int(32*lstm_hidden_ratio), return_sequences=True, input_shape=(X_train.shape[1], 1)))
   model.add(keras.layers.Dropout(0.5))
   for _ in range(lstm_add_layers):
       model.add(layers.LSTM(int(32*lstm_hidden_ratio), return_sequences=True))
       model.add(keras.layers.Dropout(0.5))
   model.add(layers.LSTM(int(16*lstm_hidden_ratio)))
   model.add(keras.layers.Dropout(0.5))
   model.add(layers.Dense(int(64*dense_hidden_ratio), activation='relu'))
   model.add(keras.layers.Dropout(0.5))
   for _ in range(dense_add_layers):
       model.add(layers.Dense(int(64*dense_hidden_ratio), activation='relu'))
       model.add(keras.layers.Dropout(0.5))
   model.add(layers.Dense(int(32*dense_hidden_ratio), activation='relu'))
   model.add(keras.layers.Dropout(0.5))
   model.add(layers.Dense(Y_train.shape[1], activation='softmax')) 
   model.compile(loss='categorical_crossentropy', optimizer='adam')
   train_sample_dict={}
   test_sample_dict={}
   Y_train_1d,Y_test_1d= np.argmax(Y_train,axis=1),np.argmax(Y_test,axis=1)
   labels_unique = np.unique(np.concatenate([Y_train_1d,Y_test_1d]))
   for label in labels_unique:
       train_uniques = np.count_nonzero(Y_train_1d==label)
       if train_uniques == 0:
           train_sample_dict[label]=1
       else:
           train_sample_dict[label]=1/train_uniques
       test_uniques = np.count_nonzero(Y_test_1d==label)
       if test_uniques == 0:
           test_sample_dict[label]=1
       else:
           test_sample_dict[label]=1/test_uniques
   train_weight_list,test_weight_list = [],[]
   for label in Y_train_1d:
       train_weight_list.append(train_sample_dict[label])
   for label in Y_test_1d:
       test_weight_list.append(train_sample_dict[label])
   callbacks = [keras.callbacks.EarlyStopping(monitor='val_loss',  min_delta=0,patience=100, verbose=0, mode='auto')]
   history = model.fit(X_train, Y_train, validation_data=(X_test, Y_test,np.array(test_weight_list)), sample_weight=np.array(train_weight_list), epochs=1000, batch_size=512, callbacks=callbacks,verbose=verbose)  
   train_predict = np.argmax(model.predict(X_train),axis=1)    
   test_predict = np.argmax(model.predict(X_test),axis=1)      
   return train_predict, test_predict,history,model
 

ベイズ最適化

 ベイズ最適化では、ガウス過程という回帰モデルを利用して、より良いハイパーパラメータを探索します。まずは、過去のS&P500種指数のヒストリカルデータを抽出します。

SP500 = yf.Ticker("^GSPC")
df_history = SP500.history(period="max")
df_history=df_history[df_history.index>=dt.datetime(1995,1,1)]
print(df_history)

画像4

上記のデータを元にして、ベイズ最適化により、5つ値(look_back , lstm_hidden_ratio, lstm_add_layers, dense_hidden_ratio, dense_add_layers)を各期間ごと求めます。以下のコードでは、next_day_colse における値を求めています。

# 期間別のインデックス(指数)の収益率を目的変数とする
#  市場収益率 = 当期末指数/前期末指数 - 1  
#  期間は 当日内(終値 - 始値), 1日(終値比較) , 5日, 1ヶ月, 3ヶ月, 6ヶ月,1年
predict_range = ["1day_open_to_close","next_day_colse","5days","1month","3months","6months","1year"][1]
look_back_list = [25,50,100,150,200]
# `df_history`をトレーニングデータ、テストデータに分割 #トレーニングデータが後 、テストデータが前(後ろのデータをトレンド重視とするため)
test_size = int(len(df_history) * 0.33)
df_train, df_test = df_history.iloc[test_size:, :].values, df_history.iloc[0:test_size, :].values
# データのスケーリング(正規化)
scaler = MinMaxScaler(feature_range=(0, 1))
scaler_train = scaler.fit(df_train)
arr_train = scaler_train.transform(df_train)
arr_test = scaler_train.transform(df_test)

def opt_func(X):
   look_back,lstm_hidden_ratio,lstm_add_layers,dense_hidden_ratio,dense_add_layers = int(X[:,0]),int(X[:,1]),int(X[:,2]),int(X[:,3]),int(X[:,4])
   # データセットの前処理
   X_train,Y_train = create_dataset(arr_train, predict_range, look_back_list[look_back])
   X_test,Y_test = create_dataset(arr_test, predict_range, look_back_list[look_back])
   train_predict, test_predict, history, model = train(X_train,Y_train,X_test,Y_test,lstm_hidden_ratio,lstm_add_layers,dense_hidden_ratio,dense_add_layers,verbose=0)
   Y_test_1d = np.argmax(Y_test,axis=1)
   #混同行列       
   confmat_test = confusion_matrix(Y_test_1d, test_predict) 
   #F値 
   return np.array([f1_score(Y_test_1d, test_predict, average="macro")]) 

bounds = [{"name":"look_back","type":"discrete","domain":(0,len(look_back_list)-1)},
         {"name":"lstm_hidden_ratio","type":"discrete","domain":(1,3)},
         {"name":"lstm_add_layers","type":"discrete","domain":(0,2)},
         {"name":"dense_hidden_ratio","type":"discrete","domain":(1,3)},
         {"name":"dense_add_layers","type":"discrete","domain":(0,2)}]
myBopt = GPyOpt.methods.BayesianOptimization(f=opt_func, domain=bounds,initial_design_numdata=10,acquisition_type='LCB',normalize_Y=False,maximize=True)
myBopt.run_optimization(max_iter=90) 
print(myBopt.x_opt) #各パラメータの最適値 
print(myBopt.fx_opt) #F値 

画像5

上記の出力結果より、next_day_colseにおける5つの値は、 "look_back":0, 'lstm_hidden_ratio':1, 'lstm_add_layers':0, 'dense_hidden_ratio':1, 'dense_add_layers':0, と求まりました。

全予測期間の推論

SP500 = yf.Ticker("^GSPC")
df_history = SP500.history(period="max")
df_history=df_history[df_history.index>=dt.datetime(1995,1,1)]
print(df_history)
df_history.iloc[-100:,:]["Close"].plot()

S&P500種指数の100営業日分の推移を参考として表示します。

画像3

predict_range = ["1day_open_to_close","next_day_colse","5days","1month","3months","6months","1year"]
pram_dict = {
"1day_open_to_close":{
   "look_back":4,
   'lstm_hidden_ratio':1,
   'lstm_add_layers':0,
   'dense_hidden_ratio':3,
   'dense_add_layers':0,
   "predict_text_list":["2%以上の下落","2%~0.5%の下落","0.5%未満の変動","0.5%~2%の上昇","2%以上の上昇"]
   },
"next_day_colse":{
   "look_back":0,
   'lstm_hidden_ratio':1,
   'lstm_add_layers':0,
   'dense_hidden_ratio':1,
   'dense_add_layers':0,
   "predict_text_list":["3%以上の下落","3%~1%の下落","1%未満の変動","1%~3%の上昇","3%以上の上昇"]
   },
"5days":{
   "look_back":0,
   'lstm_hidden_ratio':3,
   'lstm_add_layers':0,
   'dense_hidden_ratio':3,
   'dense_add_layers':2,
   "predict_text_list":["5%以上の下落","5%~0.3%の下落","3%未満の変動","3%~5%の上昇","5%以上の上昇"]
   },
"1month":{
   "look_back":4,
   'lstm_hidden_ratio':3,
   'lstm_add_layers':0,
   'dense_hidden_ratio':3,
   'dense_add_layers':2,
   "predict_text_list":["10%以上の下落","10%~5%の下落","5%~3%の下落","3%未満の変動","3%~5%の上昇","5%~10%の上昇","10%以上の上昇"]
   },
"3months":{
   "look_back":4,
   'lstm_hidden_ratio':3,
   'lstm_add_layers':0,
   'dense_hidden_ratio':3,
   'dense_add_layers':2,
   "predict_text_list":["15%以上の下落","15%~10%の下落","10%~5%の下落","5%未満の変動","5%~10%の上昇","10%~15%の上昇","15%以上の上昇"]
   },
"6months":{
   "look_back":4,
   'lstm_hidden_ratio':3,
   'lstm_add_layers':0,
   'dense_hidden_ratio':3,
   'dense_add_layers':2,
   "predict_text_list":["15%以上の下落","15%~10%の下落","10%~5%の下落","5%未満の変動","5%~10%の上昇","10%~15%の上昇","15%以上の上昇"]
   },
"1year":{
   "look_back":4,
   'lstm_hidden_ratio':3,
   'lstm_add_layers':0,
   'dense_hidden_ratio':3,
   'dense_add_layers':2,
   "predict_text_list":["15%以上の下落","15%~10%の下落","10%~5%の下落","5%未満の変動","5%~10%の上昇","10%~15%の上昇","15%以上の上昇"]
   }
}
# 期間別のインデックス(指数)の収益率を目的変数とする
#  ①市場収益率 = (当期末指数 - 前期末指数)/ 前期末指数  ②市場収益率 = 当期末指数/前期末指数 - 1  ②の方が計算は簡単
#  期間は 当日内(終値 - 始値), 1日(終値比較) , 5日, 1ヶ月, 3ヶ月, 6ヶ月,1年
predict_range_list = ["1day_open_to_close","next_day_colse","5days","1month","3months","6months","1year"]
look_back_list = [25,50,100,150,200] #リーケジ対策 
# `df_history`をトレーニングデータ、テストデータに分割
test_size = int(len(df_history) * 0.33)
df_train, df_test = df_history.iloc[test_size:, :].values, df_history.iloc[0:test_size, :].values
# データのスケーリング(正規化)
scaler = MinMaxScaler(feature_range=(0, 1))
scaler_train = scaler.fit(df_train)
arr_train = scaler_train.transform(df_train)
arr_test = scaler_train.transform(df_test)
for predict_range in predict_range_list:
   # データセットの前処理
   X_train,Y_train = create_dataset(arr_train, predict_range, look_back_list[pram_dict[predict_range]["look_back"]])
   X_test,Y_test = create_dataset(arr_test, predict_range, look_back_list[pram_dict[predict_range]["look_back"]])
   train_predict, test_predict, history, model = train(X_train,
                                                       Y_train,
                                                       X_test,
                                                       Y_test,
                                                       pram_dict[predict_range]["lstm_hidden_ratio"],
                                                       pram_dict[predict_range]["lstm_add_layers"],
                                                       pram_dict[predict_range]["dense_hidden_ratio"],
                                                       pram_dict[predict_range]["dense_add_layers"],
                                                       verbose=0)
   Y_test_1d = np.argmax(Y_test,axis=1)
   #混同行列       
   confmat_test = confusion_matrix(Y_test_1d, test_predict) #対角線上での数値を上げること 
   #F値 
   fscore = np.array([f1_score(Y_test_1d, test_predict, average="macro")]) #F値をもっとも重視すること 
   #結果の表示 
   print(predict_range+": "+pram_dict[predict_range]["predict_text_list"][test_predict[-1]]+", F値: "+str(fscore[0]))

予想結果

画像6

 投資収益率の予想結果は、当日内(オープンからクローズ)で0.5%~2%の上昇、翌営業日で引け値で1%~3%の上昇、5営業日後で3%~5%の上昇、1ヶ月後で3%~5%の上昇、3ヶ月後で15%以上の上昇、6ヶ月後で15%以上の上昇、1年後で15%以上の上昇となりました。F値は検出精度と検出率の調和平均で求められ、0~1の間の数値で出力され、1に近いほど良い評価となります。

考察

 予想結果は期間を問わず収益率は上昇となっています。また、期間が長いほど収益率は高くなっていると考えられます。これはヒストリカルデータの傾向が上昇基調にあることが影響していると思われます。トレーニングデータは直近までのものを使っているため、もしも直近で大きく下落基調であれば、違った結果が得られる可能性もあると考えられます。

今後の展望

 今回はS&P500種指数に連動する商品に一定期間投資したと仮定した場合の収益率について、LSTMの単系列分析によって計算しました。今後は、為替、金利、コモディティなど複数系列にし、予想を立ててみたいと思います。こうした商品同士の相関性なども考慮し、予想したモデルを構築してみたいと思いました。また、twitterなどの感情分析なども追加できると面白いと思っています。最終的には様々な商品を様々な期間で運用し、相関性なども考慮した、再現性の高い予想ポートフォリオをつくっていきたいと思っています。

この記事が気に入ったらサポートをしてみませんか?