【问题标题】:How to unacknowledge message when I run a celery task?运行芹菜任务时如何取消确认消息?
【发布时间】:2020-06-12 07:37:19
【问题描述】:

现在我有一些同步作业,它们是有状态的,所以如果任务失败,我必须取消确认消息,然后让它们转到RabbitMQ 的前面。但是当我尝试引发错误时,我发现 celery 仍然会确认此消息,并且队列已被清除。

@celery.task(bind=True)
def my_task(self, *args, **kwargs):
    raise ValueError

我发现 celery 任务有一个名为 retry 的方法,但它会将任务添加到队列的后面。这不是我想要的。

@celery.task(bind=True)
def my_task(self, *args, **kwargs):
    try:
        raise ValueError
    except Exception:
        self.retry(countdown=15)

即使我不能用终止信号来做到这一点:

os.kill(os.getpid(), signal.SIGKILL)

我该怎么办? celery 是否提供了一些错误,以便我可以引发此错误以通知 celery 不确认我的消息?

【问题讨论】:

标签: python rabbitmq celery


【解决方案1】:

根据文档 celery 能够RabbitMQs priority queueshttps://docs.celeryproject.org/en/latest/faq.html#does-celery-support-task-priorities

因此,您应该能够通过将重试任务的优先级高于常规任务来将它们推到队列的前面。

@celery.task(bind=True)
def my_task(self, *args, **kwargs):
    try:
        raise ValueError
    except Exception:
        self.retry(countdown=15, priority=9)

根据这个github issue,您还可以将重试的任务分配给一个新的专用队列,并分配您的资源来对该队列进行优先级排序。

@celery.task(bind=True)
def my_task(self, *args, **kwargs):
    try:
        raise ValueError
    except Exception:
        self.retry(countdown=15, queue='prioritized_queue_name') 

【讨论】:

  • 我想这是解决我问题的最接近的方法,我没有找到任何其他解决方案。
  • 但是,老实说,如果我将消息放到另一个队列中,它仍然会破坏队列的顺序——这条消息之后的消息将首先被消费。第一种解决方案可能需要指定--prefetch-multiplier=1 以保证优先级最高的消息将首先被消费。它更慢。所以我觉得最好还是用amql实现自己的任务调度系统。
【解决方案2】:

在文档https://docs.celeryproject.org/en/stable/userguide/configuration.html 上我发现:

task_acks_on_failure_or_timeout 默认为enabled


所以我认为你应该尝试结合

task_acks_late=True + task_acks_on_failure_or_timeout=False
实现NO acknowledgement when a task fails

【讨论】:

  • 我已经尝试过这个选项,但看起来不能正常工作。如果我的任务失败,它不会确认我的消息,但不会让消息返回。所以我不能尝试通过任何其他方式来解决这个任务,除了杀死这个工人。
  • 而且,我又试了一次,又一个大问题,如果我预取10条消息,而第一个失败,它不会停在这里,它会继续运行下一个任务,直到这10条消息完成,然后在这里等待。这意味着,第二条消息,第三条消息......将在第一条消息之前被消费。
  • 如果希望按提交的顺序处理任务,那么恐怕 celery 不是为这些用例设计的。但是,如果可以预先将所有这些任务链接在一起,您可以检查一下链接 celery 任务
最近更新 更多