【问题标题】:Python Pickle StreamingPython Pickle 流式传输
【发布时间】:2021-11-02 12:42:27
【问题描述】:

我想弄清楚如何以可流式方式编写此代码:

data = pickle.dumps(obj)
fp = io.BytesIO(data)

通常可以调用pickle.dump,但这需要您提供编写器文件作为参数。相反,我有一个“upload_from_file”函数,所以我想将文件指针传递给该函数,因此是 BytesIO 代码。

这种方式的问题是我在内存中复制了数据,如果它是可流式的,我更喜欢它。

【问题讨论】:

  • 这会有点棘手,因为pickle.dump()(以及底层的底层函数调用)是同步的;您需要一个自定义的“fifo”样式的类文件对象,在一个线程中运行转储,该线程馈入该 fifo 并从该 fifo 上传到主线程中。
  • 如果问题只是“内存中的数据过多”,使用tempfile.TemporaryFile 可能会有所帮助。 pickle.dump(obj, temp),然后是temp.seek(0)(返回指向文件开头的文件指针),然后将temp 传递给upload_from_file
  • (但这意味着数据将被写入磁盘,如果你不走运,会写入由内存支持的/tmp tmpfs...)
  • @AKX:是的。虽然没有太多选择;如果他们想使用需要类似文件的对象的东西,他们需要一个类似文件的对象。数据需要在某个地方,无论是在内存中还是在磁盘上。您的解决方案可以避免其中任何一种(某种程度上,通过在使用数据的同时实时生成数据),但它会变得更加复杂。请注意,将tempfile.NamedTemporaryFile 与明确的dir 一起使用应该避免使用由内存支持的临时存储的风险(以降低操作系统在不需要写入磁盘时避免写入磁盘的能力为代价)。
  • 您可以通过执行fp = io.BytesIO()pickle.dump(obj, fp)fp.seek(0) 然后将fp 传递给任何想要的对象来避免内存中重复数据的两个副本。

标签: python pickle


【解决方案1】:

方法 1:一个线程和一个 Python FIFO

正如我在 cmets 中提到的,您需要一个线程和一个 FIFO,而且它会变得很复杂。 (您可能需要在 FIFO 中实现其他方法,具体取决于您实际需要的 upload_from_file 函数)。

import os
import queue
import pickle
import threading
from typing import Union


class FIFOStream:
    def __init__(self, maxsize=0):
        self.queue = queue.Queue(maxsize)

    def write(self, chunk: Union[bytes, None]):
        if chunk:
            print(f"Queued {len(chunk)} bytes")
        self.queue.put(chunk)

    def read(self):
        chunk = self.queue.get(True)
        if chunk is None:  # EOF marker encountered
            raise EOFError()
        return chunk


def do_pickling(fifo: FIFOStream, obj):
    pickle.dump(obj, fifo, protocol=pickle.HIGHEST_PROTOCOL)
    fifo.write(None)  # write EOF marker after Pickle is done


def upload_from_file(fifo):
    n = 0
    while True:
        try:
            chunk = fifo.read()
        except EOFError:
            break
        n += len(chunk)
        print(f"Uploading chunk of size {len(chunk)}")
    print(f"Finished uploading {n} bytes!")


def main():
    data = {a: os.urandom(1024) for a in range(500)}
    fifo = FIFOStream(maxsize=3)  # adjust maxsize to something larger in real use :)
    dumper = threading.Thread(target=do_pickling, args=(fifo, data))
    dumper.start()
    upload_from_file(fifo)
    dumper.join()


if __name__ == "__main__":
    main()

这会打印出类似的东西

Queued 66062 bytes
Queued 66057 bytes
Uploading chunk of size 66062
Queued 66057 bytes
Uploading chunk of size 66057
Queued 66057 bytes
Queued 66121 bytes
Queued 66121 bytes
Uploading chunk of size 66057
Queued 66121 bytes
Uploading chunk of size 66057
Uploading chunk of size 66121
Uploading chunk of size 66121
Queued 53727 bytes
Uploading chunk of size 66121
Uploading chunk of size 53727
Finished uploading 516323 bytes!

方法2:一个线程和一个系统FIFO

正如 ShadowRanger 在 cmets 中指出的那样,您也可以使用 os.pipe() 来执行此操作,它为您提供了一个由系统管理的 FIFOesque 文件:

import os
import pickle
import threading
from typing import IO


def upload_from_file(io: IO[bytes]):
    n = 0
    while True:
        try:
            chunk = io.read(65536)
        except EOFError:
            break
        if not chunk:  # Empty chunk = EOF
            break
        n += len(chunk)
        print(f"Uploading chunk of size {len(chunk)}")
    print(f"Finished uploading {n} bytes!")


def dump_and_close(data, file: IO[bytes]):
    pickle.dump(data, file, protocol=pickle.HIGHEST_PROTOCOL)
    file.close()  # so the reader end will be EOF


def main():
    data = {a: os.urandom(1024) for a in range(500)}
    r_fd, w_fd = os.pipe()
    with open(r_fd, "rb") as r_file, open(w_fd, "wb") as w_file:
        dumper = threading.Thread(target=dump_and_close, args=(data, w_file))
        dumper.start()
        upload_from_file(r_file)
        dumper.join()


if __name__ == "__main__":
    main()

虽然输出有点无聊。 ;-)

Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 57571
Finished uploading 516323 bytes!

【讨论】:

  • 如果目标是尽量减少内存使用,我强烈建议在queue.Queue 上设置maxsize;否则,在任何基于网络的上传代码处理超过少量块之前,大部分数据很可能会被腌制并在内存中等待。
  • 也就是说,这里有使用 Python 级别队列的理由吗?似乎os.pipe + os.fdopen 可以用来解决这个问题,这样阅读器就是一个真正的文件对象(例如,它可以与许多直接操作底层文件描述符的东西一起工作,只要他们不这样做需要一个可查找的文件),并且管道的性质意味着您可以“免费”阻塞(避免无限的内存消耗)。 upload_from_file 很可能会做类似的事情(而且它很可能不喜欢不接受 readsize 参数的 API)。
  • 添加了 Maxsize,很重要。至于使用os.pipe...是的,这也可以,但我会小心溢出管道缓冲区。
  • 嗯,用os.pipe() 实现这一点也不难——也编辑了它。感谢您的想法!
  • 不错。别客气!我会注意到,如果upload_from_file 明确地执行类似chunk = io.read1() (可能指定自定义大小)之类的操作,以模拟执行单个系统调用以获取可用数据的更合理的上传代码,那么输出将不那么无聊(不阻塞读取整个文件)并在请求另一个块之前将其写入网络。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-05-03
  • 2016-09-18
  • 1970-01-01
  • 1970-01-01
  • 2020-07-17
  • 2017-04-27
相关资源
最近更新 更多