【问题标题】:Prevent Celery Beat from running the same task防止 Celery Beat 运行相同的任务
【发布时间】:2015-10-22 05:38:44
【问题描述】:

我有一个计划的 celery 每 30 秒运行一次任务。我有一个每天作为任务运行,另一个每周在用户指定的时间和星期几运行。它检查“开始时间”和“下一个预定日期”。在任务完成之前,下一个计划日期不会更新。

但是,我想知道如何确保 celery beat 只运行一次任务。我现在看到,celery 将多次运行某个任务,直到该任务的下一个计划日期被更新。

【问题讨论】:

    标签: python django celery django-celery celerybeat


    【解决方案1】:

    为了做到这一点,您需要实现某种“分布式锁”,解决这个问题的一种简单可靠的方法是使用带有 memcached 后端的 django 缓存,并在任务开始时在其中设置一个“标志”就在它完成删除该标志之前。其他选择是使用“redis”锁作为“分布式锁”。 使用django缓存memcached作为后端的例子:

    @shared_task
    def my_task(arg1, arg2, lock_expire=300):
        lock_key = 'my_task'
        acquire_lock = lambda: cache.add(lock_key, '1', lock_expire)
        release_lock = lambda: cache.delete(lock_key)
    
        if acquire_lock():
            try:
                # Execute your code here!
            except Exception:
                # error handling here
            finally:
                # release allow other task to execute
                release_lock()
        else:
            print("Other task is running, skipping")
    

    上面的代码实现了一个“分布式锁”,以确保无论您尝试再次执行多少次,都只能运行一个任务。 锁只能由一个任务获取:),另一个将跳过“主块”并完成。 你觉得有意义吗?

    玩得开心!

    【讨论】:

    • 我试过了,但我的任务仍在重复。我还从我的 celery 日志中看到了这条消息:“ WARNING/Worker-3] /usr/lib/python2.7/site-packages/django/core/cache/backends/base.py:224: CacheKeyWarning: Cache key contains characters如果与 memcached 一起使用会导致错误:u':1:tasks object' CacheKeyWarning)"
    • 你用什么作为“钥匙”?您似乎正在尝试使用任务本身。在这种情况下,事情可能会出错。现在很好,您应该使用我之前指出的“字符串”,例如“my_only_one_task”。谢谢!
    • 嗨@mar​​tin-alderete,我对字符串应该做什么感到困惑。字符串应该与什么相关联?谢谢!
    • 嘿@shizznetz,我刚刚阅读了这条评论。根据您的要求,字符串“lock_key”应该是您的任务的标识符。例如,如果您想一次只为给定用户执行一项任务,在这种情况下,您可以使用 lock_key,如下所示: lock_key = 'my_task:{0}'.format(user.id) 希望您明白这一点!提前致谢!
    最近更新 更多