【问题标题】:How to stop python multiprocessing server when processes are done进程完成后如何停止python多处理服务器
【发布时间】:2012-10-20 17:34:23
【问题描述】:

我正在尝试使用 python 的多处理模块在几台机器上运行分布式任务,我一直使用 this blog post 作为参考。

但是,这篇文章的任务使用了一个作业队列,并将结果放入一个结果队列中,这两个队列都由 JobQueueManager(它是 SyncManager 的子类)管理。这个管理器有一个服务器,当它调用manager.shutdown()时,它会启动并持续运行直到结果队列被填满。

我的问题是我的任务不需要结果队列,所以我试图弄清楚如何知道何时停止服务器。我可以让服务器使用serve_forever 持续运行,然后手动停止它,或者创建一个以与示例相同的方式填充的虚拟队列,并在服务器与原始作业数一样大时停止服务器.

我不想手动停止它,但第二种解决方案似乎相当老套。似乎一种常见的方法(没有服务器)是在每个进程上调用join(),但我不知道管理器是否有办法找出哪个进程从队列中删除了每个作业。

我的后备计划是虚拟队列方法的变体,但共享计数器变量在每个进程的最后一步递增,但我想知道是否有任何建议使用多处理库中的方法,或者如果这不可靠。

谢谢

编辑:我没有提到我不使用结果队列的原因是我将处理结果存储到 Redis 数据库。

【问题讨论】:

  • 为什么不添加结果队列。您只需输入一个布尔值。

标签: python multiprocessing distributed-computing


【解决方案1】:

在给出的示例中:

outdict = shared_result_q.get()

结果队列用于异步等待结果。这是主要的沟通方式。没有它,您需要另一种信号机制来确认任务结束事件。只需将None 加入队列即可。

【讨论】:

    【解决方案2】:

    正如我的更新所示,我已经使用了一个 redis 数据库来存储我的任务结果,所以我不必担心管理不同机器之间的字典。

    我最终采用的解决方案也使用了 Redis 数据库。每当每个流程完成时,我都会将带有流程信息的字符串推送到列表中(redis-py 中的r_server.lpush(...))。在服务器端,我没有对结果队列使用阻塞 get 方法,而是使用 Redis 的阻塞 pop rs.blpop(),其工作方式相同。

    这与这里的博客文章和其他建议几乎相同,以创建一个虚拟队列并使用 get(),但只是使用 redis,所以我没有额外方法参数和注册额外方法的开销经理。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-02-01
      • 2017-02-14
      • 1970-01-01
      • 1970-01-01
      • 2015-06-07
      • 2021-09-05
      • 2021-12-25
      • 2019-07-12
      相关资源
      最近更新 更多