【问题标题】:Using multiprocessing to concurrently execute a for-loop使用多处理并发执行一个 for 循环
【发布时间】:2022-01-03 23:01:58
【问题描述】:

我正在尝试在我的程序中实现多处理。
最初,我编写了这段代码。

pool = mp.Pool(mp.cpu_count())

for i in range(0, 10000):
    bid = i
    ask = i
    pool.apply_async(function1, args=(bid, ask,))
    pool.apply_async(function2, args=(bid, ask,))
    pool.apply_async(function3, args=(bid, ask,))
    pool.close()
    pool.join()

这给了我一个错误:

Python ValueError: Pool is still running

所以我将代码修改为:

for i in range(0, 10000):
    bid = i
    ask = i
    pool = mp.Pool(mp.cpu_count())
    pool.apply_async(function1, args=(bid, ask,))
    pool.apply_async(function2, args=(bid, ask,))
    pool.apply_async(function3, args=(bid, ask,))
    pool.close()
    pool.join()

这根本不执行并显示一个空白终端。

我想要实现的是对于范围内的每个值,我想并行运行 3 个函数,只有在执行这 3 个函数之后,它才应该移动到 range(0,1000) 中的下一个 i 值。

【问题讨论】:

  • 如果你想要同步工作流,为什么要使用异步方法?
  • 你应该使用 async io 而不是 mp
  • 考虑使用多处理模块。参考this答案
  • 你的循环中有pool.close()。池关闭后,您将永远无法向其发送更多工作。你的意思是closejoin 在循环之外吗?您将运行 30000 个任务。
  • 多处理用于 CPU 绑定任务。线程用于网络绑定任务。多处理在后台实现线程。如果他想并行调用多个函数,他应该使用 async io。

标签: python parallel-processing multiprocessing python-multiprocessing python-multithreading


【解决方案1】:

这是一个你可以适应的模式:

from concurrent.futures import ProcessPoolExecutor

def func1(a, b):
    pass

def func2(a, b):
    pass

def func3(a, b):
    pass

def main():
    with ProcessPoolExecutor() as executor: # work manager
        for i in range(1_000):
            futures = []
            for func in [func1, func2, func3]:
                futures.append(executor.submit(func, i, i))
            for future in futures:
                future.result() # wait for process to terminate

if __name__ == '__main__':
    main()

【讨论】:

    猜你喜欢
    • 2013-12-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-12-30
    • 2022-01-12
    • 1970-01-01
    • 2013-06-11
    相关资源
    最近更新 更多