見出し画像

Pybottersを使おう WebSocket編

前回の続きで、Pybottersを使ってWebSocketに接続してみます。

1.インストール

図1

PyPIに登録してあるので、pip installだけでインストール可能です。

pip install pybotters

2.使用方法

図1

2-1.RestAPI

画像3

2-2.WebSocket

画像3

bybitで試してみましょう。

import pybotters
import asyncio
from rich import print
import pandas as pd


#windowsのみ
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

apis = {
   'bybit'          :[''''],
   'bybit_testnet'  :[''''],
   'binance'        :[''''],
   'binance_testnet':[''''],
   'ftx'            :[''''],
   'bitflyer'       :[''''],
   'gmocoin'        :[''''],
   'liquid'         :[''''],
   'bitbank'        :['''']
}

RestAPI_url = {
   'bybit'          :'https://api.bybit.com',
   'bybit_testnet'  :'https://api-testnet.bybit.com',
   'binance'        :'https://fapi.binance.com',
   'binance_testnet':'https://testnet.binancefuture.com',
   'ftx'            :'https://ftx.com/api/',
   'bitflyer'       :'https://api.bitflyer.com/v1/',
   'gmocoin_public' :'https://api.coin.z.com/public',
   'gmocoin_private':'https://api.coin.z.com/private',
   'liquid'         :'https://api.liquid.com',
   'bitbank_public' :'https://public.bitbank.cc',
   'bitbank_private':'https://api.bitbank.cc/v1'
}

wss_url = {
   'bybit'          :'wss://stream.bybit.com/realtime',
   'bybit_testnet'  :'wss://stream-testnet.bybit.com/realtime',
   'binance'        :'wss://fstream.binance.com',
   'binance_testnet':'wss://stream.binancefuture.com',
   'ftx'            :'wss://ftx.com/ws/',
   'gmocoin'        :''
}


async def bybit_ws():
   async with pybotters.Client(apis=apis) as client:
       
       # データストアのインスタンスを生成する
       store = pybotters.BybitDataStore()

       # WebSocket接続
       wstask = await client.ws_connect(
           wss_url['bybit'],
           send_json={'op''subscribe',
                      'args': [
                           "orderBook_200.100ms.BTCUSD",
                           'trade.BTCUSD',
                           'instrument_info.100ms.BTCUSD',
                           ]
                      },
           hdlr_json=store.onmessage,
       )

       # WebSocketでデータを受信するまで待機
       while not all([
           len(store.orderbook),
           len(store.instrument)
       ]):
           await store.wait()

       # メインループ
       while True:
           # データ参照
           data = dict(
                       orderbook =store.orderbook.sorted(),
                       instrument=store.instrument.get({"symbol":"BTCUSD"}),
                       position  =store.position_inverse.find(),
                       trade     =store.trade.find(),
                       execution =store.execution.find(),
                       order     =store.order.find()
                       )

           print(data['instrument'])

           await asyncio.sleep(1)

if __name__ == '__main__':
   try:
       asyncio.run(bybit_ws())

   except KeyboardInterrupt:
       pass

メイン関数の解説をしていきます。
メイン関数前の辞書リストは前回の記事で説明しています。

図3

        # データストアのインスタンスを生成する
        store = pybotters.BybitDataStore()

まずはBybit専用のデータストアを作成します。
データストアはWSで取得したデータを保存しておく場所で、取り出す際に必要に応じてソートや最新のものを取り出す等ができます。

図3

        # WebSocket接続
        wstask = await client.ws_connect(
            wss_url['bybit'],
            send_json={'op''subscribe',
                       'args': [
                           "orderBook_200.100ms.BTCUSD",
                           'trade.BTCUSD',
                           'instrument_info.100ms.BTCUSD',
                           ]
                      },
           hdlr_json=store.onmessage,
        )

WebSocketに接続します。
send_json()の中に、購読したいトピックを記入して、WS接続を行います。
hdlr_jsonは受け取ったjson形式のデータをどう処理するか決める部分で、store.onmessageを指定するとデータストアにどんどんデータが突っ込まれていきます。
pybotters.print_handlerにすると文字列でコンソールに出力されます。

一度WSに接続したら、再接続処理等は不要です。
pybottersの仕様で自動でやってくれるらしいです(神か、、、)

図3

        # WebSocketでデータを受信するまで待機
       while not all(
           [
           len(store.orderbook),
           len(store.instrument)
           ]   
           ):
           await store.wait()

地味に重要な部分です。
WS接続を行うと、トピックごとに最初のデータを受信する時間が違います。
ここでほしいデータを受信したことを確認してからメインの処理に移ります。

図3

        # メインループ
       while True:
           # データ参照
           data = dict(
                       orderbook =store.orderbook.sorted(),
                       instrument=store.instrument.get({"symbol":"BTCUSD"}),
                       position  =store.position_inverse.find(),
                       trade     =store.trade.find(),
                       execution =store.execution.find(),
                       order     =store.order.find()
                       )

           print(data['instrument'])

           await asyncio.sleep(1)

続いてメインループです。

dataに辞書形式でデータストアから取り出した値を格納しています。
※dict(a=b)で{'a':'b'}と同じ扱いになります。

ループの待機時間にはtime.sleep()ではなく、await asyncio.sleep()を使います。こうしないとsleepが同期処理として扱われるので、全体の処理がスリープしていしまいます。

図3

if __name__ == '__main__':
   try:
       asyncio.run(bybit_ws())
       
   except KeyboardInterrupt:
       pass

最後にメイン関数の実行部分です。
前回説明したように、呼び出し時はasyncio.run()を使います。

・・・・・

とりあえずここまでにします。
冒頭のコードを動かすと、1秒ごとにコンソールにティッカー情報が表示されますので、ぜひ動かしてみてください。


この記事が気に入ったらサポートをしてみませんか?