【问题标题】:Celery task does not appear in queue after task.apply_async using countdown在 task.apply_async 使用倒计时后,芹菜任务没有出现在队列中
【发布时间】:2019-02-21 13:50:36
【问题描述】:

我希望这个问题符合堆栈溢出的条件。

使用 celery 4.2.0 和 Redis 作为代理和后端。

有任务

@shared_task()
def add(a, b):
    return a+b

worker 处于活动状态时,运行休闲命令:

add.apply_async(countdown=60)

导致任务未注册到默认的 celery 队列, 但在倒计时

中规定的时间段后仍在执行

为什么会这样?如何查找所有待处理的任务? 如果将任务注册到队列中,这样做会起作用:

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange('celery', 0, -1)

如果我在任务尚未开始的情况下终止工作人员,我会得到休耕:

[WARNING/MainProcess] Restoring 1 unacknowledged message(s)

这告诉我任务保存在队列以外的其他地方,但我不知道在哪里

【问题讨论】:

  • 要监控所有即将到来和已完成的任务,您可以使用Flower 工具。 flower.readthedocs.io/en/latest
  • Flower 是另一个进程..我希望能够从应用程序内部知道任务是否存在,如果不存在,请重新提交任务

标签: python python-3.x celery


【解决方案1】:

事实证明,这实际上是预期的行为,因为工作人员甚至在执行之前就抓取待处理的任务以加快处理速度,从而将它们从队列中移除。

为了实现我想要的,即禁止工作人员在实际开始处理任务之前从队列中获取任务,并且忘记了该任务,我不得不一起使用 2 个 celery 设置

task_acks_late = True
worker_prefetch_multiplier = 1

这样一个任务会从它的原始队列中移除,但仍然存在于一个名为“unacked”的队列中,这让我可以监控它。 Optimizing celery

请注意,使用 'acks_late' acks late 设置有一些您需要注意的副作用,例如,如果工作人员意外终止,任务将恢复并在下次工作人员恢复时重新运行。

【讨论】:

    最近更新 更多