見出し画像

第9章 ボラティリティ予測と統計的裁定取引 第6節: 共和分検定とカルマンフィルターを利用したペアトレーディング

この記事では、タイトルの通り、共和分検定とカルマンフィルターを用いてペアトレーディング出来るような資産を見つけることが目的です。

仮想通貨であれば、アルトコイン同士でペアトレーディングするのも中々面白そうではありますよね。

インポートと設定

import warnings
warnings.filterwarnings('ignore')
from collections import Counter

from time import time
from pathlib import Path

import numpy as np
import pandas as pd

from pykalman import KalmanFilter
from statsmodels.tsa.stattools import coint
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from statsmodels.tsa.api import VAR

import matplotlib.pyplot as plt
import seaborn as sns
idx = pd.IndexSlice
sns.set_style('whitegrid')
def format_time(t):
   m_, s = divmod(t, 60)
   h, m = divmod(m_, 60)
   return f'{h:>02.0f}:{m:>02.0f}:{s:>02.0f}'

ヨハンセン検定 異常値定義

critical_values = {0: {.9: 13.4294, .95: 15.4943, .99: 19.9349},
                  1: {.9: 2.7055, .95: 3.8415, .99: 6.6349}}
trace0_cv = critical_values[0][.95] # critical value for 0 cointegration relationships
trace1_cv = critical_values[1][.95] # critical value for 1 cointegration relationship

データの読み込み

DATA_PATH = Path('..', 'data') 
STORE = DATA_PATH / 'assets.h5'

バックテスト用の価格取得

関連株とETFのOHLCVデータを結合

def get_backtest_prices():
   with pd.HDFStore('data.h5') as store:
       tickers = store['tickers']

   with pd.HDFStore(STORE) as store:
       prices = (pd.concat([
           store['stooq/us/nyse/stocks/prices'],
           store['stooq/us/nyse/etfs/prices'],
           store['stooq/us/nasdaq/etfs/prices'],
           store['stooq/us/nasdaq/stocks/prices']])
                 .sort_index()
                 .loc[idx[tickers.index, '2016':'2019'], :])
   print(prices.info(null_counts=True))
   prices.to_hdf('backtest.h5', 'prices')
   tickers.to_hdf('backtest.h5', 'tickers')
get_backtest_prices()
'''
<class 'pandas.core.frame.DataFrame'>
MultiIndex: 313869 entries, ('AA.US', Timestamp('2016-01-04 00:00:00')) to ('YUM.US', Timestamp('2019-12-31 00:00:00'))
Data columns (total 5 columns):
#   Column  Non-Null Count   Dtype  
---  ------  --------------   -----  
0   open    313869 non-null  float64
1   high    313869 non-null  float64
2   low     313869 non-null  float64
3   close   313869 non-null  float64
4   volume  313869 non-null  int64  
dtypes: float64(4), int64(1)
memory usage: 13.3+ MB
None
'''

株価の読み込み

stocks = pd.read_hdf('data.h5', 'stocks/close').loc['2015':]
stocks.info()
'''
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 1258 entries, 2015-01-02 to 2019-12-31
Columns: 173 entries, AA.US to YUM.US
dtypes: float64(173)
memory usage: 1.7 MB
'''

ETFデータの読み込み

etfs = pd.read_hdf('data.h5', 'etfs/close').loc['2015':]
etfs.info()
'''
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 1258 entries, 2015-01-02 to 2019-12-31
Columns: 139 entries, AAXJ.US to YCS.US
dtypes: float64(139)
memory usage: 1.3 MB
'''

ティッカー辞書の読み込み

names = pd.read_hdf('data.h5', 'tickers').to_dict()
pd.Series(names).count()
'''
312
'''

共和分検定の計算準備

def test_cointegration(etfs, stocks, test_end, lookback=2):
   start = time()
   results = []
   test_start = test_end - pd.DateOffset(years=lookback) + pd.DateOffset(days=1)
   etf_tickers = etfs.columns.tolist()
   etf_data = etfs.loc[str(test_start):str(test_end)]

   stock_tickers = stocks.columns.tolist()
   stock_data = stocks.loc[str(test_start):str(test_end)]
   n = len(etf_tickers) * len(stock_tickers)
   j = 0
   for i, s1 in enumerate(etf_tickers, 1):
       for s2 in stock_tickers:
           j += 1
           if j % 1000 == 0:
               print(f'\t{j:5,.0f} ({j/n:3.1%}) | {time() - start:.2f}')
           df = etf_data.loc[:, [s1]].dropna().join(stock_data.loc[:, [s2]].dropna(), how='inner')
           with warnings.catch_warnings():
               warnings.simplefilter('ignore')
               var = VAR(df)
               lags = var.select_order()
               result = [test_end, s1, s2]
               order = lags.selected_orders['aic']
               result += [coint(df[s1], df[s2], trend='c')[1], coint(df[s2], df[s1], trend='c')[1]]

           cj = coint_johansen(df, det_order=0, k_ar_diff=order)
           result += (list(cj.lr1) + list(cj.lr2) + list(cj.evec[:, cj.ind[0]]))
           results.append(result)
   return results

テスト期間の設定

dates = stocks.loc['2016-12':'2019-6'].resample('Q').last().index

検定の実行

ここは恐ろしく時間がかかるので、少ない銘柄で試すことをお勧めします。

test_results = []
columns = ['test_end', 's1', 's2', 'eg1', 'eg2',
          'trace0', 'trace1', 'eig0', 'eig1', 'w1', 'w2']

for test_end in dates:
   print(test_end)
   result = test_cointegration(etfs, stocks, test_end=test_end)
   test_results.append(pd.DataFrame(result, columns=columns))

pd.concat(test_results).to_hdf('backtest.h5', 'cointegration_test')

検定結果の読み込み

test_results = pd.read_hdf('backtest.h5', 'cointegration_test')
test_results.info()
'''
<class 'pandas.core.frame.DataFrame'>
Int64Index: 264517 entries, 0 to 24046
Data columns (total 11 columns):
#   Column    Non-Null Count   Dtype         
---  ------    --------------   -----         
0   test_end  264517 non-null  datetime64[ns]
1   s1        264517 non-null  object        
2   s2        264517 non-null  object        
3   eg1       264517 non-null  float64       
4   eg2       264517 non-null  float64       
5   trace0    264517 non-null  float64       
6   trace1    264517 non-null  float64       
7   eig0      264517 non-null  float64       
8   eig1      264517 non-null  float64       
9   w1        264517 non-null  float64       
10  w2        264517 non-null  float64       
dtypes: datetime64[ns](1), float64(8), object(2)
memory usage: 24.2+ MB
'''

共和分ペアを見つける

ヨハンセン検定

test_results['joh_sig'] = ((test_results.trace0 > trace0_cv) &
                          (test_results.trace1 > trace1_cv))
test_results.joh_sig.value_counts(normalize=True)
'''
False    0.947406
True     0.052594
Name: joh_sig, dtype: float64
'''

大体5%くらいはヒットしています。

Engle Granger検定

test_results['eg'] = test_results[['eg1', 'eg2']].min(axis=1)
test_results['s1_dep'] = test_results.eg1 < test_results.eg2
test_results['eg_sig'] = (test_results.eg < .05)
test_results.eg_sig.value_counts(normalize=True)
'''
False    0.91128
True     0.08872
Name: eg_sig, dtype: float64
'''

Engle-Granger vs Johansen 比較

ここでは両方の検定でどちらにも引っ掛かったものを取得します。

test_results['coint'] = (test_results.eg_sig & test_results.joh_sig)
test_results.coint.value_counts(normalize=True)
'''
False    0.986829
True     0.013171
Name: coint, dtype: float64
'''
test_results = test_results.drop(['eg1', 'eg2', 'trace0', 'trace1', 'eig0', 'eig1'], axis=1)
test_results.info()
'''
<class 'pandas.core.frame.DataFrame'>
Int64Index: 264517 entries, 0 to 24046
Data columns (total 10 columns):
#   Column    Non-Null Count   Dtype         
---  ------    --------------   -----         
0   test_end  264517 non-null  datetime64[ns]
1   s1        264517 non-null  object        
2   s2        264517 non-null  object        
3   w1        264517 non-null  float64       
4   w2        264517 non-null  float64       
5   joh_sig   264517 non-null  bool          
6   eg        264517 non-null  float64       
7   s1_dep    264517 non-null  bool          
8   eg_sig    264517 non-null  bool          
9   coint     264517 non-null  bool          
dtypes: bool(4), datetime64[ns](1), float64(3), object(2)
memory usage: 15.1+ MB
'''

ペア数の推移

ax = test_results.groupby('test_end').coint.mean().to_frame('# Pairs').plot()
ax.axhline(.05, lw=1, ls='--', c='k');

画像1

候補ペアの選定

def select_candidate_pairs(data):
   candidates = data[data.joh_sig | data.eg_sig]
   candidates['y'] = candidates.apply(lambda x: x.s1 if x.s1_dep else x.s2, axis=1)
   candidates['x'] = candidates.apply(lambda x: x.s2 if x.s1_dep else x.s1, axis=1)
   return candidates.drop(['s1_dep', 's1', 's2'], axis=1)
candidates = select_candidate_pairs(test_results)
candidates.to_hdf('backtest.h5', 'candidates')
candidates = pd.read_hdf('backtest.h5', 'candidates')
candidates.info()
'''
<class 'pandas.core.frame.DataFrame'>
Int64Index: 33896 entries, 7 to 24045
Data columns (total 9 columns):
#   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
0   test_end  33896 non-null  datetime64[ns]
1   w1        33896 non-null  float64       
2   w2        33896 non-null  float64       
3   joh_sig   33896 non-null  bool          
4   eg        33896 non-null  float64       
5   eg_sig    33896 non-null  bool          
6   coint     33896 non-null  bool          
7   y         33896 non-null  object        
8   x         33896 non-null  object        
dtypes: bool(3), datetime64[ns](1), float64(3), object(2)
memory usage: 1.9+ MB
'''

候補数の推移

candidates.groupby('test_end').size().plot(figsize=(8, 5))

画像2

最も共通点が多いペア

時刻によって異常と見なされたペアが異なるので、ここでは、検定に引っ掛かったペアをひたすらカウントして、最もカウントに引っ掛かったペアを出力します。

with pd.HDFStore('data.h5') as store:
   print(store.info())
   tickers = store['tickers']
'''
<class 'pandas.io.pytables.HDFStore'>
File path: data.h5
/etfs/close              frame        (shape->[2516,139])
/stocks/close            frame        (shape->[2516,173])
/tickers                 series       (shape->[1])       
'''
with pd.HDFStore('backtest.h5') as store:
   print(store.info())
'''
<class 'pandas.io.pytables.HDFStore'>
File path: backtest.h5
/candidates                    frame        (shape->[33896,9])  
/cointegration_test            frame        (shape->[264517,11])
/half_lives                    frame        (shape->[33896,4])  
/pair_data                     frame        (shape->[4259004,7])
/pair_trades                   frame        (shape->[1,6])      
/prices                        frame        (shape->[313869,5]) 
/tickers                       series       (shape->[1])        
'''
counter = Counter()
for s1, s2 in zip(candidates[candidates.joh_sig & candidates.eg_sig].y, 
                 candidates[candidates.joh_sig & candidates.eg_sig].x):
   if s1 > s2:
       counter[(s2, s1)] += 1
   else: 
       counter[(s1, s2)] += 1
most_common_pairs = pd.DataFrame(counter.most_common(10))
most_common_pairs = pd.DataFrame(most_common_pairs[0].values.tolist(), columns=['s1', 's2'])
most_common_pairs

画像3

ペアトレの候補が出てきましたね(*'ω'*)

with pd.HDFStore('backtest.h5') as store:
   prices = store['prices'].close.unstack('ticker').ffill(limit=5)
   tickers = store['tickers'].to_dict()
cnt = pd.Series(counter).reset_index()
cnt.columns = ['s1', 's2', 'n']
cnt['name1'] = cnt.s1.map(tickers)
cnt['name2'] = cnt.s2.map(tickers)
cnt.nlargest(10, columns='n')


画像4

炙り出された似ているペアを確認

fig, axes = plt.subplots(ncols=2, figsize=(14, 5))
for i in [0, 1]:
   s1, s2 = most_common_pairs.at[i, 's1'], most_common_pairs.at[i, 's2']
   prices.loc[:, [s1, s2]].rename(columns=tickers).plot(secondary_y=tickers[s2],
                                                        ax=axes[i],
                                                        rot=0)
   axes[i].grid(False)
   axes[i].set_xlabel('')

sns.despine()
fig.tight_layout()

画像5

さて、ペアが見つかったら、いよいよ、トレーディング戦略の方に移ります。

カルマンフィルターによるスムージング

def KFSmoother(prices):
   """Estimate rolling mean"""
   
   kf = KalmanFilter(transition_matrices=np.eye(1),
                     observation_matrices=np.eye(1),
                     initial_state_mean=0,
                     initial_state_covariance=1,
                     observation_covariance=1,
                     transition_covariance=.05)

   state_means, _ = kf.filter(prices.values)
   return pd.Series(state_means.flatten(),
                    index=prices.index)
smoothed_prices = prices.apply(KFSmoother)
smoothed_prices.to_hdf('tmp.h5', 'smoothed')
smoothed_prices = pd.read_hdf('tmp.h5', 'smoothed')

ローリングヘッジレシオをカルマンフィルターを用いて計算する

def KFHedgeRatio(x, y):
   """Estimate Hedge Ratio"""
   delta = 1e-3
   trans_cov = delta / (1 - delta) * np.eye(2)
   obs_mat = np.expand_dims(np.vstack([[x], [np.ones(len(x))]]).T, axis=1)

   kf = KalmanFilter(n_dim_obs=1, n_dim_state=2,
                     initial_state_mean=[0, 0],
                     initial_state_covariance=np.ones((2, 2)),
                     transition_matrices=np.eye(2),
                     observation_matrices=obs_mat,
                     observation_covariance=2,
                     transition_covariance=trans_cov)

   state_means, _ = kf.filter(y.values)
   return -state_means

平均回帰半減期の推定

def estimate_half_life(spread):
   X = spread.shift().iloc[1:].to_frame().assign(const=1)
   y = spread.diff().iloc[1:]
   beta = (np.linalg.inv(X.T @ X) @ X.T @ y).iloc[0]
   halflife = int(round(-np.log(2) / beta, 0))
   return max(halflife, 1)

スプレッドとボリンジャーバンドの計算

def get_spread(candidates, prices):
   pairs = []
   half_lives = []

   periods = pd.DatetimeIndex(sorted(candidates.test_end.unique()))
   start = time()
   for p, test_end in enumerate(periods, 1):
       start_iteration = time()

       period_candidates = candidates.loc[candidates.test_end == test_end, ['y', 'x']]
       trading_start = test_end + pd.DateOffset(days=1)
       t = trading_start - pd.DateOffset(years=2)
       T = trading_start + pd.DateOffset(months=6) - pd.DateOffset(days=1)
       max_window = len(prices.loc[t: test_end].index)
       print(test_end.date(), len(period_candidates))
       for i, (y, x) in enumerate(zip(period_candidates.y, period_candidates.x), 1):
           if i % 1000 == 0:
               msg = f'{i:5.0f} | {time() - start_iteration:7.1f} | {time() - start:10.1f}'
               print(msg)
           pair = prices.loc[t: T, [y, x]]
           pair['hedge_ratio'] = KFHedgeRatio(y=KFSmoother(prices.loc[t: T, y]),
                                              x=KFSmoother(prices.loc[t: T, x]))[:, 0]
           pair['spread'] = pair[y].add(pair[x].mul(pair.hedge_ratio))
           half_life = estimate_half_life(pair.spread.loc[t: test_end])                

           spread = pair.spread.rolling(window=min(2 * half_life, max_window))
           pair['z_score'] = pair.spread.sub(spread.mean()).div(spread.std())
           pairs.append(pair.loc[trading_start: T].assign(s1=y, s2=x, period=p, pair=i).drop([x, y], axis=1))

           half_lives.append([test_end, y, x, half_life])
   return pairs, half_lives
candidates = pd.read_hdf('backtest.h5', 'candidates')
candidates.info()
'''
<class 'pandas.core.frame.DataFrame'>
Int64Index: 33896 entries, 7 to 24045
Data columns (total 9 columns):
#   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
0   test_end  33896 non-null  datetime64[ns]
1   w1        33896 non-null  float64       
2   w2        33896 non-null  float64       
3   joh_sig   33896 non-null  bool          
4   eg        33896 non-null  float64       
5   eg_sig    33896 non-null  bool          
6   coint     33896 non-null  bool          
7   y         33896 non-null  object        
8   x         33896 non-null  object        
dtypes: bool(3), datetime64[ns](1), float64(3), object(2)
memory usage: 1.9+ MB
'''
pairs, half_lives = get_spread(candidates, smoothed_prices)

結果集計

半減期

hl = pd.DataFrame(half_lives, columns=['test_end', 's1', 's2', 'half_life'])
hl.info()
'''
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 33896 entries, 0 to 33895
Data columns (total 4 columns):
#   Column     Non-Null Count  Dtype         
---  ------     --------------  -----         
0   test_end   33896 non-null  datetime64[ns]
1   s1         33896 non-null  object        
2   s2         33896 non-null  object        
3   half_life  33896 non-null  int64         
dtypes: datetime64[ns](1), int64(1), object(2)
memory usage: 1.0+ MB
'''
hl.half_life.describe()
'''
count    33896.000000
mean        25.167837
std         10.235579
min          1.000000
25%         20.000000
50%         24.000000
75%         28.000000
max       1057.000000
Name: half_life, dtype: float64
'''
hl.to_hdf('backtest.h5', 'half_lives')

ペアデータ

pair_data = pd.concat(pairs)
pair_data.info(null_counts=True)
'''
pair_data = pd.concat(pairs)

pair_data.info(null_counts=True)

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 4259004 entries, 2017-01-03 to 2019-12-31
Data columns (total 7 columns):
#   Column       Non-Null Count    Dtype  
---  ------       --------------    -----  
0   hedge_ratio  4259004 non-null  float64
1   spread       4259004 non-null  float64
2   z_score      4259004 non-null  float64
3   s1           4259004 non-null  object 
4   s2           4259004 non-null  object 
5   period       4259004 non-null  int64  
6   pair         4259004 non-null  int64  
dtypes: float64(3), int64(2), object(2)
memory usage: 259.9+ MB
'''
pair_data.to_hdf('backtest.h5', 'pair_data')
pair_data = pd.read_hdf('backtest.h5', 'pair_data')

ロングショートのエントリーとエグジットの日付を見つける

def get_trades(data):
   pair_trades = []
   for i, ((period, s1, s2), pair) in enumerate(data.groupby(['period', 's1', 's2']), 1):
       if i % 100 == 0:
           print(i)

       first3m = pair.first('3M').index
       last3m = pair.last('3M').index

       entry = pair.z_score.abs() > 2
       entry = ((entry.shift() != entry)
                .mul(np.sign(pair.z_score))
                .fillna(0)
                .astype(int)
                .sub(2))

       exit = (np.sign(pair.z_score.shift().fillna(method='bfill'))
               != np.sign(pair.z_score)).astype(int) - 1

       trades = (entry[entry != -2].append(exit[exit == 0])
                 .to_frame('side')
                 .sort_values(['date', 'side'])
                 .squeeze())
       if not isinstance(trades, pd.Series):
           continue
       try:
           trades.loc[trades < 0] += 2
       except:
           print(type(trades))
           print(trades)
           print(pair.z_score.describe())
           break

       trades = trades[trades.abs().shift() != trades.abs()]
       window = trades.loc[first3m.min():first3m.max()]
       extra = trades.loc[last3m.min():last3m.max()]
       n = len(trades)

       if window.iloc[0] == 0:
           if n > 1:
               print('shift')
               window = window.iloc[1:]
       if window.iloc[-1] != 0:
           extra_exits = extra[extra == 0].head(1)
           if extra_exits.empty:
               continue
           else:
               window = window.append(extra_exits)

       trades = pair[['s1', 's2', 'hedge_ratio', 'period', 'pair']].join(window.to_frame('side'), how='right')
       trades.loc[trades.side == 0, 'hedge_ratio'] = np.nan
       trades.hedge_ratio = trades.hedge_ratio.ffill()
       pair_trades.append(trades)
   return pair_trades
pair_trades = get_trades(pair_data)
pair_trade_data = pd.concat(pair_trades)
pair_trade_data.info()
'''
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 135490 entries, 2017-01-03 to 2019-10-04
Data columns (total 6 columns):
#   Column       Non-Null Count   Dtype  
---  ------       --------------   -----  
0   s1           135490 non-null  object 
1   s2           135490 non-null  object 
2   hedge_ratio  135490 non-null  float64
3   period       135490 non-null  int64  
4   pair         135490 non-null  int64  
5   side         135490 non-null  int64  
dtypes: float64(1), int64(3), object(2)
memory usage: 7.2+ MB
'''
pair_trade_data.head()

画像6

trades = pair_trade_data['side'].copy()
trades.loc[trades != 0] = 1
trades.loc[trades == 0] = -1
trades.sort_index().cumsum().plot(figsize=(14, 4))
sns.despine()

画像7

pair_trade_data.to_hdf('backtest.h5', 'pair_trades')











この記事が気に入ったらサポートをしてみませんか?