見出し画像

ペアトレボット

稼働していますが、あまり儲かっていないボットの公開です。スケーラビリティーが低くて、今後注力する予定が無いので公開しました。公開用に整えてないので、かなり雑です。自前で書いたクラスへの依存とかあるので、そのままだと動かないと思います。ロジックのヒントになれば

原理

KfEstimatorがコアロジックです。ペアトレできそうな2つの銘柄A, Bを用意。カルマンフィルターでBの対数価格からAの対数価格を予測。ポジションを予測誤差(resid)に比例させてペアトレ。

発注量を計算するためのクラス

import time
import datetime
import threading
import traceback
import pandas as pd
import numpy as np
import copy
from pykalman import KalmanFilter
from concurrent.futures import ThreadPoolExecutor

class KfEstimator:
   def __init__(self, delta=None, delta2=None, market_count=2):
       n_dim_state = market_count
       trans_cov = delta / (1 - delta) * np.eye(n_dim_state)
       trans_cov[0, 0] = delta2 / (1 - delta2)
       trans_mat = np.eye(n_dim_state)

       self.state_mean = np.zeros(n_dim_state)
       self.state_cov = np.eye(n_dim_state)

       self.kf = KalmanFilter(
           n_dim_obs=1,
           n_dim_state=n_dim_state,
           initial_state_mean=self.state_mean,
           initial_state_covariance=self.state_cov,
           transition_matrices=trans_mat,
           transition_covariance=trans_cov,
           observation_matrices=np.ones((1, n_dim_state)),
           observation_covariance=1.0,
       )

       self.resid = 0.0

   def update(self, x=None):
       obs_mat = x.copy()
       obs_mat[0] = 1
       self.state_mean, self.state_cov = self.kf.filter_update(
           self.state_mean,
           self.state_cov,
           observation=x[0:1],
           observation_matrix=obs_mat.reshape(1, -1),
       )

       resid = x[0] - np.sum(self.state_mean * obs_mat)
       resid /= self.state_cov[0, 0] ** 0.5
       t = 1.0 / 12
       self.resid = resid * t + (1 - t) * self.resid

class FtxPairPlugin:
   def __init__(self, jsonl_logger=None, prod=False, symbols=[], leverage=20, max_weight=0.25):
       self.jsonl_logger = jsonl_logger
       self.prod = prod
       self.symbols = symbols
       self.lock = threading.Lock()

       # 2ヶ月くらい必要 (x_diffを作るため + kalman filterの初期化)
       self.history_period = datetime.timedelta(hours=24 * 30 * 2)
       self.last_loop_at = None
       self.start_at = time.time()

       self.leverage = leverage
       self.max_weight = max_weight
       self.max_positions = {}
       self.max_position_update_interval = 5 * 60
       self.kf_timestamp = pd.to_datetime('2000-01-01 00:00:00Z')

       self.weights = {}
       for symbol in symbols:
           self.weights[symbol] = 0

       # self.time_resolution = 60 # for test
       self.time_resolution = 300 # for test
       # self.time_resolution = 3600

       self.kf_estimator = KfEstimator(
           delta=1e-2 / 12 ** 2,
           delta2=1e-2 / 12 ** 2,
           market_count=len(symbols),
       )

   def allocate_ftx_client(self):
       return self.trader.allocate_ftx_client()

   def initialize(self, trader=None):
       self.trader = trader
       self.logger = trader.logger
       self.rate_limiter = trader.rate_limiter

       self.df_ftx = pd.DataFrame()
       self.fetch_new_candles()

       self.update_max_position()

       self.logger.info('plugin thread start')
       self.thread = threading.Thread(target=self.run)
       self.thread.start()

   # plugin interface
   def get_order_info(self, positions=None, size_units=None, price_units=None):
       with self.lock:
           pinned_weights = copy.copy(self.weights)
           pinned_max_positions = copy.copy(self.max_positions)
           pinned_cls = self.df_ftx.groupby('market')['cl'].nth(-1).to_dict()

       if self.last_loop_at is None or pinned_weights is None:
           return []

       result = {}
       for symbol in self.symbols:
           target_pos = pinned_weights[symbol] * pinned_max_positions[symbol]

           size_unit = size_units[symbol]
           min_amount = 10 * size_unit  # minimum order size回避

           buy_amount = target_pos - positions[symbol]
           buy_amount = round(buy_amount / size_unit) * size_unit
           if buy_amount < min_amount:
               buy_amount = 0

           sell_amount = -target_pos + positions[symbol]
           sell_amount = round(sell_amount / size_unit) * size_unit
           if sell_amount < min_amount:
               sell_amount = 0

           result[symbol] = {
               'buy_amount': buy_amount,
               'sell_amount': sell_amount,
               'buy_price': pinned_cls[symbol] - price_units[symbol],
               'sell_price': pinned_cls[symbol] + price_units[symbol],
               'raw_target_pos': target_pos
           }

       return result

   # plugin interface
   def get_status(self):
       if self.last_loop_at is None:
           live = time.time() - self.start_at < 1.5 * 60 * 60
       else:
           live = time.time() - self.last_loop_at.timestamp() < 1.5 * 60 * 60

       if self.kf_timestamp is None:
           live = live and time.time() - self.start_at < 1.5 * 60 * 60
       else:
           live = live and time.time() - self.kf_timestamp.timestamp() < 2.5 * 60 * 60

       return {
           'weights': self.weights,
           'max_positions': self.max_positions,
           'live': live
       }

   def run(self):
       while True:
           try:
               self.loop()
           except KeyboardInterrupt:
               raise
           except Exception as e:
               self.logger.error('exception ' + traceback.format_exc())
               time.sleep(20) # 短時間にAPIエラーが多いと制限されたりするので
           time.sleep(0.1)

   def update_max_position(self):
       with self.allocate_ftx_client() as ftx:
           self.rate_limiter.rate_limit(tags=['all'])
           account = ftx.privateGetAccount()['result']
           self.rate_limiter.rate_limit(tags=['all'])
           futures = ftx.publicGetFutures()['result']

       collateral = account['collateral']

       with self.lock:
           for future in futures:
               symbol = future['name']
               if symbol not in self.symbols:
                   continue
               mark_price = future['mark']

               self.max_positions[symbol] = collateral / mark_price * self.leverage
               self.logger.info('max_position updated {} {} collateral {} mark_price {} leverage {}'.format(
                   symbol, self.max_positions[symbol], collateral, mark_price, self.leverage))
               self.max_position_updated_at = time.time()

   def loop(self):
       latest_tf = latest_time_frame(self.time_resolution)

       if self.max_position_updated_at < time.time() - self.max_position_update_interval:
           self.update_max_position()

       if self.last_loop_at is not None and self.last_loop_at >= latest_tf:
           return

       self.logger.debug('plugin loop')

       start = time.time()
       fetch_ftx_sec = self.wait_for_ftx() - start

       start = time.time()
       self.update_ml_states()
       update_ml_sec = time.time() - start

       log = {
           'timestamp': str(now()),
           'strategy_id': 'mm_ftx_portfolio_plugin',
           'weights': self.weights,
           'max_position': self.max_positions,
           'ftx_size': self.df_ftx.shape[0],
           'ftx_min_of_max_time': str(self.df_ftx.groupby('market')['timestamp_col'].max().min()),
           'ftx_max_time': str(self.df_ftx['timestamp_col'].max()),
           'ftx_min_time': str(self.df_ftx['timestamp_col'].min()),
           'ftx_sec': fetch_ftx_sec,
           'update_ml_sec': update_ml_sec,
       }
       self.jsonl_logger.write(log)

       # 古いのを削除
       min_index = latest_tf - self.history_period
       self.df_ftx = self.df_ftx[self.df_ftx['timestamp_col'] >= min_index]

       self.last_loop_at = latest_tf

   def wait_for_ftx(self):
       latest_tf = latest_time_frame(self.time_resolution)
       self.logger.debug('wait_for_ftx latest_tf {} df_ftx.shape {}'.format(latest_tf, self.df_ftx.shape))

       # rest版
       first = True
       while self.df_ftx.groupby('market')['timestamp_col'].max().min() < latest_tf - datetime.timedelta(seconds=self.time_resolution):
           self.fetch_new_candles()
           if not first:
               time.sleep(1.0)
           first = False

       return time.time()

   def fetch_new_candles(self):
       if self.df_ftx.shape[0]:
           limit = 10
       else:
           limit = 5000

       def do_fetch_new_candles(params):
           symbol = params['symbol']
           end_time = params['end_time']

           self.rate_limiter.rate_limit(tags=['all'])
           with self.allocate_ftx_client() as ftx:
               data = ftx.publicGetMarketsMarketNameCandles({
                   'market_name': symbol,
                   'end_time': end_time, # キャッシュを無効にするために必要
                   'resolution': self.time_resolution,
                   'limit': limit,
               })['result']

           if len(data) == 0:
               return None

           df = pd.DataFrame(data)
           df = df.rename(columns={
               'open': 'op',
               'high': 'hi',
               'low': 'lo',
               'close': 'cl',
               'startTime': 'timestamp',
           })[['timestamp', 'op', 'hi', 'lo', 'cl', 'volume']]
           df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True)

           # 最後は未確定足なので削除
           df = df[df['timestamp'] != df['timestamp'].max()]

           df['market'] = symbol
           df = df.set_index(['market', 'timestamp'], drop=False)
           df = df.rename(columns={
               'market': 'market_col',
               'timestamp': 'timestamp_col',
           })

           return df

       # 未来時刻だと何も返らない
       # float OK
       end_time_base = int(np.floor(time.time() - 0.1))

       params = []
       for symbol in self.symbols:
           for i in range(1 if self.df_ftx.shape[0] else 12 * 24 * 30 * 2 // limit + 1):
               params.append({
                   'symbol': symbol,
                   'end_time': end_time_base - self.time_resolution * (limit - 10) * i, # -10はoverlapさせるため
               })

       with ThreadPoolExecutor() as executor:
           dfs = executor.map(do_fetch_new_candles, params)

       for df in dfs:
           if df is not None:
               self.df_ftx = smart_append(self.df_ftx, df)

   def update_ml_states(self):
       start = time.time()

       self.logger.debug('df_ftx.shape {}'.format(self.df_ftx.shape))

       df = self.df_ftx.reset_index().copy()

       df['x'] = np.log(df['cl'])
       df['x_diff'] = df['x'] - df.groupby('market')['x'].rolling(24 * 30 * 12, 1).mean().values
       df = df.dropna()

       df_x_diff = df.pivot(index='timestamp', columns='market', values='x_diff')
       df_x_diff = df_x_diff.ffill().dropna()

       df_x_diff = df_x_diff[self.symbols]

       df_x_diff = df_x_diff[self.kf_timestamp < df_x_diff.index]
       if df_x_diff.shape[0] == 0:
           return

       for i in range(df_x_diff.shape[0]):
           x = df_x_diff.iloc[i].values
           self.kf_estimator.update(x)

       self.kf_timestamp = df_x_diff.index[-1]
       self.logger.info('kf_timestamp updated {}'.format(self.kf_timestamp))

       pos = -self.kf_estimator.resid

       with self.lock:
           self.weights = {}

           self.weights[self.symbols[0]] = pos
           for i in range(1, len(self.symbols)):
               self.weights[self.symbols[i]] = -pos * self.kf_estimator.state_mean[i]

           # clip
           for symbol in self.weights:
               self.weights[symbol] = np.clip(self.weights[symbol], -self.max_weight, self.max_weight)

       self.logger.debug('update_ml_states elapsed sec {}'.format(time.time() - start))

# https://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object/10854034#10854034
def floor_time(dt=None, roundTo=60):
   """Round a datetime object to any time lapse in seconds
   dt : datetime.datetime object, default now.
   roundTo : Closest number of seconds to round to, default 1 minute.
   Author: Thierry Husson 2012 - Use it as you want but don't blame me.
   """
   if dt == None : dt = datetime.datetime.now()
   seconds = (dt.replace(tzinfo=None) - dt.min).seconds
   rounding = seconds // roundTo * roundTo
   return dt + datetime.timedelta(0,rounding-seconds,-dt.microsecond)

def now():
   return datetime.datetime.now(datetime.timezone.utc)

def latest_time_frame(resolution=None):
   return floor_time(now(), resolution)

def smart_append(df, other):
   df = df.append(other)
   df.sort_index(inplace=True)
   # https://stackoverflow.com/questions/13035764/remove-rows-with-duplicate-indices-pandas-dataframe-and-timeseries
   return df[~df.index.duplicated(keep='last')]

発注するクラス

import time
import threading
import logging
import pandas as pd
import ccxt
from functools import reduce, partial
from concurrent.futures import ThreadPoolExecutor
from ccxt.base.errors import BadRequest, OrderNotFound, ExchangeError, ExchangeNotAvailable
from sync_executor import SyncExecutor
from panic_manager import PanicManager
from instance_pool import InstancePool
import ccxt_rate_limiter

class DefaultPlugin:
   def __init__(self):
       None

   def initialize(self, trader=None):
       None

   def get_order_info(self, positions=None, size_units=None, price_units=None):
       position = positions['XTZ-PERP']
       size_unit = size_units['XTZ-PERP']

       max_position = 40 * size_unit
       min_amount = 20 * size_unit

       buy_amount = round(max([0, max_position - position]) / size_unit) * size_unit
       if buy_amount < min_amount:
           buy_amount = 0
       sell_amount = round(max([0, max_position + position]) / size_unit) * size_unit
       if sell_amount < min_amount:
           sell_amount = 0
       return {
           'XTZ-PERP': {
               'buy_amount': buy_amount,
               'sell_amount': sell_amount
           }
       }

   def get_status(self):
       return { 'live': True }

class MmFtxPortfolioTrader:
   def __init__(self, api_key=None, api_secret=None, prod=False,
                ftx_ws=None, logger=None, plugin=None, symbols=['XTZ-PERP'], subaccount=None):
       self.panic_manager = PanicManager(logger=logger)
       self.panic_manager.register(tag='trader', start_time=5 * 60, interval=60)

       rate_limits = ccxt_rate_limiter.ftx.ftx_limits()
       rate_limits = ccxt_rate_limiter.scale_limits(rate_limits, 0.75)
       self.rate_limiter = ccxt_rate_limiter.rate_limiter_group.RateLimiterGroup(limits=rate_limits)

       def create_ftx():
           headers = {}
           if prod:
               headers['FTX-SUBACCOUNT'] = subaccount
           else:
               headers['FTX-SUBACCOUNT'] = 'bottest'
           ftx = ccxt.ftx({
               'apiKey': api_key,
               'secret': api_secret,
               'enableRateLimit': False,
               'headers': headers,
           })
           ccxt_rate_limiter.wrap_object(
               ftx,
               rate_limiter_group=self.rate_limiter,
               wrap_defs=ccxt_rate_limiter.ftx.ftx_wrap_defs()
           )
           return ftx

       self.ftx_pool = InstancePool(create_fn=create_ftx)
       # self.ftx_ws = ftx_ws
       self.logger = logger
       self.symbols = symbols

       self.orders = []
       self.prev_log_status = time.time()
       self.lock = threading.Lock()
       self.plugin = plugin or DefaultPlugin()
       self.exchange_error_at = 0
       self.current_positions = {}
       for symbol in self.symbols:
           self.current_positions[symbol] = 0.0
       self.last_position_updated_at = time.time()
       self.position_update_interval = 10
       self.last_order_updated_at = time.time()
       self.order_update_interval = 1
       self.replace_order_at = {}

       with self.allocate_ftx_client() as ftx:
           ftx.privateDeleteOrders({})
           markets = ftx.publicGetMarkets()['result']

       self.price_units = {}
       self.size_units = {}
       for market in markets:
           if market['name'] in self.symbols:
               self.price_units[market['name']] = market['priceIncrement']
               self.size_units[market['name']] = market['sizeIncrement']
               self.logger.info(market)

       self.update_position()

       self.plugin.initialize(trader=self)

   def allocate_ftx_client(self):
       return self.ftx_pool.allocate()

   def trade(self):
       try:
           self.do_trade()
       except ExchangeNotAvailable as e:
           self.exchange_error_at = time.time()
           self.logger.error('ExchangeNotAvailable occured. hard circuit break enabled. retrying after sleep {}'.format(e))
           time.sleep(0.5) # https://www.bitmex.com/app/restAPI#Overload

   def update_position(self):
       with self.allocate_ftx_client() as ftx:
           positions = ftx.privateGetPositions()['result']
       for position in positions:
           if position['future'] not in self.symbols:
               continue
           pos = side_to_int(position['side']) * position['size']
           if self.current_positions[position['future']] != pos:
               self.logger.info('current_position force updated {} {} -> {}'.format(position['future'], self.current_positions[position['future']], pos))
           self.current_positions[position['future']] = pos

   def reserve_update_orders(self):
       # openapi系の変更APIは現在の値が返ってこないので、
       # 次のループでちゃんと同期する
       self.last_order_updated_at = 1

   def order_updated(self, old_order=None, new_order=None):
       with self.lock:
           symbol = old_order['market']
           old_filled_size = old_order.get('filledSize', 0)
           executed_size = new_order.get('filledSize', 0) - old_filled_size
           signed_executed_size = executed_size * side_to_int(new_order['side'])

           if executed_size:
               self.logger.info('{} {} order executed {} signed_executed_size {}'.format(symbol, new_order['side'], new_order['id'], signed_executed_size))
               pos = self.current_positions[symbol] + signed_executed_size
               self.logger.info('current_position updated by execution {} {} -> {}'.format(symbol, self.current_positions[symbol], pos))
               self.current_positions[symbol] = pos

           status = new_order['status']
           if status == 'closed':
               if new_order['size'] == new_order.get('filledSize', 0):
                   self.logger.info('order fully executed {}'.format(new_order))
               else:
                   self.logger.info('order canceled {}'.format(new_order))
               new_order = None

           return new_order

   def update_orders(self):
       def update_order(order):
           with self.allocate_ftx_client() as ftx:
               new_order = ftx.privateGetOrdersOrderId({
                   'order_id': order['id'],
               })['result']

           return self.order_updated(old_order=order, new_order=new_order)

       if len(self.orders) <= 1:
           results = list(map(update_order, self.orders))
       else:
           with ThreadPoolExecutor() as executor:
               results = executor.map(update_order, self.orders)

       self.orders = [x for x in results if x is not None]

       self.logger.debug('order force updated')

   def fetch_quote(self, symbol=None):
       with self.allocate_ftx_client() as ftx:
           ob = ftx.publicGetMarketsMarketNameOrderbook({
               'market_name': symbol,
               'depth': 1,
           })['result']

       return {
           'ask_price': ob['asks'][0][0],
           'ask_size': ob['asks'][0][1],
           'bid_price': ob['bids'][0][0],
           'bid_size': ob['bids'][0][1],
       }

   def do_trade(self):
       self.logger.debug('trade loop')

       if self.last_order_updated_at < time.time() - self.order_update_interval:
           self.update_orders()
           self.last_order_updated_at = time.time()

       # 注文が無い状態がしばらく続いたらポジション更新 (不整合を防ぐため)
       if self.orders:
           self.last_position_updated_at = time.time()
       else:
           if self.last_position_updated_at < time.time() - self.position_update_interval:
               self.update_position()
               self.last_position_updated_at = time.time()

       if self.logger.isEnabledFor(logging.DEBUG):
           self.logger.debug('orders {}'.format(self.orders))

       order_infos = self.plugin.get_order_info(
           positions=self.current_positions,
           size_units=self.size_units,
           price_units=self.price_units,
       )

       # すでに注文が出ているか、発注要求があれば、active_symbolsに加える
       active_symbols = []
       for symbol in order_infos:
           order_info = order_infos[symbol]
           if (order_info['buy_amount'] or order_info['sell_amount']) and symbol not in active_symbols:
               active_symbols.append(symbol)
       for order in self.orders:
           if order['market'] not in active_symbols:
               active_symbols.append(order['market'])

       with self.create_executor(sync=len(active_symbols) <= 1) as executor:
           order_results = []

           for symbol in active_symbols:
               order_info = order_infos[symbol]

               buy_amount = order_info['buy_amount']
               sell_amount = order_info['sell_amount']
               buy_price = order_info.get('buy_price')
               sell_price = order_info.get('sell_price')
               if self.logger.isEnabledFor(logging.DEBUG):
                   self.logger.debug('buy_amount {}'.format(buy_amount))
                   self.logger.debug('sell_amount {}'.format(sell_amount))
                   self.logger.debug('buy_price {}'.format(buy_price))
                   self.logger.debug('sell_price {}'.format(sell_price))

               target_buy_order = reduce(partial(target_buy, symbol), self.orders, None)
               if target_buy_order:
                   self.orders.remove(target_buy_order)
               order_future = executor.submit(self.update_limit_order, order=target_buy_order, side='buy', amount=buy_amount, symbol=symbol, price=buy_price)
               order_results.append({
                   'order_future': order_future,
                   'original_order': target_buy_order,
               })

               target_sell_order = reduce(partial(target_sell, symbol), self.orders, None)
               if target_sell_order:
                   self.orders.remove(target_sell_order)
               order_future = executor.submit(self.update_limit_order, order=target_sell_order, side='sell', amount=sell_amount, symbol=symbol, price=sell_price)
               order_results.append({
                   'order_future': order_future,
                   'original_order': target_sell_order,
               })

           cancel_results = []
           for order in self.orders:
               cancel_results.append(executor.submit(self.cancel_order, order))

           for idx, result in enumerate(cancel_results):
               try:
                   result.result()
                   self.orders[idx] = None
               except Exception as e:
                   self.logger.error('error during cancel {}'.format(e))

           for result in order_results:
               try:
                   self.orders.append(result['order_future'].result())
               except Exception as e:
                   self.logger.error('error during ordering {}'.format(e))
                   self.orders.append(result['original_order'])

       self.orders = [x for x in self.orders if x]

       now = time.time()
       if now - self.prev_log_status > 10:
           plugin_status = self.plugin.get_status()
           self.logger.info('api/min {} position {} order_info {} plugin {}'.format(self.rate_limiter.status_info(), self.current_positions, order_infos, plugin_status))
           self.prev_log_status = now
           if plugin_status['live']:
               self.panic_manager.ping('trader')

       time.sleep(0.1)

   def create_executor(self, sync=False):
       if sync:
           return SyncExecutor()
       else:
           return ThreadPoolExecutor()

   def cancel_order(self, order=None):
       self.logger.info('cancel_order {} {}'.format(order['market'], order['id']))
       with self.allocate_ftx_client() as ftx:
           self.reserve_update_orders()
           ftx.privateDeleteOrdersOrderId({
               'order_id': order['id'],
           })
       self.logger.info('cancel_order done')

   def is_size_equal(self, a, b, symbol):
       return round(a / self.size_units[symbol]) == round(b / self.size_units[symbol])

   def is_price_equal(self, a, b, symbol):
       return round(a / self.price_units[symbol]) == round(b / self.price_units[symbol])

   def update_limit_order(self, order=None, side=None, amount=None, symbol=None, price=None):
       quote = self.fetch_quote(symbol=symbol)
       price_unit = self.price_units[symbol]

       if price is None:
           # old logic
           if order:
               if side.lower() == 'buy':
                   price = max([order['price'], quote['bid_price']])
               else:
                   price = min([order['price'], quote['ask_price']])
           else:
               if side.lower() == 'buy':
                   price = min([quote['ask_price'] - price_unit, quote['bid_price'] + price_unit])
               else:
                   price = max([quote['bid_price'] + price_unit, quote['ask_price'] - price_unit])
       else:
           if side.lower() == 'buy':
               price = min([quote['ask_price'] - price_unit, price])
           else:
               price = max([quote['bid_price'] + price_unit, price])

       if self.logger.isEnabledFor(logging.DEBUG):
           self.logger.debug('update_limit_order {} side {} amount {} price {}'.format(symbol, side, amount, price))

       if order:
           if self.is_size_equal(amount, order['remainingSize'], symbol) and self.is_price_equal(price, order['price'], symbol):
               self.logger.debug('same order. do nothing')
               return order
           else:
               # 注文変更はキャンセルと発注によって実装されているので、
               # キャンセル完了するまで待つ
               # キャンセル前に約定することを考慮してorderはそのまま返す
               self.cancel_order(order=order)
               return order
           # elif amount == 0:
           #     self.cancel_order(order=order)
           #     return None
           # else:
           #     quote_delay = time.time() - self.last_ob_updated_at
           #     self.logger.info('edit_order {} limit side {} amount {} price {} quote_delay(sec) {:.6f}'.format(order['id'], side, amount, price, quote_delay))
           #     params = {
           #         'order_id': order['id'],
           #     }
           #     if not self.is_size_equal(amount, order['remainingSize']):
           #         params['size'] = order.get('filledSize', 0) + amount
           #     if not self.is_price_equal(price, order['price']):
           #         params['price'] = price
           #     params_str = str(params)
           #     if self.replace_order_at.get(params_str, 0) < time.time() - 1:
           #         self.rate_limiter.rate_limit(tags=['all'])
           #         with self.allocate_ftx_client() as ftx:
           #             new_order = ftx.privatePostOrdersOrderIdModify(params)['result']
           #         order = self.order_updated(old_order=order, new_order=new_order)
           #         self.replace_order_at[params_str] = time.time()
           #         self.logger.info('edit_order done')
           #     else:
           #         self.logger.debug('edit_order same order in short time skipped')
           #     return order
       else:
           if amount == 0:
               self.logger.debug('amount 0. do nothing')
               return None
           else:
               self.logger.info('create_order {} limit side {} amount {} price {}'.format(symbol, side, amount, price))
               params = {
                   'market': symbol,
                   'type': "limit",
                   'side': side,
                   'size': amount,
                   'price': price,
                   'postOnly': True
               }
               with self.allocate_ftx_client() as ftx:
                   order = ftx.privatePostOrders(params)['result']
               self.logger.info('create_order done')
               return order

       raise Exception('should not reach here')

def target_buy(market, existing, target):
   # 必須条件
   if target['side'].lower() != 'buy':
       return existing
   if target['status'] == 'closed':
       return existing
   if target['market'] != market:
       return existing
   if not existing:
       return target
   # 順序
   if pd.to_datetime(target['created_at']) < pd.to_datetime(existing['created_at']):
       return target
   return existing

def target_sell(market, existing, target):
   # 必須条件
   if target['side'].lower() != 'sell':
       return existing
   if target['status'] == 'closed':
       return existing
   if target['market'] != market:
       return existing
   if not existing:
       return target
   # 順序
   if pd.to_datetime(target['created_at']) < pd.to_datetime(existing['created_at']):
       return target
   return existing

def side_to_int(side):
   return -1 if side.lower() == 'sell' else 1

セットアップ

trader = MmFtxPortfolioTrader(
   api_key=ftx_config()['key'],
   api_secret=ftx_config()['secret'],
   prod=args.prod,
   plugin=FtxPairPlugin(
       jsonl_logger=jsonl_logger,
       prod=args.prod,
       symbols=['BSV-PERP', 'BCH-PERP'],
       max_weight=0.05,
       leverage=20.0,
   ),
   symbols=['BSV-PERP', 'BCH-PERP'],
   logger=logger,
   subaccount='my_subaccount1',
)

ライセンス

CC0

Q and A

Q. SyncExecutorって?

A. ThreadPoolExecutorと同じインターフェースで普通にシングルスレッドで同期で実行するものです。パフォーマンスが重要なボットを作っていたときに、マルチスレッドコストを消すために作ったもののなごりです。コピペで作っているので、他ボットのなごりがかなり含まれています。

Q. InstancePoolって必要?

InstancePoolは以下のクラスです。使う理由は2つ。

・ccxtのインスタンスはスレッドセーフではない(注意して使えば使えるかもだが)、

・たしか、ccxtのインスタンスを使い回すとコネクションを使い回せた気がする(調べて確認した気がするが、正確には忘れた)。これもパフォーマンスが重要なボットのなごりです。

import threading

class InstanceAllocation:
   def __init__(self, pool, instance):
       self.pool = pool
       self.instance = instance

   def __enter__(self):
       return self.instance

   def __exit__(self, exc_type, exc_val, exc_tb):
       self.pool.free(self.instance)

class InstancePool:
   def __init__(self, create_fn=None):
       self.lock = threading.Lock()
       self.instances = []
       self.create_fn = create_fn

   def allocate(self):
       with self.lock:
           if len(self.instances):
               instance = self.instances.pop(-1)
           else:
               instance = None
       if instance is None:
           instance = self.create_fn()
       return InstanceAllocation(self, instance)

   def free(self, instance):
       with self.lock:
           self.instances.append(instance)