【问题标题】:Reading binary file and looping over each byte读取二进制文件并遍历每个字节
【发布时间】:2010-11-05 08:10:42
【问题描述】:

在 Python 中,如何读取二进制文件并遍历该文件的每个字节?

【问题讨论】:

    标签: python file-io binary


    【解决方案1】:

    Python 2.4 及更早版本

    f = open("myfile", "rb")
    try:
        byte = f.read(1)
        while byte != "":
            # Do stuff with byte.
            byte = f.read(1)
    finally:
        f.close()
    

    Python 2.5-2.7

    with open("myfile", "rb") as f:
        byte = f.read(1)
        while byte != "":
            # Do stuff with byte.
            byte = f.read(1)
    

    请注意,with 语句在低于 2.5 的 Python 版本中不可用。要在 v 2.5 中使用它,您需要导入它:

    from __future__ import with_statement
    

    在 2.6 中这是不需要的。

    Python 3

    在 Python 3 中,它有点不同。我们将不再以字节模式从流中获取原始字符,而是字节对象,因此我们需要更改条件:

    with open("myfile", "rb") as f:
        byte = f.read(1)
        while byte != b"":
            # Do stuff with byte.
            byte = f.read(1)
    

    或者正如本霍伊特所说,跳过不等于并利用b"" 评估为假的事实。这使得代码在 2.6 和 3.x 之间兼容,无需任何更改。如果您从字节模式转到文本或相反,它还可以避免更改条件。

    with open("myfile", "rb") as f:
        byte = f.read(1)
        while byte:
            # Do stuff with byte.
            byte = f.read(1)
    

    python 3.8

    从现在开始感谢 := 操作符,上面的代码可以用更短的方式编写。

    with open("myfile", "rb") as f:
        while (byte := f.read(1)):
            # Do stuff with byte.
    

    【讨论】:

    • 按字节读取文件是一场性能噩梦。这不是 python 中可用的最佳解决方案。应谨慎使用此代码。
    • @usr:文件对象是在内部缓冲的,即便如此,这也是我们所要求的。并非每个脚本都需要最佳性能。
    • @mezhaka:所以你把它从 read(1) 改成 read(bufsize) 并且在 while 循环中你做了一个 for-in ......这个例子仍然有效。
    • @usr: 性能差异可以高达for the code I've tried的200倍。
    • @usr - 这取决于您要处理的字节数。如果它们足够少,那么性能“差”但易于理解的代码可能会更受欢迎。 CPU 周期的浪费是为了在维护代码时节省“阅读器 CPU 周期”而得到补偿。
    【解决方案2】:

    此生成器从文件中生成字节,以块的形式读取文件:

    def bytes_from_file(filename, chunksize=8192):
        with open(filename, "rb") as f:
            while True:
                chunk = f.read(chunksize)
                if chunk:
                    for b in chunk:
                        yield b
                else:
                    break
    
    # example:
    for b in bytes_from_file('filename'):
        do_stuff_with(b)
    

    有关 iteratorsgenerators 的信息,请参阅 Python 文档。

    【讨论】:

    • @codeape 正是我想要的。但是,你如何确定块大小?可以是任意值吗?
    • @swdev:该示例使用 8192 Bytes 的块大小。 file.read() 函数的参数只是指定大小,即要读取的字节数。 codeape 选择了8192 Byte = 8 kB(实际上是KiB,但这并不常见)。该值是“完全”随机的,但 8 kB 似乎是一个合适的值:没有浪费太多内存,而且仍然没有像 Skurmedel 接受的答案那样“太多”读取操作......
    • 文件系统已经缓冲了数据块,所以这段代码是多余的。最好一次读取一个字节。
    • 虽然已经比公认的答案快,但通过用 yield from chunk 替换整个最内层的 for b in chunk: 循环,可以再加快 20-25%。这种yield 的形式是在Python 3.3 中添加的(参见Yield Expressions)。
    • 这对我来说比接受的答案要慢。我不知道为什么。
    【解决方案3】:

    如果文件不是太大,将其保存在内存中是个问题:

    with open("filename", "rb") as f:
        bytes_read = f.read()
    for b in bytes_read:
        process_byte(b)
    

    其中 process_byte 表示您要对传入的字节执行的一些操作。

    如果你想一次处理一个块:

    with open("filename", "rb") as f:
        bytes_read = f.read(CHUNKSIZE)
        while bytes_read:
            for b in bytes_read:
                process_byte(b)
            bytes_read = f.read(CHUNKSIZE)
    

    with 语句在 Python 2.5 及更高版本中可用。

    【讨论】:

    • 您可能对我刚刚发布的benchmark 感兴趣。
    【解决方案4】:

    要读取文件——一次一个字节(忽略缓冲)——你可以使用two-argument iter(callable, sentinel) built-in function

    with open(filename, 'rb') as file:
        for byte in iter(lambda: file.read(1), b''):
            # Do stuff with byte
    

    它调用file.read(1),直到它没有返回任何东西b''(空字节串)。对于大文件,内存不会无限增长。您可以将buffering=0 传递给open(),以禁用缓冲——它保证每次迭代只读取一个字节(慢)。

    with-statement 自动关闭文件——包括下面的代码引发异常的情况。

    尽管默认情况下存在内部缓冲,但一次处理一个字节仍然是低效的。例如,下面是 blackhole.py 实用程序,它会吃掉所有给定的东西:

    #!/usr/bin/env python3
    """Discard all input. `cat > /dev/null` analog."""
    import sys
    from functools import partial
    from collections import deque
    
    chunksize = int(sys.argv[1]) if len(sys.argv) > 1 else (1 << 15)
    deque(iter(partial(sys.stdin.detach().read, chunksize), b''), maxlen=0)
    

    例子:

    $ dd if=/dev/zero bs=1M count=1000 | python3 blackhole.py
    

    chunksize == 32768 在我的机器上时它处理 ~1.5 GB/s,而当 chunksize == 1 时只处理 ~7.5 MB/s。也就是说,一次读取一个字节要慢 200 倍。如果您可以重写您的处理以一次使用多个字节并且如果您需要性能,请考虑到这一点。

    mmap 允许您同时将文件视为bytearray 和文件对象。如果您需要访问这两个接口,它可以作为将整个文件加载到内存中的替代方法。特别是,您可以只使用普通的for-loop 一次在内存映射文件上迭代一个字节:

    from mmap import ACCESS_READ, mmap
    
    with open(filename, 'rb', 0) as f, mmap(f.fileno(), 0, access=ACCESS_READ) as s:
        for byte in s: # length is equal to the current file size
            # Do stuff with byte
    

    mmap 支持切片表示法。例如,mm[i:i+len] 从位置i 开始的文件中返回len 字节。 Python 3.2 之前不支持上下文管理器协议;在这种情况下,您需要显式调用mm.close()。使用mmap 迭代每个字节比file.read(1) 消耗更多的内存,但mmap 快一个数量级。

    【讨论】:

    • 我发现最后一个例子很有趣。太糟糕了,没有等效的 numpy 内存映射(字节)数组。
    • @martineau 有numpy.memmap(),您可以一次获取一个字节的数据(ctypes.data)。您可以将 numpy 数组视为内存中的 blob + 元数据。
    • jfs:谢谢,好消息!不知道它存在这样的东西。很好的答案,顺便说一句。
    【解决方案5】:

    在 Python 中读取二进制文件并遍历每个字节

    Python 3.5 中的新功能是pathlib 模块,它有一个方便的方法专门将文件作为字节读取,允许我们遍历字节。我认为这是一个不错的(如果又快又脏)答案:

    import pathlib
    
    for byte in pathlib.Path(path).read_bytes():
        print(byte)
    

    有趣的是,这是提到 pathlib 的唯一答案。

    在 Python 2 中,您可能会这样做(正如 Vinay Sajip 也建议的那样):

    with open(path, 'b') as file:
        for byte in file.read():
            print(byte)
    

    如果文件可能太大而无法在内存中进行迭代,您可以习惯性地将其分块,使用带有callable, sentinel 签名的iter 函数 - Python 2 版本:

    with open(path, 'b') as file:
        callable = lambda: file.read(1024)
        sentinel = bytes() # or b''
        for chunk in iter(callable, sentinel): 
            for byte in chunk:
                print(byte)
    

    (其他几个答案都提到了这一点,但很少有人提供合理的读取大小。)

    大文件或缓冲/交互式阅读的最佳实践

    让我们创建一个函数来执行此操作,包括 Python 3.5+ 标准库的惯用用法:

    from pathlib import Path
    from functools import partial
    from io import DEFAULT_BUFFER_SIZE
    
    def file_byte_iterator(path):
        """given a path, return an iterator over the file
        that lazily loads the file
        """
        path = Path(path)
        with path.open('rb') as file:
            reader = partial(file.read1, DEFAULT_BUFFER_SIZE)
            file_iterator = iter(reader, bytes())
            for chunk in file_iterator:
                yield from chunk
    

    请注意,我们使用file.read1file.read 阻塞,直到它获得所有请求的字节或EOFfile.read1 可以让我们避免阻塞,因此它可以更快地返回。没有其他答案也提到这一点。

    最佳实践用法演示:

    让我们用一兆字节(实际上是兆字节)的伪随机数据创建一个文件:

    import random
    import pathlib
    path = 'pseudorandom_bytes'
    pathobj = pathlib.Path(path)
    
    pathobj.write_bytes(
      bytes(random.randint(0, 255) for _ in range(2**20)))
    

    现在让我们遍历它并在内存中实现它:

    >>> l = list(file_byte_iterator(path))
    >>> len(l)
    1048576
    

    我们可以检查数据的任何部分,例如,最后 100 个字节和前 100 个字节:

    >>> l[-100:]
    [208, 5, 156, 186, 58, 107, 24, 12, 75, 15, 1, 252, 216, 183, 235, 6, 136, 50, 222, 218, 7, 65, 234, 129, 240, 195, 165, 215, 245, 201, 222, 95, 87, 71, 232, 235, 36, 224, 190, 185, 12, 40, 131, 54, 79, 93, 210, 6, 154, 184, 82, 222, 80, 141, 117, 110, 254, 82, 29, 166, 91, 42, 232, 72, 231, 235, 33, 180, 238, 29, 61, 250, 38, 86, 120, 38, 49, 141, 17, 190, 191, 107, 95, 223, 222, 162, 116, 153, 232, 85, 100, 97, 41, 61, 219, 233, 237, 55, 246, 181]
    >>> l[:100]
    [28, 172, 79, 126, 36, 99, 103, 191, 146, 225, 24, 48, 113, 187, 48, 185, 31, 142, 216, 187, 27, 146, 215, 61, 111, 218, 171, 4, 160, 250, 110, 51, 128, 106, 3, 10, 116, 123, 128, 31, 73, 152, 58, 49, 184, 223, 17, 176, 166, 195, 6, 35, 206, 206, 39, 231, 89, 249, 21, 112, 168, 4, 88, 169, 215, 132, 255, 168, 129, 127, 60, 252, 244, 160, 80, 155, 246, 147, 234, 227, 157, 137, 101, 84, 115, 103, 77, 44, 84, 134, 140, 77, 224, 176, 242, 254, 171, 115, 193, 29]
    

    二进制文件不要逐行迭代

    不要执行以下操作 - 这会拉取任意大小的块,直到它到达换行符 - 当块太小并且可能太大时太慢:

        with open(path, 'rb') as file:
            for chunk in file: # text newline iteration - not for bytes
                yield from chunk
    

    以上内容仅适用于您应该在没有'b' 标志。

    【讨论】:

    • 这好多了...谢谢你这样做。我知道回到两年前的答案并不总是很有趣,但我很感激你这样做了。我特别喜欢“不要逐行迭代”副标题 :-)
    • 嗨 Aaron,您有什么理由选择使用 path = Path(path), with path.open('rb') as file: 而不是使用内置的 open 函数吗?他们都做同样的事情对吗?
    • @JoshuaYonathan 我使用Path 对象,因为它是处理路径的一种非常方便的新方法。我们可以简单地调用路径对象上的方法,而不是将字符串传递给精心挑选的“正确”函数,它本质上包含您想要的大部分重要功能,语义上是路径字符串。使用可以检查的 IDE,我们也可以更轻松地获得自动完成功能。我们可以使用内置的open 完成相同的操作,但是在编写程序以供程序员使用Path 对象时有很多好处。
    • 你提到的最后一个使用函数file_byte_iterator的方法比我在这个页面上尝试过的所有方法都要快得多。向你致敬!
    • @RickM:您可能对我刚刚发布的benchmark 感兴趣。
    【解决方案6】:

    总结 chrispy、Skurmedel、Ben Hoyt 和 Peter Hansen 的所有亮点,这将是一次处理一个字节的二进制文件的最佳解决方案:

    with open("myfile", "rb") as f:
        while True:
            byte = f.read(1)
            if not byte:
                break
            do_stuff_with(ord(byte))
    

    对于python 2.6及以上版本,因为:

    • python 内部缓冲区 - 无需读取块
    • DRY 原则——不重复读行
    • with 语句确保一个干净的文件关闭
    • 当没有更多字节时(而不是当一个字节为零时),'byte' 的计算结果为 false

    或使用 J. F. Sebastians 解决方案来提高速度

    from functools import partial
    
    with open(filename, 'rb') as file:
        for byte in iter(partial(file.read, 1), b''):
            # Do stuff with byte
    

    或者,如果您希望它作为生成器函数,如 codeape 演示的那样:

    def bytes_from_file(filename):
        with open(filename, "rb") as f:
            while True:
                byte = f.read(1)
                if not byte:
                    break
                yield(ord(byte))
    
    # example:
    for b in bytes_from_file('filename'):
        do_stuff_with(b)
    

    【讨论】:

    • 正如链接的答案所说,即使读取被缓冲,在 Python 中一次读取/处理一个字节仍然很慢。如果可以像链接答案中的示例那样一次处理几个字节,则性能可以大大提高:1.5GB/s 与 7.5MB/s。
    【解决方案7】:

    这篇文章本身并不是对这个问题的直接回答。相反,它是一个数据驱动的可扩展基准,可用于比较已发布到此问题的许多答案(以及利用后来更现代的 Python 版本中添加的新功能的变体)——因此应该有助于确定哪个性能最佳。

    在某些情况下,我修改了引用答案中的代码,使其与基准框架兼容。

    首先,以下是当前最新版本的 Python 2 和 3 的结果:

    Fastest to slowest execution speeds with 32-bit Python 2.7.16
      numpy version 1.16.5
      Test file size: 1,024 KiB
      100 executions, best of 3 repetitions
    
    1                  Tcll (array.array) :   3.8943 secs, rel speed   1.00x,   0.00% slower (262.95 KiB/sec)
    2  Vinay Sajip (read all into memory) :   4.1164 secs, rel speed   1.06x,   5.71% slower (248.76 KiB/sec)
    3            codeape + iter + partial :   4.1616 secs, rel speed   1.07x,   6.87% slower (246.06 KiB/sec)
    4                             codeape :   4.1889 secs, rel speed   1.08x,   7.57% slower (244.46 KiB/sec)
    5               Vinay Sajip (chunked) :   4.1977 secs, rel speed   1.08x,   7.79% slower (243.94 KiB/sec)
    6           Aaron Hall (Py 2 version) :   4.2417 secs, rel speed   1.09x,   8.92% slower (241.41 KiB/sec)
    7                     gerrit (struct) :   4.2561 secs, rel speed   1.09x,   9.29% slower (240.59 KiB/sec)
    8                     Rick M. (numpy) :   8.1398 secs, rel speed   2.09x, 109.02% slower (125.80 KiB/sec)
    9                           Skurmedel :  31.3264 secs, rel speed   8.04x, 704.42% slower ( 32.69 KiB/sec)
    
    Benchmark runtime (min:sec) - 03:26
    

    Fastest to slowest execution speeds with 32-bit Python 3.8.0
      numpy version 1.17.4
      Test file size: 1,024 KiB
      100 executions, best of 3 repetitions
    
    1  Vinay Sajip + "yield from" + "walrus operator" :   3.5235 secs, rel speed   1.00x,   0.00% slower (290.62 KiB/sec)
    2                       Aaron Hall + "yield from" :   3.5284 secs, rel speed   1.00x,   0.14% slower (290.22 KiB/sec)
    3         codeape + iter + partial + "yield from" :   3.5303 secs, rel speed   1.00x,   0.19% slower (290.06 KiB/sec)
    4                      Vinay Sajip + "yield from" :   3.5312 secs, rel speed   1.00x,   0.22% slower (289.99 KiB/sec)
    5      codeape + "yield from" + "walrus operator" :   3.5370 secs, rel speed   1.00x,   0.38% slower (289.51 KiB/sec)
    6                          codeape + "yield from" :   3.5390 secs, rel speed   1.00x,   0.44% slower (289.35 KiB/sec)
    7                                      jfs (mmap) :   4.0612 secs, rel speed   1.15x,  15.26% slower (252.14 KiB/sec)
    8              Vinay Sajip (read all into memory) :   4.5948 secs, rel speed   1.30x,  30.40% slower (222.86 KiB/sec)
    9                        codeape + iter + partial :   4.5994 secs, rel speed   1.31x,  30.54% slower (222.64 KiB/sec)
    10                                        codeape :   4.5995 secs, rel speed   1.31x,  30.54% slower (222.63 KiB/sec)
    11                          Vinay Sajip (chunked) :   4.6110 secs, rel speed   1.31x,  30.87% slower (222.08 KiB/sec)
    12                      Aaron Hall (Py 2 version) :   4.6292 secs, rel speed   1.31x,  31.38% slower (221.20 KiB/sec)
    13                             Tcll (array.array) :   4.8627 secs, rel speed   1.38x,  38.01% slower (210.58 KiB/sec)
    14                                gerrit (struct) :   5.0816 secs, rel speed   1.44x,  44.22% slower (201.51 KiB/sec)
    15                 Rick M. (numpy) + "yield from" :  11.8084 secs, rel speed   3.35x, 235.13% slower ( 86.72 KiB/sec)
    16                                      Skurmedel :  11.8806 secs, rel speed   3.37x, 237.18% slower ( 86.19 KiB/sec)
    17                                Rick M. (numpy) :  13.3860 secs, rel speed   3.80x, 279.91% slower ( 76.50 KiB/sec)
    
    Benchmark runtime (min:sec) - 04:47
    

    我还使用一个更大的 10 MiB 测试文件(运行了将近一个小时)运行它,并获得了与上面显示的性能相当的结果。

    这是用于进行基准测试的代码:

    from __future__ import print_function
    import array
    import atexit
    from collections import deque, namedtuple
    import io
    from mmap import ACCESS_READ, mmap
    import numpy as np
    from operator import attrgetter
    import os
    import random
    import struct
    import sys
    import tempfile
    from textwrap import dedent
    import time
    import timeit
    import traceback
    
    try:
        xrange
    except NameError:  # Python 3
        xrange = range
    
    
    class KiB(int):
        """ KibiBytes - multiples of the byte units for quantities of information. """
        def __new__(self, value=0):
            return 1024*value
    
    
    BIG_TEST_FILE = 1  # MiBs or 0 for a small file.
    SML_TEST_FILE = KiB(64)
    EXECUTIONS = 100  # Number of times each "algorithm" is executed per timing run.
    TIMINGS = 3  # Number of timing runs.
    CHUNK_SIZE = KiB(8)
    if BIG_TEST_FILE:
        FILE_SIZE = KiB(1024) * BIG_TEST_FILE
    else:
        FILE_SIZE = SML_TEST_FILE  # For quicker testing.
    
    # Common setup for all algorithms -- prefixed to each algorithm's setup.
    COMMON_SETUP = dedent("""
        # Make accessible in algorithms.
        from __main__ import array, deque, get_buffer_size, mmap, np, struct
        from __main__ import ACCESS_READ, CHUNK_SIZE, FILE_SIZE, TEMP_FILENAME
        from functools import partial
        try:
            xrange
        except NameError:  # Python 3
            xrange = range
    """)
    
    
    def get_buffer_size(path):
        """ Determine optimal buffer size for reading files. """
        st = os.stat(path)
        try:
            bufsize = st.st_blksize # Available on some Unix systems (like Linux)
        except AttributeError:
            bufsize = io.DEFAULT_BUFFER_SIZE
        return bufsize
    
    # Utility primarily for use when embedding additional algorithms into benchmark.
    VERIFY_NUM_READ = """
        # Verify generator reads correct number of bytes (assumes values are correct).
        bytes_read = sum(1 for _ in file_byte_iterator(TEMP_FILENAME))
        assert bytes_read == FILE_SIZE, \
               'Wrong number of bytes generated: got {:,} instead of {:,}'.format(
                    bytes_read, FILE_SIZE)
    """
    
    TIMING = namedtuple('TIMING', 'label, exec_time')
    
    class Algorithm(namedtuple('CodeFragments', 'setup, test')):
    
        # Default timeit "stmt" code fragment.
        _TEST = """
            #for b in file_byte_iterator(TEMP_FILENAME):  # Loop over every byte.
            #    pass  # Do stuff with byte...
            deque(file_byte_iterator(TEMP_FILENAME), maxlen=0)  # Data sink.
        """
    
        # Must overload __new__ because (named)tuples are immutable.
        def __new__(cls, setup, test=None):
            """ Dedent (unindent) code fragment string arguments.
            Args:
              `setup` -- Code fragment that defines things used by `test` code.
                         In this case it should define a generator function named
                         `file_byte_iterator()` that will be passed that name of a test file
                         of binary data. This code is not timed.
              `test` -- Code fragment that uses things defined in `setup` code.
                        Defaults to _TEST. This is the code that's timed.
            """
            test =  cls._TEST if test is None else test  # Use default unless one is provided.
    
            # Uncomment to replace all performance tests with one that verifies the correct
            # number of bytes values are being generated by the file_byte_iterator function.
            #test = VERIFY_NUM_READ
    
            return tuple.__new__(cls, (dedent(setup), dedent(test)))
    
    
    algorithms = {
    
        'Aaron Hall (Py 2 version)': Algorithm("""
            def file_byte_iterator(path):
                with open(path, "rb") as file:
                    callable = partial(file.read, 1024)
                    sentinel = bytes() # or b''
                    for chunk in iter(callable, sentinel):
                        for byte in chunk:
                            yield byte
        """),
    
        "codeape": Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    while True:
                        chunk = f.read(chunksize)
                        if chunk:
                            for b in chunk:
                                yield b
                        else:
                            break
        """),
    
        "codeape + iter + partial": Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    for chunk in iter(partial(f.read, chunksize), b''):
                        for b in chunk:
                            yield b
        """),
    
        "gerrit (struct)": Algorithm("""
            def file_byte_iterator(filename):
                with open(filename, "rb") as f:
                    fmt = '{}B'.format(FILE_SIZE)  # Reads entire file at once.
                    for b in struct.unpack(fmt, f.read()):
                        yield b
        """),
    
        'Rick M. (numpy)': Algorithm("""
            def file_byte_iterator(filename):
                for byte in np.fromfile(filename, 'u1'):
                    yield byte
        """),
    
        "Skurmedel": Algorithm("""
            def file_byte_iterator(filename):
                with open(filename, "rb") as f:
                    byte = f.read(1)
                    while byte:
                        yield byte
                        byte = f.read(1)
        """),
    
        "Tcll (array.array)": Algorithm("""
            def file_byte_iterator(filename):
                with open(filename, "rb") as f:
                    arr = array.array('B')
                    arr.fromfile(f, FILE_SIZE)  # Reads entire file at once.
                    for b in arr:
                        yield b
        """),
    
        "Vinay Sajip (read all into memory)": Algorithm("""
            def file_byte_iterator(filename):
                with open(filename, "rb") as f:
                    bytes_read = f.read()  # Reads entire file at once.
                for b in bytes_read:
                    yield b
        """),
    
        "Vinay Sajip (chunked)": Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    chunk = f.read(chunksize)
                    while chunk:
                        for b in chunk:
                            yield b
                        chunk = f.read(chunksize)
        """),
    
    }  # End algorithms
    
    #
    # Versions of algorithms that will only work in certain releases (or better) of Python.
    #
    if sys.version_info >= (3, 3):
        algorithms.update({
    
            'codeape + iter + partial + "yield from"': Algorithm("""
                def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                    with open(filename, "rb") as f:
                        for chunk in iter(partial(f.read, chunksize), b''):
                            yield from chunk
            """),
    
            'codeape + "yield from"': Algorithm("""
                def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                    with open(filename, "rb") as f:
                        while True:
                            chunk = f.read(chunksize)
                            if chunk:
                                yield from chunk
                            else:
                                break
            """),
    
            "jfs (mmap)": Algorithm("""
                def file_byte_iterator(filename):
                    with open(filename, "rb") as f, \
                         mmap(f.fileno(), 0, access=ACCESS_READ) as s:
                        yield from s
            """),
    
            'Rick M. (numpy) + "yield from"': Algorithm("""
                def file_byte_iterator(filename):
                #    data = np.fromfile(filename, 'u1')
                    yield from np.fromfile(filename, 'u1')
            """),
    
            'Vinay Sajip + "yield from"': Algorithm("""
                def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                    with open(filename, "rb") as f:
                        chunk = f.read(chunksize)
                        while chunk:
                            yield from chunk  # Added in Py 3.3
                            chunk = f.read(chunksize)
            """),
    
        })  # End Python 3.3 update.
    
    if sys.version_info >= (3, 5):
        algorithms.update({
    
            'Aaron Hall + "yield from"': Algorithm("""
                from pathlib import Path
    
                def file_byte_iterator(path):
                    ''' Given a path, return an iterator over the file
                        that lazily loads the file.
                    '''
                    path = Path(path)
                    bufsize = get_buffer_size(path)
    
                    with path.open('rb') as file:
                        reader = partial(file.read1, bufsize)
                        for chunk in iter(reader, bytes()):
                            yield from chunk
            """),
    
        })  # End Python 3.5 update.
    
    if sys.version_info >= (3, 8, 0):
        algorithms.update({
    
            'Vinay Sajip + "yield from" + "walrus operator"': Algorithm("""
                def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                    with open(filename, "rb") as f:
                        while chunk := f.read(chunksize):
                            yield from chunk  # Added in Py 3.3
            """),
    
            'codeape + "yield from" + "walrus operator"': Algorithm("""
                def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                    with open(filename, "rb") as f:
                        while chunk := f.read(chunksize):
                            yield from chunk
            """),
    
        })  # End Python 3.8.0 update.update.
    
    
    #### Main ####
    
    def main():
        global TEMP_FILENAME
    
        def cleanup():
            """ Clean up after testing is completed. """
            try:
                os.remove(TEMP_FILENAME)  # Delete the temporary file.
            except Exception:
                pass
    
        atexit.register(cleanup)
    
        # Create a named temporary binary file of pseudo-random bytes for testing.
        fd, TEMP_FILENAME = tempfile.mkstemp('.bin')
        with os.fdopen(fd, 'wb') as file:
             os.write(fd, bytearray(random.randrange(256) for _ in range(FILE_SIZE)))
    
        # Execute and time each algorithm, gather results.
        start_time = time.time()  # To determine how long testing itself takes.
    
        timings = []
        for label in algorithms:
            try:
                timing = TIMING(label,
                                min(timeit.repeat(algorithms[label].test,
                                                  setup=COMMON_SETUP + algorithms[label].setup,
                                                  repeat=TIMINGS, number=EXECUTIONS)))
            except Exception as exc:
                print('{} occurred timing the algorithm: "{}"\n  {}'.format(
                        type(exc).__name__, label, exc))
                traceback.print_exc(file=sys.stdout)  # Redirect to stdout.
                sys.exit(1)
            timings.append(timing)
    
        # Report results.
        print('Fastest to slowest execution speeds with {}-bit Python {}.{}.{}'.format(
                64 if sys.maxsize > 2**32 else 32, *sys.version_info[:3]))
        print('  numpy version {}'.format(np.version.full_version))
        print('  Test file size: {:,} KiB'.format(FILE_SIZE // KiB(1)))
        print('  {:,d} executions, best of {:d} repetitions'.format(EXECUTIONS, TIMINGS))
        print()
    
        longest = max(len(timing.label) for timing in timings)  # Len of longest identifier.
        ranked = sorted(timings, key=attrgetter('exec_time')) # Sort so fastest is first.
        fastest = ranked[0].exec_time
        for rank, timing in enumerate(ranked, 1):
            print('{:<2d} {:>{width}} : {:8.4f} secs, rel speed {:6.2f}x, {:6.2f}% slower '
                  '({:6.2f} KiB/sec)'.format(
                        rank,
                        timing.label, timing.exec_time, round(timing.exec_time/fastest, 2),
                        round((timing.exec_time/fastest - 1) * 100, 2),
                        (FILE_SIZE/timing.exec_time) / KiB(1),  # per sec.
                        width=longest))
        print()
        mins, secs = divmod(time.time()-start_time, 60)
        print('Benchmark runtime (min:sec) - {:02d}:{:02d}'.format(int(mins),
                                                                   int(round(secs))))
    
    main()
    

    【讨论】:

    • 你是否假设我使用yield from chunk 而不是for byte in chunk: yield byte?我想我应该加强我的回答。
    • @Aaron:您在 Python 3 结果中的答案有两个版本,其中一个使用yield from
    • 好的,我已经更新了我的答案。我也建议你放弃enumerate,因为应该理解迭代完成 - 如果没有,最后我检查过 - 与使用 += 1 为索引进行簿记相比,枚举有一些开销,因此你可以选择执行在您自己的代码中记账。甚至可以使用maxlen=0 传递给双端队列。
    • @Aaron:同意enumerate。感谢您的反馈。将在我的帖子中添加一个没有它的更新(尽管我认为它不会对结果产生太大影响)。还将添加@Rick M. 的基于numpy 的答案。
    • 更多代码审查:我认为此时编写 Python 2 的答案没有任何意义 - 我会考虑删除 Python 2,因为我希望您使用 64 位 Python 3.7 或 3.8。您可以使用 atexit 和部分应用程序将清理设置为最后。错字:“验证”。我认为测试字符串的重复没有任何意义——它们完全不同吗?我想如果你在__new__ 中使用super(). 而不是tuple.,你可以使用namedtuple 属性名称而不是索引。
    【解决方案8】:

    Python 3,一次读取所有文件:

    with open("filename", "rb") as binary_file:
        # Read the whole file at once
        data = binary_file.read()
        print(data)
    

    您可以使用data 变量来迭代任何您想要的东西。

    【讨论】:

      【解决方案9】:

      在尝试了以上所有方法并使用@Aaron Hall 的答案后,我在运行 Window 10、8 Gb RAM 和 Python 3.5 32 位的计算机上遇到了一个 ~90 Mb 文件的内存错误。一位同事建议我改用numpy,效果非常好。

      到目前为止,读取整个二进制文件(我已经测试过)的最快速度是:

      import numpy as np
      
      file = "binary_file.bin"
      data = np.fromfile(file, 'u1')
      

      Reference

      目前比任何其他方法都快。希望它可以帮助某人!

      【讨论】:

      • @Nirmal:问题是关于循环到达字节,所以不清楚您对不同数据类型的评论是否有任何影响。
      • Rick:你的代码做的事情和其他的不太一样——即循环遍历每个字节。如果将其添加到其中,至少根据我的benchmark 中的结果,它不会比大多数其他人快。事实上,它似乎是较慢的方法之一。如果对每个字节的处理(无论是什么)都可以通过numpy 完成,那么可能是值得的。
      • @martineau 感谢您的 cmets,是的,我确实理解问题是关于循环遍历每个字节,而不仅仅是一次性加载所有内容,但这个问题中还有其他答案也指向阅读所有内容,因此我的回答
      • @Nirmal 你也错了。文件中的 numpy 可以使用 dtypes 读取不同的类型: ==================================== dtheader= np.dtype ([('起始名称','b', (4,)), ('消息类型', np.int32, (1,)), ('Instance', np.int32, (1,)), ( 'NumItems', np.int32, (1,)), ('Length', np.int32, (1,)), ('ComplexArray', np.int32, (1,))]) dtheader=dtheader.newbyteorder ('>') headerinfo = np.fromfile(iqfile, dtype=dtheader, count=1)
      • @KurtPeters 哦,我不知道您可以传递自定义 dtype。谢谢!
      【解决方案10】:

      如果您要读取大量二进制数据,您可能需要考虑struct module。它被记录为“在 C 和 Python 类型之间”进行转换,但是当然,字节就是字节,而这些是否被创建为 C 类型并不重要。例如,如果您的二进制数据包含两个 2 字节整数和一个 4 字节整数,您可以按如下方式读取它们(示例取自 struct 文档):

      >>> struct.unpack('hhl', b'\x00\x01\x00\x02\x00\x00\x00\x03')
      (1, 2, 3)
      

      您可能会发现这比显式循环文件内容更方便、更快捷,或两者兼而有之。

      【讨论】:

        【解决方案11】:

        如果您正在寻找快速的东西,这是我多年来一直在使用的一种方法:

        from array import array
        
        with open( path, 'rb' ) as file:
            data = array( 'B', file.read() ) # buffer the file
        
        # evaluate it's data
        for byte in data:
            v = byte # int value
            c = chr(byte)
        

        如果你想迭代chars而不是ints,你可以简单地使用data = file.read(),它应该是py3中的一个bytes()对象。

        【讨论】:

        • 'array'由'from array import array'导入
        • @quanly_mc 是的,谢谢你看到这个,很抱歉我忘了包括那个,现在编辑。
        【解决方案12】:

        对于大尺寸我认为使用生成器不会不好,这个答案是为了读取文件之类的东西,尽管@codeapp 有一个类似的答案我认为删除内部循环会更有意义。

        def read_chunk(file_object, chunk_size=125):
            while True:
                file =  file_object.read(chunk_size)
                if not file:
                    break
                yield file
        
        
        #sample use 
        buffer = io.BytesIO()
        file = open('myfile', 'r')
        for chunk in read_chunk(file):
            buffer.write(chunk)
        buffer.seek(0)
        // save the file or do whatever you want here
        

        您仍然可以将其用作普通列表,我认为这没有任何用处,但是

        file_list = list(read_chunk(file, chunk_size=10000))
        for i in file_list:
            # do something
        

        并且还得到每个块的索引

        for index, chunk in enumurate(read_chunk(file, chunk_size=10000)):
            #use the index as a number index
            # you can try and get the size of each chunk with this 
            length = len(chunk)
        

        请注意,支付注意文件的大小,注意chunk_size总是以字节为单位。

        【讨论】:

          【解决方案13】:

          这是一个使用 Numpy fromfile 寻址上述@Nirmal cmets 读取网络字节序数据的示例:

          dtheader= np.dtype([('Start Name','b', (4,)),
                          ('Message Type', np.int32, (1,)),
                          ('Instance', np.int32, (1,)),
                          ('NumItems', np.int32, (1,)),
                          ('Length', np.int32, (1,)),
                          ('ComplexArray', np.int32, (1,))])
          dtheader=dtheader.newbyteorder('>')
          
          headerinfo = np.fromfile(iqfile, dtype=dtheader, count=1)
          
          print(raw['Start Name'])
          

          我希望这会有所帮助。问题是 fromfile 无法识别 EOF 并允许优雅地跳出任意大小的文件的循环。

          【讨论】:

            猜你喜欢
            • 2016-03-21
            • 2013-12-06
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2012-07-11
            • 2011-08-30
            • 1970-01-01
            相关资源
            最近更新 更多