我的意思是,除非您想部署数据提供商业务,否则制作独立的自主机器人非常酷。
您必须实时实时聚合 OHLC 条形图。
- 通过 WebSocket(或其他流 API)的报价(交易); ->
- 队列持有 dem 刻度; ->
- 从队列中取出 em 记号的逻辑,将 em 发送到聚合器逻辑并将 em 从队列中弹出,这样我们就不会意外地多次处理一个记号; ->
- 实时聚合 OHLCV 的逻辑;
- 判断何时开始构建新柱的逻辑。
FTX 交易所和 Python,15 秒图表聚合。 FTX 有一个"official" WebSocket Client,我对其进行了修改以正确处理所有刻度;
def _handle_trades_message(self, message: Dict) -> None:
self._trades[message['market']].extend(reversed(message['data']))
在那之后,这就是我得到的:
import ftx
import time
import pandas as pd
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
def process_tick(tick):
global data
global flag
global frontier
if (flag == True) and (tick['time'] >= frontier):
start_time = datetime.utcnow().isoformat() #"almost"
time_ = time.time() * 1000 # with higher precision
op = tick['price']
hi = tick['price']
lo = tick['price']
cl = tick['price']
vol = tick['size' ]
row = {'startTime' : start_time,
'time' : time_ ,
'open' : op ,
'high' : hi ,
'low' : lo ,
'close' : cl ,
'volume' : vol }
data = data.append(row, True)
flag = False
print('Opened')
print(tick)
else:
if (tick['price'] > data['high'].iloc[-1]):
data['high'].iloc[-1] = tick['price']
elif (tick['price'] < data['low' ].iloc[-1]):
data['low' ].iloc[-1] = tick['price']
data['close' ].iloc[-1] = tick['price']
data['volume'].iloc[-1] += tick['size' ]
print(tick)
def on_tick():
while True:
try:
try:
process_tick(trades[-1])
trades.pop()
except IndexError:
pass
time.sleep(0.001)
except KeyboardInterrupt:
client_ws._reset_data()
scheduler.remove_job('onClose')
scheduler.shutdown()
print('Shutdown')
break
def on_close():
global flag
global frontier
flag = True
frontier = pd.Timestamp.utcnow().floor('15S').isoformat() #floor to 15 secs
print('Closed')
ASSET = 'LTC-PERP'
RES = '15'
flag = True
frontier = pd.Timestamp.utcnow().floor('15S').isoformat() #floor to 15 secs
cols = ['startTime', 'time', 'open', 'high', 'low', 'close', 'volume']
data = pd.DataFrame(columns=cols)
client_ws = ftx.FtxClientWs()
client_ws._subscribe({'channel': 'trades', 'market': ASSET})
trades = client_ws._trades[ASSET]
scheduler = BackgroundScheduler()
scheduler.configure(timezone='utc')
scheduler.add_job(on_close, trigger='cron', second='0, 15, 30, 45', id='onClose')
scheduler.start()
on_tick()
除了我的东西,只需让on_tick() 在自己的线程中运行,data 变量将在后台不断更新并保存 OHLCV,因此您将能够在同一程序中使用此数据.