Bitflyer FXのデータをInfluxDBに格納し続けるプログラム

Bitflyer FXのTicker情報と約定情報をInfluxDBに格納し続けるプログラムです。

PubnubがBitflyerで廃止予定なので、記念にnoteに貼りました。数年前にPythonのパの字も知らないまま書いたので、とてもぐちゃぐちゃなコードですが、半年前は動いていました。Systemdで設定しておくと安心して常時稼働させられるはずです。

# encoding: utf-8
import json
import datetime
import time
import urllib.request
import re

from influxdb import InfluxDBClient
import requests
from pubnub.callbacks import SubscribeCallback
from pubnub.enums import PNStatusCategory
from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub_tornado import PubNubTornado
from pubnub.pnconfiguration import PNReconnectionPolicy
from datetime import datetime, timezone, timedelta

config = PNConfiguration()
config.subscribe_key = 'sub-c-52a9ab50-291b-11e5-baaa-0619f8945a4f'
config.reconnect_policy = PNReconnectionPolicy.LINEAR
pubnub = PubNubTornado(config)

from tornado import gen

class influxDB():
	def __init__(self,dbName="bitflyer_pubnub",dbname_1s="bitflyer_pubnub_1s"):
		self.user = 'root'
		self.password = 'root'
		self.dbname = dbName
		self.client = InfluxDBClient("localhost", "8086", self.user, self.password, self.dbname)

	def insert_db(self, json_body):

		#下記4行は最初だけ有効にしておけばおっけ
		dbs = self.client.get_list_database()
		sample_db = {'name' : self.dbname}
		if sample_db not in dbs:
			self.client.create_database(self.dbname)
			rp_query = 'CREATE RETENTION POLICY ' + '"rp_60day" ON "' +  self.dbname+ '" DURATION 60d REPLICATION 1'
			self.client.query(rp_query)

		#rs = self.client.query('Select * From "' + coinName + '" order by time Desc Limit 1')
		#tmp = list(rs.get_points(measurement=coinName))
		#if len(tmp)!=0:
		#	if tmp[0]['time']==json_body[0]['time']:
		#		return 'Duplication'
		#print(json_body)
		self.client.write_points(json_body)

def convertTimestamp(unixTime):
	utcDate = datetime.datetime.utcfromtimestamp(unixTime)
	utcDate = str(utcDate).split(" ")[0] + "T" + str(utcDate).split(" ")[1] + "Z"
	return utcDate

db_global = influxDB("bitflyer_pubnub")

@gen.coroutine
def main(channels):
    class BitflyerSubscriberCallback(SubscribeCallback):
        def presence(self, pubnub, presence):
            pass  # handle incoming presence data

        def status(self, pubnub, status):
            if status.category == PNStatusCategory.PNUnexpectedDisconnectCategory:
                pass  # This event happens when radio / connectivity is lost

            elif status.category == PNStatusCategory.PNConnectedCategory:
                # Connect event. You can do stuff like publish, and know you'll get it.
                # Or just use the connected event to confirm you are subscribed for
                # UI / internal notifications, etc
                pass
            elif status.category == PNStatusCategory.PNReconnectedCategory:
                pass
                # Happens as part of our regular operation. This event happens when
                # radio / connectivity is lost, then regained.
            elif status.category == PNStatusCategory.PNDecryptionErrorCategory:
                pass
                # Handle message decryption error. Probably client configured to
                # encrypt messages and on live data feed it received plain text.

        def message(self, pubnub, message):
            # Handle new message stored in message.message

            try:
                task(message.channel, message.message)
            except Exception as e:
                print("error: ", e)

    listener = BitflyerSubscriberCallback()
    pubnub.add_listener(listener)
    pubnub.subscribe().channels(channels).execute()

def task(channel, message):
    global db_global
    if isinstance(message,list):
        for i in message:
            tmpDictData = []
            json_body = []
            if 'tick_id' in i:
                tmpDictData = {"best_bid":float(i["best_bid"]),"best_bid_size": float(i["best_bid_size"]),"bid_JPY_volume":float(i["best_bid"])*float(i["best_bid_size"]),"best_ask":float(i["best_ask"]),"best_ask_size": float(i["best_ask_size"]),"ask_JPY_volume":float(i["best_ask"])*float(i["best_ask_size"]), 'total_bid_depth': i["total_bid_depth"],'total_ask_depth': i["total_ask_depth"],'ltp':i["ltp"],"volume_by_product":i["volume_by_product"],"volume":i["volume"],"Symbol": "FX_BTC_JPY", 'timestamp':i['timestamp']}
                json_body = [{'measurement': "lightning_ticker_FX_BTC_JPY", 'time' : tmpDictData['timestamp'],'fields':tmpDictData}]
                #print("ticker")
            #lightning_executions_FX_BTC_JPY
            elif 'buy_child_order_acceptance_id' in i:
                tmpDictData = {"side":str(i['side']),"price": float(i["price"]),"size":float(i["size"]),"JPY_volume":float(i["price"])*float(i["size"]),"Symbol": "FX_BTC_JPY", 'timestamp':i['exec_date']}
                json_body = [{'measurement': "lightning_executions_FX_BTC_JPY", 'time' : tmpDictData['timestamp'],'fields':tmpDictData}]
                #print("excecutions")
            db_global.insert_db(json_body)
    else:
        i = message
        #lightning_ticker_FX_BTC_JPY
        tmpDictData = []
        json_body = []
        if 'tick_id' in i:
            tmpDictData = {"best_bid":float(i["best_bid"]),"best_bid_size": float(i["best_bid_size"]),"bid_JPY_volume":float(i["best_bid"])*float(i["best_bid_size"]),"best_ask":float(i["best_ask"]),"best_ask_size": float(i["best_ask_size"]),"ask_JPY_volume":float(i["best_ask"])*float(i["best_ask_size"]), 'total_bid_depth': i["total_bid_depth"],'total_ask_depth': i["total_ask_depth"],'ltp':i["ltp"],"volume_by_product":i["volume_by_product"],"volume":i["volume"],"Symbol": "FX_BTC_JPY", 'timestamp':i['timestamp']}
            json_body = [{'measurement': "lightning_ticker_FX_BTC_JPY", 'time' : tmpDictData['timestamp'],'fields':tmpDictData}]
            #print("ticker")
        #lightning_executions_FX_BTC_JPY
        elif 'buy_child_order_acceptance_id' in i:
            tmpDictData = {"side":str(i['side']),"price": float(i["price"]),"size":float(i["size"]),"JPY_volume":float(i["price"])*float(i["size"]),"Symbol": "FX_BTC_JPY", 'timestamp':i['exec_date']}
            json_body = [{'measurement': "lightning_executions_FX_BTC_JPY", 'time' : tmpDictData['timestamp'],'fields':tmpDictData}]
            #print("excecutions")
        db_global.insert_db(json_body)

if __name__ == "__main__":
    #mainmain()
    #main(['lightning_executions_FX_BTC_JPY'])
    main(['lightning_executions_FX_BTC_JPY','lightning_ticker_FX_BTC_JPY'])
    pubnub.start()

この記事が気に入ったらサポートをしてみませんか?