【问题标题】:Reading large file with Python Multiprocessing使用 Python 多处理读取大文件
【发布时间】:2019-12-11 14:39:16
【问题描述】:

我正在尝试使用 python 读取 > 20Gb 的大型文本文件。 文件包含 400 帧的原子位置,就我在此代码中的计算而言,每一帧都是独立的。理论上,我可以将工作拆分为 400 个任务,而无需任何沟通。每帧有 1000000 行,因此文件有 1000 000* 400 行文本。 我最初的方法是对工人池使用多处理:

def main():
   """ main function
   """
   filename=sys.argv[1]
   nump = int(sys.argv[2])
   f = open(filename)
   s = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
   cursor = 0
   framelocs=[]
   start = time.time()
   print (mp.cpu_count())
   chunks = []
   while True:
        initial = s.find(b'ITEM: TIMESTEP', cursor)
        if initial == -1:
            break
        cursor = initial + 14
        final = s.find(b'ITEM: TIMESTEP', cursor)
        framelocs.append([initial,final])
        #readchunk(s[initial:final])
        chunks.append(s[initial:final])
        if final == -1:
           break

这里基本上我正在寻找文件以查找框架的开头和结尾,以使用 python mmap 模块打开文件以避免将所有内容读入内存。

def readchunk(chunk):
   start = time.time()
   part = chunk.split(b'\n')
   timestep= int(part[1])
   print(timestep)

现在我想将文件块发送到工作人员池进行处理。 读取部分应该更复杂,但这些行将在稍后实现。

   print('Seeking file took %8.6f'%(time.time()-start))
   pool = mp.Pool(nump)
   start = time.time()
   results= pool.map(readchunk,chunks[0:16])
   print('Reading file took %8.6f'%(time.time()-start))

如果我通过将 8 个块发送到 8 个内核来运行它,则需要 0.8 sc 才能读取。 然而 如果我通过将 16 个块发送到 16 个内核来运行它,则需要 1.7 sc。似乎并行化并没有加快速度。如果相关,我正在 Oak Ridge 的 Summit 超级计算机上运行它,我正在使用这个命令:

jsrun -n1 -c16 -a1 python -u ~/Developer/DipoleAnalyzer/AtomMan/readlargefile.py DW_SET6_NVT.lammpstrj 16

这应该创建 1 个 MPI 任务并将 16 个内核分配给 16 个线程。 我在这里想念什么吗? 有更好的方法吗?

【问题讨论】:

  • (1) 我不确定这是否真的可以避免复制块。最好只将块边界发送到子进程并让它们读取实际块。 (2) 一个简单的测试代码可能比实际工作的开销更大,因此时间可能不具有代表性。
  • 并行化不会加快磁盘 I/O 到包含文件的单个物理驱动器的速度 - 并且使用多进程通常会自行引入相当多的开销。

标签: python python-multiprocessing large-files


【解决方案1】:

正如其他人所说,制作流程时会有一些开销,因此如果使用小样本进行测试,您可能会看到速度变慢。

这样的东西可能更整洁。确保您了解生成器函数的作用。

import multiprocessing as mp
import sys
import mmap


def do_something_with_frame(frame):
    print("processing a frame:")
    return 100


def frame_supplier(filename):
    """A generator for frames"""
    f = open(filename)
    s = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)

    cursor = 0
    while True:
        initial = s.find(b'ITEM: TIMESTEP', cursor)
        if initial == -1:
            break
        cursor = initial + 14
        final = s.find(b'ITEM: TIMESTEP', cursor)

        yield s[initial:final]

        if final == -1:
            break


def main():
    """Process a file of atom frames

    Args:
      filename: the file to process
      processes: the size of the pool
    """
    filename = sys.argv[1]
    nump = int(sys.argv[2])

    frames = frame_supplier(filename)

    pool = mp.Pool(nump)

    # play around with the chunksize
    for result in pool.imap(do_something_with_frame, frames, chunksize=10):
        print(result)

免责声明:这是一个建议。可能存在一些语法错误。我没有测试过。

编辑:

  • 听起来您的脚本变得 I/O 受限(即受限于您可以从磁盘读取的速率)。您应该能够通过将do_something_with_frame 的正文设置为pass 来验证这一点。如果程序是 I/O 绑定的,它仍然需要几乎一样长的时间。

  • 我认为 MPI 不会在这里产生任何影响。我认为文件读取速度可能是一个限制因素,我看不出 MPI 会有什么帮助。

  • 此时值得进行一些分析以找出哪些函数调用花费的时间最长。

  • 不使用 mmap() 也值得一试:

frame = []
with open(filename) as file:
    for line in file:
        if line.beginswith('ITEM: TIMESTEP'):
            yield frame
        else:
            frame.append(line)

【讨论】:

  • 你能解释一下吗?这里的 chunksize=10 意味着您要向每个池工作人员发送 10 帧?这里的结果也是有序的?
  • 我已经测试了你的代码,它比我的更简洁。但是,当我在 8 核或 16 核上运行它时,读取 20gb 文件的速度仍然相同,大约为 103 秒。池工作人员之间是否存在访问同一文件的竞争条件?你认为 MPI4PY 会更好吗?
  • @dundar yilmaz,是的,chunksize=10 意味着您要向每个池工作人员发送 10 帧。我不确定这对内存使用有什么影响。 IMO,这个问题的答案给出了一个很好的解释:stackoverflow.com/questions/53306927/…
猜你喜欢
  • 1970-01-01
  • 2016-09-02
  • 1970-01-01
  • 2017-04-14
  • 2022-06-12
  • 2018-11-11
  • 2019-12-13
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多