【问题标题】:Problem with Periodic tasks in django with Celery在 django 中使用 Celery 执行周期性任务的问题
【发布时间】:2020-09-27 05:13:14
【问题描述】:

我的 celery 定期任务不起作用。我希望根据日期每晚更新我的数据库。这是我在应用程序目录中的 ptasks.py 文件:

'''
import datetime
import celery
from celery.task.schedules import crontab
from celery.decorators import periodic_task
from django.utils import timezone
@periodic_task(run_every=crontab(hour=0, minute=0))
def every_night():
    tasks=Task.objects.all()
    form=TaskForm()
    if form.deadline<timezone.now() and form.staus=="ONGOING":
        form.status="PENDING"
        form.save()
'''

我在 settings.py 中使用 ampq:

'''
CELERY_BROKER_URL = 'amqp://guest:guest@localhost'
CELERYBEAT_SCHEDULER='djcelery.schedulers.DatabaseScheduler'
'''

这是我的models.py:

'''
from django.db import models
import datetime
import pytz
from django.utils import timezone
# Create your models here.
class Task(models.Model):
    title=models.CharField(max_length=30)
    complete=models.BooleanField(default=False)
    created=models.DateTimeField(default=timezone.now())
    comment=models.CharField(max_length=500, default="")
    Tag1=models.CharField(max_length=10, default="Tag1")
    deadline=models.DateTimeField(default=timezone.now())
    status=models.CharField(max_length=15,default="ONGOING")
    def __str__(self):
        return self.title
'''

这是我的forms.py:

'''
from django import forms
from django.forms import ModelForm

from .models import *

class TaskForm(forms.ModelForm):
    class Meta:
        model=Task
        fields='__all__'

'''

【问题讨论】:

  • 运行 celery 的输出是什么?它找到你的任务了吗?与芹菜无关 - 但你为什么要使用表格?
  • @ohrstrom 我的管理员确实找到了任务,我正在使用一个表单,因为我想创建一个每晚自动更新和刷新的表单/清单。
  • 您不需要表单来以编程方式更新实例 - 表单用于用户输入(和验证等)。如果您不想通过某些条件更新查询集,可以使用类似:Task.objects.filter(deadline__lt=timezone.now(), status="ONGOING").update(status="PENDING") 但是 - 在这种情况下,我认为直接过滤查询会更有意义。

标签: django django-models django-forms celery periodic-task


【解决方案1】:

解决方案 1

要按照您的计划更新数据库记录,您可以这样做:

&lt;your_app&gt;/models.py

from django.db import models
from django.utils import timezone


class Task(models.Model):

    STATUS_PENDING = "pending"
    STATUS_ONGOING = "ongoing"

    STATUS_CHOICES = (
        (STATUS_ONGOING, "ongoing"),
        (STATUS_PENDING, "pending"),
    )

    status = models.CharField(
        max_length=15, default=STATUS_ONGOING, choices=STATUS_CHOICES
    )
    title = models.CharField(max_length=30)
    deadline = models.DateTimeField(auto_now_add=True)
    complete = models.BooleanField(default=False)

    def __str__(self):
        return self.title

&lt;your_app&gt;/tasks.py

from django.utils import timezone
from celery.task.schedules import crontab
from celery.decorators import periodic_task

from .models import Task


@periodic_task(run_every=crontab(hour=0, minute=0))
def every_night():

    qs = Task.objects.filter(deadline__lt=timezone.now(), status=Task.STATUS_ONGOING)

    # Update whole queryset at once
    qs.update(status=Task.STATUS_PENDING)

    # Alternatively update one by one (e.g. if you need signals to be fired)
    # for task in qs:
    #     task.status = Task.STATUS_PENDING
    #     task.save()

确保运行 celery "beat" 来触发周期性任务:

celery -A app worker -B 

解决方案 2

但是 - 对于所描述的情况,老实说,我不明白为什么要绕道在一个不同的数据库字段中添加信息,无论如何每个记录都可以轻松地得出这些信息。这增加了数据库本身的冗余,似乎不需要。

为什么不直接使用Manager 来轻松获取所需的模型实例?

from django.db import models
from django.utils import timezone


class TaskManager(models.Manager):
    def pending(self):
        return (
            self.get_queryset()
            .filter(deadline__lt=timezone.now())
            .exclude(complete=True)
        )


class Task(models.Model):

    title = models.CharField(max_length=30)
    deadline = models.DateTimeField(auto_now_add=True)
    complete = models.BooleanField(default=False)

    objects = TaskManager()

    def __str__(self):
        return self.title

并获得“待定”查询集:Task.objects.pending()

【讨论】:

    猜你喜欢
    • 2018-11-28
    • 1970-01-01
    • 2015-10-28
    • 2017-06-15
    • 2017-05-12
    • 2020-09-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多