【Python】bitFlyerのRealtimeAPIのTicker情報からOHLCデータを作成する

やること

pythonでbitFlyerのRealtimeAPIのTicker情報を取得してOHLCデータを作成する

目次
● 全体のコード  #無料パート
● コードの説明(OHLCデータ生成) #有料パート

本コードはMacOS上のターミナルでpythonのバージョンが3.7.1で実装したものです。またパッケージ等のインストールに関しては各自で行ってください。

●全体のコード

RealtimeAPIのTicker情報を取得するコード

"""
==========================================================================

    Packages

==========================================================================
"""
import json
import websocket
import threading


"""
==========================================================================

   Global variables

==========================================================================
"""
SYMBOL_BTC_FX = 'FX_BTC_JPY'
WS_URL = 'wss://ws.lightstream.bitflyer.com/json-rpc'
CHANNEL_NAME = 'lightning_ticker_FX_BTC_JPY'


"""
==========================================================================

   Functions

==========================================================================
"""
def run_websocket():
    ws.keep_running = True
    ws.run_forever()

"""
Below are callback functions of websocket.
"""
# when get message
def on_message(self, message):
    acquired_data = json.loads(message)
    # ここでOHLCデータ生成関数にデータを渡す

# when error occurs
def on_error(self, error):
    print(error)

# when websocket closed.
def on_close(self):
    print('Disconnected websocket')

# when websocket opened.
def on_open(self):
    print('Connected websocket')
    ws.send(json.dumps({ 'method': 'subscribe', 'params': { 'channel': CHANNEL_NAME } }))

if __name__ == '__main__':
    ws = websocket.WebSocketApp(WS_URL, on_open = on_open, on_message = on_message, on_error = on_error, on_close = on_close)
    t1 = threading.Thread(target = run_websocket)

    t1.start()

OHLCデータの生成コード

"""
==========================================================================

    Packages

==========================================================================
"""
import json
import datetime
from time import sleep
import dateutil.parser

"""
==========================================================================

   Global variables

==========================================================================
"""
class GenerateOHLC:
    def __init__(self, period):
        self.ohlc = { 'open': '', 'high': '', 'low': '', 'close': '' }
        self.ohlc_list = []
        self.next_time = 0
        self.first_time = datetime.datetime.now()
        self.period = period

    def begin_generate(self, acquired_data):
        exec_date = self.__toDatetime(acquired_data['timestamp']) + datetime.timedelta(hours=9) # change to JTC
        price = float(acquired_data['ltp'])

        if (self.__get_date_seconds_excluded(self.first_time) == self.__get_date_seconds_excluded(exec_date)):
            return

        if not self.next_time:
            self.next_time = self.__get_date_seconds_excluded(self.first_time) + 60 + self.period
            self.__initialize_ohlc(price)
        elif self.__is_next_candlestick(exec_date):
            self.ohlc_list.insert(0, { 'open': self.ohlc['open'],
                                        'high': self.ohlc['high'],
                                        'low': self.ohlc['low'],
                                        'close': self.ohlc['close'] })
            self.__initialize_ohlc(price)
            self.next_time += self.period
        else:
            self.__update_ohlc(price)

    # Use to skip the bad timing
    def __get_date_seconds_excluded(self, datetime):
        base_date = datetime.replace(second=0, microsecond=0)
        return base_date.timestamp()

    # Use to judge next candlestick
    def __get_date_millseconds_eccluded(self, datetime):
        base_date = datetime.replace(microsecond=0)
        return base_date.timestamp()

    def __is_next_candlestick(self, exec_date):
        return self.next_time <= self.__get_date_millseconds_eccluded(exec_date)

    def __toDatetime(self, date):
        datetime = date.replace('T', ' ')[:-1]
        return dateutil.parser.parse(datetime)

    def __initialize_ohlc(self, price):
        self.ohlc['open'] = price
        self.ohlc['high'] = price
        self.ohlc['low'] = price
        self.ohlc['close'] = price

    def __update_ohlc(self, price):
        self.ohlc['high'] = max(self.ohlc['high'], price)
        self.ohlc['low'] = min(self.ohlc['low'], price)
        self.ohlc['close'] = price

コードの実行

from generate_ohlc import GenerateOHLC
generate_ohlc = GenerateOHLC(60) # 60 => 1分(生成したい時間を入れる)

generate_ohlc.begin_generate(APIで取得したデータ(json型))

ここまでが無料パート。

有料パートはコードの説明になります。上記のコードを見ても全くわからな人はnoteの購入をお勧めします。また、500円くらいならいいかと投げ銭してくれると嬉しいです。

BTCで送金してくれてもありがたいです。36N9ACPRyHU5dHCqRUugkgJtUnJh5SGE57

ここから先は

7,880字

¥ 500

この記事が気に入ったらサポートをしてみませんか?