【问题标题】:How to fix this IO bound python operation on 12GB .bin file?如何在 12GB .bin 文件上修复此 IO 绑定 python 操作?
【发布时间】:2020-11-25 09:53:37
【问题描述】:

我正在阅读这本书Hands-On Machine Learning for Algorithmic Trading,我遇到了一个脚本,它应该解析一个大的.bin 二进制文件并将其转换为.h5。该文件包含称为 ITCH 数据的内容,您可以找到数据here 的技术文档。该脚本效率非常低,它一次读取 2 个字节的 12GB(12952050754 字节)文件,这非常慢(在一些体面的 4cpu GCP 实例上可能需要长达 4 小时),这并不奇怪。你可以找到整个笔记本here

我的问题是我不明白这个.bin 文件是如何被读取的缓冲区很大,但我不知道该怎么做,或者如果在优化这个脚本之后,它仍然很慢,我什至不知道该怎么做,如果我了解这个 I/O 过程的内部工作,我可以做到这一点,有人有建议吗?

这里是 ITCH 数据文件source 的链接,如果您需要试验代码,可以找到时间较短的小文件(300 mb 或更少)。


瓶颈:

with file_name.open('rb') as data:
    while True:

        # determine message size in bytes
        message_size = int.from_bytes(data.read(2), byteorder='big', signed=False)
        
        # get message type by reading first byte
        message_type = data.read(1).decode('ascii')        
        message_type_counter.update([message_type])

        # read & store message
        record = data.read(message_size - 1)
        message = message_fields[message_type]._make(unpack(fstring[message_type], record))
        messages[message_type].append(message)
        
        # deal with system events
        if message_type == 'S':
            seconds = int.from_bytes(message.timestamp, byteorder='big') * 1e-9
            print('\n', event_codes.get(message.event_code.decode('ascii'), 'Error'))
            print(f'\t{format_time(seconds)}\t{message_count:12,.0f}')
            if message.event_code.decode('ascii') == 'C':
                store_messages(messages)
                break
        message_count += 1

        if message_count % 2.5e7 == 0:
            seconds = int.from_bytes(message.timestamp, byteorder='big') * 1e-9
            d = format_time(time() - start)
            print(f'\t{format_time(seconds)}\t{message_count:12,.0f}\t{d}')
            res = store_messages(messages)
            if res == 1:
                print(pd.Series(dict(message_type_counter)).sort_values())
                break
            messages.clear()

这是store_messages() 函数:

def store_messages(m):
    """Handle occasional storing of all messages"""
    with pd.HDFStore(itch_store) as store:
        for mtype, data in m.items():
            # convert to DataFrame
            data = pd.DataFrame(data)

            # parse timestamp info
            data.timestamp = data.timestamp.apply(int.from_bytes, byteorder='big')
            data.timestamp = pd.to_timedelta(data.timestamp)

            # apply alpha formatting
            if mtype in alpha_formats.keys():
                data = format_alpha(mtype, data)

            s = alpha_length.get(mtype)
            if s:
                s = {c: s.get(c) for c in data.columns}
            dc = ['stock_locate']
            if m == 'R':
                dc.append('stock')
            try:
                store.append(mtype,
                         data,
                         format='t',
                         min_itemsize=s,
                         data_columns=dc)
            except Exception as e:
                print(e)
                print(mtype)
                print(data.info())
                print(pd.Series(list(m.keys())).value_counts())
                data.to_csv('data.csv', index=False)
                return 1
    return 0

【问题讨论】:

    标签: python python-3.x io binaryfiles algorithmic-trading


    【解决方案1】:

    根据代码,文件格式看起来像它的 2 个字节的消息大小,一个字节的消息类型,然后是 n 个字节的实际消息(由先前读取的消息大小定义)。

    优化这一点的简单方法是先将 3 个字节读入列表,将 [0:1] 转换为消息大小 int,将 [2] 转换为消息类型,然后读取消息..

    为了进一步消除所需的读取量,您可以从文件中读取固定数量的数据到一个列表中并开始从中提取。提取时,保留已处理字节的索引,一旦该索引或索引+要读取的数据量超过列表的大小,您就可以预填充列表。如果没有正确考虑,这可能会导致巨大的内存需求..

    【讨论】:

    • 感谢您的建议,让我反映一下我的想法:假设我一次读取 100 mb(100000000 字节),然后我会做类似current_read = data.read(100000000) 的操作,然后我可能会使用另一个知道在哪里的函数对列表进行分区,使用第三个和第四个函数转换和存储列表内容,然后可能在最后,清除列表并将最后一个索引保存在变量中。问题是如何确保它的行为正确?我的意思是即使在调试器中运行它也不会表明它是否工作正常。有更好/更快的方法吗?
    • 你能提供一些代码来进一步澄清你的解释吗?
    • FWIW 不太可能做任何事情:默认情况下,Python 的二进制文件 IO 是完全缓冲的,如果可用则使用 os.stat(path).st_blksize 的缓冲区,否则使用 io.DEFAULT_BUFFER_SIZE。所以小的reads 只是要更新缓冲区,底层文件一次读取 4k/8k。如果您想要更大的缓冲区(以消耗更多内存为代价),您可以为open 提供更大的缓冲区大小,但我不会乐观。我建议使用分析器来尝试找出热点是什么(如果有热点)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-04-16
    • 2014-07-15
    • 2016-06-11
    • 1970-01-01
    相关资源
    最近更新 更多