【问题标题】:How do I use multiprocessing on Python to speed up a for loop?如何在 Python 上使用多处理来加速 for 循环?
【发布时间】:2020-11-14 06:59:19
【问题描述】:

我有这段代码,我想使用多处理来加速:

matrix=[]

for i in range(len(datasplit)):
    matrix.append(np.array(np.asarray(datasplit[i].split()),dtype=float))

变量“datasplit”是一个逗号分隔的字符串列表。每个字符串有大约 50 个数字,由空格分隔。对于每个字符串,此代码在这些数字之间添加逗号而不是空格,将整个字符串转换为数组,并将每个单独的数字转换为字符串。这现在看起来像一个逗号分隔的字符串数组,其中每个字符串是 50 个数字中的 1 个。然后代码将这些字符串转换为浮点数,所以现在我们有一个由 50 个逗号分隔的数字组成的数组。代码运行后,打印“矩阵”会给出一个数组列表,其中每个数组有 50 个逗号分隔的数字。

现在我的问题是数据拆分的长度很大。它的长度约为 10^7。此代码大约需要 15 分钟才能运行。我需要为 124 个其他类似大小的样本运行此程序,因此我想使用多处理来加快运行时间。

我将如何使用多处理重新编写我的代码以使其运行得更快?

感谢您的帮助。

【问题讨论】:

    标签: python parallel-processing multiprocessing python-multiprocessing ipython-parallel


    【解决方案1】:

    这会将您的任务拆分为多个内核,并将您的性能提高至少 4-8 倍:

    from multiprocessing import Pool
    import os
    import numpy as np
    
    pool = Pool(os.cpu_count())
    
    # Add your data to the datasplit variable below:
    datasplit = []
    
    results = pool.map(lambda x: np.array(np.asarray(x.split()),dtype=float), datasplit)
    
    pool.close()
    pool.join()
    
    

    【讨论】:

      【解决方案2】:

      Python 标准库为多处理提供了两个选项:模块multiprocessingconcurrent.futures。第二个在第一个之上添加了一层抽象。对于像您这样的简单地图场景,用法非常简单。

      这里有一些东西可以试验:

      import numpy as np
      from time import time
      from os import cpu_count
      from multiprocessing import Pool
      from concurrent.futures import ProcessPoolExecutor
      
      def string_to_float(string):
          return np.array(np.asarray(string.split()), dtype=float)
      
      if __name__ == '__main__':
      
          # Example datasplit
          rng = np.random.default_rng()
          num_strings = 100000
          datasplit = [' '.join(str(n) for n in rng.random(50))
                       for _ in range(num_strings)]
      
          # Looping (sequential processing)
          start = time()
          matrix = []
          for i in range(len(datasplit)):
              matrix.append(np.array(np.asarray(datasplit[i].split()), dtype=float))
          print(f'Duration of sequential processing: {time() - start:.2f} secs')
      
          # Setting up multiprocessing
          num_workers = int(0.8 * cpu_count())
          chunksize = max(1, int(len(datasplit) / num_workers))
      
          # Multiprocessing with Pool
          start = time()
          with Pool(num_workers) as p:
              matrix = p.map(string_to_float, datasplit, chunksize)
          print(f'Duration of parallel processing (Pool): {time() - start:.2f} secs')
      
          # Multiprocessing with ProcessPoolExecutor 
          start = time()
          with ProcessPoolExecutor(num_workers) as ppe:
              matrix = list(ppe.map(string_to_float, datasplit, chunksize=chunksize))
          print(f'Duration of parallel processing (PPE): {time() - start:.2f} secs')
      

      您应该使用num_workers,更重要的是chunksize 变量。我在这里使用的那些在很多情况下对我都很有效。您也可以让系统决定选择什么,这些参数是可选的,但结果可能不是最理想的,尤其是当要处理的数据量很大时。

      对于 1000 万个字符串(您的范围)和chunksize=10000,我的机器产生了以下结果:

      Duration of sequential processing: 393.78 secs
      Duration of parallel processing (Pool): 73.76 secs
      Duration of parallel processing (PPE): 85.82 secs
      

      PS:你为什么用np.array(np.asarray(string.split()), dtype=float)而不是np.asarray(string.split(), dtype=float)

      【讨论】:

        猜你喜欢
        • 2022-12-04
        • 2016-03-31
        • 2023-04-06
        • 2021-08-08
        • 2021-01-30
        • 2021-05-25
        • 1970-01-01
        • 2016-11-07
        • 2020-05-20
        相关资源
        最近更新 更多