見出し画像

richmanbtcさんのmlbot_tutorialを、超初心者向けに読み解く

MLbot人気の火付け役である、richmanbtcさんのチュートリアル。

この記事を書いている2024年6月現在でも、このチュートリアルをベースにしたMLbotで少額ながら利益が出ています。
私がbot開発をはじめたのは2年前の2022年1月頃で、プログラミングと言えばWEBデザインで使う簡単なJavaScriptくらいしか使ったことがない状態からスタートしました。
当時の自分が戸惑ったことなどを思い出しながら、プログラミングが本職ではない方やプログラミング初心者を対象に読み解いてみます。

この記事は、私個人の解釈によるものです。
内容の正確さについては保証できませんので、あらかじめご了承ください。

環境構築(Windows)

1.Dockerのインストール
まず、Docker Desktop for windowsをインストールします。
詳しいインストール方法については解説サイトがたくさんあるのでここでは割愛します。

2.チュートリアルのダウンロードと、コンテナのビルド。
次に、Githubからmlbot_tutorialをZIPでダウンロードします。


Gitが使える人は普通にクローンすればOKですが、よく分からない人はZIPでダウンロードしましょう。右の方にある「Code」と書かれた緑色のボタンをクリックして、「Download ZIP」からダウンロードできます。

どこでも良いので、ダウンロードした「mlbot_tutorial-master」を解凍します。解凍したフォルダを開いて、docker-compose.ymlがある場所でフォルダパスの入力欄に「cmd」と入力してEnterを押します。

コマンドプロントが開くので、そのまま下記コードをコピペしてEnterを押します。Docker Composeという機能を使ってコンテナをビルドし、バックグラウンドで起動するコマンドです。

docker-compose up -d

コピペするとこんな感じ。

Dockerのコンテナビルドが始まります。
数GBのデータをダウンロードしてくるので、それなりに時間が掛かります。完了するまで気長に待ちましょう。

Dockerについて
Dockerは、コンテナ型の仮想環境を作るアプリです。
このチュートリアルは、「jupyter/datascience-notebook」というJupyter環境のコンテナイメージをベースに、bot開発で必要なpythoの追加パッケージがいくつか定義されています。
Dockerファイルの中身まで読める必要は無く、bot開発用のJupyter環境を作るためにDockerが使われていると理解しておけば大丈夫だと思います。
Dockerを使う事で、VPSなどの本番環境へデプロイしやすくなります。

Jupyter Labについて
Jupyterは、pythonの実行環境です。
コードを書いて実行し、結果を確認しながら開発していくことができる対話型のpython環境です。
Jupyter環境を作るには、直接インストールしたりAnacondaを使ったりと色々な方法がありますが、このチュートリアルでは本番環境へデプロイしやすいDockerが使われています。

ライブラリのインポート

ccxtはチュートリアル内で使われていませんが、様々な取引所のAPI操作を便利にするライブラリです。(※2024年5月現在、GMOコインには対応していません)
GmoFetcherは過去データを取得するライブラリで、richmanbtcさんが作成されたものです。
talibはテクニカル指標を計算するライブラリで、自動売買システムの開発ではよく使われます。
その他は機械学習では割とよく使われるものが多いので個別の説明は割愛します。独自のbotを作りたくなった場合でも、これらのライブラリはほぼ必須と言って良いのでこの部分は丸ごとコピペして使えそうです。

import math

import ccxt
from crypto_data_fetcher.gmo import GmoFetcher
import joblib
import lightgbm as lgb
import matplotlib.pyplot as plt
import numba
import numpy as np
import pandas as pd
from scipy.stats import ttest_1samp
import seaborn as sns
import talib

from sklearn.ensemble import BaggingRegressor
from sklearn.linear_model import RidgeCV
from sklearn.model_selection import cross_val_score, KFold, TimeSeriesSplit


データの準備

GmoFetcherの中では、GMOコインのヒストリカルデータをダウンロードして、15分足のOHLCVデータに変換しています。このヒストリカルデータの中身は約定履歴ですので、GMOコインの全取引が記録されています。これをinterval_secで指定した期間でグルーピングする事でOHLCVの形式に変換しています。
他の時間足やGMOコイン以外でこのチュートリアルを実行したくなったらここを書き換えます。他の取引所だと、OHLCVデータはK-Lineという名前でダウンロードできたりします。K-Lineはローソク足の事です。

memory = joblib.Memory('/tmp/gmo_fetcher_cache', verbose=0)
fetcher = GmoFetcher(memory=memory)

# GMOコインのBTC/JPYレバレッジ取引 ( https://api.coin.z.com/data/trades/BTC_JPY/ )を取得
# 初回ダウンロードは時間がかかる
df = fetcher.fetch_ohlcv(
    market='BTC_JPY', # 市場のシンボルを指定
    interval_sec=15 * 60, # 足の間隔を秒単位で指定。この場合は15分足
)

# 実験に使うデータ期間を限定する
df = df[df.index < pd.to_datetime('2021-04-01 00:00:00Z')]

display(df)
df.to_pickle('df_ohlcv.pkl')

df = df[df.index < pd.to_datetime('2021-04-01 00:00:00Z')]でデータを絞り込んでいますが、このままだと21年4月以前のデータしか使えないので、2021を2024に書き換えるか、まるごとコメントアウトしてしまっても構わないと思います。データの絞り込みを行うタイミングは、モデルを作成する前後でも良いと思います。

pandasのフィルタリング機能
pandasでは、DataFrameの[]内に条件式を書くと、その条件に合うデータのみを取り出すことができます。df.indexにはdatetimeが入っているので、pd.to_datetimeで指定した日時よりも古いデータのみを取り出しています。timestampのフォーマットは取引所によって異なります。引数で指定しているフォーマットと、フィルタリング対象データのフォーマットが一致しないとエラーになるので注意しましょう。

手数料カラムの追加

GMOコインの過去の手数料情報を追加しています。
手数料の推移はOHLCVデータからは読み取ることができないので、いつどの手数料に変わったかを自分で調べて直接追加しなければいけません。特にGMOコインはマイナス手数料(指値取引をすると、手数料を支払うのではなく貰える仕組み)を導入していた時期があり、価格変動に大きな影響を与えていた可能性があるので要注意です。

maker_fee_history = [
    {
        # https://coin.z.com/jp/news/2020/08/6482/
        # 変更時刻が記載されていないが、定期メンテナンス後と仮定
        'changed_at': '2020/08/05 06:00:00Z',
        'maker_fee': -0.00035
    },
    {
        # https://coin.z.com/jp/news/2020/08/6541/
        'changed_at': '2020/09/09 06:00:00Z',
        'maker_fee': -0.00025
    },
    {
        # https://coin.z.com/jp/news/2020/10/6686/
        'changed_at': '2020/11/04 06:00:00Z',
        'maker_fee': 0.0
    },
]

df = pd.read_pickle('df_ohlcv.pkl')

# 初期の手数料
# https://web.archive.org/web/20180930223704/https://coin.z.com/jp/corp/guide/fees/
df['fee'] = 0.0

for config in maker_fee_history:
    df.loc[pd.to_datetime(config['changed_at']) <= df.index, 'fee'] = config['maker_fee']

df['fee'].plot()
plt.title('maker手数料の推移')
plt.show()
    
display(df)
df.to_pickle('df_ohlcv_with_fee.pkl')

matplotlibによるデータのグラフ表示
matplotlibを使うと、簡単にDataFrameのデータをグラフ化して表示することができます。
任意のカラムのデータをplot()でプロットし、plt.show()で表示するというのが基本的な使い方で、プロットしたデータは上書きされるので、plot()とshow()を交互に繰り返す事によって複数のデータをグラフ表示できます。

特徴量エンジニアリング

TA-Libというライブラリを使ってテクニカル指標を作成し、それを特徴量としています。
裁量トレードをしている方にとって馴染みのある指標もあれば、そうじゃないものもあると思います。ひとつひとつの指標の意味は特に考慮せず、作れそうな指標は一通り作成して特徴量としているようです。

オリジナル特徴量の追加
独自の指標を作りたい場合は、ここに追記していきます。
たとえば、曜日によって値動きに傾向がある取引所や通貨ペアを見つけた場合、df['weekday'] = df.index.weekdayの1行を追加して曜日を特徴量とする事で予測精度が上がる可能性があります。df.index.weekdayは、月~日曜日までを表すコード値として0~6の数字が生成されます。このように、整数で分類する値を「カテゴリ特徴量」と言います。
他にも、時間をずらした値を特徴量として使う「ラグ特徴量」などもよく使われます。ラグ特徴量を作るには、shift(n)などを使って前の行の値を取得します。

def calc_features(df):
    open = df['op']
    high = df['hi']
    low = df['lo']
    close = df['cl']
    volume = df['volume']
    
    orig_columns = df.columns

    hilo = (df['hi'] + df['lo']) / 2
    # 価格(hilo または close)を引いた後、価格(close)で割ることで標準化
    df['BBANDS_upperband'], df['BBANDS_middleband'], df['BBANDS_lowerband'] = talib.BBANDS(close, timeperiod=5, nbdevup=2, nbdevdn=2, matype=0)
    df['BBANDS_upperband'] = (df['BBANDS_upperband'] - hilo) / close
    df['BBANDS_middleband'] = (df['BBANDS_middleband'] - hilo) / close
    df['BBANDS_lowerband'] = (df['BBANDS_lowerband'] - hilo) / close
    df['DEMA'] = (talib.DEMA(close, timeperiod=30) - hilo) / close
    df['EMA'] = (talib.EMA(close, timeperiod=30) - hilo) / close
    df['HT_TRENDLINE'] = (talib.HT_TRENDLINE(close) - hilo) / close
    df['KAMA'] = (talib.KAMA(close, timeperiod=30) - hilo) / close
    df['MA'] = (talib.MA(close, timeperiod=30, matype=0) - hilo) / close
    df['MIDPOINT'] = (talib.MIDPOINT(close, timeperiod=14) - hilo) / close
    df['SMA'] = (talib.SMA(close, timeperiod=30) - hilo) / close
    df['T3'] = (talib.T3(close, timeperiod=5, vfactor=0) - hilo) / close
    df['TEMA'] = (talib.TEMA(close, timeperiod=30) - hilo) / close
    df['TRIMA'] = (talib.TRIMA(close, timeperiod=30) - hilo) / close
    df['WMA'] = (talib.WMA(close, timeperiod=30) - hilo) / close
    df['LINEARREG'] = (talib.LINEARREG(close, timeperiod=14) - close) / close
    df['LINEARREG_INTERCEPT'] = (talib.LINEARREG_INTERCEPT(close, timeperiod=14) - close) / close


    # 価格(close)で割ることで標準化
    df['AD'] = talib.AD(high, low, close, volume) / close
    df['ADOSC'] = talib.ADOSC(high, low, close, volume, fastperiod=3, slowperiod=10) / close
    df['APO'] = talib.APO(close, fastperiod=12, slowperiod=26, matype=0) / close
    df['HT_PHASOR_inphase'], df['HT_PHASOR_quadrature'] = talib.HT_PHASOR(close)
    df['HT_PHASOR_inphase'] /= close
    df['HT_PHASOR_quadrature'] /= close
    df['LINEARREG_SLOPE'] = talib.LINEARREG_SLOPE(close, timeperiod=14) / close
    df['MACD_macd'], df['MACD_macdsignal'], df['MACD_macdhist'] = talib.MACD(close, fastperiod=12, slowperiod=26, signalperiod=9)
    df['MACD_macd'] /= close
    df['MACD_macdsignal'] /= close
    df['MACD_macdhist'] /= close
    df['MINUS_DM'] = talib.MINUS_DM(high, low, timeperiod=14) / close
    df['MOM'] = talib.MOM(close, timeperiod=10) / close
    df['OBV'] = talib.OBV(close, volume) / close
    df['PLUS_DM'] = talib.PLUS_DM(high, low, timeperiod=14) / close
    df['STDDEV'] = talib.STDDEV(close, timeperiod=5, nbdev=1) / close
    df['TRANGE'] = talib.TRANGE(high, low, close) / close


    df['ADX'] = talib.ADX(high, low, close, timeperiod=14)
    df['ADXR'] = talib.ADXR(high, low, close, timeperiod=14)
    df['AROON_aroondown'], df['AROON_aroonup'] = talib.AROON(high, low, timeperiod=14)
    df['AROONOSC'] = talib.AROONOSC(high, low, timeperiod=14)
    df['BOP'] = talib.BOP(open, high, low, close)
    df['CCI'] = talib.CCI(high, low, close, timeperiod=14)
    df['DX'] = talib.DX(high, low, close, timeperiod=14)
    # skip MACDEXT MACDFIX たぶん同じなので
    df['MFI'] = talib.MFI(high, low, close, volume, timeperiod=14)
    df['MINUS_DI'] = talib.MINUS_DI(high, low, close, timeperiod=14)
    df['PLUS_DI'] = talib.PLUS_DI(high, low, close, timeperiod=14)
    df['RSI'] = talib.RSI(close, timeperiod=14)
    df['STOCH_slowk'], df['STOCH_slowd'] = talib.STOCH(high, low, close, fastk_period=5, slowk_period=3, slowk_matype=0, slowd_period=3, slowd_matype=0)
    df['STOCHF_fastk'], df['STOCHF_fastd'] = talib.STOCHF(high, low, close, fastk_period=5, fastd_period=3, fastd_matype=0)
    df['STOCHRSI_fastk'], df['STOCHRSI_fastd'] = talib.STOCHRSI(close, timeperiod=14, fastk_period=5, fastd_period=3, fastd_matype=0)
    df['TRIX'] = talib.TRIX(close, timeperiod=30)
    df['ULTOSC'] = talib.ULTOSC(high, low, close, timeperiod1=7, timeperiod2=14, timeperiod3=28)
    df['WILLR'] = talib.WILLR(high, low, close, timeperiod=14)

    df['ATR'] = talib.ATR(high, low, close, timeperiod=14)
    df['NATR'] = talib.NATR(high, low, close, timeperiod=14)

    df['HT_DCPERIOD'] = talib.HT_DCPERIOD(close)
    df['HT_DCPHASE'] = talib.HT_DCPHASE(close)
    df['HT_SINE_sine'], df['HT_SINE_leadsine'] = talib.HT_SINE(close)
    df['HT_TRENDMODE'] = talib.HT_TRENDMODE(close)

    df['BETA'] = talib.BETA(high, low, timeperiod=5)
    df['CORREL'] = talib.CORREL(high, low, timeperiod=30)

    df['LINEARREG_ANGLE'] = talib.LINEARREG_ANGLE(close, timeperiod=14)
    return df

df = pd.read_pickle('df_ohlcv_with_fee.pkl')
df = df.dropna()
df = calc_features(df)
display(df)
df.to_pickle('df_features.pkl')

指標の標準化
closeで割っているのは、BTCの価格変動による学習データへの影響を吸収して標準化するためです。
例えば、BTCが100万円の時期と1000万円の時期とでは、ボリンジャーバンド1σまでの距離に10倍の開きが生じてしまいます。BTCは価格変動が大きいため、学習データに使う指標の値が時期によって大きく変わるのはよくありません。
指標の数値をそのまま扱うとモデル性能の低下につながる可能性があるので、現在の価格(close)に対する割合に置き換える事で学習精度を高めるのが狙いです。

特徴量選択

使用する特徴量のカラム名を定義しています。
ここでは、featuresという名前のリストを作成しています。

features = sorted([
    'ADX',
    'ADXR',
    'APO',
    'AROON_aroondown',
    'AROON_aroonup',
    'AROONOSC',
    'CCI',
    'DX',
    'MACD_macd',
    'MACD_macdsignal',
    'MACD_macdhist',
    'MFI',
#     'MINUS_DI',
#     'MINUS_DM',
    'MOM',
#     'PLUS_DI',
#     'PLUS_DM',
    'RSI',
    'STOCH_slowk',
    'STOCH_slowd',
    'STOCHF_fastk',
#     'STOCHRSI_fastd',
    'ULTOSC',
    'WILLR',
#     'ADOSC',
#     'NATR',
    'HT_DCPERIOD',
    'HT_DCPHASE',
    'HT_PHASOR_inphase',
    'HT_PHASOR_quadrature',
    'HT_TRENDMODE',
    'BETA',
    'LINEARREG',
    'LINEARREG_ANGLE',
    'LINEARREG_INTERCEPT',
    'LINEARREG_SLOPE',
    'STDDEV',
    'BBANDS_upperband',
    'BBANDS_middleband',
    'BBANDS_lowerband',
    'DEMA',
    'EMA',
    'HT_TRENDLINE',
    'KAMA',
    'MA',
    'MIDPOINT',
    'T3',
    'TEMA',
    'TRIMA',
    'WMA',
])

print(features)


目的変数の計算

このタイミングで未来の情報が含まれたデータになりますので、以降の取り扱いには注意が必要です。ここからは、コードを分割して細かく見ていきます。

@numba.njit
def calc_force_entry_price(entry_price=None, lo=None, pips=None):
    y = entry_price.copy()
    y[:] = np.nan
    force_entry_time = entry_price.copy()
    force_entry_time[:] = np.nan
    for i in range(entry_price.size):
        for j in range(i + 1, entry_price.size):
            if round(lo[j] / pips) < round(entry_price[j - 1] / pips):
                y[i] = entry_price[j - 1]
                force_entry_time[i] = j - i
                break
    return y, force_entry_time

@numba.njitは計算を高速化するためのおまじないです。

ここで定義しているcalc_force_entry_priceという関数は、目的変数yを計算するためのもので、未来の情報を取得します。
その時間にエントリーした場合、イグジットの指値が「いつ」「いくら」で約定するのか?を捜索しています。ここで取引の開始から終了までの情報を一気に取得しています。この関数は、目的変数yを取得するのに使われますが、yを取得しているのはここではありません。返り値にyという名前がありますが、この中身はイグジット価格です。
この関数で取得した情報は予測対象であり、未来の情報ですので、絶対に特徴量にしてはいけません。


df = pd.read_pickle('df_features.pkl')

# 呼び値 (取引所、取引ペアごとに異なるので、適切に設定してください)
pips = 1

# ATRで指値距離を計算します
limit_price_dist = df['ATR'] * 0.5
limit_price_dist = np.maximum(1, (limit_price_dist / pips).round().fillna(1)) * pips

# 終値から両側にlimit_price_distだけ離れたところに、買い指値と売り指値を出します
df['buy_price'] = df['cl'] - limit_price_dist
df['sell_price'] = df['cl'] + limit_price_dist

ATRというテクニカル指標を使って、指値を作成しています。
ATRはボラティリティ(価格変動)を図る指標で、どれだけ価格が上下に動くかを予測するためのものです。
上下の振れ幅ですので、0.5を掛けることで現在の価格からの距離を計算し、上下対称の指値を作成しています。
この戦略では、価格変動の上限と下限付近に指値を置いてその間の変動分を取りに行く戦略ですので、「指標が予測した変動幅いっぱいまで動いたら反転する確率が高いだろう」という考えを根拠にしたカウンタートレード戦略です。

カウンタートレードとは
「本来あるべき価格」から一定量乖離したときに、戻ろうとする「反発力」を利益に変える戦略です。下がるのを待ち構えて買うか、上がるのを待ち構えて売る戦略となります。

このチュートリアルでは指値作成(反発力が発生する場所の判定)にATRという指標を使っていますが、他にも指値作成に使える指標はあります。この指値作成の精度を上げることが最も大きな改良ポイントになると思います。
ATRを使うエッジはすでに消失していますが、2024年6月現在でもプラス収支にできる指値作成の方法はまだあります。
詳しくは今後別の記事で書くかもしれません。


# Force Entry Priceの計算
df['buy_fep'], df['buy_fet'] = calc_force_entry_price(
    entry_price=df['buy_price'].values,
    lo=df['lo'].values,
    pips=pips,
)

# calc_force_entry_priceは入力と出力をマイナスにすれば売りに使えます
df['sell_fep'], df['sell_fet'] = calc_force_entry_price(
    entry_price=-df['sell_price'].values,
    lo=-df['hi'].values, # 売りのときは高値
    pips=pips,
)
df['sell_fep'] *= -1

calc_force_entry_priceを使って、そのトレード結果のリターン(目的変数y)を取得しています。
sell_fepの計算時、sell_priceとhiがマイナスになっていますが、これはbuy_priceとloの約定判定のコードを転用するためです。比較対象を両方ともマイナスにする事で、比較演算子を書き換えずに逆の比較ができます。

<buy_fepの計算時>
round(lo[j] / pips) < round(entry_price[j - 1] / pips)
⇒ loがentry_priceを下回ったら約定判定。

<sell_fepの計算時>
round(-hi[j] / pips) < round(-entry_price[j - 1] / pips)
⇒ -hiが-entry_priceを下回ったら約定判定(言い換えると、hiがentry_priceを上回ったら約定判定)

ただし、そのままだとsell_fepがマイナスになってしまうので、最後にsell_fepに-1を掛けて元に戻しています。fepはBTC価格ですので、マイナスになる事はありません。

horizon = 1 # エントリーしてからエグジットを始めるまでの待ち時間 (1以上である必要がある)
fee = df['fee'] # maker手数料

# 指値が約定したかどうか (0, 1)
df['buy_executed'] = ((df['buy_price'] / pips).round() > (df['lo'].shift(-1) / pips).round()).astype('float64')
df['sell_executed'] = ((df['sell_price'] / pips).round() < (df['hi'].shift(-1) / pips).round()).astype('float64')

horizonは、どの時点からイグジット価格を取得するかを設定するための変数です。fepを取得する際のshiftの値に使われているので、例えばhorizon=2にすると、shift(-2)となり、15分×2=30分待ってからイグジット価格を捜索します。この値は通常1のままで良いと思います。

df['buy_executed']df['sell_executed']には、エントリーが約定したかどうかのbool値が入ります。最後にastype('float64')がついていますので、Falseなら0.0、Trueなら1.0が入ります。


# yを計算
df['y_buy'] = np.where(
    df['buy_executed'],
    df['sell_fep'].shift(-horizon) / df['buy_price'] - 1 - 2 * fee,
    0
)
df['y_sell'] = np.where(
    df['sell_executed'],
    -(df['buy_fep'].shift(-horizon) / df['sell_price'] - 1) - 2 * fee,
    0
)

ここで、目的変数yを計算しています。
このyには、手数料を考慮したリターンが金額ではなく割合で入ります。
金額ではなく割合を使う理由は、指標の標準化と同じくBTCの価格変動による影響を吸収するためです。yを金額にしてしまうと、時期によって予測する数値に大きな差が生じてしまうので、モデルの予測精度が落ちる可能性があります。

np.whereは、条件によって値を設定できます。
第一引数が条件式で、第二引数がTrueの場合の値、第三引数がFalseの時の値なので、エントリーの指値が約定した場合はそのトレード結果のリターンを返し、約定しなかった場合はエントリーしていない(発注しても約定していない)ので0が入ります。

このチュートリアルの戦略をそのままbot化する場合、エントリーの指値を出すかどうかは機械学習モデルで判断しますが、ポジションを持っている場合イグジットの指値は毎回出す動作になります。

2 * feeを引いているのは、エントリーとイグジットの手数料を計算しています。往復分の手数料が掛かるため、feeを2倍しています。


# バックテストで利用する取引コストを計算
df['buy_cost'] = np.where(
    df['buy_executed'],
    df['buy_price'] / df['cl'] - 1 + fee,
    0
)
df['sell_cost'] = np.where(
    df['sell_executed'],
    -(df['sell_price'] / df['cl'] - 1) + fee,
    0
)

「取引コスト」を計算しています。
df['buy_price']に入っているのは、df['cl'] - limit_price_distにて算出した指値です。そして、df['cl']は、執行処理が実行されるタイミングの基準価格です。
df['buy_price'] / df['cl'] - 1で両者の比を取ってfeeを加えています。
これが意味するのは、指値を出すタイミングで、現在の価格からどれだけ離れた位置での売買が約定するかを数値にしたものです。
指値の特性上、必ず現在の価格より有利な価格での取引になるため、指値幅がfeeを超えない限りは常に「マイナスの取引コスト」になります。

np.whereを使い、指値が約定したところにだけ取引コストの値が入るようになっていて、バックテストではこの値があるかどうかで約定判定しています。


print('約定確率を可視化。時期によって約定確率が大きく変わると良くない。')
df['buy_executed'].rolling(1000).mean().plot(label='買い')
df['sell_executed'].rolling(1000).mean().plot(label='売り')
plt.title('約定確率の推移')
plt.legend(bbox_to_anchor=(1.05, 1))
plt.show()

print('エグジットまでの時間分布を可視化。長すぎるとロングしているだけとかショートしているだけになるので良くない。')
df['buy_fet'].rolling(1000).mean().plot(label='買い')
df['sell_fet'].rolling(1000).mean().plot(label='売り')
plt.title('エグジットまでの平均時間推移')
plt.legend(bbox_to_anchor=(1.2, 1))
plt.show()

df['buy_fet'].hist(alpha=0.3, label='買い')
df['sell_fet'].hist(alpha=0.3, label='売り')
plt.title('エグジットまでの時間分布')
plt.legend(bbox_to_anchor=(1.2, 1))
plt.show()

print('毎時刻、この執行方法でトレードした場合の累積リターン')
df['y_buy'].cumsum().plot(label='買い')
df['y_sell'].cumsum().plot(label='売り')
plt.title('累積リターン')
plt.legend(bbox_to_anchor=(1.05, 1))
plt.show()

df.to_pickle('df_y.pkl')

最後に、結果を可視化しています。

rolling(1000).mean()は、そのデータの過去1000行分の平均を取っています。
cumsum()は、そのデータの累積和です。その時点までの合計利益の推移をグラフ化したい時はこのように累積和をプロットします。
plt.legendで、凡例を位置指定して表示させています。
histはヒストグラムを作成します。

ここまでのコードは、戦略そのものを評価するためのものです。
その戦略で利益が出るかどうか、機械学習の効果を出せる内容になっているかどうかを評価します。この時点で綺麗な右肩上がりのプラス収支になっている事が理想ですが、そうでなくても機械学習モデルを使って執行判断の精度を上げることにより、利益を出せる場合があります。

ここまでの結果は「バックテスト」ではありません。
機械学習で学習するためのデータを得るのが目的ですので、運用資金や最大ポジション数はまだ考慮されておらず、ポジションを無制限に持てる状態での結果なので、実際にこの通りの取引を行うことは不可能です。


モデルの学習

説明変数xのリストから、目的変数yを予測するためのモデルを学習しています。説明変数xは、TA-Libを使って作られた指標などの特徴量で、目的変数yはリターンです。
コードを分割して見ていきます。

df = pd.read_pickle('df_y.pkl')
df = df.dropna()

# モデル (コメントアウトで他モデルも試してみてください)
# model = RidgeCV(alphas=np.logspace(-7, 7, num=20))
model = lgb.LGBMRegressor(n_jobs=-1, random_state=1)

# アンサンブル (コメントアウトを外して性能を比較してみてください)
# model = BaggingRegressor(model, random_state=1, n_jobs=1)

まず、df.dropna()で、欠損値がある行を除外しています。

欠損値(NaN)
何らかの理由で部分的にデータが無い状態を欠損値と言います。
TA-Libでデータ不足により計算できなかったところは欠損値となります。たとえば、30移動平均を計算するには過去30行文の過去データが必要ですが、先頭から30行目までは過去データが足りないため結果計算できず、欠損値となります。モデル作成では欠損値があるとエラーになるので、最初に除外しておく必要があります。

modelという名前で、回帰モデルを作成しています。
RidgeCVはScikit-learnの回帰モデルですが、コメントアウトされていて、ここでは使われていません。
LGBMの学習方法は「Training API」と「Scikit-learn API」の2つあるのですが、ここではScikit-learn APIが使われています。Scikit-learnと同じ書き方でLGBMを使えるので、modelの中身を入れ替えるだけで同じコードで学習できます。


# 本番用モデルの学習 (このチュートリアルでは使わない)
# 実稼働する用のモデルはデータ全体で学習させると良い
model.fit(df[features], df['y_buy'])
joblib.dump(model, 'model_y_buy.xz', compress=True) 
model.fit(df[features], df['y_sell'])
joblib.dump(model, 'model_y_sell.xz', compress=True)

model.fit()で学習を実行しています。
第一引数は説明変数xのリスト(特徴量リスト)、第二引数が目的変数yです。
joblib.dump()で学習したモデルを保存しています。引数のcompressは、圧縮するかどうかを指定しています。保存するモデルの拡張子はxzになっていますが、この拡張子は何でも構いません。

y_buyを予測するためのモデルと、y_sellを予測するためのモデルの2つを作成しています。

モデルを2つに分ける理由
成行(Take)と指値(Make)では、選択できる行動パターンが異なるため、指値の執行判断を行うために2つのモデルが必要になります。

成行の行動パターンは以下3択です。
①買う
②売る
③なにもしない

成行の場合は、1つのモデルでリターン予測を行い、◯以上なら買い、◯以下なら売り、その中間ならなにもしない…といった感じでしきい値を決めて執行判断することができます。

指値の行動パターンは以下4択です。
①買い指値を出す
②売り指値を出す
③買い指値と売り指値を出す
④なにもしない


成行とは違い、買いと売りを同時に実行しても利益が出るパターンが存在します。そこで、下記のように2つのモデルを用いて、2×2の組み合わせで合計4択の執行判断を行います。

buyモデルで以下2択を判断します。
①買い指値を出す
②買い指値を出さない(なにもしない)

sellモデルで以下2択を判断します。
③売り指値を出す
④売り指値を出さない(なにもしない)


# 通常のCV
cv_indicies = list(KFold().split(df))
# ウォークフォワード法
# cv_indicies = list(TimeSeriesSplit().split(df))

CV(クロスバリデーション)は、学習データと検証データを分割してローテーションさせて汎化性能を測る方法です。
たとえば、期間をA、B、C、D・Eに分割し、A+C+D+Eで学習してBで検証、A+B+C+Eで学習してDで検証といった感じでデータをローテーションさせ、どの期間で学習・検証しても一定の性能が出るか確認します。

cv_indicies = list(KFold().split(df))のところで、KFold.splitを使ってデータを分割しています。このcv_indiciesには、学習用と検証用それぞれのインデックスのリストが入ります。

# OOS予測値を計算
def my_cross_val_predict(estimator, X, y=None, cv=None):
    y_pred = y.copy()
    y_pred[:] = np.nan
    for train_idx, val_idx in cv:
        estimator.fit(X[train_idx], y[train_idx])
        y_pred[val_idx] = estimator.predict(X[val_idx])
    return y_pred

df['y_pred_buy'] = my_cross_val_predict(model, df[features].values, df['y_buy'].values, cv=cv_indicies)
df['y_pred_sell'] = my_cross_val_predict(model, df[features].values, df['y_sell'].values, cv=cv_indicies)

my_cross_val_predictという名前の関数を定義しています。
関数の中身を見ていくと、まず予測結果を格納するy_predというリストを用意し、値をnanで初期化しています。yのリストをコピーして初期化することで、yと同じ要素数の箱を用意してます。

for文で、cvの中身を取り出しながらループします。
cvの中身は、クロスバリデーション用に分割した学習用と検証用のインデックスのリストです。
estimator.fit(X[train_idx], y[train_idx])で、train_idxを対象に学習したモデルを作成しています。
y_pred[val_idx] = estimator.predict(X[val_idx])でval_idxを対象に予測した結果をy_predに入れています。
このように、train_idxとval_idxはCVで分割した学習用と検証用のインデックスのリストがペアで入っているため、これをループする事で分割した期間で学習と検証を繰り返すことができます。

チュートリアルではOOSの改良ポイントとしてパージに触れています。
「KFoldやTimeSeriesSplitで分割すると、 テストデータに学習データの情報が混入する可能性」について考えてみます。
たとえば期間をA・B・C・D・Eに分割し、A・C・D・Eを学習データにして、Bで検証を行うとします。df['MA']で30移動平均を特徴量としている場合は、TA-Libはrolling(30).mean()と同じような方法で移動平均を算出しています。
これは直前30行分の情報を取得して計算しているわけですが、学習データCの1行目の30移動平均は、検証データであるBの末行から30行分のデータを元に計算された値になってしまいます。
間接的ではありますが、検証用データの一部が学習データに含まれてしまっている状態は実運用ではあり得ないため、データの前後を切り取ることで未来のデータで学習してしまうのを防ぐことができます。

例として移動平均をあげましたが、オリジナルの特徴量としてラグ特徴量を使っていた場合は直接的に未来の情報が含まれてしまう可能性があるので要注意です。

簡単にパージする方法
検証データが混入する可能性のある期間だけ学習データの前後を切り捨てれば良いので、下記のようなコードを追加します。
train_idx = train_idx[59:-60]

# OOS予測値を計算(パージあり例)
def my_cross_val_predict(estimator, X, y=None, cv=None):
    y_pred = y.copy()
    y_pred[:] = np.nan
    for train_idx, val_idx in cv:
        
        train_idx = train_idx[59:-60]
        
        estimator.fit(X[train_idx], y[train_idx])
        y_pred[val_idx] = estimator.predict(X[val_idx])
    return y_pred

リストのスライス
リストの[]内に開始インデックスとコロンと終了インデックスを書くと、指定した範囲のリストを切り出すことができ、この機能をスライスと言います。
[59:-60]は、先頭から59番目~末尾から60番目のデータを抜き出しています。データの前後60件を削除しています。

開始インデックスと終了インデックスは省略することができるので、スライスは「コロンがある場所を抜き出す機能」と覚えておくと便利です。
たとえば、[5:]だと5番目以降を抜き出せますし、[:5]なら5番目以前を抜き出せます。マイナス表記は末尾から数えます。[:-5]にすると、末尾から5番目以前のデータを抜き出せます。

バックテスト

バックテストでは、モデル作成のために計算したyは用いません。yはエントリーからイグジットまでの過程を飛ばして結果だけを記録しているため、バックテストには不向きです。損切や利確など、取引の途中過程で仕込みたい執行判断を追加することができないからです。

成行きの場合は行ごとの価格差×ポジションを計算すればリターンを計算できますが、指値の場合は指値距離を考慮しなければいけません。そこで、このチュートリアルではリターンを3つに分解して計算しています。
・イグジット指値分のリターン(終値から指値まで)
・エントリー指値分のリターン(終値から指値まで)
・終値の価格変動分のリターン

@numba.njit
def backtest(cl=None, hi=None, lo=None, pips=None,
              buy_entry=None, sell_entry=None,
              buy_cost=None, sell_cost=None
            ):
    n = cl.size
    y = cl.copy() * 0.0
    poss = cl.copy() * 0.0
    ret = 0.0
    pos = 0.0
    for i in range(n):
        prev_pos = pos
        
        # exit
        if buy_cost[i]:
            vol = np.maximum(0, -prev_pos)
            ret -= buy_cost[i] * vol
            pos += vol

        if sell_cost[i]:
            vol = np.maximum(0, prev_pos)
            ret -= sell_cost[i] * vol
            pos -= vol

        # entry
        if buy_entry[i] and buy_cost[i]:
            vol = np.minimum(1.0, 1 - prev_pos) * buy_entry[i]
            ret -= buy_cost[i] * vol
            pos += vol

        if sell_entry[i] and sell_cost[i]:
            vol = np.minimum(1.0, prev_pos + 1) * sell_entry[i]
            ret -= sell_cost[i] * vol
            pos -= vol
        
        if i + 1 < n:
            ret += pos * (cl[i + 1] / cl[i] - 1)
            
        y[i] = ret
        poss[i] = pos
        
    return y, poss

@numba.njitは高速化のおまじないです。

backtestという名前の関数を定義し、その中で実際の取引を再現しています。引数の意味は次の通りです。
cl=終値
hi=高値
lo=安値
pips=呼び値
buy_entry=買い指値のエントリー判定用
sell_entry=売り指値のエントリー判定用
buy_cost=買い指値の取引コスト(エントリー&イグジット判定用)
sell_cost=売り指値の取引コスト(エントリー&イグジット判定用)

関数の中身を細かく分割して見ていきます。

n = cl.size
y = cl.copy() * 0.0
poss = cl.copy() * 0.0
ret = 0.0
pos = 0.0

まず前準備として、n = cl.sizeでclの数を取得することでループするデータ数を取得しています。yとpossは、clの値をコピーして0.0を掛けることで初期化しています。ここで定義している変数は、ループ内で結果と状態の記録に使われます。

以降は、取引内容を再現しているforループの中身です。


prev_pos = pos

prev_pos = posで執行前のポジションを記録しておきます。


# exit
if buy_cost[i]:
    vol = np.maximum(0, -prev_pos)
    ret -= buy_cost[i] * vol
    pos += vol

if sell_cost[i]:
    vol = np.maximum(0, prev_pos)
    ret -= sell_cost[i] * vol
    pos -= vol

buy_cost[i]の値があるか(指値が約定したかどうか)チェックし、イグジット結果を取得します。
vol = np.maximum(0, -prev_pos)のところでprev_posがマイナスかどうか、売りポジションを持っているかを判定しています。売りポジションを持っている場合はポジション数がイグジット数としてvolに入り、買いポジションを持っていたりノーポジの場合は0が入るので、後者の場合は次に説明するリターンも0になります。

ret -= buy_cost[i] * volで、保有ポジションがイグジットされた場合の指値距離分のリターンを取得しています。指値は現在の価格より有利な値になりますので、取引コストはマイナス値になり利益として加算されます。

pos += volで、イグジット後のポジション数をposに反映しています。

売り指値でイグジットする場合も基本同じです。

if文で条件式を省略する
if文で条件式を値だけにした場合、その値が0、False、未定義以外の場合にTrueとみなされます。


# entry
if buy_entry[i] and buy_cost[i]:
    vol = np.minimum(1.0, 1 - prev_pos) * buy_entry[i]
    ret -= buy_cost[i] * vol
    pos += vol

if sell_entry[i] and sell_cost[i]:
    vol = np.minimum(1.0, prev_pos + 1) * sell_entry[i]
    ret -= sell_cost[i] * vol
    pos -= vol

エントリー結果の取得です。
buy_entry[i]の値があるか(発注サイズが0以上かどうか)と、イグジットと同様にbuy_cost[i]の値があるか(指値が約定したかどうか)チェックし、エントリーが実行されます。

volの計算では、np.minimum(1.0, 1 - prev_pos)によって保有可能なポジションサイズとの差を計算しています。最大ポジション数に達している場合は発注サイズが0になります。
最後にbuy_entry[i]を掛けて最終的な発注サイズを決定しています。

あとはイグジットと同じです。
ここでも、指値分のリターンが計算されます。

売り指値でエントリーする場合も基本同じです。

最大ポジション数を設定できるようにするには
複数ポジションを持てるようにして、最大ポジション数を設定したい場合、下記のように書き換えます。

maxpos = 5
if buy_entry[i] and buy_cost[i]:
    vol = np.minimum(1.0, maxpos - prev_pos) * buy_entry[i]
    ret -= buy_cost[i] * vol
    pos += vol
if sell_entry[i] and sell_cost[i]:
    vol = np.minimum(1.0, prev_pos + maxpos ) * sell_entry[i]
    ret -= sell_cost[i] * vol
    pos -= vol


df = pd.read_pickle('df_fit.pkl')

# バックテストで累積リターンと、ポジションを計算
df['cum_ret'], df['poss'] = backtest(
    cl=df['cl'].values,
    buy_entry=df['y_pred_buy'].values > 0,
    sell_entry=df['y_pred_sell'].values > 0,
    buy_cost=df['buy_cost'].values,
    sell_cost=df['sell_cost'].values,
)

引数のbuy_entryとsell_entryにはbool値が入るようになっているため、False=0か True=1が入ります。

if i + 1 < n:
    ret += pos * (cl[i + 1] / cl[i] - 1)
            
y[i] = ret
poss[i] = pos

イグジット&エントリーを終えた状態から、次の執行判定までの価格変動分のリターンを追加しています。if文でi + 1 < nとしているのは、リターン計算ができない最終行を除外するためです。
yとpossに結果を記録しています。


# バックテストで累積リターンと、ポジションを計算
df['cum_ret'], df['poss'] = backtest(
    cl=df['cl'].values,
    buy_entry=df['y_pred_buy'].values > 0,
    sell_entry=df['y_pred_sell'].values > 0,
    buy_cost=df['buy_cost'].values,
    sell_cost=df['sell_cost'].values,
)

バックテストを実行し、結果をdf['cum_ret']df['poss']に入れています。


df['cum_ret'].plot()
plt.title('累積リターン')
plt.show()

print('ポジション推移です。変動が細かすぎて青色一色になっていると思います。')
print('ちゃんと全ての期間でトレードが発生しているので、正常です。')
df['poss'].plot()
plt.title('ポジション推移')
plt.show()

print('ポジションの平均の推移です。どちらかに偏りすぎていないかなどを確認できます。')
df['poss'].rolling(1000).mean().plot()
plt.title('ポジション平均の推移')
plt.show()

print('取引量(ポジション差分の絶対値)の累積です。')
print('期間によらず傾きがだいたい同じなので、全ての期間でちゃんとトレードが行われていることがわかります。')
df['poss'].diff(1).abs().dropna().cumsum().plot()
plt.title('取引量の累積')
plt.show()

最後に、結果を可視化しています。

df['poss'].diff(1).abs().dropna().cumsum().plot()は、ポジションが変化しているところの変化量の累積和を計算し、取引量としています。


検定(p平均法)


richmanbtcさんが指摘されているように、t検定では、その戦略が直近勝てなくなってきている場合でもその事を知り得ないという問題があります。

個別のバックテスト結果を人が見て評価する場合は大きな問題にはならないかもしれませんが、パラメータをプログラムで最適化したい時などは意図した結果を得られない可能性があります。

p平均法は、期間を分割してt検定を行い、p値の平均を取ることでこれを解決しようとしています。関数の中身を見ると「p平均」は、分割した期間でマイナスになっている期間が1つでもあると、検定結果が大きく悪化する性質を持っています。つまり、ドローダウンが少なく長期的に安定して勝てているかを測るための検定方法になっています。

print('t検定')
x = df['cum_ret'].diff(1).dropna()
t, p = ttest_1samp(x, 0)
print('t値 {}'.format(t))
print('p値 {}'.format(p))

# p平均法 https://note.com/btcml/n/n0d9575882640
def calc_p_mean(x, n):
    ps = []
    for i in range(n):
        x2 = x[i * x.size // n:(i + 1) * x.size // n]
        if np.std(x2) == 0:
            ps.append(1)
        else:
            t, p = ttest_1samp(x2, 0)
            if t > 0:
                ps.append(p)
            else:
                ps.append(1)
    return np.mean(ps)

def calc_p_mean_type1_error_rate(p_mean, n):
    return (p_mean * n) ** n / math.factorial(n)

x = df['cum_ret'].diff(1).dropna()
p_mean_n = 5
p_mean = calc_p_mean(x, p_mean_n)
print('p平均法 n = {}'.format(p_mean_n))
print('p平均 {}'.format(p_mean))
print('エラー率 {}'.format(calc_p_mean_type1_error_rate(p_mean, p_mean_n)))


ここもコードを分割して見ていきます。

print('t検定')
x = df['cum_ret'].diff(1).dropna()
t, p = ttest_1samp(x, 0)

まず、通常のt検定で全期間のt値とp値を計算しています。
x = df['cum_ret'].diff(1).dropna()で、累積リターンから行ごとのリターンに戻し、欠損値を除外しています。
t,p = ttest_1samp(x, 0)で、仮説平均値を0としてt値とp値を計算しています。第二引数の仮説平均値を0にすることで、利益と損失の偏りが無い状態=偶然の結果なのかどうかを評価します。

t検定と、t値と、p値
t検定をbotに当てはめるとしたら、t値は有意性の度合い≒その戦略で狙える市場の歪みの大きさと捉えることができると思います。そして、p値はそのt値が「偶然である確率」を示します。
つまり、t値が大きいほどその戦略は優秀であり、p値が低いほどその結果には再現性があると考えることができます。

「リターンの平均が0である」という帰無仮説を棄却することで、そのリターンには意味のある偏りが生じている=バックテスト結果には再現性があると評価することができるというわけです。

# p平均法 https://note.com/btcml/n/n0d9575882640
def calc_p_mean(x, n):
    ps = []
    for i in range(n):
        x2 = x[i * x.size // n:(i + 1) * x.size // n]
        if np.std(x2) == 0:
            ps.append(1)
        else:
            t, p = ttest_1samp(x2, 0)
            if t > 0:
                ps.append(p)
            else:
                ps.append(1)
    return np.mean(ps)

calc_p_meanという名前の関数を定義し、その中で分割したp値の平均を使って「p平均」を計算しています。

ps=[]に分割した各期間のt検定を元に取得したp値を記録していきます。

x2 = x[i * x.size // n:(i + 1) * x.size // n]は、xをスライスしています。i*x.sizeをnで割る事で、n分割した配列の開始インデックスを取得しています。終了インデックスは次の開始インデックスと同じなので、x.sizeに(i+1)を掛けて取得しています。

if np.std(x2) == 0:のところは、エラー処理かと思います。
標準偏差が0になるのは、リスト内全てが同じ値の場合です。リターンにエラーがある場合は、有意性が全く無いのと同じなのでp=1を記録しています。

t, p = ttest_1samp(x2, 0)で、スライスした範囲のt値とp値を計算します。
この時、t値がプラスの場合はt検定のp値を、マイナスの場合はその戦略の有意性が全く無い状態を意味しているため確率の最大値であるp=1を記録しています。

return np.mean(ps)で、psに記録したp値の平均を返しています。
分割した期間の中で1つでもマイナスがあるとp平均は一気に跳ね上がるため、安定して勝てる戦略なのかを評価できるようになっています。


def calc_p_mean_type1_error_rate(p_mean, n):
    return (p_mean * n) ** n / math.factorial(n)

このエラー率のところは、まだ理解できていません…。
Irwin–Hall分布の確率密度関数が元になっているようです。

https://en.wikipedia.org/wiki/Irwin%E2%80%93Hall_distribution

こちらの式を元に確率密度関数をpythonで書くと下記のようになります。

import math

def irwin_hall_pdf(x,n):
    if x < 0 or x > n:
        return 0.0
    else:
        sum = 0.0
        for k in range(n + 1):
            if x >= k:
                sum += (-1)**k * math.comb(n,k) * (x-k)**(n-1)
        return sum / math.factorial(n-1)

Irwin–Hall分布は独立した一様分布の確率変数の和を表しますが、xが0~1の範囲に収まる条件下ではk=0だけを考えれば良くなります。kが1以上になると(x-k)が必ず負の値となり、確率密度として意味をなさなくなるため、forループ内の式はk=0でしか実行されないからです。k=0の場合、ループ内で実行される式は下記のようになります。

sum += 1 * 1 * (x)**(n-1)

すると、returnは下記のようにまとめることができます。

return (x)**(n-1)/ math.factorial(n-1)

分割数nは1以上の整数になるため、この式の計算結果は必ず0~1の範囲となります。nの最小値では1となり、nが増えるほど限りなく0に近づいていきます。
チュートリアルでは、(x)は(p_mean * n)になっていて、(n-1)はnになっていて、ここがどうしてそうなっているのかはわかりませんでした。
チュートリアルの式に変えることで、p_meanが一般的な有意水準(0.05や0.03など)以下に収まっていてn>3の場合に限り、上記式よりも高い確率(厳しく評価される)が出てきます。そしてnが増えるほど非常に小さな数字になっていきます。

チュートリアルには「エラー率は100000分の1以下が良い」と書かれていますが、エラー率は分割数nによって大きく変わるためnは既定値の5から変えない方が良いように思います。

この検定とエラー率については色々調べてもなかなか理解することができず…、もうちょっと勉強してみようと思います。


最後に

冒頭にも書きましたが、2024年6月現在でもこのチュートリアルをベースにしたbotで利益が出ています。
すでにエッジが消失していると思われている戦略部分は、数行書き換えるだけでエッジを復活させ、プラス収支にすることができます。
この素晴らしいチュートリアルは、アイデア次第でまだ収益機会を作り出すことができるのです。

振り返ってみると、本当に素晴らしい初心者向けチュートリアルだと改めて思います。このチュートリアルがなかったら、botにどうやって機械学習を取り入れれば良いのか全然検討もつかなかったです。
記事を書くためにコードをよく読んでみて、私自身の理解も深まったような気がします。素晴らしいMLbotチュートリアルを書いてくれたrichmanbtcさんに感謝です。

それでは、皆様に良きbotライフを。


繰り返しになりますが、この記事は、私個人の解釈によるものです。
内容の正確さについては保証できませんので、あらかじめご了承ください。

この記事が気に入ったらサポートをしてみませんか?