【问题标题】:when using Pika BlockingConnection, Does basic_ack() has to be placed in callback function使用 Pika BlockingConnection 时,basic_ack() 是否必须放在回调函数中
【发布时间】:2017-05-08 05:16:31
【问题描述】:

假设我已经连接到 RabbitMQ,如下所示:

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue=getting_from_this_queue)
channel.basic_consume(
    callback, queue=getting_from_this_queue, no_ack=False)
channel.basic_qos( prefetch_count = 3 )

为了实现更好的并发性,我尝试将每个作业都放在一个内部队列中,并创建了一个 while 循环来为从这个内部队列中检索到的每个作业异步调度工作人员:

from Queue import Queue
from multiprocessing.dummy import Pool as ThreadPool

task_queue = Queue(10)
pool = Pool(20)

def worker(ch, method, job):
    # ...some heavy lifting...
     if job_gets_done:         # some abstraction
        print "job success"
        ch.basic_ack(delivery_tag=method.delivery_tag)   # PROBLEM : this seems not working
     else:
        print "job failed"

def callback(ch, method, properties, job):
     task_queue.put((ch,method,dn))     # put job in internal queue, block if full.

@threaded
def async_process_jobs():              # loop to get job and start thread worker.
    while True:
         params = task_queue.get()
         pool.apply_async( worker, params )   # param = (ch,method, job)


async_process_jobs()
channel.start_consuming()

问题是,当正在处理作业时,没有一个正在正确发送确认(即使执行流程确实通过了它,即打印“作业成功”)。 rabbitmq 上的队列大小保持不变,为什么?

somewhat official tutorial 中,basic_ack() 被放置在 callback() 中,但我的没有。这可能是问题的根源吗?


详细行为(可能不重要):假设我在队列中有 10000 个作业,一开始,一些 2000 条消息进入未确认状态,然后所有消息都返回就绪状态,即使我的工人仍在处理和打印“作业成功”(acking)。

【问题讨论】:

    标签: rabbitmq multiprocessing pika


    【解决方案1】:

    来自FAQ of pika

    Pika 在代码中没有任何线程概念。如果你想 将 Pika 与线程一起使用,确保每个都有 Pika 连接 线程,在该线程中创建。分享一只鼠兔是不安全的 跨线程连接。

    【讨论】:

    • 顺便说一句,有许多类似的 Python 库实际上确实支持线程;例如rabbitpy。
    【解决方案2】:

    我遇到了类似的问题,我注意到: 如果工作很快完成,那么 ack 就可以了 但是如果这项工作花费更多时间,那么 ack 就不起作用,即使它发送出去。

    【讨论】:

      猜你喜欢
      • 2020-02-26
      • 1970-01-01
      • 2018-10-30
      • 2017-05-08
      • 1970-01-01
      • 1970-01-01
      • 2011-05-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多