【发布时间】:2018-05-10 19:37:13
【问题描述】:
假设我在 celery 队列上的所有任务都在访问第 3 方 API。但是,API 有一个速率限制,我正在跟踪它(我需要遵守一天限制和每小时限制)。一旦达到速率限制,我想暂停新任务的消费,然后在我知道我很好时恢复。
我通过以下两个任务实现了这一点:
@celery.task()
def cancel_api_queue(minutes_to_resume):
resume_api_queue.apply_async(countdown=minutes_to_resume*60, queue='celery')
celery.control.cancel_consumer('third_party', reply=True)
@celery.task(default_retry_delay=300, max_retries=5)
def resume_api_queue():
celery.control.add_consumer('third_party', destination=['y@local'])
然后我可以继续提交我的 3rd 方 API 任务,一旦我的消费者被添加回来,我的所有任务都会被消耗掉。太好了。
但是,由于我在此队列中没有消费者,这似乎意味着我再也看不到 Flower 中提交的作业(直到我的消费者被添加)。 p>
我做错了什么吗?我能否以另一种方式实现这种“暂停”,让我继续看到提交的作业?
附言也许这与这个问题有关,但不是 100% 肯定:https://github.com/celery/celery/issues/1452
如果有影响,我正在使用 amqp 代理。
感谢女孩和男孩。
【问题讨论】: