【问题标题】:In celery how to get task position in queue?在芹菜中如何获得队列中的任务位置?
【发布时间】:2012-06-04 23:00:11
【问题描述】:

我使用 Celery 和 Redis 作为代理,我可以看到队列实际上是一个以序列化任务作为项目的 redis 列表。

我的问题是,如果我有一个 AsyncResult 对象作为调用 <task>.delay() 的结果,有没有办法确定该项目在队列中的位置?

更新:

我终于能够通过以下方式获得职位:

from celery.task.control import inspect
i = inspect()
i.reserved()

但它有点慢,因为它需要与所有工作人员沟通。

【问题讨论】:

    标签: python redis celery django-celery djcelery


    【解决方案1】:

    你提到的inspect.reserved()/scheduled() 可能有效,但不是 总是准确的,因为它只考虑任务 工人已经预取。

    Celery 不允许对队列进行带外操作,例如删除消息 从队列中取出,或重新排序它们,因为它不会在分布式系统中扩展。 消息可能尚未到达队列,这可能导致 在竞争条件下,实际上它不是具有事务性的顺序队列 操作,而是来自多个位置的消息流。 也就是说,Celery API 基于严格的消息传递语义。

    可以在某些代理上直接访问队列 Celery 支持(如 Redis 或数据库),但这不是公共 API 的一部分, 并且不鼓励您这样做,但是当然,如​​果您不打算 支持大规模运营 你应该做任何对你来说最方便的事情 并放弃我的建议。

    如果这只是为了让用户知道他的工作何时完成,那么 我相信你可以想出一个算法来预测任务何时执行, 如果您只知道队列的长度和每个任务的插入时间。

    第一个只是redis.len("celery"),后者你可以 通过收听task_sent 信号来添加自己:

    from celery.signals import task_sent
    
    @task_sent.connect
    def record_insertion_time(id, **kwargs):
       redis.zadd("celery.insertion_times", id)
    

    在此处使用排序集:http://redis.io/commands/zadd

    对于纯消息传递解决方案,您可以使用专用监视器 它消耗 Celery 事件流并预测任务何时完成。 http://docs.celeryproject.org/en/latest/userguide/monitoring.html#event-reference

    (刚刚注意到发送的任务缺少时间戳字段 文档,但时间戳随该事件一起发送,所以我会修复它)。

    事件还包含一个“时钟”字段,它是一个逻辑时钟 (见http://en.wikipedia.org/wiki/Lamport_timestamps), 这可用于检测分布式事件的顺序 系统不依赖于每台机器上的系统时间 同步(这是不可能实现的)。

    【讨论】: