【问题标题】:Stop a celery worker processing tasks OR monitor 'unconsumed' tasks in Flower停止芹菜工人处理任务或监控 Flower 中的“未使用”任务
【发布时间】:2018-05-10 19:37:13
【问题描述】:

假设我在 celery 队列上的所有任务都在访问第 3 方 API。但是,API 有一个速率限制,我正在跟踪它(我需要遵守一天限制和每小时限制)。一旦达到速率限制,我想暂停新任务的消费,然后在我知道我很好时恢复。

我通过以下两个任务实现了这一点:

@celery.task()
def cancel_api_queue(minutes_to_resume):
    resume_api_queue.apply_async(countdown=minutes_to_resume*60, queue='celery')
    celery.control.cancel_consumer('third_party', reply=True)

@celery.task(default_retry_delay=300, max_retries=5)
def resume_api_queue():
    celery.control.add_consumer('third_party', destination=['y@local'])

然后我可以继续提交我的 3rd 方 API 任务,一旦我的消费者被添加回来,我的所有任务都会被消耗掉。太好了。

但是,由于我在此队列中没有消费者,这似乎意味着我再也看不到 Flower 中提交的作业(直到我的消费者被添加)。 p>

我做错了什么吗?我能否以另一种方式实现这种“暂停”,让我继续看到提交的作业?

附言也许这与这个问题有关,但不是 100% 肯定:https://github.com/celery/celery/issues/1452

如果有影响,我正在使用 amqp 代理。

感谢女孩和男孩。

【问题讨论】:

    标签: python celery flower


    【解决方案1】:

    我怀疑在工作人员获取队列消息之前查看它们的内容并不是 Flower 预期设计的一部分。因此,如果您停止使用队列中的任务,Flower 可以做的最好的事情是在“代理”窗格中显示其中有多少已作为单个数字排队。

    观察传入任务内部的一种骇人听闻的方法可能是添加一个中间虚拟“转发”任务,该任务只是将消息从一个队列(我们称之为query_inbox)转发到另一个队列(例如,@987654322 @)。

    例如类似:

    @celery.task(queue='query_inbox')
    def query(params):
        process_query.delay(params)
    
    @celery.task(queue='query_processing')
    def process_query(params):
        ... do rate-limited stuff ...
    

    现在您可以停止使用来自query_processing 的任务,但您仍然可以在它们流经query_inbox 工作线程时观察它们的参数。

    【讨论】:

    • 谢谢@KT。 - 所以我们确实想到了额外的排队技巧 - 很高兴看到我们并没有完全愚蠢 - 但看起来很可怕。 - 现在我只是接受我不知道消费者停车时提交的确切工作。事实上,同意我们可以在经纪人栏目中看到工作。似乎取消消费者不是最好的主意,但我想现在就可以了。
    • 我不认为拥有各种中间队列一定是愚蠢或可怕的。事实上,“企业架构”,就我从著名的 Fowler 的书中模糊地记得,很多关于以奇特的方式将消息从队列路由到队列:)
    猜你喜欢
    • 2015-05-28
    • 2014-05-29
    • 2015-04-11
    • 2015-04-01
    • 2014-10-02
    • 2017-04-23
    • 1970-01-01
    • 2013-05-05
    • 2018-07-04
    相关资源
    最近更新 更多