【发布时间】:2017-05-08 05:16:31
【问题描述】:
假设我已经连接到 RabbitMQ,如下所示:
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue=getting_from_this_queue)
channel.basic_consume(
callback, queue=getting_from_this_queue, no_ack=False)
channel.basic_qos( prefetch_count = 3 )
为了实现更好的并发性,我尝试将每个作业都放在一个内部队列中,并创建了一个 while 循环来为从这个内部队列中检索到的每个作业异步调度工作人员:
from Queue import Queue
from multiprocessing.dummy import Pool as ThreadPool
task_queue = Queue(10)
pool = Pool(20)
def worker(ch, method, job):
# ...some heavy lifting...
if job_gets_done: # some abstraction
print "job success"
ch.basic_ack(delivery_tag=method.delivery_tag) # PROBLEM : this seems not working
else:
print "job failed"
def callback(ch, method, properties, job):
task_queue.put((ch,method,dn)) # put job in internal queue, block if full.
@threaded
def async_process_jobs(): # loop to get job and start thread worker.
while True:
params = task_queue.get()
pool.apply_async( worker, params ) # param = (ch,method, job)
async_process_jobs()
channel.start_consuming()
问题是,当正在处理作业时,没有一个正在正确发送确认(即使执行流程确实通过了它,即打印“作业成功”)。 rabbitmq 上的队列大小保持不变,为什么?
在somewhat official tutorial 中,basic_ack() 被放置在 callback() 中,但我的没有。这可能是问题的根源吗?
详细行为(可能不重要):假设我在队列中有 10000 个作业,一开始,一些 2000 条消息进入未确认状态,然后所有消息都返回就绪状态,即使我的工人仍在处理和打印“作业成功”(acking)。
【问题讨论】:
标签: rabbitmq multiprocessing pika