【问题标题】:Django Celery Periodic Tasks Run But RabbitMQ Queues Aren't ConsumedDjango Celery 定期任务运行但 RabbitMQ 队列未被消耗
【发布时间】:2012-10-27 20:56:56
【问题描述】:

问题

通过 celery 的周期性任务调度器运行任务后,beat,为什么我在 RabbitMQ 中还有这么多未使用的队列?

设置

  • 在 Heroku 上运行的 Django Web 应用程序
  • 通过 celery beat 安排的任务
  • 任务通过 celery worker 运行
  • 消息代理是来自 CouldAMQP 的 RabbitMQ

Procfile

web: gunicorn --workers=2 --worker-class=gevent --bind=0.0.0.0:$PORT project_name.wsgi:application
scheduler: python manage.py celery worker --loglevel=ERROR -B -E --maxtasksperchild=1000
worker: python manage.py celery worker -E --maxtasksperchild=1000 --loglevel=ERROR

settings.py

CELERYBEAT_SCHEDULE = {
    'do_some_task': {
        'task': 'project_name.apps.appname.tasks.some_task',
        'schedule': datetime.timedelta(seconds=60 * 15),
        'args': ''
    },
}

tasks.py

@celery.task
def some_task()
    # Get some data from external resources
    # Save that data to the database
    # No return value specified

结果

每次任务运行时,我都会(通过 RabbitMQ Web 界面):

  • 我的“已排队消息”下处于“就绪”状态的附加消息
  • 一个附加队列,其中有一条消息处于“就绪”状态
    • 此队列没有列出的消费者

【问题讨论】:

    标签: python django heroku rabbitmq celery


    【解决方案1】:

    它最终成为我对CELERY_RESULT_BACKEND 的设置。

    以前是:

    CELERY_RESULT_BACKEND = 'amqp'
    

    将 RabbitMQ 更改为后,我不再有未使用的消息/队列:

    CELERY_RESULT_BACKEND = 'database'
    

    发生的事情似乎是,在执行任务后,celery 正在通过rabbitmq 发送有关该任务的信息,但是,没有任何设置来使用这些响应消息,因此一堆未读的消息结束了在队列中。

    注意:这意味着 celery 将添加记录任务结果的数据库条目。为了防止我的数据库被无用的消息加载,我补充说:

    # Delete result records ("tombstones") from database after 4 hours
    # http://docs.celeryproject.org/en/latest/configuration.html#celery-task-result-expires
    CELERY_TASK_RESULT_EXPIRES = 14400
    

    Settings.py 中的相关部分

    ########## CELERY CONFIGURATION
    import djcelery
    # https://github.com/celery/django-celery/
    djcelery.setup_loader()
    
    INSTALLED_APPS = INSTALLED_APPS + (
        'djcelery',
    )
    
    # Compress all the messages using gzip
    # http://celery.readthedocs.org/en/latest/userguide/calling.html#compression
    CELERY_MESSAGE_COMPRESSION = 'gzip'
    
    # See: http://docs.celeryproject.org/en/latest/configuration.html#broker-transport
    BROKER_TRANSPORT = 'amqplib'
    
    # Set this number to the amount of allowed concurrent connections on your AMQP
    # provider, divided by the amount of active workers you have.
    #
    # For example, if you have the 'Little Lemur' CloudAMQP plan (their free tier),
    # they allow 3 concurrent connections. So if you run a single worker, you'd
    # want this number to be 3. If you had 3 workers running, you'd lower this
    # number to 1, since 3 workers each maintaining one open connection = 3
    # connections total.
    #
    # See: http://docs.celeryproject.org/en/latest/configuration.html#broker-pool-limit
    BROKER_POOL_LIMIT = 3
    
    # See: http://docs.celeryproject.org/en/latest/configuration.html#broker-connection-max-retries
    BROKER_CONNECTION_MAX_RETRIES = 0
    
    # See: http://docs.celeryproject.org/en/latest/configuration.html#broker-url
    BROKER_URL = os.environ.get('CLOUDAMQP_URL')
    
    # Previously, had this set to 'amqp', this resulted in many read / unconsumed
    # queues and messages in RabbitMQ
    # See: http://docs.celeryproject.org/en/latest/configuration.html#celery-result-backend
    CELERY_RESULT_BACKEND = 'database'
    
    # Delete result records ("tombstones") from database after 4 hours
    # http://docs.celeryproject.org/en/latest/configuration.html#celery-task-result-expires
    CELERY_TASK_RESULT_EXPIRES = 14400
    ########## END CELERY CONFIGURATION
    

    【讨论】:

    • 您好,我正在尝试在我的应用程序上实现相同的堆栈,但我无法做到这一点,您能否发布所有与 celery 和 rabbitmq 相关的设置?我会非常感激,它会帮助其他新手。
    • 当然,已添加。它基于出色的 django-skel 推荐。
    【解决方案2】:

    看起来您正在从已使用的任务中获得响应。

    您可以通过以下方式避免这种情况:

    @celery.task(ignore_result=True)
    

    【讨论】: