方法 1:一个线程和一个 Python FIFO
正如我在 cmets 中提到的,您需要一个线程和一个 FIFO,而且它会变得很复杂。 (您可能需要在 FIFO 中实现其他方法,具体取决于您实际需要的 upload_from_file 函数)。
import os
import queue
import pickle
import threading
from typing import Union
class FIFOStream:
def __init__(self, maxsize=0):
self.queue = queue.Queue(maxsize)
def write(self, chunk: Union[bytes, None]):
if chunk:
print(f"Queued {len(chunk)} bytes")
self.queue.put(chunk)
def read(self):
chunk = self.queue.get(True)
if chunk is None: # EOF marker encountered
raise EOFError()
return chunk
def do_pickling(fifo: FIFOStream, obj):
pickle.dump(obj, fifo, protocol=pickle.HIGHEST_PROTOCOL)
fifo.write(None) # write EOF marker after Pickle is done
def upload_from_file(fifo):
n = 0
while True:
try:
chunk = fifo.read()
except EOFError:
break
n += len(chunk)
print(f"Uploading chunk of size {len(chunk)}")
print(f"Finished uploading {n} bytes!")
def main():
data = {a: os.urandom(1024) for a in range(500)}
fifo = FIFOStream(maxsize=3) # adjust maxsize to something larger in real use :)
dumper = threading.Thread(target=do_pickling, args=(fifo, data))
dumper.start()
upload_from_file(fifo)
dumper.join()
if __name__ == "__main__":
main()
这会打印出类似的东西
Queued 66062 bytes
Queued 66057 bytes
Uploading chunk of size 66062
Queued 66057 bytes
Uploading chunk of size 66057
Queued 66057 bytes
Queued 66121 bytes
Queued 66121 bytes
Uploading chunk of size 66057
Queued 66121 bytes
Uploading chunk of size 66057
Uploading chunk of size 66121
Uploading chunk of size 66121
Queued 53727 bytes
Uploading chunk of size 66121
Uploading chunk of size 53727
Finished uploading 516323 bytes!
方法2:一个线程和一个系统FIFO
正如 ShadowRanger 在 cmets 中指出的那样,您也可以使用 os.pipe() 来执行此操作,它为您提供了一个由系统管理的 FIFOesque 文件:
import os
import pickle
import threading
from typing import IO
def upload_from_file(io: IO[bytes]):
n = 0
while True:
try:
chunk = io.read(65536)
except EOFError:
break
if not chunk: # Empty chunk = EOF
break
n += len(chunk)
print(f"Uploading chunk of size {len(chunk)}")
print(f"Finished uploading {n} bytes!")
def dump_and_close(data, file: IO[bytes]):
pickle.dump(data, file, protocol=pickle.HIGHEST_PROTOCOL)
file.close() # so the reader end will be EOF
def main():
data = {a: os.urandom(1024) for a in range(500)}
r_fd, w_fd = os.pipe()
with open(r_fd, "rb") as r_file, open(w_fd, "wb") as w_file:
dumper = threading.Thread(target=dump_and_close, args=(data, w_file))
dumper.start()
upload_from_file(r_file)
dumper.join()
if __name__ == "__main__":
main()
虽然输出有点无聊。 ;-)
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 65536
Uploading chunk of size 57571
Finished uploading 516323 bytes!