pe氏pybottersinagoflyer読み解き殴り書きメモ4

続き

引き続きmain関数をみていく

        # time barを約定履歴で初期化
        resp = await client.get(
            "/v1/getexecutions", params={"producet_code": args.symbol, "count": 500}
        )
        data = await resp.json()
        await bar_l.init(data[::-1])
        await bar_s.init(data[::-1])

RESTAPIをつかって、約定履歴を取得し、そのデータを用いて初期化してる
[::-1]というのはすべての要素を逆に返す
[1,2,3,4] → [4,3,2,1]みたいな
それをbar_l,sのオーバーライドしたinitにいれて処理してる。

        # web socketに接続
        await client.ws_connect(
            "wss://ws.lightstream.bitflyer.com/json-rpc",
            send_json=[
                {
                    "method": "subscribe",
                    "params": {"channel": f"lightning_board_snapshot_{args.symbol}"},
                    "id": 1,
                },
                {
                    "method": "subscribe",
                    "params": {"channel": f"lightning_board_{args.symbol}"},
                    "id": 2,
                },
                {
                    "method": "subscribe",
                    "params": {"channel": f"lightning_executions_{args.symbol}"},
                    "id": 3,
                },
                {
                    "method": "subscribe",
                    "params": {"channel": "child_order_events"},
                    "id": 4,
                },
            ],
            hdlr_json=store.onmessage,
        )
        while not all([len(w) for w in [store.board, store.executions]]):
            logger.debug("[WAITING SOCKET RESPONSE]")
            await store.wait()

pybottersおなじみの書き方。
websocketに接続してそれをstoreの中に入れていく。
なお最後の3行はstoreのなかに欲しいデータが入ってくるまで、waitして次のプログラムが動かないようにしている。

        await BitflyerInagoBot(
            client,
            store,
            bar_l,
            bar_s,
            lower_threshold=args.lower_threshold,
            upper_threshold=args.upper_threshold,
            entry_patience_seconds=args.entry_patience_seconds,
            entry_price_change=args.entry_price_change,
            trail_margin=args.trail_margin,
            symbol=args.symbol,
            size=args.size,
            side=args.side,
            logger=logger,
        ).loop()

最後にBitflyerInagoBotに最初に指定した引数、今まで作ってきたbar等を入れて、ループ処理
なお、bar等は勝手に更新されていくようになってるのでいちいち呼びだす必要がない

クラスBitflyerInagoBotをみていく

class BitflyerInagoBot(AbstractInagoBot):

AbstractInagoBotの子クラス。AbstractInagoBotをみていく

class AbstractInagoBot:
    """イナゴBOT用の抽象クラス(仮)"""

    def __init__(
        self,
        client: pybotters.Client,
        is_inago_start_fn: Callable[[AbstractInagoBot], tuple[bool, str]] = None,
        is_inago_end_fn: Callable[[AbstractInagoBot], bool] = None,
        off_loop_interval: float = 0.5,
        logger=loguru.logger,
    ):
        self._client = client
        self._is_inago_start_fn = is_inago_start_fn
        self._is_inago_end_fn = is_inago_end_fn
        self._off_loop_interval = off_loop_interval
        self._logger = logger
        self._side = None

    # 要オーバーライド
    async def inago_stream(self) -> Generator[dict]:
        """ "イナゴ"(主に約定情報になると思う)を流す"""
        raise NotImplementedError

    async def on_inago(self, inago):
        """各イナゴに対する処理"""
        pass

    async def is_inago_start(self) -> tuple[bool, str]:
        """イナゴ到来判定"""
        if self._is_inago_start_fn is None:
            raise NotImplementedError
        if asyncio.iscoroutinefunction(self._is_inago_start_fn):
            return await self._is_inago_start_fn(self)
        else:
            return self._is_inago_start_fn(self)

    async def is_inago_end(self) -> bool:
        """イナゴ終了判定"""
        if self._is_inago_end_fn is None:
            raise NotImplementedError
        if asyncio.iscoroutinefunction(self._is_inago_end_fn):
            return await self._is_inago_end_fn(self)
        else:
            return self._is_inago_end_fn(self)

    async def loop(self):
        """イナゴBOTのメインループ

        - off_loop(): イナゴ待機ループ(検知)
        - on_loop(): イナゴ処理ループ(トレード)

        この二つを繰り返す。

        - *_begin(), *_end(): 各関数の前後で呼ばれるhook

        """
        while True:
            self._logger.debug("BEGIN LOOP")
            await self.on_loop_begin()
            self._logger.debug("BEGIN OFF LOOP")
            await self.off_loop()
            self._logger.debug(f"END OFF LOOP")

            self._logger.debug(f"BEGIN ON LOOP: {self._side}")
            await self.on_loop()
            self._logger.debug("END ON LOOP")
            await self.on_loop_end()
            self._logger.debug("END LOOP")

    async def off_loop(self):
        """イナゴオフ時のループ=イナゴ検知"""
        await self.on_off_loop_begin()
        while True:
            is_start, side = await self.is_inago_start()
            if is_start:
                self._side = side
                break
            await asyncio.sleep(self._off_loop_interval)
        await self.on_off_loop_end()

    async def on_loop(self):
        """イナゴオン時のループ=トレード"""
        await self.on_on_loop_begin()
        async for inago in self.inago_stream():
            await self.on_inago_begin(inago)
            await self.on_inago(inago)
            is_end = await self.is_inago_end()
            if is_end:
                self._side = None
                break
            await self.on_inago_end(inago)
        await self.on_on_loop_end()

    async def on_loop_begin(self):
        pass

    async def on_loop_end(self):
        pass

    async def on_off_loop_begin(self):
        pass

    async def on_off_loop_end(self):
        pass

    async def on_on_loop_begin(self):
        pass

    async def on_on_loop_end(self):
        pass

    async def on_inago_begin(self, inago):
        pass

    async def on_inago_end(self, inago):
        pass

    @property
    def client(self):
        return self._client

    @property
    def side(self):
        return self._side

イナゴBOT用の抽象クラスとある。おそらくこれをもとにほかの取引所等でもプログラムがつくれそう。
初期処理から見てく

    def __init__(
        self,
        client: pybotters.Client,
        is_inago_start_fn: Callable[[AbstractInagoBot], tuple[bool, str]] = None,
        is_inago_end_fn: Callable[[AbstractInagoBot], bool] = None,
        off_loop_interval: float = 0.5,
        logger=loguru.logger,
    ):
        self._client = client
        self._is_inago_start_fn = is_inago_start_fn
        self._is_inago_end_fn = is_inago_end_fn
        self._off_loop_interval = off_loop_interval
        self._logger = logger
        self._side = None

引数はいつなににつかうのかこのままだとわからないが、とりあえず複雑なとこがないから飛ばす

inago_streamとon_inagoは書いてある説明で完結

    async def is_inago_start(self) -> tuple[bool, str]:
        """イナゴ到来判定"""
        if self._is_inago_start_fn is None:
            raise NotImplementedError
        if asyncio.iscoroutinefunction(self._is_inago_start_fn):
            return await self._is_inago_start_fn(self)
        else:
            return self._is_inago_start_fn(self)

引数をもらってイナゴの判定をおこなう。
asyncio.iscoroutinefunctionに関して調べる

func がコルーチン関数であると判断された場合、真を返す。コルーチン関数は、装飾された生成関数または非同期 def 関数である可能性がある。(DeepLで翻訳)

https://man.plustar.jp/python/library/asyncio-task.html

つまり、self._is_inago_end_fnがコルーチン関数だったらawait処理、そうじゃないとそのままreturnで値を返す

終了判定も同じなので省略

async def loop(self):
        """イナゴBOTのメインループ

        - off_loop(): イナゴ待機ループ(検知)
        - on_loop(): イナゴ処理ループ(トレード)

        この二つを繰り返す。

        - *_begin(), *_end(): 各関数の前後で呼ばれるhook

        """
        while True:
            self._logger.debug("BEGIN LOOP")
            await self.on_loop_begin()
            self._logger.debug("BEGIN OFF LOOP")
            await self.off_loop()
            self._logger.debug(f"END OFF LOOP")

            self._logger.debug(f"BEGIN ON LOOP: {self._side}")
            await self.on_loop()
            self._logger.debug("END ON LOOP")
            await self.on_loop_end()
            self._logger.debug("END LOOP")

ここは説明通りっぽい
スタート→待機→トレード→終了みないな流れ

    async def off_loop(self):
        """イナゴオフ時のループ=イナゴ検知"""
        await self.on_off_loop_begin()
        while True:
            is_start, side = await self.is_inago_start()
            if is_start:
                self._side = side
                break
            await asyncio.sleep(self._off_loop_interval)
        await self.on_off_loop_end()

    async def on_loop(self):
        """イナゴオン時のループ=トレード"""
        await self.on_on_loop_begin()
        async for inago in self.inago_stream():
            await self.on_inago_begin(inago)
            await self.on_inago(inago)
            is_end = await self.is_inago_end()
            if is_end:
                self._side = None
                break
            await self.on_inago_end(inago)
        await self.on_on_loop_end()

今まで見てきたものを元にイナゴの検知とトレードを行ってるっぽい
その他、関数に関しては省略

今回は本当に殴り書きが多かった。というか純粋にもう書いてあるコメントよめば理解ができた。
実際に動かした際の流れは何となく理解できた。
次の記事でおそらく最後になると思う

この記事が気に入ったらサポートをしてみませんか?