【问题标题】:How to convert an iterable to a stream?如何将可迭代对象转换为流?
【发布时间】:2011-10-03 05:21:30
【问题描述】:

如果我有一个包含字符串的可迭代对象,有没有一种简单的方法可以将其转换为流?我想做这样的事情:

def make_file():
    yield "hello\n"
    yield "world\n"

output = tarfile.TarFile(…)
stream = iterable_to_stream(make_file())
output.addfile(…, stream)

【问题讨论】:

  • 我不太了解流,但你想要stream = io.StringIO("".join(make_file())) 吗?
  • 不——我不想那样做。 make_file() 可能会返回一个大文件,我宁愿不把它加载到内存中。
  • @TokenMacGuy:抱歉,我不认为我看到了那个链接的意义……

标签: python stream iterator


【解决方案1】:

一个伟大的Mechanical snail 的答案的一点修改版本。在这里,readinto(b) 实现对底层迭代器进行了多次调用,以便为给定的可写字节类对象b 的大小收集尽可能多的字节。

class IteratorReader(io.RawIOBase):

    def __init__(self, iterator):
        self.iterator = iterator
        self.leftover = []

    def readinto(self, buffer: bytearray) -> Optional[int]:
        size = len(buffer)
        while len(self.leftover) < size:
            try:
                self.leftover.extend(next(self.iterator))
            except StopIteration:
                break

        if len(self.leftover) == 0:
            return 0

        output, self.leftover = self.leftover[:size], self.leftover[size:]
        buffer[:len(output)] = output
        return len(output)

    def readable(self) -> bool:
        return True

及用法:

def iterator1():
    for i in ('a', 'b', 'c', 'd', 'e', 'f', 'g'):
        res = i * 3
        yield res.encode("utf8")


iterreader = IteratorReader(iterator1())
while True:
    r = iterreader.read(4)
    if not r:
        break
    print(r)

【讨论】:

    【解决方案2】:

    这是我的流式迭代器 urllib3 的实验分支,支持通过可迭代的流式分块请求:

    class IterStreamer(object):
        """
        File-like streaming iterator.
        """
        def __init__(self, generator):
            self.generator = generator
            self.iterator = iter(generator)
            self.leftover = ''
    
        def __len__(self):
            return self.generator.__len__()
    
        def __iter__(self):
            return self.iterator
    
        def next(self):
            return self.iterator.next()
    
        def read(self, size):
            data = self.leftover
            count = len(self.leftover)
    
            if count < size:
                try:
                    while count < size:
                        chunk = self.next()
                        data += chunk
                        count += len(chunk)
                except StopIteration:
                    pass
    
            self.leftover = data[size:]
    
            return data[:size]
    

    带有上下文的来源: https://github.com/shazow/urllib3/blob/filepost-stream/urllib3/filepost.py#L23

    相关单元测试: https://github.com/shazow/urllib3/blob/filepost-stream/test/test_filepost.py#L9

    唉,这个代码还没有进入稳定分支,因为对无大小块请求的支持很差,但它应该是你尝试做的一个很好的基础。有关如何使用它的示例,请参阅源链接。

    【讨论】:

    • 这有一个错误,它将永远继续发出最后剩余的数据。
    • pass 换成return data,错误就消失了。
    • 没有。将pass 替换为self.leftover = ''; return data,错误就消失了。
    • 修复了你们提到的错误。很抱歉没有回复,很长时间没有注意到 Stackoverflow 的通知。 :)
    • read 仍然有一个过时剩余的错误,已通过此差异中的更改修复github.com/jennyyuejin/Kaggle/commit/…
    【解决方案3】:

    Python 3 具有 a new I/O stream API (library docs),取代了旧的类文件对象协议。 (新的 API 在 Python 2 中的 io 模块中也可用,并且它向后兼容类文件对象协议。)

    Here's an implementation for the new API,在 Python 2 和 3 中:

    import io
    
    def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
        """
        Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
        input stream.
    
        The stream implements Python 3's newer I/O API (available in Python 2's io module).
        For efficiency, the stream is buffered.
        """
        class IterStream(io.RawIOBase):
            def __init__(self):
                self.leftover = None
            def readable(self):
                return True
            def readinto(self, b):
                try:
                    l = len(b)  # We're supposed to return at most this much
                    chunk = self.leftover or next(iterable)
                    output, self.leftover = chunk[:l], chunk[l:]
                    b[:len(output)] = output
                    return len(output)
                except StopIteration:
                    return 0    # indicate EOF
        return io.BufferedReader(IterStream(), buffer_size=buffer_size)
    

    示例用法:

    with iterable_to_stream(str(x**2).encode('utf8') for x in range(11)) as s:
        print(s.read())
    

    【讨论】:

    • 在 2020 年和 Python 3.8 中,仍然是最好的方法吗?试过了,它仍然有效,但也许它可以简化?
    【解决方案4】:

    由于看起来没有“标准”的做法,我拼凑了一个简单的实现:

    class iter_to_stream(object):
        def __init__(self, iterable):
            self.buffered = ""
            self.iter = iter(iterable)
    
        def read(self, size):
            result = ""
            while size > 0:
                data = self.buffered or next(self.iter, None)
                self.buffered = ""
                if data is None:
                    break
                size -= len(data)
                if size < 0:
                    data, self.buffered = data[:size], data[size:]
                result += data
            return result
    

    【讨论】:

      【解决方案5】:

      TarFile 接受任何提供file-like interface 的东西——因此您可以使用StringIO(如果您使用的是Python 3.X,则使用io.StringIO)来生成您需要的TarFile.addfile() 或者您可以创建自己的提供file-like interface 并产生您需要的类。

      【讨论】:

      • 对——但是有没有办法通过 StringIO 流式传输迭代器?在将其写入 StringIO 之前,我宁愿不将整个输入文件加载到内存中。
      • @David——我不知道。我会给你一个在StringIO 周围包装一个类的例子,但看起来你已经得到了你需要的东西:-)
      【解决方案6】:

      起点:

      class iterable_to_stream:
          def __init__(self, iterable):
              self.iter = iter(iterable)
      
          def read(self):
              try:
                  return self.iter.next()
              except StopIteration:
                  return ""
      

      【讨论】:

      • 嗯……虽然这肯定会自行爆炸(如果next(iter) 返回"" 怎么办?如果有人有胆量将大小传递给read(…) 怎么办)……我想我可以使用BufferedReader 来处理这些细节……
      • 对不起,伙计,这似乎行不通。 BufferedReader 需要一个RawIOBase 的实例,而这与实现该接口相去甚远……而且它甚至没有实现基本的流API(例如,read() 不接受大小)。
      • @David Wolever:似乎为您的可迭代对象编写类似RawIOBase 的包装器并将其传递给BufferReader 是可行的。 RawIOBase 对象只有 4 种方法,您可能只需要实现 3 种 read...() 即可。
      猜你喜欢
      • 2017-12-30
      • 2021-05-26
      • 2015-02-21
      • 2021-03-08
      • 2018-10-29
      • 2016-08-12
      • 2017-05-28
      相关资源
      最近更新 更多