【问题标题】:Reloading celery code without losing reserved tasks在不丢失保留任务的情况下重新加载 celery 代码
【发布时间】:2017-10-03 05:10:28
【问题描述】:

我在 Celery 中保留了数百万个任务(ETA 尚未到期),每次我想更新我的 Celery 代码库时,我都必须重新启动它,这会切断与 RabbitMQ 的连接并导致 RabbitMQ 再次重新分配任务(我我正在使用延迟确认)。

是否可以重新加载新的代码库但仍保留我保留的任务?我在 Django 中使用 Celery。

【问题讨论】:

  • 你的意思是数以百万计的任务被保留在一个工人身上吗?如果是这种情况,似乎队列并没有真正被使用。在这种情况下,重新分配不是预期的行为吗?我很好奇你的用例。

标签: python rabbitmq celery django-celery


【解决方案1】:

简短回答:可以,但您必须编写自己的队列排空逻辑。

更长的答案:当您想要进行代码更新时(取决于您如何处理),您必须使用 celery remote control api 告诉您的所有工作人员停止使用任务。 RabbitMQ brokers 支持远程控制接口so you're in luck

from my_app.celery import app
inspector = app.control.inspect()
controller = app.control

# get a list of current workers
workers = inspector.ping()
active_queues = inspector.active_queues()
all_queues = set()
for worker, queues in active_queues.items():
    for queue in queues:
        all_queues.add(queue['name'])
for queue in all_queues:
    controller.cancel_consumer(queue)

这将阻止您的工作人员使用任务。现在您必须监控您的工作人员,直到他们完成所有活动任务的处理。

import time
done = False
while not done:
    active_count = 0
    active = inspector.active()
    active_count = sum(map(lambda l: len(l), active.values()))
    done = active_count > 0
    if not done:  
        time.sleep(60)  # wait a minute between checks

一旦您的工作人员完成,您就可以清楚地部署您的代码,而不必担心丢失任务。

【讨论】:

    猜你喜欢
    • 2019-07-25
    • 2021-09-03
    • 2020-07-16
    • 2022-01-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-21
    • 1970-01-01
    相关资源
    最近更新 更多