【问题标题】:Python Multiprocessing: Writing to file every k iterationsPython多处理:每k次迭代写入文件
【发布时间】:2021-03-09 14:10:42
【问题描述】:

我正在使用python 3.7 中的multiprocessing 模块来并行重复调用一个函数。我想每 k 次迭代将结果写入一个文件。 (每次可以是不同的文件。)

下面是我的第一次尝试,它基本上循环遍历函数参数集,并行运行每个集合并将结果写入文件,然后再移动到下一个集合。这显然是非常低效的。实际上,我的函数运行所需的时间要长得多,并且取决于输入值,因此许多处理器在循环的迭代之间处于空闲状态。

有没有更有效的方法来实现这一点?

import multiprocessing as mp
import numpy as np
import pandas as pd

def myfunction(x): # toy example function
  return(x**2)

for start in np.arange(0,500,100):
    
    with mp.Pool(mp.cpu_count()) as pool:
        out = pool.map(myfunction, np.arange(start, start+100))
    
    pd.DataFrame(out).to_csv('filename_'+str(start//100+1)+'.csv', header=False, index=False) 

【问题讨论】:

    标签: python python-3.x multiprocessing file-writing


    【解决方案1】:

    我的第一条评论是,如果 myfunction 像您所展示的那样微不足道,那么使用多处理时您的性能会更差,因为创建进程池会产生开销(顺便说一下,您不必要地创建在每个循环迭代中结束)并将参数从一个进程传递到另一个进程。

    假设 myfunction 是纯 CPU 并且在 map 返回 100 个值之后,有机会重叠写入您没有利用的 CSV 文件(目前尚不清楚通过并发磁盘写入;这取决于您拥有的驱动器类型、磁头移动等),那么多线程和多处理的组合可能是解决方案。假设myfunction 是 100% CPU 并且不会释放全局解释器锁,因此无法利用大于您拥有的 CPU。无论如何,这是我的假设。例如,如果您要使用某些 numpy 函数,那么这是一个错误的假设。另一方面,众所周知numpy 将多处理用于其自身的某些处理,在这种情况下,将使用numpy 和您自己的多处理结合起来可能会导致性能下降。您当前的代码仅使用 numpy 生成范围。这似乎有点矫枉过正,因为还有其他生成范围的方法。我冒昧地通过定义STARTSTOP 值和N_SPLITS 以稍微不同的方式生成范围,该范围的相等(或尽可能相等)划分的数量并生成元组可以转换为范围的开始和停止值。我希望这不会太混乱。但这似乎是一种更灵活的方法。

    在下面的代码中,创建了一个线程池和一个处理池。任务被提交到线程池,其中一个参数是处理池,worker 使用 whish 来执行 CPU 密集型计算,然后在组装结果后,worker 写出 CSV 文件。

    from multiprocessing.pool import Pool, ThreadPool
    from multiprocessing import cpu_count
    import pandas as pd
    
    def worker(process_pool, index, split_range):
        out = process_pool.map(myfunction, range(*split_range))
        pd.DataFrame(out).to_csv(f'filename_{index}.csv', header=False, index=False)
    
    def myfunction(x): # toy example function
      return(x ** 2)
    
    def split(start, stop, n):
        k, m = divmod(stop - start, n)
        return [(i * k + min(i, m),(i + 1) * k + min(i + 1, m)) for i in range(n)]
    
    def main():
        RANGE_START = 0
        RANGE_STOP = 500
        N_SPLITS = 5
        n_processes = min(N_SPLITS, cpu_count())
        split_ranges = split(RANGE_START, RANGE_STOP, N_SPLITS) # [(0, 100), (100, 200), ... (400, 500)]
        process_pool = Pool(n_processes)
        thread_pool = ThreadPool(N_SPLITS)
        for index, split_range in enumerate(split_ranges):
            thread_pool.apply_async(worker, args=(process_pool, index, split_range))
        # wait for all threading tasks to complete:
        thread_pool.close()
        thread_pool.join()
    
    # required for Windows:
    if __name__ == '__main__':
        main()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-09-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-03-10
      • 2012-11-06
      • 2019-05-25
      • 2017-11-17
      相关资源
      最近更新 更多