【问题标题】:How to correctly shut down Python RQ worker processes dynamically?如何正确地动态关闭 Python RQ 工作进程?
【发布时间】:2013-02-13 16:56:42
【问题描述】:

使用Python RQ,我们正在尝试动态管理工作进程。我们使用定制的工作脚本,其(简化形式)如下:

from rq import Connection, Worker

queues_to_listen_on = get_queues_to_listen_on()

with Connection(connection = get_worker_connection()):
    w = Worker(queues_to_listen_on)
    w.work()

我们对关闭工人特别感兴趣。我们主要关心的是如何优雅地关闭工作人员,以使当前工作能够在关闭之前完成。适当的Worker 对象上的request_stop(...) 信号处理程序似乎可以满足我们的需要,但似乎没有办法(至少据我所知)发出它,除非它通过在正在运行的工作进程上按下CTRL+C在终端中。

在我看来,有两种可能的解决方案(肯定有更多) - 按优先顺序排列:

  1. 使用rq 库以编程方式将信号发送到request_stop,从而触发正常关机。
  2. 以某种方式获取正确进程的 pid(不确定是主力进程还是工作侦听器进程)并使用其他方法向该进程发送适当的信号。我们有一些方法可以做到这一点,但它很可能需要更多的工作并将其他变量引入我宁愿被排除在外的问题(例如,使用Fabric 运行远程命令或类似的东西) .

如果有更好的方法来解决这个问题或可以实现相同目标的不同替代方案,我将不胜感激您的建议。

【问题讨论】:

  • 如果你需要 PID,你实际上可以从 w.pid 中得到它

标签: python asynchronous process signals


【解决方案1】:

选项 1 在设计方面肯定更好。

但是,为了解决您必须使用 CTRL + C 退出流程的特殊问题(我也讨厌这样),您可以为您的工作人员使用以下策略:

# WORKER_NAME.py
import os

PID = os.getpid()

@atexit.register
def clean_shut():
    print "Clean shut performed"

    try:
        os.unlink("WORKER_NAME.%d" % PID)
    except:
        pass

# Worker main
def main():
    f = open("WORKER_NAME.%d" % PID, "w")
    f.write("Delete this to end WORKER_NAME gracefully")
    f.close()

    while os.path.exists("WORKER_NAME.%d" % PID):
        # Worker working

在您的主脚本中,按照@Borys 的建议获取工作进程 PID,发送热停止请求,并 os.unlink("path/to/WORKER_NAME.%d" % worker_PID) 以确保正常关闭 :)

这仅适用于运行无限循环的工作人员。如果工作进程调用的事情甚至阻塞了普通的顺序一次性作业,您必须进一步追踪可能阻塞的例程以从那里解决,例如应用某种超时策略。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-07-08
    • 2017-03-01
    • 2020-07-08
    • 1970-01-01
    • 2016-12-23
    • 2015-12-05
    • 1970-01-01
    相关资源
    最近更新 更多