【发布时间】:2020-09-27 05:13:14
【问题描述】:
我的 celery 定期任务不起作用。我希望根据日期每晚更新我的数据库。这是我在应用程序目录中的 ptasks.py 文件:
'''
import datetime
import celery
from celery.task.schedules import crontab
from celery.decorators import periodic_task
from django.utils import timezone
@periodic_task(run_every=crontab(hour=0, minute=0))
def every_night():
tasks=Task.objects.all()
form=TaskForm()
if form.deadline<timezone.now() and form.staus=="ONGOING":
form.status="PENDING"
form.save()
'''
我在 settings.py 中使用 ampq:
'''
CELERY_BROKER_URL = 'amqp://guest:guest@localhost'
CELERYBEAT_SCHEDULER='djcelery.schedulers.DatabaseScheduler'
'''
这是我的models.py:
'''
from django.db import models
import datetime
import pytz
from django.utils import timezone
# Create your models here.
class Task(models.Model):
title=models.CharField(max_length=30)
complete=models.BooleanField(default=False)
created=models.DateTimeField(default=timezone.now())
comment=models.CharField(max_length=500, default="")
Tag1=models.CharField(max_length=10, default="Tag1")
deadline=models.DateTimeField(default=timezone.now())
status=models.CharField(max_length=15,default="ONGOING")
def __str__(self):
return self.title
'''
这是我的forms.py:
'''
from django import forms
from django.forms import ModelForm
from .models import *
class TaskForm(forms.ModelForm):
class Meta:
model=Task
fields='__all__'
'''
【问题讨论】:
-
运行 celery 的输出是什么?它找到你的任务了吗?与芹菜无关 - 但你为什么要使用表格?
-
@ohrstrom 我的管理员确实找到了任务,我正在使用一个表单,因为我想创建一个每晚自动更新和刷新的表单/清单。
-
您不需要表单来以编程方式更新实例 - 表单用于用户输入(和验证等)。如果您不想通过某些条件更新查询集,可以使用类似:
Task.objects.filter(deadline__lt=timezone.now(), status="ONGOING").update(status="PENDING")但是 - 在这种情况下,我认为直接过滤查询会更有意义。
标签: django django-models django-forms celery periodic-task