【问题标题】:Pika: Consume the next message even the last message was not acknowledgedPika:即使最后一条消息没有被确认,也消费下一条消息
【发布时间】:2016-06-16 09:21:17
【问题描述】:

对于服务器自动化,我们正在尝试开发一种工具,该工具可以在不同的服务器上处理和执行大量任务。我们将任务和服务器主机名发送到队列中。然后从请求者使用队列,请求者将信息提供给 ansible api。为了实现这一点,我们可以一次执行多个任务,我们正在使用线程。

现在我们被消息的确认卡住了......

到目前为止我们做了什么:
requester.py 消耗队列并启动一个线程,ansible 任务正在其中运行。然后将结果发送到另一个队列。因此,每条新消息都会创建一个新线程。任务完成了吗,线程死了。
但现在是困难的部分。我们必须使消息持久化,以防我们的服务器死机。因此,每条消息都应该在 来自 ansible 的结果被发回之后得到确认。

我们现在的问题是,当我们尝试在线程本身中确认消息时,没有更多的“同时”工作完成,因为 pika 的consume 等待确认。那么我们如何实现consume 消费消息而不等待确认呢?或者我们如何解决或改进我们的小程序?

requester.py

  #!/bin/python

  from worker import *
  import ansible.inventory
  import ansible.runner
  import threading

  class Requester(Worker):
      def __init__(self):
          Worker.__init__(self)
          self.connection(self.selfhost, self.from_db)
          self.receive(self.from_db)

      def send(self, result, ch, method):
          self.channel.basic_publish(exchange='',
                                     routing_key=self.to_db,
                                     body=result,
                                     properties=pika.BasicProperties(
                                                     delivery_mode=2,
                                     ))

          print "[x] Sent \n" + result
          ch.basic_ack(delivery_tag = method.delivery_tag)

      def callAnsible(self, cmd, ch, method):
          #call ansible api pre 2.0

          result =  json.dumps(result, sort_keys=True, indent=4, separators=(',', ': '))
          self.send(result, ch, method)

      def callback(self, ch, method, properties, body):
          print(" [x] Received by requester %r" % body)
          t = threading.Thread(target=self.callAnsible, args=(body,ch,method,))
          t.start()

worker.py

  import pika
  import ConfigParser
  import json
  import os

  class Worker(object):
      def __init__(self):
          #read some config files

      def callback(self, ch, method, properties, body):
          raise Exception("Call method in subclass")

      def receive(self, queue):
          self.channel.basic_qos(prefetch_count=1)
          self.channel.basic_consume(self.callback,queue=queue)
          self.channel.start_consuming()

      def connection(self,server,queue):
          self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                 host=server,
                 credentials=self.credentials))
          self.channel = self.connection.channel()
          self.channel.queue_declare(queue=queue, durable=True)

我们正在使用 Python 2.7 和 pika 0.10.0。

是的,我们在鼠兔常见问题解答中注意到:http://pika.readthedocs.io/en/0.10.0/faq.html
鼠兔不是线程安全的。

【问题讨论】:

    标签: python multithreading rabbitmq pika


    【解决方案1】:

    禁用自动确认并将预取计数设置为大于 1,具体取决于您希望消费者接收多少条消息。

    这里是如何设置预取 channel.basic_qos(prefetch_count=1),找到here

    【讨论】:

    • 太棒了!谢谢!我怎样才能锁定这个预取计数。这一切都很神奇。
    • @Rumpli 我已将此添加到答案中。现在我要在这里自讨苦吃,但由于你是新来的,我将很快解释关于投票和接受答案的内容:如果答案对你有帮助,请投赞成票。如果它解决了您的问题,请给它一个赞成票并接受。在这里,您只接受了没有投票,但您没有尝试它是否有效:) 也许现在只是投票,一旦您验证接受也是如此。如果我没有正确解释此投票/接受,请有人纠正我。
    • 感谢您的解释。我尝试了这一点,并将“channel.basic_qos(prefetch_count=1)”设置为“1”以上,它当时执行的任务不止一项。我试图支持你的回答,但只要我没有 15 名声望,它就不会显示它...... :(
    猜你喜欢
    • 1970-01-01
    • 2015-12-02
    • 1970-01-01
    • 2019-04-22
    • 1970-01-01
    • 2012-10-08
    • 2016-11-13
    • 2021-11-25
    • 1970-01-01
    相关资源
    最近更新 更多