【问题标题】:Django celery - start a periodic task after a set duration of the completion of the previous instanceDjango celery - 在完成前一个实例的设定持续时间后启动周期性任务
【发布时间】:2015-06-10 03:19:24
【问题描述】:

目前我有一个每 30 分钟运行一次的周期性任务。任务本身有时需要 30 多分钟才能完成。

我怎样才能改变周期性任务运行 30 分钟之后最后一次运行完成?

@periodic_task(run_every=timedelta(minutes=30), queue='activities', options={'queue': 'activities'})
def pull_activities_frequent_adaptors():
    adaptors_queryset = TrackingAppAdaptor.objects.adaptors_that_pull_activities_frequently()
    pull_activities_from_adaptors(adaptors_queryset)

【问题讨论】:

    标签: python django celery django-celery periodic-task


    【解决方案1】:

    Celery 不支持这种开箱即用的方法,但我过去不得不做类似的事情,我不得不自己偶然发现一个解决方案。

    根据我的经验,有两种比较直接的方法可以实现这一点,两者都需要权衡取舍。您还可以使用这些东西进入一些相当大的漏洞,因此请注意购买者

    选项 1:

    使用一些数据存储来保存有关何时运行任务和触发 celery beat 任务的信息。

    为此,您可以使用您的数据库和一个模型,其中包含有关周期性任务的一些信息。 (如果您想获得更多获取技术,您甚至可以直接与队列交谈并跳过模型路线。)

    from django.db import models
    
    class PeriodicTask(models.Model):
       lastrun = models.DateTimeField()
       nextrun = models.DateTimeField()
       notes = models.TextField()  # errors?
       task_id = models.CharField(max_length=100)
    

    这只是对模型可能存储的内容的粗略概念。您可以将任何有用的东西放在那里,但您需要一个日期时间对象来存储下一次运行的时间。

    接下来,您的周期性任务需要更频繁地运行以启动并查看是否有任何任务需要执行很快

    import datetime
    from .models import PeriodicTask
    
    @periodic_task(run_every=timedelta(minutes=2), queue='activities', options={'queue': 'activities'})
    def pull_activities_frequent_adaptors():
        now = datetime.datetime.utcnow()  # need to be clear about time-zones
        scheduled_tasks = PeriodicTask.objects.filter(nextrun__gte=now)
    
        if scheduled_tasks and scheduled_tasks.count() == 1: # more than one and we've erred somewhere
    
            timewindow = datetime.timedelta(minutes=5)
            if (scheduled_tasks[0].nextrun - now) <= timewindow:
                scheduled_tasks[0].delete()
                # Do the task
                # schedule the next one
                PeriodicTask.objects.create(
                     lastrun=now,
                     nextrun=now + datetime.timedelta(minutes=30))
    

    潜在问题:

    1) 如果您有多个具有主从设置的数据库,特别是如果您有滞后,您最终可能会进行双重调度(即使使用 count() == 1 部分)。因此,有一个值得考虑的竞争条件。

    2) 很难精确到 30 分钟,因为您必须使用时间窗口来查找要执行的任务。

    3) 任务需要比您的时间窗口更频繁地运行,否则您可能会错过它。这可能会浪费资源(但我想这并不算太糟糕),因为它通常会旋转而无所事事。

    4) 没有什么比处理日期时间更令人难以置信的了,因此您必须真正考虑时区问题并考虑所有变化并测试这段代码。

    5) 这是一个大问题:如果任务的运行时间比它计划的时间间隔长,那么您将有两个任务同时运行,这是一个问题。同样,在竞争条件下,事情可能会变得很冒险。

    选项 2)

    不要使用 celery beat:启动第一个任务并在 30 分钟后启动另一个任务。这有可能成为一个失控的巫师学徒类型的东西,所以我觉得它有点,嗯,可怕,虽然我做了第一个选择,但我从来没有真正说服自己去做下面的事情。但是,无论如何,我认为这是可以做到的:

    @task  # no longer a periodic task
    def your_task(args):
        # Whatever you want to do, then call itself again...
        your_task.apply_async(args=(args), countdown=1800)  
    

    现在你只需要在某个地方调用它,可能是在一个每周启动一次并杀死这个东西的任何以前版本的 cron 作业中(它是如何找到它们的?),然后启动第一个。

    我不得不说我不是很喜欢这个答案,即使我遇到过几次,这似乎是解决问题的一种更危险和不守规矩的方法。不过,如果有人这样做,我会很好奇。

    【讨论】:

      【解决方案2】:

      对不起,你不能完全做到这一点。

      由于有多个工作人员执行该作业,因此您实际上需要确保没有工作人员正在运行前一个任务。

      你可以做什么:

      1. 使用celery backend,您可以监控是否仍有任务在进行中并且不执行当前任务。这将允许您创建只执行一个任务的情况。

      2. 当一个任务完成后,你可以创建一个触发器来发送一个等待 30 分钟的新任务,你可以使用ETA实现它

      【讨论】:

        猜你喜欢
        • 2011-12-22
        • 1970-01-01
        • 2018-11-30
        • 2021-10-15
        • 2012-01-03
        • 2018-11-28
        • 1970-01-01
        • 1970-01-01
        • 2016-02-04
        相关资源
        最近更新 更多