InfluxDBのContinuous Queryを使ってOHLCVを自動生成する
下記のSQL文でexecutionsから5秒足を継続的に作成しohlcv_5sに保存する。
code:cq_ohlcv_5s.sql
CREATE CONTINUOUS QUERY "cq_ohlcv_5s" ON FX_BTC_JPY
BEGIN
SELECT MIN(price) as low, MAX(price) as high, FIRST(price) as open, LAST(price) as close, SUM(size) as volume INTO executions_policy.ohlcv_5s FROM executions_policy.executions GROUP BY time(5s)
END
以下のPythonスクリプトを動かして約定履歴を貯めている。dbnameはproduct_code(FX_BTC_JPY等)、measurement(テーブル名っぽい)はtickerとexecutionsとした。保存期間を別々に設定するため1日と7日のリテンションポリシーを作成。jsonデコード後の値段・サイズがintだったりfloatだったりするのでfloatに統一している。階層構造を持つjsonデータ(板情報とか)はフラットにしないと保存できないようだ。1度のwrite_pointsで複数データが描き込めない問題を修正した(2019/1/24)。
code:influxdbbuild.py
# -*- coding: utf-8 -*-
import aiohttp
import asyncio
import json
from datetime import datetime
from time import sleep
from influxdb import InfluxDBClient, SeriesHelper
from influxdb.line_protocol import make_lines, quote_ident, quote_literal
class PointBuffer:
def __init__(self, client, series, policy, fields, tags, bulksize=1000):
self.client = client
self.series = series
self.policy = policy
self.fields = fields
self.tags = tags
self.bulksize = bulksize
self.buffer = []
def append(self, p):
for f in self.fields:
self.buffer.append({
'measurement':self.series,
'tags':{t:pt for t in self.tags}, 'fields':{f:pf for f in self.fields}}) if len(self.buffer)>self.bulksize:
self.commit()
def commit(self):
if len(self.buffer)>0:
self.client.write_points(self.buffer,retention_policy=self.policy)
self.buffer = []
async def main(product_code, topics, dbname):
db = InfluxDBClient("localhost", "8086", 'root', 'root', dbname)
db.create_database(dbname)
db.create_retention_policy("ticker_policy", "1d", 1)
db.create_retention_policy("executions_policy", "7d", 1)
ticker = PointBuffer(
client = db,
series = 'ticker',
policy = 'ticker_policy',
bulksize = 10)
executions = PointBuffer(
client = db,
series = 'executions',
policy = 'executions_policy',
bulksize = 2000)
while True:
try:
async with aiohttp.ClientSession() as client:
ws = await client.ws_connect('wss://ws.lightstream.bitflyer.com/json-rpc')
for t in topics:
await ws.send_json({'method': 'subscribe', 'params': {'channel': 'lightning_'+t+'_'+product_code}})
while True:
msg = await ws.receive()
if msg.type == aiohttp.WSMsgType.TEXT:
if data'channel'.startswith('lightning_ticker_'): print('TICK {ltp} {best_bid}/{best_bid_size} {best_ask}/{best_ask_size}'.format(**data'message')) elif data'channel'.startswith('lightning_executions_'): print('EXEC {side} {price} {size}'.format(**data'message'-1)) executions.append(e)
executions.commit()
elif data'channel'.startswith('lightning_board_snapshot_'): pass
elif data'channel'.startswith('lightning_board_'): pass
elif msg.type == aiohttp.WSMsgType.CLOSED:
print('ws closed')
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print('ws error')
await asyncio.sleep(1)
except Exception as e:
print(e)
await asyncio.sleep(1)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="")
parser.add_argument("--product_code", dest='product_code', type=str, default='FX_BTC_JPY')
parser.add_argument("--dbname", dest='dbname', type=str, default='FX_BTC_JPY')
args = parser.parse_args()
asyncio.get_event_loop().run_until_complete(main(args.product_code, args.topics, args.dbname))