【问题标题】:Check if celery warm shutdown is in progress from a task检查某个任务是否正在进行 celery 热关机
【发布时间】:2016-09-06 09:07:02
【问题描述】:

TL;DR

有没有办法判断我们的 celery worker 是否正在热关机?换句话说,我可以检查是否有SIGTERM 待处理吗?我有一个重新安排自己的任务,但如果有一个待处理的关机,我想避免重新安排自己,以避免推迟热关机。像这样的:

if not self.shutdown_pending():
    self.retry(countdown=5, max_retries=3)

实际上,除了重新安排工作之外,我希望能够在收到SIGTERM 后立即退出当前的工作,以便在新代码部署时尽快重新启动我的工作人员:

@app.task(bind=True)
def my_work_task(self):
    work = get_work()
    for item in work:
        if self.shutdown_pending():
            logger.info("Shutdown detected. Bailing.")
            return
        item.process()

背景

我有一项任务需要花费不同的时间(从几秒到几分钟不等)。我最初使用分钟 celery-beat 计划来调用任务,但如果我只完成少量工作,只需要十秒钟即可完成,那么我想立即重新调用任务次,以避免等待 50 秒等待下一个 celery beat 到来,因为在此期间很可能会有新的工作。

所有这些都是为了尽量减少处理我的工作项的延迟。我想避免工人坐在那里无所事事的 50 秒时间,因为在此期间可能有一些工作可用。请注意,基于数据库中项目的“过期”,工作变得“准备就绪”,这就是为什么我使用 celery beat 只是在它们可用时将它们清扫,而不是直接触发任务。

我的任务如下所示:

@app.task(bind=True)
def my_work_task(self):
    work = get_work()
    do_some_work(work)
    # if this was just a short bit of work reschedule ourselves
    # immediately to avoid wasting time waiting for the
    # next celery beat.
    if len(work) < SMALL_WORK_THRESHOLD:
        self.retry(countdown=5, max_retries=3)

这一切都很好,除了一件事:当我重新加载我的工人时(通过发送SIGTERM),我最终可能会等待一个工人重新安排自己,可能每次都有大量的工作。每次调用可能需要几分钟,直到我达到我的 max_retries 值。这使得部署新代码成为一个问题,因为工作处理几乎停止了长达几分钟。

【问题讨论】:

    标签: python celery


    【解决方案1】:

    遗憾的是,没有像其他类似question 中提到的那样简单的解决方案。

    您唯一能做的就是改变方法或使用SIGKILL,在这种情况下,只需确保使用任务结果后端来了解您可能丢失了哪些任务,或者您是否在db 你可能不需要它。

    在我个人的经验中,我总是使用 mongodb 来注册星星和任务的结束。这让我可以看到由于机器崩溃而从未完成的任务(我使用的是 CELERY_ACKS_LATE),并且如果我希望某个任务在整个云上一次只运行一次,还可以执行全局锁定。 这样,如果 SIGTERM 在一段时间后没有工作,我会发送 SIGKILL 而不会丢失工作。

    希望对你有帮助

    【讨论】:

      【解决方案2】:

      其实我也遇到了同样的问题,想出了一个解决办法:

      @worker_ready.connect
      def my_long_running_task(signal, sender, **kwargs):
          is_running = True
      
          def get_shutdown_signal(**kwargs):
           # here is the magic with nonelocal keyword
              nonlocal is_running
              is_running = False
      
          worker_shutting_down.connect(get_shutdown_signal)
          while is_running:
               # do stuff
      

      【讨论】:

      • 你能详细说明你为什么要@worker_ready.connect吗?并且还应该有一个@app.task 注释吗?我没有运气让我正在运行的任务“听到”worker_shutting_down 信号......
      猜你喜欢
      • 1970-01-01
      • 2012-11-11
      • 2012-04-07
      • 1970-01-01
      • 1970-01-01
      • 2017-07-21
      • 1970-01-01
      • 1970-01-01
      • 2014-07-06
      相关资源
      最近更新 更多