【问题标题】:How to restart consumer and consume rejected messages如何重启消费者并消费被拒绝的消息
【发布时间】:2014-11-06 21:24:22
【问题描述】:

使用txamqp客户端,具体场景如下:

  1. 声明一个名为“消息传递”的交换(type=topic)
yield amqp.chan.exchange_declare(exchange='messaging', type='topic')
  1. 设置消费者
yield amqp.named_queue_declare(queue="submit.sm_all")
yield amqp.chan.queue_bind(queue="submit.sm_all", exchange="messaging", routing_key="submit.sm.*")
yield amqp.chan.basic_consume(queue="submit.sm_all", no_ack=False, consumer_tag='qtag')
  1. 发布 50 条消息
   for i in range(50):
       yield amqp.publish(exchange='messaging', routing_key="submit.sm.connector01", 
           content=Content(str(i)))
  1. 使用回调启动消费者,将所有消费的消息重新排队
queue = yield amqp.client.queue('qtag')
queue.get().addCallback(self._callback_reject_and_requeue_all).addErrback(self._errback)
  1. 5 秒后停止消费者:
yield queue.close()

在这个阶段,队列仍然充满了 50 条消息,因为它们都被拒绝并重新排队(回调被触发了很多次)。

  1. 再次启动消费者:
queue = yield amqp.client.queue('qtag')
queue.get().addCallback(self._callback).addErrback(self._errback)
  1. 5 秒后停止消费者:
yield queue.close()

问题是在第 6 步启动消费者后从未触发回调,并且队列仍然充满 50 条消息。

注意:

  • 消息被拒绝如下:
yield amqp.chan.basic_reject(delivery_tag=message.delivery_tag, requeue = 1)

【问题讨论】:

    标签: python python-2.7 rabbitmq twisted amqp


    【解决方案1】:

    消息是否被拒绝没有区别 - 它将驻留在队列顶部并且可以被任何消费者选择(或者如果您使用 TTL 或长度限制并且将达到这样的限制,则将从队列中删除) .

    您也不能只使用以前被拒绝的消息,因为它不能在服务器端定义。实际上,您只能从队列中消费一条消息(它们是严格的 FIFO 队列)。

    作为一种解决方法,您可以设置Dead Letter Exchanges 并拒绝带有requeue=false 的邮件,然后它们将根据 DLX 路由流程移动到您的目标队列。然后你可以从那里消费被拒绝的消息,但一般来说,除非需要特殊的逻辑,否则通常会通过重新排队拒绝消息到它们被消费的同一队列。

    而且你还可以在你想拒绝的地方重新发布你想要拒绝的消息,它甚至听起来有点生疏。

    附注:

    请注意,当您在函数上调用 yield 时,它不会运行函数体,而是返回生成器对象

    【讨论】:

    • 你的意思是被拒绝的消息会进入当前队列吗?如果是这种情况,那么当我用同一个队列再次启动我的消费者时,为什么它看不到被拒绝的消息?
    • 我添加了关于产量声明的注释。用return 代替yield 怎么样?它应该会有所帮助。
    • 与yield无关,它只是阻塞直到我从调用中得到响应,我用它来简化代码而不是使用回调(amqp对象是一个使用twisted框架的txamqp客户端,一个基于事件)
    【解决方案2】:

    为了彻底停止消费者(第 5 步),必须使用 basic_cancel:

    1. 5 秒后停止消费者:

      产生 amqp.chan.basic_cancel(consumer_tag = 'qtag')

    2. 再次启动消费者:

      yield amqp.chan.basic_consume(queue="submit.sm_all", no_ack=False, consumer_tag='qtag') queue = yield amqp.client.queue('qtag') queue.get().addCallback(self._callback).addErrback(self._errback)

    【讨论】:

      猜你喜欢
      • 2020-08-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-05-01
      • 1970-01-01
      • 2023-02-20
      相关资源
      最近更新 更多