【问题标题】:Setting up and calling a group of celery tasks with individual countdown设置和调用一组带有单独倒计时的 celery 任务
【发布时间】:2022-01-06 20:29:26
【问题描述】:

使用:Django==2.2.24Python=3.6celery==4.3.0

这是我目前正在做的事情:

from celery import group

the_group_of_tasks = group(
  some_task.s(an_object.the_data_dict)
  for an_object in AnObject.objects.all()
)
the_group_of_tasks.delay()

我想做的事:
group 文档:celery docs link
我想将the_group_of_tasks 个人some_task 电话分散到某个时间范围内。
最好我可以使用countdown 功能,并将任务分散在可变的秒数上(例如一小时,3600 秒)。
分配将在 0 到 3600 之间的随机秒整数中完成,想象一下,一旦我有了范围,它就可以很容易地计算出来。

我认为我可以添加 countdown 参数,并在我的范围内使用随机数生成器,这样它将被“打包”并准备在 group 中执行,并准备好单独的任务?

some_task.s(an_object.the_data_dict, countdown=some_generator_call)

这样行吗?

In the docs,看来signature应该支持countdown

>>> from celery import signature
>>> signature('tasks.add', args=(2, 2), countdown=10)

谢谢!

编辑:
我尝试添加倒计时,如下所示:

the_group_of_tasks = group(
  some_task.s(an_object.the_data_dict, countdown=10)
  for an_object in AnObject.objects.all()
)
the_group_of_tasks.delay()

但是看到这个错误:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 385, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib64/python3.6/site-packages/newrelic/hooks/application_celery.py", line 85, in wrapper
    return wrapped(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 648, in __protected_call__
    return self.run(*args, **kwargs)
TypeError: run() got an unexpected keyword argument 'countdown'

【问题讨论】:

    标签: python django celery


    【解决方案1】:

    所以问题是 countdown 被假定为任务的参数,而它应该是 apply_async/delay 调用的参数。

    这解决了问题:

    the_group_of_tasks = group(
      some_task.signature((an_object.the_data_dict), countdown=10)
      for an_object in AnObject.objects.all()
    )
    the_group_of_tasks.delay()
    
    1. .s() 更改为 .signature
    2. 以不再模棱两可的方式分隔参数

    谢谢!

    【讨论】: