【发布时间】:2020-11-25 09:53:37
【问题描述】:
我正在阅读这本书Hands-On Machine Learning for Algorithmic Trading,我遇到了一个脚本,它应该解析一个大的.bin 二进制文件并将其转换为.h5。该文件包含称为 ITCH 数据的内容,您可以找到数据here 的技术文档。该脚本效率非常低,它一次读取 2 个字节的 12GB(12952050754 字节)文件,这非常慢(在一些体面的 4cpu GCP 实例上可能需要长达 4 小时),这并不奇怪。你可以找到整个笔记本here。
我的问题是我不明白这个.bin 文件是如何被读取的缓冲区很大,但我不知道该怎么做,或者如果在优化这个脚本之后,它仍然很慢,我什至不知道该怎么做,如果我了解这个 I/O 过程的内部工作,我可以做到这一点,有人有建议吗?
这里是 ITCH 数据文件source 的链接,如果您需要试验代码,可以找到时间较短的小文件(300 mb 或更少)。
瓶颈:
with file_name.open('rb') as data:
while True:
# determine message size in bytes
message_size = int.from_bytes(data.read(2), byteorder='big', signed=False)
# get message type by reading first byte
message_type = data.read(1).decode('ascii')
message_type_counter.update([message_type])
# read & store message
record = data.read(message_size - 1)
message = message_fields[message_type]._make(unpack(fstring[message_type], record))
messages[message_type].append(message)
# deal with system events
if message_type == 'S':
seconds = int.from_bytes(message.timestamp, byteorder='big') * 1e-9
print('\n', event_codes.get(message.event_code.decode('ascii'), 'Error'))
print(f'\t{format_time(seconds)}\t{message_count:12,.0f}')
if message.event_code.decode('ascii') == 'C':
store_messages(messages)
break
message_count += 1
if message_count % 2.5e7 == 0:
seconds = int.from_bytes(message.timestamp, byteorder='big') * 1e-9
d = format_time(time() - start)
print(f'\t{format_time(seconds)}\t{message_count:12,.0f}\t{d}')
res = store_messages(messages)
if res == 1:
print(pd.Series(dict(message_type_counter)).sort_values())
break
messages.clear()
这是store_messages() 函数:
def store_messages(m):
"""Handle occasional storing of all messages"""
with pd.HDFStore(itch_store) as store:
for mtype, data in m.items():
# convert to DataFrame
data = pd.DataFrame(data)
# parse timestamp info
data.timestamp = data.timestamp.apply(int.from_bytes, byteorder='big')
data.timestamp = pd.to_timedelta(data.timestamp)
# apply alpha formatting
if mtype in alpha_formats.keys():
data = format_alpha(mtype, data)
s = alpha_length.get(mtype)
if s:
s = {c: s.get(c) for c in data.columns}
dc = ['stock_locate']
if m == 'R':
dc.append('stock')
try:
store.append(mtype,
data,
format='t',
min_itemsize=s,
data_columns=dc)
except Exception as e:
print(e)
print(mtype)
print(data.info())
print(pd.Series(list(m.keys())).value_counts())
data.to_csv('data.csv', index=False)
return 1
return 0
【问题讨论】:
标签: python python-3.x io binaryfiles algorithmic-trading