【问题标题】:Write data to hdf file using multiprocessing使用多处理将数据写入 hdf 文件
【发布时间】:2013-03-20 04:47:39
【问题描述】:

这似乎是一个简单的问题,但我无法理解它。

我有一个模拟,它在双 for 循环中运行并将结果写入 HDF 文件。该程序的简单版本如下所示:

import tables as pt

a = range(10)
b = range(5)

def Simulation():
    hdf = pt.openFile('simulation.h5',mode='w')
    for ii in a:
        print(ii)
        hdf.createGroup('/','A%s'%ii)
        for i in b:
            hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i])
        hdf.close()
    return
Simulation()

此代码完全符合我的要求,但由于该过程可能需要很长时间才能运行,因此我尝试使用多处理模块并使用以下代码:

import multiprocessing
import tables as pt

a = range(10)
b = range(5)

def Simulation(ii):
    hdf = pt.openFile('simulation.h5',mode='w')
    print(ii)
        hdf.createGroup('/','A%s'%ii)
        for i in b:
            hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i])
        hdf.close()
    return

if __name__ == '__main__':
    jobs = []
    for ii in a:
        p = multiprocessing.Process(target=Simulation, args=(ii,))
        jobs.append(p)       
        p.start()

然而,这只会将最后一次模拟打印到 HDF 文件中,不知何故它会覆盖所有其他组。

【问题讨论】:

    标签: python multiprocessing hdf5


    【解决方案1】:

    每次以写入 (w) 模式打开文件时,都会创建一个新文件——因此,如果文件内容已经存在,则会丢失该文件的内容。只有最后一个文件句柄才能成功写入文件。即使您将其更改为附加模式,您也不应该尝试从多个进程写入同一个文件——如果两个进程同时尝试写入,输出将会出现乱码。

    相反,让所有工作进程将输出放入队列中,并让单个专用进程(子进程或主进程)处理队列的输出并写入文件:


    import multiprocessing as mp
    import tables as pt
    
    
    num_arrays = 100
    num_processes = mp.cpu_count()
    num_simulations = 1000
    sentinel = None
    
    
    def Simulation(inqueue, output):
        for ii in iter(inqueue.get, sentinel):
            output.put(('createGroup', ('/', 'A%s' % ii)))
            for i in range(num_arrays):
                output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
    
    
    def handle_output(output):
        hdf = pt.openFile('simulation.h5', mode='w')
        while True:
            args = output.get()
            if args:
                method, args = args
                getattr(hdf, method)(*args)
            else:
                break
        hdf.close()
    
    if __name__ == '__main__':
        output = mp.Queue()
        inqueue = mp.Queue()
        jobs = []
        proc = mp.Process(target=handle_output, args=(output, ))
        proc.start()
        for i in range(num_processes):
            p = mp.Process(target=Simulation, args=(inqueue, output))
            jobs.append(p)
            p.start()
        for i in range(num_simulations):
            inqueue.put(i)
        for i in range(num_processes):
            # Send the sentinal to tell Simulation to end
            inqueue.put(sentinel)
        for p in jobs:
            p.join()
        output.put(None)
        proc.join()
    

    为了比较,这里有一个使用mp.Pool的版本:

    import multiprocessing as mp
    import tables as pt
    
    
    num_arrays = 100
    num_processes = mp.cpu_count()
    num_simulations = 1000
    
    
    def Simulation(ii):
        result = []
        result.append(('createGroup', ('/', 'A%s' % ii)))
        for i in range(num_arrays):
            result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
        return result
    
    
    def handle_output(result):
        hdf = pt.openFile('simulation.h5', mode='a')
        for args in result:
            method, args = args
            getattr(hdf, method)(*args)
        hdf.close()
    
    
    if __name__ == '__main__':
        # clear the file
        hdf = pt.openFile('simulation.h5', mode='w')
        hdf.close()
        pool = mp.Pool(num_processes)
        for i in range(num_simulations):
            pool.apply_async(Simulation, (i, ), callback=handle_output)
        pool.close()
        pool.join()
    

    看起来更简单不是吗?然而,有一个显着的区别。原始代码使用output.put 将参数发送到handle_outputhandle_output 在其自己的子进程中运行。 handle_output 将从output 队列中取出args 并立即处理它们。使用上面的池代码,Simulationresult 中积累了一大堆args,并且resultSimulation 返回之前不会发送到handle_output

    如果Simulation 需要很长时间,则将有很长的等待时间,而simulation.h5 没有写入任何内容。

    【讨论】:

    • 除了这个问题,我已经成功使用了上面的代码,但现在我扩展了这个模拟,由 a = range(1000) 定义的 for 循环和 b = range ( 100)。然而,这导致我的记忆被广泛使用。我有 8 个 CPU 和 16 Gb RAM,但是当我运行文件时(即使没有真正的模拟),我的 RAM 使用率达到 100%,这导致我的系统停止运行。
    • 我认为我们需要将子流程的数量与任务的数量分开。听起来您想要 1000 个任务,但可能不是 1000 个子流程。我将编辑帖子以建议您可以做到这一点的方法。
    • Pool 是更高级别的设置,而 Process 为您提供基本控制。 Pool 具有timeout parameter 之类的功能,可用于终止耗时过长的任务。 apply_async 方法也有一个 callback parameter ,可用于在任务结束时在调用进程中运行代码。您可以使用它而不是使 handle_output 成为子进程。
    • 如果您需要这些功能或者只是更喜欢语法,请使用Pool。我有点矛盾。如果 Pool 具有我需要的功能,我会使用 Pool。但有时我喜欢只使用进程和队列。
    • @PirateApp:我对芹菜不是很熟悉。据我了解,它用于farming out asynchronous tasks across a distributed cluster,上面没有实现many features
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-06-18
    • 2022-07-06
    • 2022-07-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多