【问题标题】:How to multiprocess iteratively into several numpy arrays?如何迭代地多处理成几个numpy数组?
【发布时间】:2020-08-03 16:56:04
【问题描述】:

并行化的正确方法是什么?本质上,我有一个非常大的二维数组,我想将每一行线性拟合到一个具有相同长度 (x) 的单独数组中,这对于所有行都是恒定的。预期结果是具有线性拟合斜率的一维数组 (data_slopes)。此代码有效,但速度很慢:

for j in range(img1_data_r.shape[0]):

    y = img1_data_r[j,:]
    model = LinearRegression()
    model.fit(x.reshape((-1, 1)),y,1)
    data_slopes[j] = model.coef_[0]

我以前没有使用多处理池的经验,我一直在尝试失败

【问题讨论】:

  • “多进程迭代”到底是什么意思?

标签: python numpy python-multiprocessing


【解决方案1】:

如果你可以传入需要处理的数据的域,你可以使用xargs来并行运行你的程序。 xargs 允许您并行执行程序,传入从标准输入读取的不同参数。我已经成功地使用它使 bash shell 并行工作。

看看这个问题对你有没有帮助:Python read .json files from GCS into pandas DF in parallel

【讨论】:

    【解决方案2】:

    您可以尝试以下方法。我建议不要迭代范围,而是创建一个接收二维数组并返回预期的LinearRegression 输出的函数。然后您可以创建一个列表,其中包含您需要迭代的所有二维数组(迭代器)-

    #Function that works on a single object
    def fn(x):
        out = x**3 #your code here
        return out
    
    iterator = [1,2,3,4,5,6,7,8,9,10]  #list of objects that you need to run your function on
    
    pool = mp.Pool(processes=4)  #Number of cores you want to utilize
    results = [pool.apply_async(fn, args=(x,)) for x in iterator] #maps the iterator and the function to each core asynchronously
    output = [p.get() for p in results]   #collects and returns the results as a list of outputs.
    output
    
    [1, 8, 27, 64, 125, 216, 343, 512, 729, 1000]
    

    pool.apply_async 与列表推导一起应该是超快的,因为它异步地将操作传递给内核,而无需等待所有内核在传递下一批之前完成它们的操作。

    【讨论】:

      【解决方案3】:

      这是一个示例,说明如何使用 multiprocessing 对二维 numpy 数组和常量向量的行进行操作。

      在此示例中,相同的向量 b(相当于您的 x)与 a 数组的每一行进行点积。

      import numpy as np
      from multiprocessing import Pool
      
      
      def dot_product(row, vec):
          return (row * vec).sum()
      
      a = np.array([[1, 2, 3],
                    [4, 5, 6],
                    [7, 8, 9],
                    [10, 11, 12]])
      
      b = np.array([10, 11, 12])
      
      p = Pool(3)  # max number of simultaneous processes
      
      print(p.starmap(dot_product, ((row, b) for row in a)))
      

      请注意,您只能将可提取对象传递给 multiprocessing.Pool。尽管 numpy 数组是可挑选的,并且函数(例如这里的 dot_product)是可挑选的,但实例方法却不是。所以你不能使用你的模型(LinearRegression())作为Pool.map(或Pool.starmap)的第一个参数。相反,您必须在每个进程的函数内分别实例化 LinearRegression

      为你把这些放在一起(虽然显然我没有足够的信息来测试这个),你会得到这样的东西:

      def get_data_slope(row, x):
          model = LinearRegression()
          model.fit(x.reshape((-1, 1)), row, 1)
          return model.coef_[0]
      
      p = Pool(3)
      
      data_slopes[:] = p.starmap(get_data_slope, ((row, x) for row in img1_data_r))
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2016-01-28
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-02-17
        • 2019-08-22
        • 2013-12-17
        相关资源
        最近更新 更多