pe氏pybottersinagoflyer読み解き殴り書きメモ2

続き

main

async def main(args):
    logger = loguru.logger
    logger.add("log.txt", retention=3, rotation="10MB")

    async with pybotters.Client(
        args.api_key, base_url="https://api.bitflyer.com"
    ) as client:
        store = pybotters.bitFlyerDataStore()

        # time bar
        def log_volume(bar: AbstractTimeBar):
            """最新足のbuy/sell volumeのログを計算するcallback"""
            d = dict()
            d["bv_log"] = np.log1p(bar.bv)
            d["sv_log"] = np.log1p(bar.sv)
            return d

        bar_l = BitflyerTimeBar(
            unit_seconds=args.bar_unit_seconds_long,
            store=store.executions,
            maxlen=args.bar_maxlen,
            callbacks=[log_volume],
        )
        bar_s = BitflyerTimeBar(
            unit_seconds=args.bar_unit_seconds_short,
            store=store.executions,
            maxlen=args.bar_maxlen,
            callbacks=[log_volume],
        )

        # time barを約定履歴で初期化
        resp = await client.get(
            "/v1/getexecutions", params={"producet_code": args.symbol, "count": 500}
        )
        data = await resp.json()
        await bar_l.init(data[::-1])
        await bar_s.init(data[::-1])

        # web socketに接続
        await client.ws_connect(
            "wss://ws.lightstream.bitflyer.com/json-rpc",
            send_json=[
                {
                    "method": "subscribe",
                    "params": {"channel": f"lightning_board_snapshot_{args.symbol}"},
                    "id": 1,
                },
                {
                    "method": "subscribe",
                    "params": {"channel": f"lightning_board_{args.symbol}"},
                    "id": 2,
                },
                {
                    "method": "subscribe",
                    "params": {"channel": f"lightning_executions_{args.symbol}"},
                    "id": 3,
                },
                {
                    "method": "subscribe",
                    "params": {"channel": "child_order_events"},
                    "id": 4,
                },
            ],
            hdlr_json=store.onmessage,
        )
        while not all([len(w) for w in [store.board, store.executions]]):
            logger.debug("[WAITING SOCKET RESPONSE]")
            await store.wait()

        await BitflyerInagoBot(
            client,
            store,
            bar_l,
            bar_s,
            lower_threshold=args.lower_threshold,
            upper_threshold=args.upper_threshold,
            entry_patience_seconds=args.entry_patience_seconds,
            entry_price_change=args.entry_price_change,
            trail_margin=args.trail_margin,
            symbol=args.symbol,
            size=args.size,
            side=args.side,
            logger=logger,
        ).loop()

loggerに関しては今後そんなに明記する必要はないかと思ってるけど、
最初にある2行は'log.txt'っていう名前で、10MBごと3世代分で出力する

pybottersの使い方に関しては、前の記事でも乗せたまちゅけんさんのwikiが一番わかりやすいので省略

log_volumeという関数に関して調べる

        # time bar
        def log_volume(bar: AbstractTimeBar):
            """最新足のbuy/sell volumeのログを計算するcallback"""
            d = dict()
            d["bv_log"] = np.log1p(bar.bv)
            d["sv_log"] = np.log1p(bar.sv)
            return d

"最新足のbuy/sell volumeのログを計算するcallback"とあるが、なんのことかさっぱりわからん

引数としてあるbarがAbstractTimeBarとアノテーションされてるので、その中身を順を追ってみてみる

class AbstractTimeBar:
    """Time-bar用の抽象クラス。

    storeには約定情報を取得する``DataStore``を与える(例: bitFlyer -> ``pybotters.models.bitflyer.Executions``)

    """

    def __init__(
        self,
        store: "pybotters.store.DataStore",
        unit_seconds: int,
        maxlen: int = 9999,
        callbacks: list[Callable[[AbstractTimeBar], None]] = (),
    ):
        self._store = store
        self._seconds = unit_seconds
        self._rule = f"{unit_seconds}S"
        self._bar = StreamArray((maxlen, 7))
        self._cur_bar = np.zeros(7)
        self._timestamp = deque(maxlen=maxlen)

        # callback
        self._callbacks = callbacks or []
        self._d = {}

        # 確定足取得用
        self._queue = asyncio.Queue(1)

        self._task = asyncio.create_task(self.auto_update())

まずはinit処理。初期処理。クラス内で使う引数を定義やらなんやらしてる
Callableに関しては、前のpeさんのプログラムを読んでる際に調べたので参考にしてください

self._bar = StreamArray((maxlen, 7))

とあるが、何をしてるのか調べる
まずStreamArrayクラスをのぞく

class StreamArray:
    """queue-likeなnumpy array"""

    def __init__(self, shape, array=None):
        self._a = np.full(shape, np.nan)
        if array is not None:
            self.init(array)

    def __repr__(self):
        return self._a.__repr__()

    def __getitem__(self, *args):
        return self._a.__getitem__(*args)

    def __len__(self):
        return self._a.__len__()

    def append(self, x):
        def shift(arr: np.ndarray, num: int, fill_value=0) -> np.ndarray:
            result = np.empty_like(arr)

            if num > 0:
                result[:num] = fill_value
                result[num:] = arr[:-num]
            elif num < 0:
                result[num:] = fill_value
                result[:num] = arr[-num:]
            else:
                result[:] = arr

            return result

        self._a = shift(self._a, -1)
        self._a[-1] = x

    def init(self, array):
        if self._a.shape[0] <= array.shape[0]:
            array = array[-self._a.shape[0] :]
            self._a = array
        else:
            self._a[-array.shape[0] :] = array

    @property
    def a(self):
        return self._a

…逃げ出したい。なんでもないです。
queue-likeなnumpy arrayとあるがどういう意味だろう。

queueとは、データ構造の一つ(データを格納する入れ物)で、入ってきたデータを順番に格納し、先に格納したデータから順に取り出す、 先入れ先出し(FIFO:First-In First-Out)方式のデータ構造です。

https://medium-company.com/queue/

入れては出すnumpyの配列を作ってるとみた(間違ってたらごめんなさい)

初期処理に関してはnp.fullをつかってself._aにshape分だけnanのndarrayを生成している。if文の後ろは後で覗く

    def __repr__(self):
        return self._a.__repr__()

    def __getitem__(self, *args):
        return self._a.__getitem__(*args)

    def __len__(self):
        return self._a.__len__()

特殊メソッドは一気にまとめる

__repr__() メソッドを定義することで、この関数によりそのクラスのインスタンスが返すものを制御することができます。

http://mukaimame.blog111.fc2.com/blog-entry-880.html?sp

クラスに__getitem__を実装すると添え字やスライスで要素にアクセスできるようになります。

https://deecode.net/?p=1600

__len__(self): 組み込み関数len()を呼び出したときに実行されるメソッド

https://qiita.com/y518gaku/items/07961c61f5efef13cccc
def append(self, x):
        def shift(arr: np.ndarray, num: int, fill_value=0) -> np.ndarray:
            result = np.empty_like(arr)

            if num > 0:
                result[:num] = fill_value
                result[num:] = arr[:-num]
            elif num < 0:
                result[num:] = fill_value
                result[:num] = arr[-num:]
            else:
                result[:] = arr

            return result

        self._a = shift(self._a, -1)
        self._a[-1] = x

append関数のなかでshift関数を作成してる。
おそらく引数xをenqueueして古いものをだしてるのかな?
適当に単体で動かしてみる

import numpy as np
a = np.zeros(6)
a[1] = 1
print(a)

def shift(arr, num, fill_value=0):
    result = np.empty_like(arr)

    if num > 0:
        result[:num] = fill_value
        result[num:] = arr[:-num]
    elif num < 0:
        result[num:] = fill_value
        result[:num] = arr[-num:]
    else:
        result[:] = arr

    return result

a = shift(a, -1)
a[-1] = 1

print(a)

[0. 1. 0. 0. 0. 0.]
[1. 0. 0. 0. 0. 1.]
となって1が入ってa[1]に入れてたものが前にずれていってる
というか、本人に確認とってFirst-In First-Outという手法ってことまで教えてもらいました。ありがとうございます。

本日は疲れた。ここまで。

参考


そもそもコールバックとはなんぞや??ってことで調べた
参考にしたサイトは以下

https://wa3.i-3-i.info/word12295.html

関数に渡す関数らしい。

この記事が気に入ったらサポートをしてみませんか?