【问题标题】:How to set retry tasks in case of failure in Django-Celery如何在 Django-Celery 失败的情况下设置重试任务
【发布时间】:2022-01-20 06:00:47
【问题描述】:

我正在尝试使用 celery 运行任务。 我需要在用户按下发送按钮时将发布请求发送到远程服务器,所以我尝试在此处使用带有 Redis 的 celery 并在设置文件中使用此配置:

BROKER_URL = os.environ.get("REDIS_URL")
CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL")
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Dubai'

根据apply_async 的文档,我可以定义重试选项,如下面的代码:

__task_expiration = 60
__interval_start = 1 * 60

api_generator.apply_async(args=(*args),
                                group=user_key,
                                expires=__task_expiration,
                                retry=True,
                                retry_policy={
                                  "max_retries": 3,
                                  "interval_start": __interval_start
                                })

在文档中我找到了 apply_async 的定义:

apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)

按照文档,我可以使用 retry 和 retry_policy 进行设置

以及如何定义重试选项的示例代码

add.apply_async((2, 2), retry=True, retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
})

我希望我的任务运行 3 次以在任何失败的情况下运行,并且每次重试之间的间隔为 60 秒。 我的任务定义如下所示:

@shared_task
def api_generator(*args):
    import requests
    import json
    url = os.environ.get("API_URL_CALL")
    api_access_key = os.environ.get("API_ACCESS_KEY")

    headers = {
        "Authorization": api_access_key,
        "Content-Type": "application/json"
    }

    json_schema = generate_json(*args)

    response = requests.request("POST", url, headers=headers, data=json.dumps(json_schema), timeout=30)

    if response.status_code != 200:
        raise NameError("API Response error")

    return response.status_code

但是当我的代码失败时,我在 celery 日志中看不到任何重试机制,这是什么问题?使用 apply_async 方法调用我的任务时如何定义重试?我正在提高 NameError("Exception") 告诉工人发生了错误。

【问题讨论】:

    标签: python django celery django-celery


    【解决方案1】:

    [编辑 1:添加 acks_late]

    将任务发送给 Celery 工作人员时可能会出现两件事:

    1. 代理和消息队列的连接问题。
    2. 工作人员引发异常。

    第一个问题可以通过定义retryretry_policy 来解决。

    第二种(也就是你要解决的问题),可以通过在任务失败时调用self.retry()来解决。

    根据您的问题类型,设置CELERY_ACKS_LATE = True 可能会有所帮助。

    查看这些链接了解更多信息:

    Retry Lost or Failed Tasks (Celery, Django and RabbitMQ)

    https://coderbook.com/@marcus/how-to-automatically-retry-failed-tasks-with-celery/

    【讨论】:

    • 感谢这帮助了很多,你是对的。我尝试使用装饰器在发生异常时设置重试。
    猜你喜欢
    • 2011-07-17
    • 2019-10-23
    • 1970-01-01
    • 2022-01-21
    • 2011-06-19
    • 2011-07-17
    • 2012-07-15
    • 2017-10-03
    • 2012-03-27
    相关资源
    最近更新 更多