pe氏pybottersinagoflyer読み解き殴り書きメモ5

続き

前回はAbstractInagoBotの中身を見ていった。
今回はそれの子クラスのBitflyerInagoBotを見ていく
まずは初期処理から

    def __init__(
        self,
        client: pybotters.Client,
        store: pybotters.bitFlyerDataStore,
        bar_l: BitflyerTimeBar,
        bar_s: BitflyerTimeBar,
        *,
        lower_threshold: float,
        upper_threshold: float,
        entry_patience_seconds: int,
        entry_price_change: int,
        trail_margin: int,
        symbol: str = "FX_BTC_JPY",
        size: float = 0.01,
        side: str = "BOTH",
        **kwargs,
    ):
        super(BitflyerInagoBot, self).__init__(client, **kwargs)
        self._store = store
        self._bar_l = bar_l
        self._bar_s = bar_s
        self._lower_threshold = lower_threshold
        self._upper_threshold = upper_threshold
        self._entry_patience_seconds = entry_patience_seconds
        self._entry_price_change = entry_price_change
        self._trail_margin = trail_margin
        self._symbol = symbol
        self._size = size
        self._entry_side = side
        self._entry_order_info = None
        self._exit_order_info = None

        self._asks, self._bids = None, None
        asyncio.create_task(self.auto_ask_bid_update())

基本的にはここまでで作成されてるbarと、argsの引数がここに渡される
また、ここでもcreate_taskを用いてる。auto_ask_bid_updateの中身をみる

    async def auto_ask_bid_update(self):
        """板情報の自動更新タスク"""
        while True:
            await self._store.board.wait()
            self._asks, self._bids = self._store.board.sorted().values()

板情報が返ってくるまでwait,その返ってきた板情報をsortedしてる。
このソートはpybottersのもので、ask,bidに板の順番通りに分けてくれる

    async def on_loop_end(self):
        """トレードログ"""
        assert self._entry_order_info is not None
        assert self._exit_order_info is not None
        pnl = self._exit_order_info["price"] - self._entry_order_info["price"]
        if self._entry_order_info["side"] == "SELL":
            pnl *= -1
        pnl *= self._entry_order_info["size"]
        self._logger.debug(f"[LOOP FINISH] pnl={pnl}")
        self._entry_order_info = None
        self._exit_order_info = None

ログの出力。pnlを計算して出力してるっぽい

    async def inago_stream(self):
        with self._store.executions.watch() as stream:
            async for msg in stream:
                yield msg.data

約定データのwatch。オーバーライド。

    async def is_inago_start(self) -> tuple[bool, str]:
        """イナゴ検知ロジック。

        2段階で検知する。

        (1)閾値判定:短期足(秒足)でのボリュームが閾値をクリア
        (2)経過判定:n秒(``self._entry_patience_seconds``)後にイナゴ方向に値動き(``self._entry_price_change``)があるか否か

        """
        d = self._bar_s.d

        if len(d) == 0:
            self._logger.warning(f"[INFORMATION IS EMPTY] {d}")
            return False, None

        self._logger.debug(f"[WAITING INAGO] {d}")

        async def _primary_check():
            """閾値判定"""
            if (
                self._entry_side in ("BUY", "BOTH")
                and d["sv_log"]
                < self._lower_threshold
                < d["bv_log"]
                < self._upper_threshold
            ):
                self._logger.debug("[PRIMARY CHECK] YES BUY")
                return "BUY"
            elif (
                self._entry_side in ("SELL", "BOTH")
                and d["bv_log"]
                < self._lower_threshold
                < d["sv_log"]
                < self._upper_threshold
            ):
                self._logger.debug("[PRIMARY CHECK] YES SELL")
                return "SELL"
            else:
                return None

        async def _secondary_check(s):
            """時間経過判定"""

            # 仲値を値動きの参照値に使う
            mark_price_start = int(self.mid)
            self._logger.debug(f"[SECONDARY CHECK] mark_price={mark_price_start}")

            while True:
                mark_price = int(self.mid)
                price_change = mark_price - mark_price_start
                if s == "SELL":
                    price_change *= -1

                print(
                    f"\r\033[31m>>> [SECONDARY CHECK] {mark_price_start}/{mark_price}/{price_change:+.0f}\033[0m",
                    end="",
                )

                if price_change > self._entry_price_change:
                    # イナゴ方向への値動きがあった
                    break

                await asyncio.sleep(0.1)

        # 閾値判定
        side = await _primary_check()

        if side:
            try:
                # 経過判定
                await asyncio.wait_for(
                    _secondary_check(side), timeout=self._entry_patience_seconds
                )
                # carriage return調整してるだけ
                print()
                # イナゴ検知
                return True, side
            except asyncio.TimeoutError as e:
                # carriage return調整してるだけ
                print()
                # 指定秒数以内に値動きがみられなかったのでスルー
                self._logger.debug(f"[CANCEL] mark_price={self.mid}")
                return False, None
        else:
            return False, None

ロジックは以下二つ

閾値判定:短期足(秒足)でのボリュームが閾値をクリア
経過判定:n秒(``self._entry_patience_seconds``)後にイナゴ方向に値動き(``self._entry_price_change``)があるか否か

        d = self._bar_s.d

        if len(d) == 0:
            self._logger.warning(f"[INFORMATION IS EMPTY] {d}")
            return False, None

        self._logger.debug(f"[WAITING INAGO] {d}")

スタート処理

        async def _primary_check():
            """閾値判定"""
            if (
                self._entry_side in ("BUY", "BOTH")
                and d["sv_log"]
                < self._lower_threshold
                < d["bv_log"]
                < self._upper_threshold
            ):
                self._logger.debug("[PRIMARY CHECK] YES BUY")
                return "BUY"
            elif (
                self._entry_side in ("SELL", "BOTH")
                and d["bv_log"]
                < self._lower_threshold
                < d["sv_log"]
                < self._upper_threshold
            ):
                self._logger.debug("[PRIMARY CHECK] YES SELL")
                return "SELL"
            else:
                return None

ここはそのまま理解

        async def _secondary_check(s):
            """時間経過判定"""

            # 仲値を値動きの参照値に使う
            mark_price_start = int(self.mid)
            self._logger.debug(f"[SECONDARY CHECK] mark_price={mark_price_start}")

            while True:
                mark_price = int(self.mid)
                price_change = mark_price - mark_price_start
                if s == "SELL":
                    price_change *= -1

                print(
                    f"\r\033[31m>>> [SECONDARY CHECK] {mark_price_start}/{mark_price}/{price_change:+.0f}\033[0m",
                    end="",
                )

                if price_change > self._entry_price_change:
                    # イナゴ方向への値動きがあった
                    break

                await asyncio.sleep(0.1)

midpriceがどう動いたかで判定している
ここもそのまま読めばわかる

        # 閾値判定
        side = await _primary_check()

        if side:
            try:
                # 経過判定
                await asyncio.wait_for(
                    _secondary_check(side), timeout=self._entry_patience_seconds
                )
                # carriage return調整してるだけ
                print()
                # イナゴ検知
                return True, side
            except asyncio.TimeoutError as e:
                # carriage return調整してるだけ
                print()
                # 指定秒数以内に値動きがみられなかったのでスルー
                self._logger.debug(f"[CANCEL] mark_price={self.mid}")
                return False, None
        else:
            return False, None

閾値を超えれた場合、返ってくるのは、"BUY","SELL"のどちらか
もしsideの中にどちらかがあればif文の処理
if文では時間経過の値動きの関数を起動していて、閾値を超えればTrue,asyncio.TimeoutErrorで、指定した時間を超えても帰ってこない場合はFalseとしている

    async def on_loop(self):
        """ロジック

        - 色々とhookを用意したものの、ロジック的に当てはめられなかったのでon_loop丸ごとオーバーライドしている(爆)
        - 「約定情報を参照して決済注文を出す」といったロジックであれば以下のように分けて実装できると思う(元々はそう考えていた)
            - ``on_on_loop_begin``で新規注文
            - ``is_inago_endo``で終了判定
            - ``on_on_loop_end``で決済注文

        """

        # 新規注文
        order_id = await market_order(self.client, self._symbol, self.side, self._size)
        self._entry_order_info = await watch_execution(
            self.store.childorderevents, order_id
        )
        self._logger.debug(f"[ENTRY ORDER] {self._entry_order_info}")

        entry_price = self._entry_order_info["price"]

        

実際のロジック。market_orderをみていく

# 注文ヘルパー
async def market_order(client, symbol, side, size):
    res = await client.post(
        "/v1/me/sendchildorder",
        data={
            "product_code": symbol,
            "side": side,
            "size": f"{size:.8f}",
            "child_order_type": "MARKET",
        },
    )

    data = await res.json()

    if res.status != 200:
        raise RuntimeError(f"Invalid request: {data}")
    else:
        return data["child_order_acceptance_id"]

成行注文のヘルパー。idを返してる

async def watch_execution(execution: pybotters.models.bitflyer.ChildOrders, order_id):
    with execution.watch() as stream:
        async for msg in stream:
            if (
                msg.operation == "insert"
                and msg.data["child_order_acceptance_id"] == order_id
                and msg.data["event_type"] == "EXECUTION"
            ):
                return msg.data

約定したかどうかをwatch機能を用いて確認してる
成行のidを参照して、それがwatchで返ってきたデータのidと一致した場合、その注文のデータを返す

        # 建値±``_trail_margin`` を初期ストップ値としてトレイルスタート
        if self.side == "BUY":
            stop_price = entry_price - self._trail_margin
        else:
            stop_price = entry_price + self._trail_margin

        trailer = BarBasedPriceTrailer(
            stop_price, self.side, self._bar_l, self._trail_margin, self._logger
        )

        self._logger.debug(f"[TRAIL START] entry={entry_price} stop={stop_price}")

BarBasedPriceTrailerに関してみていく

class AbstractPriceTrailer:
    """面倒になったのでここは箱だけ..."""


class BarBasedPriceTrailer(AbstractPriceTrailer):
    """足が確定するたびに最後の足のclose ± marginのところにstopを置き直す(ストップ値が悪化(?)する場合は更新なし)"""

    def __init__(
        self,
        price: int,
        side: str,
        bar: AbstractTimeBar,
        margin: int,
        logger=loguru.logger,
    ):
        check_side(side)
        self._price = price
        self._side = side
        self._bar = bar
        self._margin = margin
        self._logger = logger
        self._task = asyncio.create_task(self.auto_trail())

    def __del__(self):
        # gcの機嫌次第でいつ呼ばれるか(はたまた本当に呼ばれるのか)わからないが一応オブジェクト破棄時に
        # Taskをキャンセルをするようにする。
        self.cancel()

    async def auto_trail(self):
        while True:
            # 最新足確定まで待機
            bar = await self._bar.get_bar_at_settled()
            last_close = bar[-1, 3]

            if self._side == "BUY":
                new_price = last_close - self._margin
                if new_price > self._price:
                    self._price = new_price
            else:
                new_price = last_close + self._margin
                if new_price < self._price:
                    self._price = new_price

            self._logger.debug(
                f"[TRAIL] {self._price:.0f} (last_close={last_close:.0f})"
            )

    def cancel(self, msg=None):
        return self._task.cancel(msg)

    @property
    def price(self):
        return self._price

親クラスはめんどくさくなったらしい
でもここまで読んで最初は箱だけの意味すらわからなかったが、今ならわかるようになってる。うれしい

引数はそのまま
delの中身を見るとgcがなんたらとかいてある
gcに関して調べる

Python では,言語処理系(CPython)の機能として実装されている「ガベージコレクション」(以下,GC)によって,不要になったメモリ領域を自動的に解放している

https://pyteyon.hatenablog.com/entry/2020/04/29/014008

GCが行われるときに、taskとして残っていたらそれを破棄するものっぽい

auto_trailは最新足をもらったあとに自動的にtrail価格を計算してくれる
それをtaskとしておいとくことで、並列化してる

        while True:
            await asyncio.sleep(0.1)

            # 最良気配値がストップ値を割ったら決済
            # ストップ値はtrailerが長期足(e.g., 1分足)の確定毎に更新

            if self.side == "BUY":
                mark_price = self.best_bid
                pnl = mark_price - entry_price
                if mark_price <= trailer.price:
                    break
            else:
                mark_price = self.best_ask
                pnl = entry_price - mark_price
                if mark_price >= trailer.price:
                    break

            print(
                f"\r\033[31m>>> [TRAILING] entry={entry_price:.0f} stop={trailer.price:.0f} mark={mark_price:.0f} pnl={pnl:+.0f}\033[0m",
                end="",
            )

ここはコメント通りっぽい

        # trailタスクが回り続けてしまうので明示的にキャンセルする
        # asyncio.Taskはオブジェクトがスコープを外れて破壊されてもキャンセルされない
        trailer.cancel()

        side = "SELL" if self._entry_order_info["side"] == "BUY" else "BUY"
        order_id = await market_order(self.client, self._symbol, side, self._size)
        self._exit_order_info = await watch_execution(
            self.store.childorderevents, order_id
        )
        self._logger.debug(f"[EXIT ORDER] {self._exit_order_info}")

決済の指令がでてるのにtrailが回り続けないようにタスクをキャンセルしてる

成行で決済注文して、ちゃんと決済されたかをwatchしてる

    @property
    def store(self):
        return self._store

    @property
    def best_ask(self):
        if self._asks is None:
            return -1
        return self._asks[0]["price"]

    @property
    def best_bid(self):
        if self._bids is None:
            return -1
        return self._bids[0]["price"]

    @property
    def mid(self):
        return (self.best_ask + self.best_bid) / 2

ここは見たまま

これで一通りの流れがわかった。
まとめると

  • 高頻度のローソク足を作成するために、約定データをwatchして、使いやすいように加工する工程

  • 自炊したローソク足を用いてロジックを動かす(inagoを検知する)工程

  • inagoを検知したら成成で処理する工程

これを、非同期処理を用いて作成してる
自分だけだと到底思いつかない処理の方法が詰め込まれててためになった。
これを完全に自分に落とし込むために次はFTXで動かせないか試してみます。
メモは気が向いたら乗せる




この記事が気に入ったらサポートをしてみませんか?