【问题标题】:Applying multi processing to different versions f multi processing将多处理应用于不同版本的多处理
【发布时间】:2024-01-07 01:46:01
【问题描述】:

我想与multiprocessing 函数同步运行所有formatting 函数。我如何能够添加此功能,以便它与下面的多处理功能同步运行,我尝试这样做但是它不起作用。本质上,我想同时运行formating(Numbers, limit1)formating(Numbers, limit2)formating(Numbers, limit3)

代码:

import multiprocessing as mp
import numpy as np
def formating(a, b):
    # Formating goes here
    x = np.sort(b);
    # digitize
    l = np.digitize(a, x)
    # output:
    result = np.bincount(l, weights=a)
    return result

Numbers = np.array([3, 4, 5, 7, 8, 10,20])
limit1 = np.array([0, 2 , 5, 12, 15])
limit2 = np.array([0, 2 , 5, 12])
limit3 = np.array([0, 2 , 5, 12, 15, 22])
formating(Numbers, limit1)
formating(Numbers, limit2)
formating(Numbers, limit3)

【问题讨论】:

    标签: python arrays numpy multiprocessing format


    【解决方案1】:

    我可以告诉你怎么做,但是你会对结果很不满意。有两个问题。第一个问题是,在创建进程池和将参数传递给工作函数并获取返回值时存在一定的开销,因为工作函数“存在”在不同的地址空间中,并且参数和返回值必须是“腌制”和“未腌制”用于传输。因此,在您的情况下,工作函数 formating 需要非常重要,以使产生的开销值得。其次,您的工作函数使用numpy,它本身取决于它在做什么,有时会在内部对其某些方法调用使用多处理。在它之上使用你自己的多处理不会给你买任何东西。 worker 函数很短,numpy 方法是用 C 语言编写的,执行速度很快,这是一个相当琐碎的 worker 函数的例子。

    以下是在循环中进行 3 次 formating 调用的基准,该循环迭代 100 次并计时经过的时间,然后使用大小为 3 的多处理池和方法 Pool.map 重复相同的事情,然后再重复一次使用方法Pool.apply_async(对于这个例子,我希望最后两个多处理案例在运行时间上或多或少相同):

    import multiprocessing as mp
    import numpy as np
    from functools import partial
    import time
    
    def formating(a, b):
        # Formating goes here
        x = np.sort(b);
        # digitize
        l = np.digitize(a, x)
        # output:
        result = np.bincount(l, weights=a)
        return result
    
    # needed for Windows:
    if __name__ == '__main__':
        Numbers = np.array([3, 4, 5, 7, 8, 10,20])
        limit1 = np.array([0, 2 , 5, 12, 15])
        limit2 = np.array([0, 2 , 5, 12])
        limit3 = np.array([0, 2 , 5, 12, 15, 22])
    
        TRIALS = 100
    
        # non-multiprocessing:
        t = time.time()
        for _ in range(TRIALS):
            result1 = formating(Numbers, limit1)
            result2 = formating(Numbers, limit2)
            result3 = formating(Numbers, limit3)
        elapsed = time.time() - t
        print(result1, result2, result3, elapsed)
    
        # multiprocessing version 1 (using method map):
        # since first argument to formating is always the same:
        worker = partial(formating, Numbers)
        t = time.time()
        for _ in range(TRIALS):
            with mp.Pool(3) as pool:
                result1, result2, result3 = pool.map(worker, [limit1, limit2, limit3])
        elapsed = time.time() - t
        print(result1, result2, result3, elapsed)
    
        # multiprocessing version 2 (using method apply_async)
        t = time.time()
        for _ in range(TRIALS):
            with mp.Pool(3) as pool:
                results = [pool.apply_async(formating, args=(Numbers, limit)) for limit in [limit1, limit2, limit3]]
                result1, result2, result3 = [result.get() for result in results]
        elapsed = time.time() - t
        print(result1, result2, result3, elapsed)
    

    打印:

    [ 0.  0.  7. 30.  0. 20.] [ 0.  0.  7. 30. 20.] [ 0.  0.  7. 30.  0. 20.] 0.00299835205078125
    [ 0.  0.  7. 30.  0. 20.] [ 0.  0.  7. 30. 20.] [ 0.  0.  7. 30.  0. 20.] 27.002381324768066
    [ 0.  0.  7. 30.  0. 20.] [ 0.  0.  7. 30. 20.] [ 0.  0.  7. 30.  0. 20.] 27.023000240325928
    

    结果

    多处理版本的运行速度仅慢了大约 9000 倍(使用 mapapply_async 没有区别)。

    如果我从基准测试中剔除创建池的开销,情况会大大改善:

    import multiprocessing as mp
    import numpy as np
    from functools import partial
    import time
    
    def formating(a, b):
        # Formating goes here
        x = np.sort(b);
        # digitize
        l = np.digitize(a, x)
        # output:
        result = np.bincount(l, weights=a)
        return result
    
    # needed for Windows:
    if __name__ == '__main__':
        Numbers = np.array([3, 4, 5, 7, 8, 10,20])
        limit1 = np.array([0, 2 , 5, 12, 15])
        limit2 = np.array([0, 2 , 5, 12])
        limit3 = np.array([0, 2 , 5, 12, 15, 22])
    
        TRIALS = 100
    
        # multiprocessing version 1 (using method map):
        # since first argument to formating is always the same:
        worker = partial(formating, Numbers)
        with mp.Pool(3) as pool:
            t = time.time()
            for _ in range(TRIALS):
                result1, result2, result3 = pool.map(worker, [limit1, limit2, limit3])
            elapsed = time.time() - t
        print(result1, result2, result3, elapsed)
    

    打印:

    [ 0.  0.  7. 30.  0. 20.] [ 0.  0.  7. 30. 20.] [ 0.  0.  7. 30.  0. 20.] 0.32500314712524414
    

    但与 0.003 秒相比,它仍然需要 0.325 秒。这只是向您展示了主要开销在于池的创建——但您仍然必须创建池并考虑该开销。

    这是如何你这样做的,但不要在这种情况下

    【讨论】:

    • 我有一个比formatting 函数规模大得多的函数,就像这个函数一样,所有函数都包含numpy。运行一次大约需要 600 毫秒,但我使用的 for 循环将运行大约 500k 次。你认为在这种情况下我应该使用多处理吗?
    • 抱歉,我上周有点忙,回复晚了。非常感谢示例代码和详细解释。如果我可以问Trial 函数的用途?
    • 首先回答您的第一个问题:根据您使用的numpy 函数,您可能看不到任何改进,因为numpy 在内部为某些函数使用了多处理本身(我不能告诉你其中)并且您在此之上使用多处理是弄巧成拙的。但除此之外,如果您现在提交 500K 任务而不是循环 500K 次,我希望您会看到很大的改进。如果可以的话,尝试使用mapstarmapimap(带有明确的chunksize 参数),其中任务将以适当的大块提交并使用默认池大小。跨度>
    • Trial 有什么功能?有一个 TRIALS 常量来运行每个代码的多次迭代,以获得更长的时间进行比较。
    最近更新 更多