【问题标题】:How to test celery periodic_task in Django?如何在 Django 中测试 celery period_task?
【发布时间】:2018-09-10 05:39:00
【问题描述】:

我有一个简单的周期性任务:

from celery.decorators import periodic_task
from celery.task.schedules import crontab
from .models import Subscription

@periodic_task(run_every=crontab(minute=0, hour=0))
def deactivate_subscriptions():
    for subscription in Subscription.objects.filter(is_expired=True):
        print(subscription)
        subscription.is_active = False
        subscription.can_activate = False
        subscription.save()

我想用测试来覆盖它。

我找到了有关如何测试简单任务的信息,例如 @shared_task,但我找不到测试 @periodic_task 的示例

【问题讨论】:

  • 您可以像在单元测试框架中调用任何其他方法一样调用此方法...您究竟想在这里测试什么?这个任务会在每天午夜触发吗?还是方法中的路径?
  • 是的,我想检查一下 crontab 是否可以工作。
  • 这应该包含在celery自己的单元测试中......我不确定你是否能够对这个机制进行单元测试。
  • @NarnikGamarnik 你能做到吗?

标签: python django cron celery periodic-task


【解决方案1】:

当使用装饰器定义周期性任务时,您可以通过以下方式访问 crontab 配置:

tasks.py

@periodic_task(
    run_every=(crontab(minute="*/1")),
)
def my_task():
    pass

您需要访问的一些文件

from .tasks import my_task

crontab = my_task.run_every

hours_when_it_will_run = crontab.hour
minutes_when_it_will_run = crontab.minute
day_of_the_week_when_it_will_run = crontab.day_of_week
day_of_the_month_when_it_will_run = crontab.day_of_month
month_of_year_when_it_will_run = crontab.month_of_year

通过这种方式,您可以访问任务何时执行,如果预期时间到了,您可以签入测试。

【讨论】:

    【解决方案2】:

    使用函数apply()Documentation for apply().

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2023-03-10
      • 1970-01-01
      • 2020-01-28
      • 2018-12-31
      • 2016-02-09
      • 2018-11-07
      • 2020-04-25
      相关资源
      最近更新 更多