【发布时间】:2022-11-14 11:52:21
【问题描述】:
在 Django 应用程序中,我有一个表单可以安排发送电子邮件。它有四个字段:名称、电子邮件、正文、发送日期。我想动态创建一个 Celery 任务(电子邮件)以在指定时间运行另一个 Celery 任务。
我已经能够使用以下代码根据表单定期(每 30 秒)发送电子邮件:
schedule, _ = IntervalSchedule.objects.update_or_create(every=30, period=IntervalSchedule.SECONDS)
@shared_task(name="schedule_interval_email")
def schedule_email_interval(name, email, body, send_date):
PeriodicTask.objects.update_or_create(
defaults={
"interval": schedule,
"task": "email_task"
},
name="Send message at interval",
args=json.dumps(['name', 'test@test.com', 'body']),
)
但是,当我尝试通过 ClockedSchedule 安排任务在特定时间(当前时间 3 分钟后)运行时,Celery beat 会记录任务并保存所有相关设置。该任务在 Django 管理区域中显示为活动状态。但是,电子邮件从未真正发送过。
clocked = ClockedSchedule.objects.create(clocked_time=datetime.now() + timedelta(minutes=3))
@shared_task(name="schedule_clock_email")
def schedule_email_clocked(name, email, body, send_date):
PeriodicTask.objects.create(
clocked=clocked,
name="Send message at specific date/time",
task="email_task",
one_off=True,
args=json.dumps(['name', 'test@test.com', 'body']),
)
我最终想根据用户输入表单的日期时间动态设置时钟字段,因此当前代码只是试图测试 Celery 的工作方式。不过,我认为我遗漏了一些有关其工作原理的信息。任何想法将不胜感激。
【问题讨论】:
标签: python django celery django-celery celerybeat