【问题标题】:Reading a single file from multiple threads in python从python中的多个线程读取单个文件
【发布时间】:2012-02-07 12:02:31
【问题描述】:

我正在用 python 编写一个多线程解压缩器。每个线程需要访问输入文件的不同块。

注意 1:无法加载整个文件,因为它的范围从 15 Gb 到 200 Gb;我不是使用多线程来加速数据读取,而是数据解压,我只是想确保数据读取不会减慢解压速度。

注意2:GIL不是问题,这里,因为主解压器函数是一个C扩展,它调用Py_ALLOW_THREADS,所以GIL在解压的时候被释放。第二阶段解压使用同样无 GIL 的 numpy。

1) 我认为简单地共享一个 Decompressor 对象(它基本上包装一个文件对象)是行不通的,因为如果线程 A 调用以下内容:

decompressor.seek(x)
decompressor.read(1024)

和线程 B 做同样的事情,线程 A 可能最终从线程 B 偏移读取。这是正确的吗?

2) 现在我只是让每个线程创建自己的 Decompressor 实例,它似乎可以工作,但我不确定这是最好的方法。 我考虑了这些可能性:

  • 添加类似的东西

    seekandread(from_where, length)
    

    到获取锁、寻找、读取和释放锁的 Decompressor 类;

  • 创建一个等待读取请求并按正确顺序执行它们的线程。

那么,我错过了一个明显的解决方案吗?这些方法之间是否存在显着的性能差异?

谢谢

【问题讨论】:

  • 如果您有硬盘驱动器,以多线程方式读取文件实际上会减慢进程。针必须从一个地方跳到另一个地方,而不是以迭代的方式工作。您应该在处理之前加载文件。
  • 不可能加载整个文件,因为它的范围从 15 Gb 到 200 Gb;我不是使用多线程来加速数据读取,而是数据解压,我只是想确保数据读取不会减慢解压速度。
  • 当然,这可能适用于 SSD,也可能不适用于 SSD。我对这个主题一无所知。你不应该依靠硬件来做到这一点。一旦 SSD 足够普遍,以多线程方式执行 I/O 可能会很有效。
  • 能否请您在问题中澄清一下?
  • 好的,我已经添加了注释

标签: python multithreading file-io


【解决方案1】:

您可以使用 mmap。见mmap() vs. reading blocks

正如 Tim Cooper 所说,当您具有随机访问权限时,mmap 是一个好主意(多个线程会让您看起来像这样),并且它们将能够共享相同的物理页面。

【讨论】:

  • 这看起来很棒!我查看了 mmap 的 python 文档,但找不到关于线程安全的参考。如果 2 个线程同时执行 a=mappedfile[x:y] 之类的操作,它会按预期工作吗?
  • 回答我自己,看来 python mmap 切片表示法实际上是线程安全的。我创建了一个测试程序,它从不同的线程访问映射文件的不同部分,并检查结果。如果我使用切片表示法它通过测试,如果我使用查找/读取它会失败。不过,我仍然需要检查性能。
  • @Alberto:在我看来,任何已经被处理的给定段都应该至少受到一个互斥体的保护,如果不是抛出条件信号量的话。通过抛出条件信号量,我的意思是一个信号量,如果不满足预进入条件,它不会等待,而是抛出异常。它是信号量和信号量的混合体。您可能只想在不满足条件 B 时抛出,并在满足条件 A 时等待。
  • @Alberto:您需要一种锁定机制,当在段上打开一个新线程时,该机制会将段分成两个段。例如。一个线程从 0 - 1024 读取,并创建一个新线程并将其分配给 0 - 1024。第一个线程已经处理了前 100 个字节,因此您可以将工作分配给第二个线程。仅当您确实需要优化时才使用它。如果你愿意,我可以用更详细的算法留下另一个答案。
  • @Alberto:是的,我是认真的。但我也意味着两个线程可以分配给相同的数据,它们只会分配给它的一部分。
【解决方案2】:

如果您还没有这样做,您可能想要使用Leader/Follower 模式。
领导者线程将知道哪些段已被处理,哪些尚未处理,并将分配给自己下一个未处理的段,然后成为追随者,将领导权留给池中的下一个可用线程。

【讨论】:

  • 谢谢,我会调查的。
【解决方案3】:

CPython 具有 GIL,因此多线程不会提高 CPU 密集型任务的性能。

如果问题不是 IO 限制的(磁盘提供/存储数据的速度比 CPU 解压缩它的速度快),您可以使用multiprocessing module:每个进程打开文件并解压缩给定的字节范围。

【讨论】:

  • 主要的解压器函数是一个C扩展,它调用Py_ALLOW_THREADS,所以GIL在解压的同时被释放。第二阶段解压使用同样不含 gil 的 numpy。我已经测量了一个很好的加速。
  • (也许这个澄清 - 关于你已经“照顾”了 GIL - 也可以进入问题主体)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2011-04-01
  • 2013-08-20
  • 1970-01-01
  • 2010-11-28
  • 1970-01-01
  • 2011-12-28
相关资源
最近更新 更多