bitFlyer Lightning Realtime API を受信し、とにかく保存する (3)
・マルチプロセスで、複数のチャネルを並列して保存するように書き換え。結構な量が届くので、マルチスレッドだとGILが不安だと思い、マルチプロセスを使う。ログも別れてるほうが便利かなと。
・ファイルはあまり大きくなると厄介なので1日ごとにファイル名を変えることにした。ログも同じく深夜をまたぐと日付がつくはず。
・0時で区切るかBFのメンテが入る4時で区切るかは少し迷ったが、0時にした。BF以外も扱っていったりするかもしれないので、BFを特別扱いしすぎたくない。
・日付の区切りを実時間でやるか受信したメッセージに含まれるタイムスタンプでやるかも少し迷った。本当は後者のほうが気持ち良いと思うが、チャネルによってタイムスタンプのフィールド名が違ったりと、やや実装が面倒であり、いうて大差ないだろうということで前者を採用。
・とりあえずJSONで書いているが、速度やファイルサイズに問題があればmsgpackに移行したい。
import websocket
import colorlog
import json
import logging
import logging.handlers
import sys
import os
import datetime
import argparse
import multiprocessing
URL = 'wss://ws.lightstream.bitflyer.com/json-rpc'
CHANNELS = [
'{}_{}'.format(channel, product)
for channel in [
'lightning_board_snapshot',
'lightning_board',
'lightning_ticker',
'lightning_executions',
]
for product in [
'BTC_JPY',
'FX_BTC_JPY',
]
]
class App:
def __init__(self, out_root_dir, channel):
self.channel = channel
self.out_dir = os.path.join(out_root_dir, channel)
self.logger = logging.getLogger('{}.{}'.format(__name__, self.channel))
self.out_file = None
self.out_file_date = None
def run_forever(self):
self.setup_logger()
# websocket-clientはinspect.ismethodでコールバックの型を認識して処理を変えてくる。
# (かなり軽率な仕様だと思うのでやめてほしい……)
# bound methodだと何故かwsを貰えないので、lambdaでごまかす。
ws = websocket.WebSocketApp(
URL,
header=None,
on_open=lambda w: self.on_open(w),
on_message=lambda w, m: self.on_message(w, m),
on_error=lambda w, e: self.on_error(w, e),
on_close=lambda w: self.on_close(w),
)
ws.run_forever()
def setup_logger(self):
log_path = os.path.join(self.out_dir, 'log', 'log.txt')
os.makedirs(os.path.dirname(log_path), exist_ok=True)
stderr_handler = colorlog.StreamHandler()
stderr_handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s[%(asctime)s %(levelname)s:%(name)s] %(message)s'))
file_handler = logging.handlers.TimedRotatingFileHandler(
filename=log_path, when='MIDNIGHT')
logging.basicConfig(level=logging.INFO, handlers=[
stderr_handler, file_handler])
def check_reopen_out_file(self):
today = datetime.date.today()
if self.out_file_date == today:
return
if self.out_file:
self.out_file.close()
out_file_path = os.path.join(
self.out_dir, 'stream', '{}.txt'.format(today.isoformat()))
os.makedirs(os.path.dirname(out_file_path), exist_ok=True)
self.out_file = open(out_file_path, 'w')
self.out_file_date = today
self.logger.info('Writing to: {}'.format(out_file_path))
def on_open(self, ws):
self.logger.info('open')
msg = {
'method': 'subscribe',
'params': {
'channel': self.channel
}
}
ws.send(json.dumps(msg))
def on_message(self, ws, message):
body = json.loads(message)['params']['message']
self.check_reopen_out_file()
self.out_file.write(json.dumps(body))
self.out_file.write('\n')
def on_close(self, ws):
self.logger.warning('closed')
def on_error(self, ws, error):
self.logger.exception(error)
if isinstance(error, KeyboardInterrupt):
sys.exit(1)
def subprocess_main(out_dir, channel):
while True:
try:
app = App(out_dir, channel)
app.run_forever()
except Exception as e:
logger = logging.getLogger('{}.{}'.format(__name__, channel))
logger.exception(e)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--out-dir', required=True)
args = parser.parse_args()
processes = []
for channel in CHANNELS:
p = multiprocessing.Process(
target=subprocess_main, args=(args.out_dir, channel))
p.daemon = True
p.start()
processes.append(p)
for p in processes:
p.join()
if __name__ == '__main__':
main()
python main.py --out-dir ~/btc
今日は日付をまたいでも死なないことを確認してから寝る。
この記事が気に入ったらサポートをしてみませんか?