見出し画像

[Python]Bybit websocket

こんな記事にたどり着いてしまう変態仮想通貨botterの皆さんこんにちは

年中自粛botterのOneです。

仮想通貨取引所BybitのwebsocketAPIのコードを張るだけの記事です。

BITMEX以外の海外取引所のサンプルコードって少ない気がするので時短のためにコピーしちゃう人用です。

まちゅけんさんのコードにかなり影響を受けています

Bybitだけでなく他のコードもすごく読みやすくていつも参考にさせて頂いています。自分のコードはカスタマイズしやすいようにベーシックなところしか書いていません。使いやすいようにアレンジしてください。

動くとは思いますが変なところがあればごめんなさい!

注意点ですが、positionを購読するのであれば最初にhttpで取得するか、強引にinputで突っ込むかした方がいいと思います。(自分は使わないので放置しています)

それでは早速コードです。ありがとうございました!

# -*- coding: utf-8 -*-
import hmac
import hashlib
import json
import logging
import websocket
import threading
from time import time, sleep
from collections import deque

class Bybit(object):

   def __init__(self, api_key, secret, symbol='BTCUSD'):
       self.api_key = api_key
       self.secret = secret
       self.symbol = symbol
   
       self.endpoint = 'wss://stream.bybit.com/realtime'
       # 不要なチャンネルは行ごと消してください
       self.channel_list = ['trade.' + self.symbol, 
                            'instrument.' + self.symbol, 
                            'orderBookL2_25.' + self.symbol, 
                            'position', 
                            'execution', 
                            'order'
                            ]
       self.logger = logging.getLogger(__name__)
       self.logger.debug("Initializing WebSocket.")
                
       self.data = {
                'connection':False,
                'last_price':None,
                'timestamp':{},
                'execution':deque(maxlen=200),
                'instrument':None,
                'board_snapshot':{
                        'asks':[],
                        'bids':[]
                        },
                'position':{},
                'my_execution':deque(maxlen=50),
                'my_order':deque(maxlen=50)
                }
       for i in self.channel_list:
           self.data['timestamp'][i] = None
   
       self.board_snapshot = {}
       self.board_snapshot_bids_dict = {}
       self.board_snapshot_asks_dict = {}
   
       self.__connect(self.endpoint)
       self.ping_th = threading.Thread(target=self.send_ping)
       self.ping_th.daemon = True
       self.ping_th.start()
   
   def __connect(self, endpoint):
       while not self.data['connection']:
           self.ws = websocket.WebSocketApp(endpoint,
                                            on_message=self.__on_message,
                                            on_close=self.__on_close,
                                            on_open=self.__on_open,
                                            on_error=self.__on_error,
                                            header=None)
    
           self.ws_th = threading.Thread(target=lambda: self.ws.run_forever())
           self.ws_th.daemon = True
           self.ws_th.start()
           sleep(10)
            
       self.__wait_first_data
       self.logger.info("WebSocket Opend.")
   
   def __exit(self):
       self.ws.close()
       self.data['connection'] = False
       for i in self.data['timestamp']:
           self.data['timestamp'][i] = None
   
   def __wait_first_data(self):
       for i in self.data['timestamp']:
           # 実際に使う際はポジションは取得してからの方がいいです
           if i in ['position', 'execution', 'order']:
               continue
           while not self.data['timestamp'][i]:
               sleep(0.1)
   
   def __on_open(self):
       if 'position' in self.channel_list or 'order' in self.channel_list or 'execution' in self.channel_list:
           # timestamp足してあげないとAuthエラーが出る
           timestamp = int((time() + 10.0) * 1000)
           param_str = 'GET/realtime' + str(timestamp)
           sign = hmac.new(self.secret.encode('utf-8'),
                           param_str.encode('utf-8'), hashlib.sha256).hexdigest()
           self.ws.send(json.dumps(
                   {'op': 'auth', 'args': [self.api_key, timestamp, sign]}))
   
       self.ws.send(json.dumps(
               {'op': 'subscribe', 'args': self.channel_list}))
   
   def __on_close(self):
       self.logger.info('Websocket Closed.')
   
   def __on_error(self, error):
       self.logger.error("Error : %s" % error)
       self.__exit()
       self.__connect(self.endpoint)
   
   def __on_message(self, message):
       try:
           message = json.loads(message)
           topic = message.get('topic')
           data = message.get('data')
           ret_msg = message.get('ret_msg')
           self.data['timestamp'][topic] = time()
    
   
           if topic == 'trade.' + self.symbol:
               for d in data:
                   self.data['last_price'] = d['price']
                   self.data['execution'].append(d)
   
           elif topic == 'instrument.' + self.symbol:
               self.data['instrument'] = data[0]
   
           elif topic == 'orderBookL2_25.' + self.symbol:
               if message['type'] == 'snapshot':
                   self.board_snapshot_bids_dict.clear()
                   self.board_snapshot_asks_dict.clear()
                   for d in data:
                       if d['side'] == 'Buy':
                           self.board_snapshot_bids_dict[float(d['price'])] = [float(d['price']), float(d['size'])]
                       elif d['side'] == 'Sell':
                           self.board_snapshot_asks_dict[float(d['price'])] = [float(d['price']), float(d['size'])]
                
               else:
                   if data['delete']:
                       for d in data['delete']:
                           if d['side'] == 'Buy':
                               del self.board_snapshot_bids_dict[float(d['price'])]
                           elif d['side'] == 'Sell':
                               del self.board_snapshot_asks_dict[float(d['price'])]
   
                   if data['insert']:
                       for i in data['insert']:
                           if i['side'] == 'Buy':
                               self.board_snapshot_bids_dict[float(i['price'])] = [float(i['price']), float(i['size'])]
                           elif i['side'] == 'Sell':
                               self.board_snapshot_asks_dict[float(i['price'])] = [float(i['price']), float(i['size'])]
   
                   if data['update']:
                       for u in data['update']:
                           if u['side'] == 'Buy':
                               self.board_snapshot_bids_dict[float(u['price'])] = [float(u['price']), float(u['size'])]
                           elif u['side'] == 'Sell':
                               self.board_snapshot_asks_dict[float(u['price'])] = [float(u['price']), float(u['size'])]
                            
               self.data['board_snapshot']['bids'] = [i[1] for i in sorted(self.board_snapshot_bids_dict.items(), key=lambda bid: bid[1][0],reverse=True)]
               self.data['board_snapshot']['asks'] = [i[1] for i in sorted(self.board_snapshot_asks_dict.items(), key=lambda ask: ask[1][0],reverse=False)]
   
           elif topic == 'position':
               if data[0]['symbol'] == self.symbol:
                   self.data['position'] = data[0]
   
           elif topic == 'execution':
               for d in data:
                   if d['symbol'] == self.symbol:
                       self.data['my_execution'].append(d)
   
           elif topic == 'order':
               for d in data:
                   if d['symbol'] == self.symbol:
                       self.data['my_order'].append(d)
   
           elif 'success' in message.keys():
               if message['success'] == True:
                   if ret_msg == 'pong':
                       pass
                   elif len(message['request']['args']) == len(self.channel_list):
                       self.data['connection'] = True
               else:
                   raise Exception("Connection failed: %s" % message)
   
           else:
               raise Exception("Unknown message: %s" % message)
   
       except Exception as e:
            self.logger.error(e)
   
   


   def send_ping(self):
       # 30~60秒ごとにピンポンした方が良いらしい
       while True:
           self.ws.send('{"op":"ping"}')
           sleep(30)
   
   def reconnect(self):
       self.__exit()
       self.__connect(self.endpoint)


if __name__ == '__main__':    
   from pprint import pprint

   symbol = 'BTCUSD'
   api_key = ''
   secret = ''
   bybit = Bybit(api_key, secret, symbol=symbol)

   while True:
       pprint(bybit.data)
       sleep(30)