見出し画像

仮想通貨bot 勉強記録㉚

~バックテストまとめ~

◆前回までのあらすじ

図1

フィルターを実装しました。色々なフィルターを作って試してみてね!

◆今回やること

図1

・バックテストコードの修正

図2

これまで作ったコードを修正します。

色々ミスってるところもあったし、不要なコードもあったので、これを機にお直しです。
(Twitterでミスってる箇所を教えて下さる方々、ありがとうございます!)

綺麗にするだけなので、ほとんど解説はないです。

# -*- coding: utf-8 -*-
"""
Created on Thu May  6 00:02:36 2021

@author: Mamu
"""

import pybybit
from datetime import datetime
import time
import matplotlib.pyplot as plt
import pandas as pd


#--------------------設定項目--------------------
apis = [
'API',
'シークレット'
]
bybit = pybybit.API(*apis, testnet = True)

#====================bot設定====================
symbol           = "BTCUSD"

leverage         = 3     # レバレッジ
wait             = 0     # 待機時間
Buy_term         = 20    # 買いブレイク判断期間
Sell_term        = 40    # 売りブレイク判断期間
volatility_term  = 28    # 平均ボラティリティの計算期間
chart_min        = 240   # 時間軸#(1 3 5 15 30 60 120 240 360 720 "D" "W" "M" )

judge_price={
 "Buy" : "close",       # ブレイク判断 高値(high)か終値(close)を使用
 "Sell": "close"        # ブレイク判断 安値(low)か終値(close)を使用
}

stop_range       = 2     # 何レンジ幅にストップを入れるか
trade_risk       = 0.05  # 1トレードあたり口座の何%まで損失を許容するか
slippage         = 0.002 # 考慮する手数料

entry_times      = 5     # 何回に分けて追加ポジションを取るか
entry_range      = 0.4   # 何レンジごとに追加ポジションを取るか

stop_config      = "ON"  # ON/OFF/Trailing
stop_AF          = 0.03  # 加速係数
stop_AF_add      = 0.03  # 加速係数を増やす度合
stop_AF_max      = 0.3   # 加速係数の上限

filter_VER       = "D"   # フィルター設定/OFFで無効

#====================バックテスト用====================
start_funds      = 1000               # シミュレーション時の初期資金
test_start       = '2019/06/01 09:00' # ローソク足取得開始時刻
LOT_MODE         = "adjustable"       # fixedなら$1000固定、adjustableなら可変ロット


#--------------------価格API---------------------
#====================価格データ取得===================
def get_price(chart_min,start):
   price = []
   get_start = int(datetime.strptime(start,'%Y/%m/%d %H:%M').timestamp()) # タイムスタンプ変換

   #200*n本のローソク足を取得して、price[]に入れる
   for o in range(30): # ()内の回数だけデータを取得

       #pybybitでローソク足取得
       data = bybit.rest.inverse.public_kline_list(
             symbol   = symbol,
             interval = chart_min,
             from_    = get_start
             ).json()

       #priceに取得したデータを入れる
       for i in data["result"]:
           price.append({
               "open_time" :i["open_time"],
               "open"      :float(i["open"]),
               "high"      :float(i["high"]),
               "low"       :float(i["low"]),
               "close"     :float(i["close"]),
               "volume"    :float(i["volume"])
               })

       #200本x足の長さ分だけタイムスタンプを進める
       if chart_min == "D" :
           get_start += 200*60*1440

       elif chart_min == "W" :
           get_start += 200*60*10080

       else:
           get_start += 200*60*chart_min

   return price


#--------------------補助ツール--------------------
#====================時間と高値・安値をログに記録====================
def log_price( data,flag ):
   flag["records"]["log"].append("時間: " + datetime.fromtimestamp(data["open_time"]).strftime('%Y/%m/%d %H:%M') + " 始値" + str(data["open"]) + " 高値: " + str(data["high"]) + " 安値: " + str(data["low"]) + " 終値: " + str(data["close"]) + "\n")
   flag["records"]["close_price"].append(data["close"])
   flag["records"]["open_time"].append(datetime.fromtimestamp(data["open_time"]).strftime('%Y/%m/%d %H:%M'))

   return flag


#--------------------フィルター--------------------
#====================単純移動平均を計算====================
def calculate_SMA( value,before=None ):
   if before is not None:
       MA = sum(i["close"] for i in last_data[-1*value + before: before]) / value
   else:
       MA = sum(i["close"] for i in last_data[-1*value:]) / value
   return round(MA)


#====================指数移動平均を計算====================
def calculate_EMA( value,before=None ):

   # 指定期間だけ前の移動平均を計算
   if before is not None:
       MA = sum((i["close"] for i in last_data[-2*value + before : -1*value + before]) / value)
       EMA = (last_data[-1*value + before]["close"] * 2 / (value+1)) + (MA * (value-1) / (value+1))
       for i in range(value-1):
           EMA = (last_data[-1*value+before+1 + i]["close"] * 2 /(value+1)) + (EMA * (value-1) / (value+1))

   # 最新の移動平均を計算
   else:
       MA = sum(i["close"] for i in last_data[-2*value: -1*value]) / value
       EMA = (last_data[-1*value]["close"] * 2 / (value+1)) + (MA * (value-1) / (value+1))
       for i in range(value-1):
           EMA = (last_data[-1*value+1 + i]["close"] * 2 /(value+1)) + (EMA * (value-1) / (value+1))
   return round(EMA)


#====================エントリーフィルター====================
def filter( signal ):
   # フィルターがOFFなら何もしない
   if filter_VER == "OFF":
       return True

   # 最新の終値が200期間前の終値より(大きい/小さい)を確認
   if filter_VER == "A":
       if len(last_data) < 200:
           return True
       if data["close"] > float(last_data[-200]["close"]) and signal["side"] == "Buy":
           return True
       if data["close"] < float(last_data[-200]["close"]) and signal["side"] == "Sell":
           return True

   # 最新の終値が最新の200単純移動平均より(大きい/小さい)を確認
   if filter_VER == "B":
       if len(last_data) < 200:
           return True
       if data["close"] > calculate_SMA(200) and signal["side"] == "Buy":
           return True
       if data["close"] < calculate_SMA(200) and signal["side"] == "Sell":
           return True

   # 最新の20単純移動平均が1つ前の20単純移動平均より(大きい/小さい)を確認
   if filter_VER == "C":
       if len(last_data) < 20:
           return True
       if calculate_SMA(20) > calculate_SMA(20,-1) and signal["side"] == "Buy":
           return True
       if calculate_SMA(20) < calculate_SMA(20,-1) and signal["side"] == "Sell":
           return True

   # 最新の25EMAが最新の350EMAより(大きい/小さい)を確認
   if filter_VER == "D":
       if len(last_data) < 350*2:
           return True
       if calculate_EMA(350) < calculate_EMA(25) and signal["side"] == "Buy":
           return True
       if calculate_EMA(350) > calculate_EMA(25) and signal["side"] == "Sell":
           return True

   return False


#--------------------資金管理関数---------------------
#====================平均ボラティリティを計算====================
def calculate_volatility( last_data ):

   high_sum = sum(i["high"] for i in last_data[-1 * volatility_term :])
   low_sum  = sum(i["low"]  for i in last_data[-1 * volatility_term :])
   volatility = round((high_sum - low_sum) / volatility_term)
   flag["records"]["log"].append("現在の{0}期間の平均ボラティリティは{1}$です\n".format( volatility_term, volatility ))
   return volatility


#====================注文ロットを計算====================
def calculate_lot(last_data,data,flag ):
   # 固定ロットでのテスト時
   if LOT_MODE == "fixed":
       flag["records"]["log"].append("固定ロットでテスト中のため、$1000を注文します\n")
       lot = 1000
       volatility = calculate_volatility( last_data,flag )
       stop = stop_range * volatility
       flag["position"]["ATR"] = round( volatility )
       return lot,stop,flag

   balance = flag["records"]["funds"]                                                    # 残高を取得

   # 初回エントリー時
   if flag["add-position"]["count"] == 0:

       volatility = calculate_volatility( last_data )                                    # ボラティリティを計算
       stop       = stop_range * volatility                                              # 損切り値幅を計算
       calc_lot   = int(( balance * trade_risk / (stop / float(data["close"]) )))        # 許容リスクから逆算したロット

       flag["add-position"]["unit-size"]  = int( calc_lot / entry_times )                # 1回ごとのポジションサイズ
       flag["add-position"]["unit-range"] = round( volatility * entry_range )            # ポジションを分割する値幅
       flag["add-position"]["stop"]       = stop                                         # 損切り価格
       flag["position"]["ATR"] = round( volatility )                                     # ATRの設定

       flag["records"]["log"].append("現在のアカウント残高は{}$です\n".format( round( balance,2 ) ))
       flag["records"]["log"].append("許容リスクから購入できる枚数は最大{}$までです\n".format( calc_lot ))
       flag["records"]["log"].append("{0}回に分けて{1}$ずつ注文します\n".format( entry_times, flag["add-position"]["unit-size"] ))

   # 2回目以降のエントリー
   else:
       balance = (balance * leverage - flag["position"]["lot"])                           # 証拠金から1回目のロットを引く

   stop     = flag["add-position"]["stop"]                                                # 初回エントリー時の損切り値幅を設定
   able_lot = int( balance * leverage )                                                   # 設定可能な最大ロット
   lot      = min(able_lot,flag["add-position"]["unit-size"])                             # 実際に設定するロットは小さい方

   if able_lot > flag["add-position"]["unit-size"]:
       flag["records"]["log"].append("ロットを{}$にします\n".format(flag["add-position"]["unit-size"]))
   else:
       flag["records"]["log"].append("ロットを{}$にします\n".format(able_lot))

   return lot,stop,flag


#====================増し玉を行う====================
def add_position(data,flag):
   # ポジションが無かったら実行しない
   if flag["position"]["exist"] == False:
       return flag

   # 固定ロット(1BTC)でのテスト時は何もしない
   if LOT_MODE == "fixed":
       return flag

   # 最初(1回目)のエントリー価格を記録
   if flag["add-position"]["count"] == 0:                                    # ポジションの追加が0回目の時
       flag["add-position"]["first-entry-price"] = flag["position"]["price"] # 初回エントリー価格
       flag["add-position"]["last-entry-price"]  = flag["position"]["price"] # 前回(初回)のエントリー価格
       flag["add-position"]["count"] += 1                                    # ポジションの追加回数を+1

   # 以下の場合は、追加ポジションを取らない
   if flag["add-position"]["count"] >= entry_times:                          # ポジションを追加した回数が設定値(entry_times)以上の場合
       return flag

   # この関数の中で使う変数を用意
   first_entry_price = flag["add-position"]["first-entry-price"]             # 初回エントリー価格
   last_entry_price  = flag["add-position"]["last-entry-price"]              # 前回のエントリー価格
   current_price     = float(data["close"])                                  # 現在の価格
   unit_range        = flag["add-position"]["unit-range"]                    # ポジションの分割値幅
   should_add_position = False                                               # 増し玉の指示変数(初期化)

   # 増し玉の指示を出す
   if flag["position"]["side"] == "Buy" and (current_price - last_entry_price) > unit_range:
       should_add_position = True
   elif flag["position"]["side"] == "Sell" and (last_entry_price - current_price) > unit_range:
       should_add_position = True


   # 基準レンジ分進んでいれば追加注文を出す
   if should_add_position == True:
       flag["records"]["log"].append("前回のエントリー価格{0}$からブレイクアウトの方向に{1}ATR({2}$)以上動きました\n".format( last_entry_price, entry_range, round( unit_range ) ))
       flag["records"]["log"].append("{0}/{1}回目の追加注文を出します\n".format(flag["add-position"]["count"] + 1, entry_times))

       # 注文サイズを計算
       lot,stop,flag = calculate_lot( last_data,data,flag )

       # 追加注文を出す
       if flag["position"]["side"] == "Buy":

           # ここに買い注文のコードを入れる

           entry_price = first_entry_price + (flag["add-position"]["count"] * unit_range)     # バックテスト用
           entry_price = round((1 + slippage) * entry_price)                                  # スリッページを考慮

           flag["records"]["log"].append("現在のポジションに追加して、{0}$で{1}$の買い注文を出します\n".format(entry_price,lot))


       if flag["position"]["side"] == "Sell":

           # ここに売り注文のコードを入れる

           entry_price = first_entry_price - (flag["add-position"]["count"] * unit_range)     # バックテスト用
           entry_price = round((1 - slippage) * entry_price)                                  # スリッページを考慮
           flag["records"]["log"].append("現在のポジションに追加して、{0}$で{1}$の売り注文を出します\n".format(entry_price,lot))

       # ポジション全体の情報を更新する
       flag["position"]["stop"]  = stop                                                                                                                        #損切り値幅は初回エントリー時から変えない
       flag["position"]["price"] = int(round(( flag["position"]["price"] * flag["position"]["lot"] + entry_price * lot ) / ( flag["position"]["lot"] + lot ))) #平均ポジションを算出
       flag["position"]["lot"]   = (flag["position"]["lot"] + lot)                                                                                             #合計ロットを算出

       if flag["position"]["side"] == "Buy":
           flag["records"]["log"].append("{0}$の位置にストップを更新します\n".format(flag["position"]["price"] - stop))
       elif flag["position"]["side"] == "Sell":
           flag["records"]["log"].append("{0}$の位置にストップを更新します\n".format(flag["position"]["price"] + stop))

       flag["records"]["log"].append("現在のポジションの取得単価は{}$です\n".format(flag["position"]["price"]))
       flag["records"]["log"].append("現在のポジションサイズは{}$です\n".format(flag["position"]["lot"]))

       flag["add-position"]["count"] += 1                                                     #ポジションの追加回数をカウント
       flag["add-position"]["last-entry-price"] = entry_price                                 #前回のエントリー価格に、今回のエントリー価格を上書き

   return flag


# トレイリングストップの関数
def trail_stop( data,flag ):

   # まだ追加ポジションの取得中であれば何もしない
   if flag["add-position"]["count"] < entry_times and LOT_MODE != "fixed":
       return flag

   # 高値/安値がエントリー価格からいくら離れたか計算
   if flag["position"]["side"] == "Buy":
       moved_range = round( data["high"] - flag["position"]["price"] )
   if flag["position"]["side"] == "Sell":
       moved_range = round( flag["position"]["price"] - data["low_price"] )

   # 最高値・最安値を更新したか調べる
   if moved_range < 0 or flag["position"]["stop-EP"] >= moved_range:
       return flag
   else:
       flag["position"]["stop-EP"] = moved_range

   # 加速係数に応じて損切りラインを動かす
   flag["position"]["stop"] = round(flag["position"]["stop"] - ( moved_range + flag["position"]["stop"] ) * flag["position"]["stop-AF"])


   # 加速係数を更新
   flag["position"]["stop-AF"] = round( flag["position"]["stop-AF"] + stop_AF_add ,2 )
   if flag["position"]["stop-AF"] >= stop_AF_max:
       flag["position"]["stop-AF"] = stop_AF_max

   # ログ出力
   if flag["position"]["side"] == "Buy":
       flag["records"]["log"].append("トレイリングストップの発動:ストップ位置を{}$に動かして、加速係数を{}に更新します\n".format( round(flag["position"]["price"] - flag["position"]["stop"]) , flag["position"]["stop-AF"] ))
   else:
       flag["records"]["log"].append("トレイリングストップの発動:ストップ位置を{}$に動かして、加速係数を{}に更新します\n".format( round(flag["position"]["price"] + flag["position"]["stop"]) , flag["position"]["stop-AF"] ))

   return flag

#--------------------売買ロジック--------------------
#====================ロジック判定====================
def donchian( data,last_data ):

   highest = float(max(i["high"] for i in last_data[(-1*Buy_term) :]))
   lowest  = float(min(i["low"]  for i in last_data[(-1*Sell_term):]))

   # data["close"]がhighestを上回ったら買いサイン
   if   data[judge_price["Buy"]]  > highest:
       return {"side":"Buy","price" :highest}

   # data["close"]がlowestを下回ったら売りサイン
   elif data[judge_price["Sell"]] < lowest :
       return {"side":"Sell","price":lowest }
   else:
       return {"side" : None , "price":0}


#====================エントリー====================
def entry_signal(data,last_data,flag ):

   signal = donchian( data,last_data )

   if signal["side"] == "Buy":
       flag["records"]["log"].append("過去{0}足の最高値{1}$を、直近の価格が{2}$でブレイクしました\n".format(Buy_term,signal["price"],data[judge_price["Buy"]]))

       # フィルター条件を確認
       if filter( signal ) == False:
           flag["records"]["log"].append("フィルターのエントリー条件を満たさなかったため、エントリーしません\n")
           return flag

       lot,stop,flag = calculate_lot( last_data,data,flag )

       flag["records"]["log"].append("{0}$あたりに{1}$で買いの成行注文を出します\n".format(data["close"],lot))

       #買い注文コード

       flag["records"]["log"].append("{0}$にストップを入れます\n".format(data["close"] - stop))
       flag["position"]["lot"] = lot
       flag["position"]["stop"] = stop
       flag["position"]["exist"] = True
       flag["position"]["side"] = "Buy"
       flag["position"]["price"] = data["close"]

   if signal["side"] == "Sell":
       lot,stop,flag = calculate_lot( last_data,data,flag )

       flag["records"]["log"].append("過去{0}足の最安値{1}$を、直近の価格が{2}$でブレイクしました\n".format(Sell_term,signal["price"],data[judge_price["Sell"]]))

       # フィルター条件を確認
       if filter( signal ) == False:
           flag["records"]["log"].append("フィルターのエントリー条件を満たさなかったため、エントリーしません\n")
           return flag

       lot,stop,flag = calculate_lot( last_data,data,flag )

       flag["records"]["log"].append("{0}$あたりに{1}$で売りの成行注文を出します\n".format(data["close"],lot))

       #売り注文コード

       flag["records"]["log"].append("{0}$にストップを入れます\n".format(data["close"] + stop))
       flag["position"]["lot"] = lot
       flag["position"]["stop"] = stop
       flag["position"]["exist"] = True
       flag["position"]["side"] = "Sell"
       flag["position"]["price"] = data["close"]

   return flag


#====================成行決済&ドテンエ注文====================
def close_position( data,last_data,flag ):
   # ポジションが無ければ何もしない
   if flag["position"]["exist"] == False:
       return flag

   flag["position"]["count"] += 1
   signal = donchian( data,last_data )

   if flag["position"]["side"] == "Buy" and  signal["side"] == "Sell": # 買いポジションかつ売りサインの場合

           flag["records"]["log"].append("過去{0}足の最安値{1}$を、直近の価格が{2}$でブレイクしました\n".format(Sell_term,signal["price"],data[judge_price["Sell"]]))
           flag["records"]["log"].append("{}$あたりで成行注文を出してポジションを決済します\n".format(str(data["close"])))

           # 成行決済注文コードを入れる

           records( flag,data,data["close"] )
           flag["position"]["exist"] = False
           flag["position"]["count"] = 0
           flag["position"]["stop-AF"] = stop_AF
           flag["position"]["stop-EP"] = 0
           flag["add-position"]["count"] = 0

           # フィルター条件を確認
           if filter( signal ) == False:
               flag["records"]["log"].append("フィルターのエントリー条件を満たさなかったため、エントリーしません\n")
               return flag

           lot,stop,flag = calculate_lot( last_data,data,flag )
           flag["records"]["log"].append("さらに{0}$あたりに{1}$の売りの成行注文を入れてドテン注文します\n".format(data["close"],lot))

           # 売り指値注文のコードを入れる

           flag["records"]["log"].append("{0}$にストップを入れます".format(data["close"] + stop))
           flag["order"]["lot"]   = lot
           flag["order"]["stop"]  = stop
           flag["order"]["exist"] = True
           flag["order"]["side"]  = "Sell"
           flag["order"]["price"] = data["close"]

   if flag["position"]["side"] == "Sell" and  signal["side"] == "Buy": # 売りポジション且つ買いサインの場合

           flag["records"]["log"].append("過去{0}足の最高値{1}$を、直近の価格が{2}$でブレイクしました\n".format(Buy_term,signal["price"],data[judge_price["Buy"]]))
           flag["records"]["log"].append("{}$あたりで成行注文を出してポジションを決済します\n".format(str(data["close"])))

           # 成行決済注文コードを入れる

           records( flag,data,data["close"] )
           flag["position"]["exist"] = False
           flag["position"]["count"] = 0
           flag["position"]["stop-AF"] = stop_AF
           flag["position"]["stop-EP"] = 0
           flag["add-position"]["count"] = 0

           # フィルター条件を確認
           if filter( signal ) == False:
               flag["records"]["log"].append("フィルターのエントリー条件を満たさなかったため、エントリーしません\n")
               return flag

           lot,stop,flag = calculate_lot( last_data,data,flag )
           flag["records"]["log"].append("さらに{0}$あたりに{1}$の買いの成行注文を入れてドテン注文します\n".format(data["close"],lot))

           # 買い指値注文のコードを入れる

           flag["records"]["log"].append("{0}$にストップを入れます\n".format(data["close"] - stop))
           flag["order"]["lot"]   = lot
           flag["order"]["stop"]  = stop
           flag["order"]["exist"] = True
           flag["order"]["side"]  = "Buy"
           flag["order"]["price"] = data["close"]

   return flag


#====================損切確認====================
def stop_position( data,flag ):

   # stop_config == "TRAILING"ならトレイリングストップ実行
   if stop_config == "TRAILING":
       flag = trail_stop( data,flag )

   # 買いポジションの時
   if flag["position"]["side"] == "Buy":

       stop_price = flag["position"]["price"] - flag["position"]["stop"] # 損切り価格を設定

       # 損切り価格に引っかかった場合
       if data["low"] < stop_price:
           flag["records"]["log"].append("{0}$の損切ラインに引っかかりました\n".format( stop_price ))
           flag["records"]["log"].append(str(data["low"]) + "$あたりで成行注文を出してポジションを決済します\n")

           # 決済の成行注文コード

           records( flag,data,stop_price,"STOP" )
           flag["position"]["exist"] = False
           flag["position"]["count"] = 0
           flag["position"]["stop-AF"] = stop_AF
           flag["position"]["stop-EP"] = 0
           flag["add-position"]["count"] = 0

   # 売りポジションの時
   if flag["position"]["side"] == "Sell":

       stop_price = flag["position"]["price"] + flag["position"]["stop"] # 損切り価格を設定

       # 損切り価格に引っかかった場合
       if data["high"] > stop_price:
           flag["records"]["log"].append("{0}$の損切ラインに引っかかりました\n".format( stop_price ))
           flag["records"]["log"].append(str(data["high"]) + "$あたりで成行注文を出してポジションを決済します\n")

           # 決済の成行注文コードを入れる

           records( flag,data,stop_price,"STOP" )
           flag["position"]["exist"] = False
           flag["position"]["count"] = 0
           flag["position"]["stop-AF"] = stop_AF
           flag["position"]["stop-EP"] = 0
           flag["add-position"]["count"] = 0

   return flag


#--------------------バックテスト関数--------------------
#====================トレードパフォーマンス確認====================
def records(flag,data,exit_price,close_type = None):

   entry_price = flag["position"]["price"]
   trade_cost  = flag["position"]["lot"] * slippage

   flag["records"]["slippage"].append(trade_cost)
   flag["records"]["log"].append("スリッページ・手数料として " + str(round(trade_cost,1)) + "$を考慮します\n")

   # 決済日時,ポジションの保有期間を記録
   flag["records"]["date"].append(datetime.fromtimestamp(data["open_time"]).strftime('%Y/%m/%d %H:%M'))
   flag["records"]["holding-periods"].append(flag["position"]["count"])

   # 損切りにかかった回数をカウント
   if close_type == "STOP":
       flag["records"]["stop-count"].append(1)
   else:
       flag["records"]["stop-count"].append(0)

   # 値幅の計算
   Buy_Price_range  = exit_price - entry_price                                                          # 買いエントリー時の獲得値幅
   Sell_Price_range = entry_price - exit_price                                                          # 売りエントリーン時の獲得値幅

   # 利益率の計算
   Buy_return = Buy_Price_range/entry_price                                                             # 買いエントリー時の獲得リターン
   Sell_return = Sell_Price_range/entry_price                                                           # 売りエントリー時の獲得リターン

   # 買いエントリーの時
   if flag["position"]["side"] == "Buy":
       flag["records"]["return"].append( Buy_return )                                                   # 獲得リターンを記録
       flag["records"]["side"].append( flag["position"]["side"] )                                       # 買いか売りかを記録
       flag["records"]["profit"].append((Buy_return-slippage)*flag["position"]["lot"])                  # 獲得利益を記録
       flag["records"]["funds"] += (Buy_return-slippage)*flag["position"]["lot"]                        # 証拠金に獲得利益を加算

       if Buy_return > 0:
           flag["records"]["log"].append(str(round((Buy_return-slippage)*flag["position"]["lot"],1)) + "$の利益です\n\n")
       else:
           flag["records"]["log"].append(str(round((Buy_return-slippage)*flag["position"]["lot"],1)) + "$の損失です\n\n")

   # 売りエントリーの時
   if flag["position"]["side"] == "Sell":
       flag["records"]["return"].append( Sell_return )                                                  # 獲得リターンを記録
       flag["records"]["side"].append( flag["position"]["side"] )                                       # 買いか売りかを記録
       flag["records"]["profit"].append((Sell_return-slippage)*flag["position"]["lot"])                 # 獲得利益を記録
       flag["records"]["funds"] += (Sell_return-slippage)*flag["position"]["lot"]                       # 証拠金に獲得利益を加算

       if Sell_return > 0:
           flag["records"]["log"].append(str(round((Sell_return-slippage)*flag["position"]["lot"],1)) + "$の利益です\n")
       else:
           flag["records"]["log"].append(str(round((Sell_return-slippage)*flag["position"]["lot"],1)) + "$の損失です\n")

   return flag


#====================バックテストの集計====================
def backtest( flag ):

   #チャート表示用df
   #//////////////////////////////////////////////////////////////////
   chart= pd.DataFrame({
       "close_price"   :  flag["records"]["close_price"],
       "open_time"     :  pd.to_datetime(flag["records"]["open_time"])
       })
   #//////////////////////////////////////////////////////////////////

   # 成績をdfに記録
   records = pd.DataFrame({
       "Date"          :  pd.to_datetime(flag["records"]["date"]), # 決済日時
       "Side"          :  flag["records"]["side"],                 # ポジションの側
       "Stop"          :  flag["records"]["stop-count"],           # 損切りを行った回数
       "Rate"          :  flag["records"]["return"],               # 獲得レート
       "Periods"       :  flag["records"]["holding-periods"],      # ポジション保有期間
       "Slippage"      :  flag["records"]["slippage"],             # 手数料等
       "Profit"        :  flag["records"]["profit"],               # 獲得損益
   })

   # recordsに総利益の列を追加
   records["Gross"] = records.Profit.cumsum()                      # その行までのrecords.Profitの総和
   # recordsに資産推移の列を追加する
   records["Funds"] = records.Gross + start_funds                 # 初期資金+records.Gross

   # recordsにドローダウンの列を追加
   records["Drawdown"]     = records.Funds.cummax().subtract( records.Funds )
   records["DrawdownRate"] = records.Drawdown / records.Funds.cummax() * 100

   # 連敗回数をカウントする
   consecutive_defeats = []                                        # 連敗回数を記録する配列
   defeats = 0                                                     # 初期化
   for r in flag["records"]["return"]:                             # リターンがマイナスなら連敗回数を+1
       if r < 0:
           defeats += 1
       else:                                                       # リターンがプラスなら連敗回数をリセット
           consecutive_defeats.append( defeats )
           defeats = 0

   # recordsから買いエントリーと売りエントリーだけをそれぞれ抽出する
   Buy_records = records[records.Side.isin(["Buy"])]
   Sell_records = records[records.Side.isin(["Sell"])]

   # # 月別のデータを集計する
   # records["月別集計"] = pd.to_datetime( records.Date.apply(lambda x: x.strftime('%Y/%m')))
   # grouped = records.groupby("月別集計")

   # monthly_records = pd.DataFrame({
   #     "Number"   :  grouped.Profit.count(),
   #     "Gross"    :  grouped.Profit.sum(),
   #     "Funds"    :  grouped.Funds.last(),
   #     "Rate"     :  round(grouped.Rate.mean(),2),
   #     "Drawdown" :  grouped.Drawdown.max(),
   #     "Periods"  :  grouped.Periods.mean()
   #     })


   print("\nバックテスト結果")
   print("==============================")
   print("--------買いエントリ成績--------")
   print("トレード回数       :  {}回".format(len(Buy_records) ))
   print("勝率            :  {}%".format(round(len(Buy_records[Buy_records.Profit>0]) / len(Buy_records) * 100,1)))
   print("平均リターン       :  {}%".format(round(Buy_records.Rate.mean()*100,2)))
   print("総損益          :  {}$".format(round( Buy_records.Profit.sum() ,2)))
   print("平均保有期間     :  {}足".format(round(Buy_records.Periods.mean(),1) ))
   print("損切りの回数      :  {}回".format( Buy_records.Stop.sum() ))

   print("\n--------売りエントリ成績--------")
   print("トレード回数       :  {}回".format( len(Sell_records) ))
   print("勝率            :  {}%".format(round(len(Sell_records[Sell_records.Profit>0]) / len(Sell_records) * 100,1)))
   print("平均リターン       :  {}%".format(round(Sell_records.Rate.mean()*100,2)))
   print("総損益          :  {}$".format(round( Sell_records.Profit.sum() ,2)))
   print("平均保有期間     :  {}足".format(round(Sell_records.Periods.mean(),1) ))
   print("損切りの回数      :  {}回".format( Sell_records.Stop.sum() ))

   print("\n------------総合成績--------------")
   print("全トレード数      :  {}回".format(len(records) ))
   print("勝率            :  {}%".format(round(len(records[records.Profit>0]) / len(records) * 100,1)))
   print("平均リターン       :  {}%".format(round(records.Rate.mean()*100,2)))
   print("標準偏差        :  {}%".format(round(records.Rate.std()*100,2)))
   print("平均利益率      :  {}%".format(round(records[records.Profit>0].Rate.mean()*100,2) ))
   print("平均損失率      :  {}%".format(round(records[records.Profit<0].Rate.mean()*100,2) ))

   print("平均保有期間    :  {}足".format(round(records.Periods.mean(),1) ))
   print("損切りの回数     :  {}回".format( records.Stop.sum() ))
   print("最大連敗回数    :  {}回".format( max(consecutive_defeats) ))
   print("最大勝ちトレード   :  {}$".format((round(records.Profit.max(),2))))
   print("最大負けトレード   :  {}$".format((round(records.Profit.min(),2))))
   print("最大ドローダウン    :  {0}$ / {1}%".format(round(-1 * records.Drawdown.max()), round( records.DrawdownRate.loc[records.Drawdown.idxmax()] )))
   print("利益合計        :  {}$".format((round(records[records.Profit>0].Profit.sum(),2))))
   print("損失合計        :  {}$".format(round(records[records.Profit<0].Profit.sum(),2),))
   print("手数料合計      :  {}$".format(round(-1 * records.Slippage.sum(),1)))
   print("最終損益        :  {}$\n".format((round(records.Profit.sum()-(records.Slippage.sum()) ,2))))
   print("初期資金        :  {}$".format( start_funds ))
   print("最終資金        :  {}$".format( round(records.Funds.iloc[-1] ,2)))
   print("運用成績        :  {}%".format( round(records.Funds.iloc[-1] / start_funds * 100 ,2) ))

   print("-----------------------------------")
   print("各成績指標")
   print("-----------------------------------")
   print("MARレシオ         :  {}".format(round( (records.Funds.iloc[-1] / start_funds -1)*100 / records.DrawdownRate.max(),2 )))
   print("シャープレシオ      :  {}".format( round(records.Rate.mean()/records.Rate.std(),2) ))
   print("プロフィットファクター  :  {}".format( round(records[records.Profit>0].Profit.sum()/abs(records[records.Profit<0].Profit.sum()),2) ))
   print("損益レシオ        :  {}".format(round( records[records.Profit>0].Rate.mean()/abs(records[records.Profit<0].Rate.mean()) ,2)))

   # print("\n--------------月別成績------------")
   # for index , row in monthly_records.iterrows():
   #     print("===================================")
   #     print( "{0}年{1}月".format( index.year, index.month ) )
   #     print("-----------------------------------")
   #     print("トレード数          :  {}回".format( row.Number.astype(int) ))
   #     print("月間損益          :  {}$".format( row.Gross.astype(int) ))
   #     print("平均リターン        :  {}%".format( round(row.Rate*100 ,2)))
   #     print("月間最大ドローダウン  :  {}$".format( -1 * row.Drawdown.astype(int) ))
   #     print("平均保有期間      :  {}足".format( round(row.Periods.astype(float),1) ))

   #際立った損益を表示
   n = 10
   print("-----------------------------------")
   print("+{}%を超えるトレードの回数  :  {}回".format(n,len(records[records.Rate>(n/100)]) ))
   print("-----------------------------------")
   for index,row in records[records.Rate>(n/100)].iterrows():
       print( "{0}  |  {1}%  |  {2}".format(row.Date,round(row.Rate*100,1),row.Side ))
   print("-----------------------------------")
   print("-{}%を下回るトレードの回数  :  {}回".format(n,len(records[records.Rate< (n/-100)]) ))
   print("-----------------------------------")
   for index,row in records[records.Rate < (n/-100)].iterrows():
       print( "{0}  |  {1}%  |  {2}".format(row.Date,round(row.Rate*100,1),row.Side  ))

   print("==============================")

   plot(records,chart)                                                                  # グラフを表示
   output_file(records,flag)                                                            # ファイルを出力

#====================損益曲線をプロット====================
def plot(records,chart):

   #損益グラフ
   plt.subplot(2,1,1)
   plt.plot( records.Date, records.Funds )         # X軸、Y軸の値を指定
   plt.xlabel("Date")                              # X軸のラベル名
   plt.ylabel("Balance")                           # Y軸のラベル名
   plt.xticks(rotation=50)                         # X軸の目盛りを50度回転

   #チャート
   plt.subplot(2,1,2)
   plt.plot( chart.open_time, chart.close_price ) # X軸、Y軸の値を指定
   plt.xlabel("Date")                             # X軸のラベル名
   plt.ylabel("{}".format(symbol))                # Y軸のラベル名
   plt.xticks(rotation=50)                        # X軸の目盛りを50度回転

   # リターン分布の相対度数表を作る
   # plt.subplot(2,1,2)                           # X軸、Y軸の値を指定
   # plt.hist( records.Rate,50,rwidth=0.9)        # ヒストグラムで表示
   # plt.axvline( x=0,linestyle="dashed",label="Return = 0" )
   # plt.axvline( records.Rate.mean(), color="orange", label="AverageReturn" )
   # plt.legend() # 凡例

   plt.show()                                    #グラフの表示

#====================ファイルを出力====================
def output_file(records,flag):

   file =  open("C:\Pydoc\log\donchian-{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
   file.writelines(flag["records"]["log"])

   #pandasのdfをcsvで出力
   records.to_csv("C:\Pydoc\log\donchian-{0}-records.csv".format(datetime.now().strftime("%Y-%m-%d-%H-%M")))


#--------------------メイン処理--------------------

# 価格チャートを取得
price = get_price( chart_min,test_start )

flag = {
   "order":{
       "exist" : False,
       "side"  : "",
       "price" : 0,
       "count" : 0,
       "ATR"   : 0,
       "lot"   : 0,
       "stop"  : 0
   },
   "position":{
       "exist" : False,
       "side" : "",
       "price": 0,
       "stop":0,
       "stop-AF": stop_AF,
       "stop-EP":0,
       "ATR":0,
       "lot":0,
       "count":0
   },
   "add-position":{
       "count":0,
       "first-entry-price":0,
       "last-entry-price":0,
       "unit-range":0,
       "unit-size":0,
       "stop":0
   },
   "records":{
       "date":[],
       "profit":[],
       "return":[],
       "side":[],
       "stop-count":[],
       "funds" : start_funds,
       "holding-periods":[],
       "slippage":[],
       "log":[],
       #////////////////////
       #チャート表示用
       "close_price":[],
       "open_time":[]
       #///////////////////
   }
}


last_data = []
need_term = max(Buy_term,Sell_term,volatility_term)
i = 0
while i < len(price):

   # ドンチャンの判定に使う期間分の安値・高値データを準備する
   if len(last_data) < need_term:
       last_data.append(price[i])
       flag = log_price(price[i],flag)
       time.sleep(wait)
       i += 1
       continue

   data = price[i]
   flag = log_price(data,flag)

   # ポジションがある場合
   if flag["position"]["exist"]:
       if stop_config != "OFF":
           flag = stop_position( data,flag )
       flag = close_position( data,last_data,flag )
       flag = add_position( data,flag )

   # ポジションがない場合
   else:
       flag = entry_signal( data,last_data,flag )

   last_data.append( data )
   i += 1
   time.sleep(wait)


print("--------------------------")
print("テスト期間:")
print("開始時点  : " + str(datetime.fromtimestamp(float(price[0]["open_time"]))))
print("終了時点  : " + str(datetime.fromtimestamp(float(price[-1]["open_time"]))))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")

backtest(flag)

・解説

図2

①総当たりコード削除

パラメータ探索ができる関数のままにしていたんですが、コードが見にくくなるのでやめました。探索したいときは総当たりコードにする。

②トレイリングストップ関数を変更

加速係数を使う奴にしました。
固定比率でトレイリングするより、だんだんトレイリング比率を上げていく方がリターンが大きくなります。
↓こちらを見てください(安定の丸投げ)

③メイン関数削除

関数ではなく、メイン処理として直接記述しました。
これで、無駄に引数が多くなることが無くなりました。

④チャートをプロット可能に

使いどころがあるか分からないけど、チャートも表示できるようにしました。plot関数のコメントアウトの部分をいじれば表示できます。
subplotの行列数はお好みで!plotに関しては↓を参考にするといいです。

他にも細かいところを修正しています。
ファイル出力関数内のパスを相対パス⇒絶対パスにしたり、
価格データを取得する関数で週足にも対応したり。

以上!

◆実行

図1

画像6

--------------------------
テスト期間:
開始時点  : 2019-06-01 09:00:00
終了時点  : 2021-05-06 21:00:00
4234件のローソク足データで検証
--------------------------

バックテスト結果
==============================
--------買いエントリ成績--------
トレード回数       :  29回
勝率            :  31.0%
平均リターン       :  5.37%
総損益          :  22241.08$
平均保有期間     :  65.5足
損切りの回数      :  17回

--------売りエントリ成績--------
トレード回数       :  19回
勝率            :  15.8%
平均リターン       :  0.16%
総損益          :  1172.68$
平均保有期間     :  17.3足
損切りの回数      :  14回

------------総合成績--------------
全トレード数      :  48回
勝率            :  25.0%
平均リターン       :  3.31%
標準偏差        :  15.44%
平均利益率      :  23.42%
平均損失率      :  -3.39%
平均保有期間    :  46.4足
損切りの回数     :  31回
最大連敗回数    :  7回
最大勝ちトレード   :  12875.95$
最大負けトレード   :  -1397.39$
最大ドローダウン    :  -4961$ / 17%
利益合計        :  31079.1$
損失合計        :  -7665.33$
手数料合計      :  -607.1$
最終損益        :  22806.63$

初期資金        :  1000$
最終資金        :  24413.76$
運用成績        :  2441.38%
-----------------------------------
各成績指標
-----------------------------------
MARレシオ         :  135.22
シャープレシオ      :  0.21
プロフィットファクター  :  4.05
損益レシオ        :  6.9
-----------------------------------
+10%を超えるトレードの回数  :  9回
-----------------------------------
2019-07-14 17:00:00  |  28.2%  |  Buy
2019-10-01 09:00:00  |  10.3%  |  Sell
2020-02-17 17:00:00  |  11.1%  |  Buy
2020-03-19 17:00:00  |  32.1%  |  Sell
2020-05-21 21:00:00  |  12.8%  |  Buy
2020-08-22 05:00:00  |  24.6%  |  Buy
2020-11-26 17:00:00  |  53.7%  |  Buy
2021-01-21 17:00:00  |  68.5%  |  Buy
2021-02-23 17:00:00  |  25.5%  |  Buy
-----------------------------------
-10%を下回るトレードの回数  :  0回
-----------------------------------
==============================

plotにチャートを表示してみました。
実行結果自体は前回と同じです。


中身スッカスカになってしまったけど、許してください!
今回はここまで。

次回は便利すぎるエディタ:Spyderの紹介か、AWSの登録~使い方の記事を書こうかな~と思ってます。

この記事が気に入ったらサポートをしてみませんか?