【问题标题】:Python multiprocessing Queue failurePython多处理队列失败
【发布时间】:2012-07-11 16:12:37
【问题描述】:

我创建了 100 个子进程

proc_list = [
    Process(target = simulator, args=(result_queue,))
    for i in xrange(100)]

并启动它们

for proc in proc_list: proc.start()

每个进程在做一些处理后放入result_queue(multiprocessing.Queue的实例)10000个元组。

def simulate(alg_instance, image_ids, gamma, results,
                     simulations, sim_semaphore):
  (rs, qs, t_us) =  alg_instance.simulate_multiple(image_ids, gamma,
                                             simulations)
  all_tuples = zip(rs, qs, t_us)
  for result in all_tuples:
    results.put(result)
  sim_semaphore.release()

我应该(?)在队列中获得 1000000 个元组,但经过多次运行后,我得到了这些(样本)大小: 14912 19563 12952 13524 7487 18350 15986 11928 14281 14282 7317

有什么建议吗?

【问题讨论】:

  • 你确定simulate() 实际返回 10,000 个元组吗?
  • 是的。对其进行了广泛的测试...
  • 您是否加入了每个流程以确保您正在等待所有流程完成?
  • 命名变量tuple 是一个非常糟糕的主意。 tuple 是一种基本类型,您不应在命名空间中将其替换为其他类型。
  • @K.Brafford 我还没有真正将它们命名为元组——只是写了函数的草图。我已经更新了功能代码。

标签: python queue multiprocessing


【解决方案1】:

我对多处理问题的解决方案几乎总是使用 Manager 对象。虽然暴露的接口是相同的,但底层实现要简单得多,并且错误更少。

from multiprocessing import Manager
manager = Manager()
result_queue = manager.Queue()

尝试一下,看看它是否不能解决您的问题。

【讨论】:

  • 我开始对 multiprocessing.Queue 失去信心,预想我一定犯了一些错误,但实际上没有任何改变,只是将它与 manager.Queue 交换,它开始完美地工作:\ -- 非常感谢建议
【解决方案2】:

multiprocessing.Queue 在其文档中被称为是线程安全的。但是当你用Queue做进程间通信时,应该和multiprocessing.Manager().Queue()一起使用

【讨论】:

    【解决方案3】:

    OP 帖子中没有证据表明multiprocessing.Queue 不起作用。 OP 发布的代码根本不足以理解发生了什么:他们是否加入了所有流程?他们是否正确地将队列传递给子进程(如果它在 Windows 上,则必须作为参数)?他们的子进程是否验证他们实际上得到了 10000 个元组?等等

    有可能 OP 确实在 mp.Queue 中遇到了一个难以重现的错误,但考虑到 CPython 已经经历的测试量,以及我刚刚运行 100 个进程 x 10000 个结果而没有任何问题的事实,我怀疑 OP 实际上在他们自己的代码中存在一些问题。

    是的,其他答案中提到的Manager().Queue() 是共享数据的完美方式,但没有理由根据未经证实的报告“它有问题”来避免multiprocessing.Queue()

    【讨论】:

      猜你喜欢
      • 2012-08-03
      • 2018-06-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-04-20
      • 2012-01-23
      • 2021-11-11
      相关资源
      最近更新 更多