【发布时间】:2022-01-20 06:00:47
【问题描述】:
我正在尝试使用 celery 运行任务。 我需要在用户按下发送按钮时将发布请求发送到远程服务器,所以我尝试在此处使用带有 Redis 的 celery 并在设置文件中使用此配置:
BROKER_URL = os.environ.get("REDIS_URL")
CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL")
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Dubai'
根据apply_async 的文档,我可以定义重试选项,如下面的代码:
__task_expiration = 60
__interval_start = 1 * 60
api_generator.apply_async(args=(*args),
group=user_key,
expires=__task_expiration,
retry=True,
retry_policy={
"max_retries": 3,
"interval_start": __interval_start
})
在文档中我找到了 apply_async 的定义:
apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)
按照文档,我可以使用 retry 和 retry_policy 进行设置
以及如何定义重试选项的示例代码
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
我希望我的任务运行 3 次以在任何失败的情况下运行,并且每次重试之间的间隔为 60 秒。 我的任务定义如下所示:
@shared_task
def api_generator(*args):
import requests
import json
url = os.environ.get("API_URL_CALL")
api_access_key = os.environ.get("API_ACCESS_KEY")
headers = {
"Authorization": api_access_key,
"Content-Type": "application/json"
}
json_schema = generate_json(*args)
response = requests.request("POST", url, headers=headers, data=json.dumps(json_schema), timeout=30)
if response.status_code != 200:
raise NameError("API Response error")
return response.status_code
但是当我的代码失败时,我在 celery 日志中看不到任何重试机制,这是什么问题?使用 apply_async 方法调用我的任务时如何定义重试?我正在提高 NameError("Exception") 告诉工人发生了错误。
【问题讨论】:
标签: python django celery django-celery