【问题标题】:How to make a celery task fail from within the task?如何使芹菜任务从任务中失败?
【发布时间】:2011-12-02 02:34:36
【问题描述】:

在某些情况下,我想让 celery 任务在该任务中失败。我尝试了以下方法:

from celery.task import task
from celery import states

@task()
def run_simulation():
    if some_condition:
        run_simulation.update_state(state=states.FAILURE)
        return False

但是,任务仍然报告成功:

任务 sim.tasks.run_simulation[9235e3a7-c6d2-4219-bbc7-acf65c816e65] 1.17847704887s成功:False

似乎只能在任务运行时修改状态并且一旦完成 - celery 将状态更改为它认为是结果的任何内容(请参阅this question)。有什么方法可以在不通过引发异常而使任务失败的情况下让 celery 返回任务失败的情况?

【问题讨论】:

  • 您是否尝试从代码内部引发异常?
  • @hymloth 引发异常确实会导致任务失败,其中包括每次发生这种情况时都向我发送电子邮件——这是我想避免的。抱歉不清楚,我现在已经改变了问题。

标签: celery celery-task


【解决方案1】:

要将任务标记为失败而不引发异常,请将任务状态更新为FAILURE,然后引发Ignore 异常,因为返回任何值都会将任务记录为成功,例如:

from celery import Celery, states
from celery.exceptions import Ignore

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task(bind=True)
def run_simulation(self):
    if some_condition:
        # manually update the task state
        self.update_state(
            state = states.FAILURE,
            meta = 'REASON FOR FAILURE'
        )

        # ignore the task so no other state is recorded
        raise Ignore()

但最好的方法是从您的任务中引发异常,您可以创建自定义异常来跟踪这些失败:

class TaskFailure(Exception):
   pass

并从您的任务中引发此异常:

if some_condition:
    raise TaskFailure('Failure reason')

【讨论】:

  • 请注意,这将不会触发 task_failure 信号,因此您要附加到它们的任何处理程序都不会被触发,这不是 imo 好。
【解决方案2】:

我想进一步扩展 Pierre 的回答,因为我在使用建议的解决方案时遇到了一些问题。

要在将任务状态更新为 states.FAILURE 时允许自定义字段,还必须模拟 FAILURE 状态可能具有的一些属性(注意 exc_type 和 exc_message) 虽然解决方案将终止任务,但任何查询状态的尝试(例如 - 获取 'REASON FOR FAILURE' 值)都将失败。

以下是我从中获取的 sn-p 供参考: https://www.distributedpython.com/2018/09/28/celery-task-states/

@app.task(bind=True)
def task(self):
    try:
        raise ValueError('Some error')
    except Exception as ex:
        self.update_state(
            state=states.FAILURE,
            meta={
                'exc_type': type(ex).__name__,
                'exc_message': traceback.format_exc().split('\n')
                'custom': '...'
            })
        raise Ignore()

【讨论】:

    【解决方案3】:

    我收到了来自 Ask Solem 的关于这个问题的 interesting reply,他提出了一个“after_return”处理程序来解决这个问题。这可能是未来的一个有趣的选择。

    与此同时,当我想让它失败时,我通过简单地从任务中返回一个字符串“FAILURE”解决了这个问题,然后按如下方式进行检查:

    result = AsyncResult(task_id)
    if result.state == 'FAILURE' or (result.state == 'SUCCESS' and result.get() == 'FAILURE'):
        # Failure processing task 
    

    【讨论】:

    • 如果条件可以写成result.state in READY_STATES | EXCEPTION_STATES: where from celery.states import READY_STATES, EXCEPTION_STATES, UNREADY_STATES
    猜你喜欢
    • 1970-01-01
    • 2014-04-16
    • 1970-01-01
    • 2021-10-12
    • 2022-01-13
    • 2016-08-25
    • 2011-05-31
    • 2014-12-04
    • 2017-08-28
    相关资源
    最近更新 更多