【问题标题】:Celery / RabbitMQ - Find out the No Acks - Unacknowledged messagesCelery / RabbitMQ - 找出 No Acks - 未确认的消息
【发布时间】:2014-04-11 12:57:47
【问题描述】:

我正在尝试弄清楚如何获取有关未确认消息的信息。这些存储在哪里?在玩 celery inspect 时,似乎一旦消息得到确认,它就会处理,你可以跟踪状态。假设您有一个结果后端,那么您可以看到它的结果。但是从你应用延迟到它被确认它在一个黑洞中。

  1. noAcks 存储在哪里?
  2. 如何确定 noAcks 列表的“深度”?换句话说,有多少人,我的任务在列表中的哪个位置。

虽然我正在处理的问题与这里的问题不完全相关。

from celery.app import app_or_default

app = app_or_default()
inspect = app.control.inspect()

# Now if I want "RECEIVED" jobs.. 
data = inspect.reserved()

# or "ACTIVE" jobs.. 
data = inspect.active()

# or "REVOKED" jobs.. 
data = inspect.revoked()

# or scheduled jobs.. (Assuming these are time based??)
data = inspect.scheduled()

# FILL ME IN FOR UNACK JOBS!!
# data = inspect.??

# This will never work for tasks that aren't in one of the above buckets..
pprint.pprint(inspect.query_task([tasks]))

非常感谢您在这方面的建议和帮助。

【问题讨论】:

    标签: rabbitmq celery django-celery


    【解决方案1】:

    它们是inspect.reserved() 中具有'acknowleged': False 的那些任务

    from celery.app import app_or_default
    
    app = app_or_default()
    inspect = app.control.inspect()
    
    # those that have been sent to a worker and are thus reserved
    # from being sent to another worker, but may or may not be acknowledged as received by that worker
    data = inspect.reserved()
    
    {'celery.tasks': [{'acknowledged': False,
                   'args': '[]',
                   'delivery_info': {'exchange': 'tasks',
                                     'priority': None,
                                     'routing_key': 'celery'},
                   'hostname': 'celery.tasks',
                   'id': '527961d4-639f-4002-9dc6-7488dd8c8ad8',
                   'kwargs': '{}',
                   'name': 'globalapp.tasks.task_loop_tick',
                   'time_start': None,
                   'worker_pid': None},
                  {'acknowledged': False,
                   'args': '[]',
                   'delivery_info': {'exchange': 'tasks',
                                     'priority': None,
                                     'routing_key': 'celery'},
                   'hostname': 'celery.tasks',
                   'id': '09d5b726-269e-48d0-8b0e-86472d795906',
                   'kwargs': '{}',
                   'name': 'globalapp.tasks.task_loop_tick',
                   'time_start': None,
                   'worker_pid': None},
                  {'acknowledged': False,
                   'args': '[]',
                   'delivery_info': {'exchange': 'tasks',
                                     'priority': None,
                                     'routing_key': 'celery'},
                   'hostname': 'celery.tasks',
                   'id': 'de6d399e-1b37-455c-af63-a68078a9cf7c',
                   'kwargs': '{}',
                   'name': 'globalapp.tasks.task_loop_tick',
                   'time_start': None,
                   'worker_pid': None}],
     'fastlane.tasks': [],
     'images.tasks': [],
     'mailer.tasks': []}
    

    【讨论】:

    • 很好 - 这是在哪里记录的?
    • 我刚从您的代码开始,然后注意到未确认的代码被保留了。昨天我发现如果你安装 celery 3.0.11,它会安装 billiard 3.x,它实际上不适用于 celery 3.0;所以任务没有得到确认并且没有开始时间。台球
    • 这很棒。谢谢,我不知道为什么我之前没有看到或想到这个。您能否确认这是否适用于 Celery 3.1x(当前)?
    • celery 是一团混乱的选项、文档、多种做事方法和许多版本。不知道它是否适用于 3.1 抱歉:)
    【解决方案2】:

    经过数小时审查芹菜后,我得出结论,使用纯芹菜是不可能的。但是,可以松散地跟踪整个过程。这是我用来查找未确认计数的代码。大部分这些都可以使用 celery 中的实用程序来完成。

    我仍然无法通过 id 查询底层未确认的任务,但是..

    如果您安装了RabbitMQ management plug-in,您可以查询API

        data = {}
        base_url = "http://localhost:55672"
        url = base_url + "/api/queues/{}/".format(vhost)
        req = requests.get(url, auth=(settings.RABBITMQ_USER, settings.RABBITMQ_PASSWORD))
        if req.status_code != 200:
            log.error(req.text)
        else:
            request_data = req.json()
            for queue in request_data:
                # TODO if we know what queue the task is then we can nail this.
                if queue.get('name') == "celery":
                    data['state'] = "Unknown"
                    if queue.get('messages'):
                        data['messages'] = queue.get('messages')
                        data['messages_ready'] = queue.get('messages_ready')
                        data['messages_unacknowledged'] = queue.get('messages_unacknowledged')
                    break
        return data 
    

    【讨论】: