pe氏pybottersinagoflyer読み解き殴り書きメモ2
続き
main
async def main(args):
logger = loguru.logger
logger.add("log.txt", retention=3, rotation="10MB")
async with pybotters.Client(
args.api_key, base_url="https://api.bitflyer.com"
) as client:
store = pybotters.bitFlyerDataStore()
# time bar
def log_volume(bar: AbstractTimeBar):
"""最新足のbuy/sell volumeのログを計算するcallback"""
d = dict()
d["bv_log"] = np.log1p(bar.bv)
d["sv_log"] = np.log1p(bar.sv)
return d
bar_l = BitflyerTimeBar(
unit_seconds=args.bar_unit_seconds_long,
store=store.executions,
maxlen=args.bar_maxlen,
callbacks=[log_volume],
)
bar_s = BitflyerTimeBar(
unit_seconds=args.bar_unit_seconds_short,
store=store.executions,
maxlen=args.bar_maxlen,
callbacks=[log_volume],
)
# time barを約定履歴で初期化
resp = await client.get(
"/v1/getexecutions", params={"producet_code": args.symbol, "count": 500}
)
data = await resp.json()
await bar_l.init(data[::-1])
await bar_s.init(data[::-1])
# web socketに接続
await client.ws_connect(
"wss://ws.lightstream.bitflyer.com/json-rpc",
send_json=[
{
"method": "subscribe",
"params": {"channel": f"lightning_board_snapshot_{args.symbol}"},
"id": 1,
},
{
"method": "subscribe",
"params": {"channel": f"lightning_board_{args.symbol}"},
"id": 2,
},
{
"method": "subscribe",
"params": {"channel": f"lightning_executions_{args.symbol}"},
"id": 3,
},
{
"method": "subscribe",
"params": {"channel": "child_order_events"},
"id": 4,
},
],
hdlr_json=store.onmessage,
)
while not all([len(w) for w in [store.board, store.executions]]):
logger.debug("[WAITING SOCKET RESPONSE]")
await store.wait()
await BitflyerInagoBot(
client,
store,
bar_l,
bar_s,
lower_threshold=args.lower_threshold,
upper_threshold=args.upper_threshold,
entry_patience_seconds=args.entry_patience_seconds,
entry_price_change=args.entry_price_change,
trail_margin=args.trail_margin,
symbol=args.symbol,
size=args.size,
side=args.side,
logger=logger,
).loop()
loggerに関しては今後そんなに明記する必要はないかと思ってるけど、
最初にある2行は'log.txt'っていう名前で、10MBごと3世代分で出力する
pybottersの使い方に関しては、前の記事でも乗せたまちゅけんさんのwikiが一番わかりやすいので省略
log_volumeという関数に関して調べる
# time bar
def log_volume(bar: AbstractTimeBar):
"""最新足のbuy/sell volumeのログを計算するcallback"""
d = dict()
d["bv_log"] = np.log1p(bar.bv)
d["sv_log"] = np.log1p(bar.sv)
return d
"最新足のbuy/sell volumeのログを計算するcallback"とあるが、なんのことかさっぱりわからん
引数としてあるbarがAbstractTimeBarとアノテーションされてるので、その中身を順を追ってみてみる
class AbstractTimeBar:
"""Time-bar用の抽象クラス。
storeには約定情報を取得する``DataStore``を与える(例: bitFlyer -> ``pybotters.models.bitflyer.Executions``)
"""
def __init__(
self,
store: "pybotters.store.DataStore",
unit_seconds: int,
maxlen: int = 9999,
callbacks: list[Callable[[AbstractTimeBar], None]] = (),
):
self._store = store
self._seconds = unit_seconds
self._rule = f"{unit_seconds}S"
self._bar = StreamArray((maxlen, 7))
self._cur_bar = np.zeros(7)
self._timestamp = deque(maxlen=maxlen)
# callback
self._callbacks = callbacks or []
self._d = {}
# 確定足取得用
self._queue = asyncio.Queue(1)
self._task = asyncio.create_task(self.auto_update())
まずはinit処理。初期処理。クラス内で使う引数を定義やらなんやらしてる
Callableに関しては、前のpeさんのプログラムを読んでる際に調べたので参考にしてください
self._bar = StreamArray((maxlen, 7))
とあるが、何をしてるのか調べる
まずStreamArrayクラスをのぞく
class StreamArray:
"""queue-likeなnumpy array"""
def __init__(self, shape, array=None):
self._a = np.full(shape, np.nan)
if array is not None:
self.init(array)
def __repr__(self):
return self._a.__repr__()
def __getitem__(self, *args):
return self._a.__getitem__(*args)
def __len__(self):
return self._a.__len__()
def append(self, x):
def shift(arr: np.ndarray, num: int, fill_value=0) -> np.ndarray:
result = np.empty_like(arr)
if num > 0:
result[:num] = fill_value
result[num:] = arr[:-num]
elif num < 0:
result[num:] = fill_value
result[:num] = arr[-num:]
else:
result[:] = arr
return result
self._a = shift(self._a, -1)
self._a[-1] = x
def init(self, array):
if self._a.shape[0] <= array.shape[0]:
array = array[-self._a.shape[0] :]
self._a = array
else:
self._a[-array.shape[0] :] = array
@property
def a(self):
return self._a
…逃げ出したい。なんでもないです。
queue-likeなnumpy arrayとあるがどういう意味だろう。
入れては出すnumpyの配列を作ってるとみた(間違ってたらごめんなさい)
初期処理に関してはnp.fullをつかってself._aにshape分だけnanのndarrayを生成している。if文の後ろは後で覗く
def __repr__(self):
return self._a.__repr__()
def __getitem__(self, *args):
return self._a.__getitem__(*args)
def __len__(self):
return self._a.__len__()
特殊メソッドは一気にまとめる
def append(self, x):
def shift(arr: np.ndarray, num: int, fill_value=0) -> np.ndarray:
result = np.empty_like(arr)
if num > 0:
result[:num] = fill_value
result[num:] = arr[:-num]
elif num < 0:
result[num:] = fill_value
result[:num] = arr[-num:]
else:
result[:] = arr
return result
self._a = shift(self._a, -1)
self._a[-1] = x
append関数のなかでshift関数を作成してる。
おそらく引数xをenqueueして古いものをだしてるのかな?
適当に単体で動かしてみる
import numpy as np
a = np.zeros(6)
a[1] = 1
print(a)
def shift(arr, num, fill_value=0):
result = np.empty_like(arr)
if num > 0:
result[:num] = fill_value
result[num:] = arr[:-num]
elif num < 0:
result[num:] = fill_value
result[:num] = arr[-num:]
else:
result[:] = arr
return result
a = shift(a, -1)
a[-1] = 1
print(a)
[0. 1. 0. 0. 0. 0.]
[1. 0. 0. 0. 0. 1.]
となって1が入ってa[1]に入れてたものが前にずれていってる
というか、本人に確認とってFirst-In First-Outという手法ってことまで教えてもらいました。ありがとうございます。
本日は疲れた。ここまで。
参考
そもそもコールバックとはなんぞや??ってことで調べた
参考にしたサイトは以下
https://wa3.i-3-i.info/word12295.html
関数に渡す関数らしい。