【问题标题】:Django Celery tasks queueDjango Celery 任务队列
【发布时间】:2015-06-16 12:43:17
【问题描述】:

我有一个应用程序使用 django 使用 redis 和 celery 完成一些异步任务。 我正在使用 celery 任务来执行一些存储过程。此 SP 需要 5 分钟到 30 分钟才能完全执行(取决于记录的数量)。 一切都很好。 但我需要能够多次执行任务。但是现在当我运行任务并且另一个用户也运行任务时,这两个任务同时执行。 我需要任务进入队列并且仅在第一个任务完成时执行。 我的设置.py:

BROKER_URL = 'redis://localhost:6379/0'
CELERY_IMPORTS = ("pc.tasks", )
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_RESULT_BACKEND='djcelery.backends.cache:CacheBackend'

tasks.py

from __future__ import absolute_import
from celery.decorators import task
from celery import task, shared_task
from .models import Servicio, Proveedor, Lectura_FTP, Actualizar_Descarga
from .models import Lista_Archivos, Lista_Final, Buscar_Conci

@task
def carga_ftp():
    tabla = Proc_Carga()
    sp = tabla.carga()
    return None

@task
def conci(idprov,pfecha):
    conci = Buscar_Conci()
    spconc = conci.buscarcon(idprov,pfecha)

我是这样调用我视图中的任务的:

conci.delay(prov,DateV);

如何创建或设置任务的队列列表,并且每个任务仅在之前的任务完成时执行

提前致谢

【问题讨论】:

    标签: python django redis celery django-celery


    【解决方案1】:

    您可以限制工作人员的任务,因为我假设您一次只需要一名工作人员,所以在调用 djcelery 时只需启动一名工作人员。

    python manage.py celery worker -B --concurrency=1
    

    【讨论】:

    • 我只有一个疑问。我可以同时执行不同的任务来限制工人。仅对于“conci”任务我需要一个队列,对于其他任务没关系多次调用该任务
    【解决方案2】:

    您可以使用锁,例如(来自我的一个项目):

    def send_queued_emails(*args, **kwargs):
      from mailer.models import Message
      my_lock = redis.Redis().lock("send_mail")
    
      try:
        have_lock = my_lock.acquire(blocking=False)
        if have_lock:
            logging.info("send_mail lock ACQUIRED")
            from celery import group
    
            if Message.objects.non_deferred().all().count() > 0:
                t = EmailSenderTask()
                g = (group(t.s(message=msg) for msg in Message.objects.non_deferred().all()[:200]) | release_redis_lock.s(lock_name="send_mail"))
                g()
            else:
                logging.info("send_mail lock RELEASED")
                my_lock.release()
        else:
            logging.info("send_mail lock NOT ACQUIRED")
    
      except redis.ResponseError as e:
            logging.error("Redis throw exception : {}".format(e))
      except:
        my_lock.release()
        logging.error("send_mail lock RELEASED because of exception")
    

    【讨论】: