【发布时间】:2014-11-06 21:24:22
【问题描述】:
使用txamqp客户端,具体场景如下:
- 声明一个名为“消息传递”的交换(type=topic)
yield amqp.chan.exchange_declare(exchange='messaging', type='topic')
- 设置消费者
yield amqp.named_queue_declare(queue="submit.sm_all") yield amqp.chan.queue_bind(queue="submit.sm_all", exchange="messaging", routing_key="submit.sm.*") yield amqp.chan.basic_consume(queue="submit.sm_all", no_ack=False, consumer_tag='qtag')
- 发布 50 条消息
for i in range(50): yield amqp.publish(exchange='messaging', routing_key="submit.sm.connector01", content=Content(str(i)))
- 使用回调启动消费者,将所有消费的消息重新排队
queue = yield amqp.client.queue('qtag') queue.get().addCallback(self._callback_reject_and_requeue_all).addErrback(self._errback)
- 5 秒后停止消费者:
yield queue.close()
在这个阶段,队列仍然充满了 50 条消息,因为它们都被拒绝并重新排队(回调被触发了很多次)。
- 再次启动消费者:
queue = yield amqp.client.queue('qtag') queue.get().addCallback(self._callback).addErrback(self._errback)
- 5 秒后停止消费者:
yield queue.close()
问题是在第 6 步启动消费者后从未触发回调,并且队列仍然充满 50 条消息。
注意:
- 消息被拒绝如下:
yield amqp.chan.basic_reject(delivery_tag=message.delivery_tag, requeue = 1)
【问题讨论】:
标签: python python-2.7 rabbitmq twisted amqp