見出し画像

【Bitflyer】簡易リアルタイムフォワードテスト(ソースコード付き)

前に作ったリアルタイムで簡易的なフォワードテストができる仕組み。

結局使う機会がなくて、ずっと眠ってるのももったいないので公開することにした。

なんとなく可視化したくなっちゃっただけです。(あと普通にマルチスレッドプログラミングとか学べたし純粋に楽しかった)

高頻度botのフォワードテストとかに使えるかもしれないです(やってない)

実行すると疑似取引所のGUIが表示されて価格推移・指値・損益が確認できます。
疑似取引所のAPIサーバーがlocalhostで起動されるので、勝手にAPIを叩いて注文出したりキャンセルしたりできます。

板情報は使ってないから特に成行注文は精度悪めです。

getリクエスト叩けば注文できます。

requests.get(f"http://localhost/marketorder?side={side}&size={size}")

どんなリクエストが叩けるかはソースコードのExchangeAPIHandlerクラスの中を見ていただければわかるかもしれません。

ソースコード

from http.server import HTTPServer, BaseHTTPRequestHandler
from PyQt5 import QtCore, QtGui, QtWidgets
from urllib.parse import urlparse, parse_qs
import pyqtgraph as pg
import websocket
import sys
import json

CHANNEL = "lightning_executions_FX_BTC_JPY"

# 疑似取引所クラス
class Exchange:
	def __init__(self, min_ordersize, max_position, maker_fee, taker_fee):
		self.min_ordersize = min_ordersize
		self.max_position = max_position
		self.maker_fee = maker_fee
		self.taker_fee = taker_fee
		self.position = 0
		self.balance = 0
		self.prices = []
		self.pnl = []
		self.orders = []
		self.order_id = 0
	
	def limit_order(self, side, price, size):
		side = side.lower()
		if size < self.min_ordersize:
			print('the ordersize is small.')
			return
		if side == 'sell' and self.position - sum([order['size'] for order in self.orders if order['side'] == 'sell']) - size < -self.max_position:
			print('The maximum position cannot be exceeded.')
			return
		if side == 'buy' and self.max_position < self.position + sum([order['size'] for order in self.orders if order['side'] == 'buy']) + size:
			print('The maximum position cannot be exceeded.')
			return
		self.orders.append({'order_id': self.order_id, 'side': side, 'price': price, 'size': size})
		self.order_id += 1

	def market_order(self, side, size):
		side = side.lower()
		if size < self.min_ordersize:
			print('the ordersize is small.')
			return
		if side == 'sell' and self.position - sum([order['size'] for order in self.orders if order['side'] == 'sell']) - size < -self.max_position:
			print('The maximum position cannot be exceeded.')
			return
		if side == 'buy' and self.max_position < self.position + sum([order['size'] for order in self.orders if order['side'] == 'buy']) + size:
			print('The maximum position cannot be exceeded.')
			return
		if side == 'buy':
			self.position += size * (1 - self.taker_fee)
			self.balance -= size * self.prices[-1]
		elif side == 'sell':
			self.position -= size * (1 - self.taker_fee)
			self.balance += size * self.prices[-1]
	
	def cancel_order(self, order_id):
		del_idx = [idx for idx in range(len(self.orders)) if self.orders[idx]['order_id'] == order_id]
		if len(del_idx) == 1:
			del_idx = del_idx[0]
			del self.orders[del_idx]

	def cancel_all_orders(self):
		self.orders *= 0
	
	def update(self, side, price, size):
		for i in range(len(self.orders) - 1, -1, -1):
			if self.orders[i]['side'] == 'buy' and side == 'sell' and self.orders[i]['price'] > price:
				if self.orders[i]['size'] > size:
					self.position += size * (1 - self.maker_fee)
					self.balance -= size * self.orders[i]['price']
					self.orders[i]['size'] -= size
				else:
					self.position += self.orders[i]['size'] * (1 - self.maker_fee)
					self.balance -= self.orders[i]['size'] * self.orders[i]['price']
					del self.orders[i]
			elif self.orders[i]['side'] == 'sell' and side == 'buy' and self.orders[i]['price'] < price:
				if self.orders[i]['size'] > size:
					self.position -= size * (1 - self.maker_fee)
					self.balance += size * self.orders[i]['price']
					self.orders[i]['size'] -= size
				else:
					self.position -= self.orders[i]['size'] * (1 - self.maker_fee)
					self.balance += self.orders[i]['size'] * self.orders[i]['price']
					del self.orders[i]
		self.pnl.append(self.balance + price * self.position)

# 疑似取引所APIハンドラクラス
class ExchangeAPIHandler(BaseHTTPRequestHandler):
	def do_GET(self):
		request = urlparse(self.path)
		params = parse_qs(request.query)

		# 建玉を取得
		if request.path == '/position':
			response = {"position": window.exchange.position}
			self.send_response(200)
			self.send_header('Content-Type', 'application/json')
			self.end_headers()
			self.wfile.write(json.dumps(response).encode('utf-8'))
			return

		# 最新価格を取得
		if request.path == '/price':
			response = {"price": window.exchange.prices[-1]}
			self.send_response(200)
			self.send_header('Content-Type', 'application/json')
			self.end_headers()
			self.wfile.write(json.dumps(response).encode('utf-8'))
			return
		
		# 成行注文を出す
		if request.path == '/marketorder':
			window.exchange.market_order(params['side'][0], float(params['size'][0]))
			response = {"status": 200}
			self.send_response(200)
			self.send_header('Content-Type', 'application/json')
			self.end_headers()
			self.wfile.write(json.dumps(response).encode('utf-8'))
			return

		# 指値注文を出す
		if request.path == '/limitorder':
			window.exchange.limit_order(params['side'][0], float(params['price'][0]), float(params['size'][0]))
			response = {"status": 200}
			self.send_response(200)
			self.send_header('Content-Type', 'application/json')
			self.end_headers()
			self.wfile.write(json.dumps(response).encode('utf-8'))
			return

		# 注文をキャンセルする
		if request.path == '/cancelorder':
			window.exchange.cancel_order(int(params['order_id'][0]))
			response = {"status": 200}
			self.send_response(200)
			self.send_header('Content-Type', 'application/json')
			self.end_headers()
			self.wfile.write(json.dumps(response).encode('utf-8'))
			return

		# すべての注文をキャンセルする
		if request.path == '/cancelallorders':
			window.exchange.cancel_all_orders()
			response = {"status": 200}
			self.send_response(200)
			self.send_header('Content-Type', 'application/json')
			self.end_headers()
			self.wfile.write(json.dumps(response).encode('utf-8'))
			return

		self.send_response(404)
		self.send_header('Content-Type', 'application/json')
		self.end_headers()
		response = {"status": 404}
		self.wfile.write(json.dumps(response).encode('utf-8'))

# 疑似取引所APIサーバークラス
class ExchangeAPIDaemon(QtCore.QThread):
	def run(self):
		self.server = HTTPServer(('', 80), ExchangeAPIHandler)
		while True:
			self.server.serve_forever()

# ウィンドウクラス
class Window(QtWidgets.QMainWindow):
	def __init__(self):
		super(Window, self).__init__()
		self.exchange = Exchange(min_ordersize=0.01, max_position=0.1, maker_fee=0, taker_fee=0)

		self.win = pg.GraphicsLayoutWidget(show=True)
		self.p1 = self.win.addPlot(x=list(range(len(self.exchange.prices))), y=self.exchange.prices, row=0, col=0)
		self.p2 = self.win.addPlot(x=list(range(len(self.exchange.prices))), y=self.exchange.prices, row=1, col=0)

		# 約定データ受信用のデーモンを起動
		self.thread = WebsocketListenerDaemon()
		self.thread.windowThread.connect(self.add_execution)
		self.thread.start()

		# 疑似取引所APIリクエスト受信用のデーモンを起動
		self.thread = ExchangeAPIDaemon()
		self.thread.start()
	
	def update(self):
		self.p1.clear()

		# 価格プロット
		prices_min = min(self.exchange.prices)
		prices_max = max(self.exchange.prices)
		self.p1.plot(list(range(len(self.exchange.prices))), self.exchange.prices, clear=True)
		self.p1.setRange(yRange=[prices_min-(prices_max-prices_min), prices_max+(prices_max-prices_min)])
		self.p1.setRange(xRange=[0, len(self.exchange.prices)])
		self.p1.showGrid(x=True, y=True)
		
		# 指値プロット
		for order in self.exchange.orders:
			self.p1.addItem(pg.InfiniteLine(pos=order['price'], angle=0, movable=False, pen=pg.mkPen(color=('#0f07' if order['side'] == 'buy' else '#f007'), width=1, style=QtCore.Qt.DashLine)))

		self.p1.addItem(pg.InfiniteLine(pos=self.exchange.prices[-1], angle=0, movable=False, pen=pg.mkPen(color='#ff0', width=1)))
		
		# 損益プロット
		self.p2.plot(list(range(len(self.exchange.pnl))), self.exchange.pnl, clear=True)
		self.p2.setRange(yRange=[min(self.exchange.pnl), max(self.exchange.pnl)])
		self.p2.showGrid(x=True, y=True)

		self.p2.setXLink(self.p1)
		
	def add_execution(self, side, price, size):
		self.exchange.update(side, price, size)
		self.exchange.prices.append(price)
		self.update()

# リアルタイム約定取得クラス
class WebsocketListenerDaemon(QtCore.QThread):
	windowThread = QtCore.pyqtSignal(str, float, float)
	
	def __init__(self, parent=None):
		super(WebsocketListenerDaemon, self).__init__(parent)
		websocket.enableTrace(False)
		self.ws = websocket.WebSocketApp("wss://ws.lightstream.bitflyer.com/json-rpc", on_open=self.on_open, on_message=self.on_message, on_error=self.on_error)

	def run(self):
		while True:
			self.ws.run_forever()

	def on_open(self, ws):
		ws.send(json.dumps({"method": "subscribe", "params": {"channel": CHANNEL}}))

	def on_message(self, ws, message):
		message = json.loads(message)
		if message["method"] == "channelMessage":
			for m in message["params"]["message"]:
				self.windowThread.emit(m['side'].lower(), float(m['price']), float(m['size']))

	def on_error(self, ws, error):
		print(error)

if __name__ == '__main__':
	app = QtWidgets.QApplication(sys.argv)
	QtWidgets.QApplication.setQuitOnLastWindowClosed(True)
	window = Window()
	sys.exit(app.exec_())

この記事が気に入ったらサポートをしてみませんか?