【问题标题】:Python multiprocessing run x processes at a timePython 多处理一次运行 x 个进程
【发布时间】:2021-09-19 15:24:20
【问题描述】:

我有很多 python 函数(每一个都是不同类的成员函数),并且希望每时每刻都并行运行其中的 x 个(除非剩下的少于 x 个)。

换句话说,我想要一个包含所有应该执行的任务和 x 个子进程的队列。主进程会从队列中弹出任务直到队列为空,然后交给子进程执行。子进程空闲时会通知主进程,得到另一个任务。

我想过使用多进程模块,但不知道如何知道每个子进程何时完成并为下一个任务做好准备。

我尝试使用共享队列 - 用主进程填充它的类对象,并在每个子进程上执行以下操作:

def subprocess(shared_queue):
    while not shared_queue.empty():
          class_obj = shared_queue.pop()
          class_obj.main_func()

但是,事实证明我无法用我的复杂类填满队列。

编辑:我认为 pool 不起作用,因为我想运行许多不同的功能,每个功能一次。我在 pool 中看到的示例使用不同的参数多次运行一个函数。

编辑 2:池将按照 cmets 的建议通过将函数作为参数传递来解决原始问题。但我仍然想要一个管理队列的解决方案,因为稍后我想给每个任务一个权重并运行任务,以使它们的权重总和不超过阈值。所以我仍然需要知道子流程何时完成它们的任务。

【问题讨论】:

  • 您需要查看pool,这将使x 进程从可能更大的任务列表中运行。另见multithreaded pool
  • 但是池运行 1 个函数,参数不同。我看到了具有 2 个不同功能的示例,例如 link,但我需要许多不同的功能,每个功能都运行一次
  • 函数可以是参数。编写一个调用函数参数的函数。
  • 谢谢。我仍然想要一个管理队列的解决方案,因为稍后我想给每个任务一个权重并运行任务,以使它们的权重总和不超过阈值。所以我仍然需要知道子流程何时完成它们的任务。

标签: python multiprocessing


【解决方案1】:

imapimap_unordered 适合你吗?您可以通过将它们用作迭代器来检索完成的结果,并且可以按照此处所述停止正在运行的任务:How to kill threads spawned when using the multiprocessing's Pool imap_unordered

完整的结果看起来像这样(未经测试):

import multiprocessing

def function_caller(func_and_args): #Edited, thanks for the comment
   func_and_args[0](*func_and_args)

funcs_and_args = [[func1, args_for_func1], [func2, args_for_func2], ...]
with multiprocessing.Pool() as pool:
   for result in pool.imap(function_caller, funcs_and_args):
      do_something_with_the_result()
      if weight_reached():
         pool.terminate()

想法:您首先创建一个包含函数和输入参数的列表。接下来,您需要一个包装器函数,它运行给定相应参数的函数,这就是包装器所做的。接下来,您打开一个池并将 imap 作为迭​​代器运行。一旦达到您想要达到的权重,您就可以终止池,从而结束所有仍在运行的子进程。请注意,这意味着您的子功能应该优雅地终止(即它们在运行时不能更改任何文件或类似的东西,否则您将不知道它们在工作中走了多远)。

编辑1: 现在有了新的解释,我不确定是否有内置的方法来实现你所追求的。尽管如此,你可以做的是:

import multiprocessing
import time

def function_caller(func, args): #Edited, thanks for the comment
   func(*args)

funcs_and_args = [[func1, args_for_func1], [func2, args_for_func2], ...]
results = []
running_processes = []
index = 0
while len(results) != len(funcs_and_args):
   if all_weight_used():
      time.sleep(10)
   else:
      p = multiprocessing.Process(target=function_caller, args=funcs_and_args[i])
      p.start()
      running_processes.append(p)
   for p in running_processes:
      check if alive and retrieve result if not

Tbh 我不能 100% 确定最后一部分的正确代码是什么,但 p.is_alive()multiprocessing.Queue() 的某种组合应该可以完成这项工作。也许其他人可以编辑答案以完成它。

【讨论】:

  • 谢谢!我想我没有正确解释自己。所有任务最终都应该运行。权重只是确定可以同时运行多少个任务。当一项任务完成后,我想从总和中减去它的权重,并为新任务腾出空间。在任何给定时刻,正在运行的任务的权重总和不应超过权重阈值
  • 当您说您的代码未经测试时我相信您,因为您不能使用 lambda 函数作为 Pool.imapfunc 参数。
  • @user287263 multiprocessing.pool.Pool.apply_async 方法有一个 callback 参数,该参数指定了一个函数,当它可用时,它的结果将被调用。这是了解任务何时完成的一种方式。
  • @Booboo:感谢您的提示。我编辑了答案以包含一个正常的功能。
  • @user287263:感谢您的澄清。我编辑了我的答案以解决您的问题。我不能 100% 确定您检索结果的部分是如何工作的,但您可以通过一些尝试和错误来找到它。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2022-01-25
  • 2013-01-24
  • 2020-07-09
  • 1970-01-01
  • 2020-03-04
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多