你提到的inspect.reserved()/scheduled() 可能有效,但不是
总是准确的,因为它只考虑任务
工人已经预取。
Celery 不允许对队列进行带外操作,例如删除消息
从队列中取出,或重新排序它们,因为它不会在分布式系统中扩展。
消息可能尚未到达队列,这可能导致
在竞争条件下,实际上它不是具有事务性的顺序队列
操作,而是来自多个位置的消息流。
也就是说,Celery API 基于严格的消息传递语义。
可以在某些代理上直接访问队列
Celery 支持(如 Redis 或数据库),但这不是公共 API 的一部分,
并且不鼓励您这样做,但是当然,如果您不打算
支持大规模运营 你应该做任何对你来说最方便的事情
并放弃我的建议。
如果这只是为了让用户知道他的工作何时完成,那么
我相信你可以想出一个算法来预测任务何时执行,
如果您只知道队列的长度和每个任务的插入时间。
第一个只是redis.len("celery"),后者你可以
通过收听task_sent 信号来添加自己:
from celery.signals import task_sent
@task_sent.connect
def record_insertion_time(id, **kwargs):
redis.zadd("celery.insertion_times", id)
在此处使用排序集:http://redis.io/commands/zadd
对于纯消息传递解决方案,您可以使用专用监视器
它消耗 Celery 事件流并预测任务何时完成。
http://docs.celeryproject.org/en/latest/userguide/monitoring.html#event-reference
(刚刚注意到发送的任务缺少时间戳字段
文档,但时间戳随该事件一起发送,所以我会修复它)。
事件还包含一个“时钟”字段,它是一个逻辑时钟
(见http://en.wikipedia.org/wiki/Lamport_timestamps),
这可用于检测分布式事件的顺序
系统不依赖于每台机器上的系统时间
同步(这是不可能实现的)。