【问题标题】:Running "unique" tasks with celery用 celery 运行“独特”的任务
【发布时间】:2011-05-05 00:01:42
【问题描述】:

我使用 celery 来更新我的新闻聚合网站中的 RSS 提要。我为每个提要使用了一个@task,一切似乎都很好。

虽然有一个细节我不确定如何处理好:所有提要每分钟使用@periodic_task 更新一次,但是如果启动新任务时提要仍在从上一个定期任务更新怎么办? (例如,如果提要真的很慢,或者离线并且任务处于重试循环中)

目前我存储任务结果并检查它们的状态,如下所示:

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
from aggregator.models import Feed


_results = {}


@periodic_task(run_every=timedelta(minutes=1))
def fetch_articles():
    for feed in Feed.objects.all():
        if feed.pk in _results:
            if not _results[feed.pk].ready():
                # The task is not finished yet
                continue
        _results[feed.pk] = update_feed.delay(feed)


@task()
def update_feed(feed):
    try:
        feed.fetch_articles()
    except socket.error, exc:
        update_feed.retry(args=[feed], exc=exc)

也许有一种更复杂/更强大的方法可以使用我错过的一些 celery 机制来实现相同的结果?

【问题讨论】:

    标签: python django celery


    【解决方案1】:

    根据 MattH 的回答,您可以使用这样的装饰器:

    def single_instance_task(timeout):
        def task_exc(func):
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                lock_id = "celery-single-instance-" + func.__name__
                acquire_lock = lambda: cache.add(lock_id, "true", timeout)
                release_lock = lambda: cache.delete(lock_id)
                if acquire_lock():
                    try:
                        func(*args, **kwargs)
                    finally:
                        release_lock()
            return wrapper
        return task_exc
    

    那么,就这样使用吧……

    @periodic_task(run_every=timedelta(minutes=1))
    @single_instance_task(60*10)
    def fetch_articles()
        yada yada...
    

    【讨论】:

    • 正是我需要的!谢谢!
    • 谢谢;为我工作!但是请注意,这实际上不适用于默认的 django CACHES,因为默认设置为本地内存缓存,这意味着每个进程都有自己的缓存,因此每个 celery worker(进程)都将拥有自己的缓存......
    【解决方案2】:

    【讨论】:

    • 我没有看到这种方法有什么优越之处,它更复杂但基本上做同样的事情(并且使用 django 缓存来存储锁似乎很尴尬)
    • 哦,我错过了一个大细节,它使锁定过程和线程安全。
    • @LuperRouch 另一个与您的锁定机制相关的问题:它仅在只有一个工作人员在运行时才有效:)
    • 这里有一个使用redis存储锁的方法:loose-bits.com/2010/10/distributed-task-locking-in-celery.html
    • official documentation 中的此链接在 django 环境中不运行 celery 时非常无用,因为它依赖于设置缓存键并在任务完成后释放它。有没有人尝试过使用 multiprocessing.Semaphore 的方法来防止单个工作人员的任务同时执行?
    【解决方案3】:

    使用https://pypi.python.org/pypi/celery_once 似乎做得非常好,包括报告错误和针对某些参数进行唯一性测试。

    您可以执行以下操作:

    from celery_once import QueueOnce
    from myapp.celery import app
    from time import sleep
    
    @app.task(base=QueueOnce, once=dict(keys=('customer_id',)))
    def start_billing(customer_id, year, month):
        sleep(30)
        return "Done!"
    

    只需要在您的项目中进行以下设置:

    ONCE_REDIS_URL = 'redis://localhost:6379/0'
    ONCE_DEFAULT_TIMEOUT = 60 * 60  # remove lock after 1 hour in case it was stale
    

    【讨论】:

      【解决方案4】:

      如果您正在寻找不使用 Django 的示例,那么 try this example(警告:使用 Redis 代替,我已经在使用)。

      装饰器代码如下(完全归功于文章作者,去阅读吧)

      import redis
      
      REDIS_CLIENT = redis.Redis()
      
      def only_one(function=None, key="", timeout=None):
          """Enforce only one celery task at a time."""
      
          def _dec(run_func):
              """Decorator."""
      
              def _caller(*args, **kwargs):
                  """Caller."""
                  ret_value = None
                  have_lock = False
                  lock = REDIS_CLIENT.lock(key, timeout=timeout)
                  try:
                      have_lock = lock.acquire(blocking=False)
                      if have_lock:
                          ret_value = run_func(*args, **kwargs)
                  finally:
                      if have_lock:
                          lock.release()
      
                  return ret_value
      
              return _caller
      
          return _dec(function) if function is not None else _dec
      

      【讨论】:

      • 这可以在rabbitMQ中做到吗?
      【解决方案5】:

      我想知道为什么没有人提到使用celery.app.control.inspect().active() 来获取当前正在运行的任务列表。不是实时的吗?因为否则它会很容易实现,例如:

      def unique_task(callback,  *decorator_args, **decorator_kwargs):
          """
          Decorator to ensure only one instance of the task is running at once.
          """
          @wraps(callback)
          def _wrapper(celery_task, *args, **kwargs):
              active_queues = task.app.control.inspect().active()
              if active_queues:
                  for queue in active_queues:
                      for running_task in active_queues[queue]:
                          # Discard the currently running task from the list.
                          if task.name == running_task['name'] and task.request.id != running_task['id']:
                              return f'Task "{callback.__name__}()" cancelled! already running...'
      
              return callback(celery_task, *args, **kwargs)
      
          return _wrapper
      
      

      然后只是将装饰器应用于相应的任务:

      @celery.task(bind=True)
      @unique_task
      def my_task(self):
          # task executed once at a time.
          pass
      
      

      【讨论】:

        【解决方案6】:

        此解决方案适用于 celery 在单个主机上工作,并发性大于 1。其他类型(没有 redis 等依赖项)基于文件的锁差异不适用于并发性大于 1。

        class Lock(object):
            def __init__(self, filename):
                self.f = open(filename, 'w')
        
            def __enter__(self):
                try:
                    flock(self.f.fileno(), LOCK_EX | LOCK_NB)
                    return True
                except IOError:
                    pass
                return False
        
            def __exit__(self, *args):
                self.f.close()
        
        
        class SinglePeriodicTask(PeriodicTask):
            abstract = True
            run_every = timedelta(seconds=1)
        
            def __call__(self, *args, **kwargs):
                lock_filename = join('/tmp',
                                     md5(self.name).hexdigest())
                with Lock(lock_filename) as is_locked:
                    if is_locked:
                        super(SinglePeriodicTask, self).__call__(*args, **kwargs)
                    else:
                        print 'already working'
        
        
        class SearchTask(SinglePeriodicTask):
            restart_delay = timedelta(seconds=60)
        
            def run(self, *args, **kwargs):
                print self.name, 'start', datetime.now()
                sleep(5)
                print self.name, 'end', datetime.now()
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2011-09-15
          • 1970-01-01
          • 2015-10-24
          • 2020-05-24
          • 2017-02-02
          • 2019-02-16
          • 2014-03-17
          • 1970-01-01
          相关资源
          最近更新 更多