見出し画像

websocketでbitFlyerの約定情報を取得し、ローソク足を作成する

くもすけさんが無料で公開されているコードが2022年時点のPandasで動かなかった為、少し修正してみました。主な変更点は下記のとおりです。

・df.ix -> df.ilocに変更。NaNの処理方法も変更
・ファイルのヘッダにopen, high, low, close等を記載
・csvを開いていたりしてプログラムからcsv書き込みができないときに新規ファイルを作成
・データ数50000件以上で新規ファイル作成

# coding: utf-8
import threading
from collections import deque
import websocket
import warnings
import time
import json
import pandas
import dateutil.parser
import numpy as np
from datetime import datetime,timedelta

product = "FX_BTC_JPY"
timescale = 60
sleeptime = timescale *2
write_counter_max = 50000

def get_logfile_name(dt_now):
   _log_file_name = 'bit_FX_'+ str(timescale) +'s_'+ dt_now.strftime('%Y-%m-%d-%H-%M-%S')  + '.csv'
   return _log_file_name

class Websocketexecutions:
   def __init__(self, product, timescale):
       self.product = product
       self.timescale = str(timescale)+"s"
       self.executions = deque(maxlen=timescale*500)
       self.executionsWebsocket()
       warnings.simplefilter(action="ignore", category=FutureWarning)

   def updatecandle(self):
       tmpExecutions = list(self.executions)
       self.raw = pandas.DataFrame([[dateutil.parser.parse(tick["exec_date"].replace('T',' ')[:-1])+timedelta(hours=9),tick["price"],tick["size"],tick["size"] if tick["side"]=='BUY' else 0,tick["size"] if tick["side"]=='SELL' else 0] for tick in tmpExecutions],columns=["date","price","volume","buy","sell"])
       self.raw = self.raw.set_index('date')
       self.candle = self.raw['price'].resample(self.timescale, label = 'left', closed = 'left').ohlc()
       self.candle = self.candle.assign( volume = self.raw['volume'].resample(self.timescale).sum().values)
       self.candle = self.candle.assign( buy = self.raw['buy'].resample(self.timescale).sum().values)
       self.candle = self.candle.assign( sell = self.raw['sell'].resample(self.timescale).sum().values)

       #NaNがある場合の処理 一旦open値で判断
       nan_index = self.candle.index[(self.candle['open'].isna())]
       if len(nan_index) > 0:
           self.candle['close'] = self.candle['close'].fillna(method='ffill')
           self.candle['volume'] = self.candle['volume'].fillna(0)
           for c in nan_index:
               self.candle.at[c, 'open'] = self.candle.at[c, 'close']
               self.candle.at[c, 'high'] = self.candle.at[c, 'close']
               self.candle.at[c, 'low'] = self.candle.at[c, 'close']

   def executionsWebsocket(self):
       def on_message(ws, message):
           messages = json.loads(message)
           recept_data = messages["params"]["message"]
           for i in recept_data:
               self.executions.append(i)
       def on_open(ws):
           ws.send(json.dumps({"method": "subscribe", "params": {"channel": "lightning_executions_{}".format(product)}}))
       def run(ws):
           while True:
               ws.run_forever()
               time.sleep(3)
       ws = websocket.WebSocketApp( "wss://ws.lightstream.bitflyer.com/json-rpc", on_message=on_message )
       ws.on_open = on_open
       websocketThread = threading.Thread(target=run, args=(ws, ))
       websocketThread.start()

if __name__ == "__main__":
   print('start bitflyer websocket')
   dt_now = datetime.now()
   log_file_name = get_logfile_name(dt_now)

   websocket = Websocketexecutions( product, timescale )
   while not websocket.executions :
       time.sleep(1)
   lastdate = ""

   write_counter = 0
   while True:
       time.sleep(sleeptime)
       dt_now_for_candle = datetime.now()
       websocket.updatecandle()
       lastpos = 0 if lastdate == "" else websocket.candle.index.get_loc(lastdate)
       latestCandle = websocket.candle[lastpos:len(websocket.candle)-1]
       if len(latestCandle)>0  :
           print(latestCandle)

           if write_counter == 0:
               latestCandle.to_csv( log_file_name, header=True)
           else:
               try:
                   latestCandle.to_csv( log_file_name, header=False, mode='a')
               except:
                   dt_now = datetime.now()
                   log_file_name = get_logfile_name(dt_now)
                   latestCandle.to_csv( log_file_name, header=True)
                   write_counter = 0

           write_counter = write_counter + 1
           if write_counter >= write_counter_max:
               dt_now = datetime.now()
               log_file_name = get_logfile_name(dt_now)
               write_counter = 0

       lastdate = websocket.candle[-1:].index[0]
       

必要なライブラリはpandas 、websocket-client、numpyくらいだったかと思います。

ローソク足の時間は'timescale'にて設定できます。秒単位なので60と指定すると1分足が出力されます。

ファイル名は「bit_FX_{秒足}s_{ファイル取得開始時間}」で作成されます。
ファイル名を変更されたい方は'_log_file_name'を変更してください。

こちらが元々のくもすけさんのコードです。

有料の部分に記載はありませんが、この記事が役に立った方はコーヒー代として、有料部分を購入していただけると非常に喜びます。

お役に立てば幸いです。


ここから先は

56字

¥ 100

この記事が気に入ったらサポートをしてみませんか?