【问题标题】:Consuming from queues based upon external event (event queues)基于外部事件从队列中消费(事件队列)
【发布时间】:2013-04-28 09:22:48
【问题描述】:

我遇到了一个用例,我想控制 celery 工作人员如何以及何时将任务从 rabbitmq 中取出来进行处理。出队将与 celery 上下文之外发生的外部事件同步,但我担心的是 celery 是否给我任何灵活性来控制任务的出队?我试图调查,以下是一些可能性:

  • 使用basic.get 代替basic.consume,其中basic.get 是根据外部事件触发的。但是,我看到 celery 默认为 basic.consume(推送)语义。我可以在不直接修改核心的情况下覆盖此行为吗?

  • 在触发外部事件时自定义远程控制工作人员。但是,从文档中我不太清楚远程控制命令如何帮助我控制任务的出列。

我非常倾向于继续使用 celery,并且可能避免在 AMQP 之上编写自定义队列处理解决方案。

【问题讨论】:

    标签: python erlang rabbitmq celery eventqueue


    【解决方案1】:

    需要考虑两个更奇特的选项:(1) 在 Rabbit 层中定义自定义交换类型。这允许您创建控制哪些任务发送到哪些队列的路由规则。 (2) 定义一个自定义的 Celery 中介。这允许您控制哪些任务从队列移动到工作池。

    【讨论】:

      【解决方案2】:

      使用远程控制命令,您可以pause or resume 来自给定队列的消息消费。

      celery.control.cancel_consumer('celery')
      

      上面的命令指示所有工作人员停止从默认 celery 队列中消费(出列)消息

      celery.control.add_consumer('celery')
      

      远程命令接受 destination 参数,允许向特定工作人员发送请求

      【讨论】:

      • 感谢您的意见。远程控制命令看起来确实是一个合理的选择。但是,看起来工作人员仍然会使用prefetch_count > 1 来处理任务,这违背了我的目的。有没有办法让这些消费者使用prefetch_count=1?具体来说,我希望 a)添加消费者 b)从指定队列中取出单个任务 c)禁用该队列的消费者,直到进一步的外部事件
      • 要为工作人员配置预取计数,请使用 CELERYD_PREFETCH_MULTIPLIER=1
      猜你喜欢
      • 2018-09-18
      • 2014-05-07
      • 1970-01-01
      • 1970-01-01
      • 2015-07-20
      • 2012-07-10
      • 1970-01-01
      • 1970-01-01
      • 2012-02-10
      相关资源
      最近更新 更多