【发布时间】:2015-04-14 15:50:46
【问题描述】:
我有一个 Python 程序,它充当 RabbitMQ 的消费者。一旦它从其队列中接收到作业,我希望程序使用多处理拆分作业,但我遇到了多处理的后勤问题。
为了便于阅读,我已经简化了代码。
我的 RabbitMQ 消费者功能:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue="JobReader", durable=True)
logging.info('Waiting for messages..')
def callback(ch, method, properties, body):
job_info = json.loads(body)
logging.info('Start Time: ' + time.strftime("%H:%M:%S"))
split_jobs = split_job(job_info)
process_manager.runProcesses(split_jobs)
ch.basic_ack(delivery_tag=method.delivery_tag)
我的多处理功能:
#!/usr/bin/python
import multiprocessing
import other_package
def worker_process(sub_job):
other_package.run_job(sub_job)
def runProcesses(jobs):
processes = []
for sub_job in jobs:
p = multiprocessing.Process(target=worker_process, args=(sub_job,))
processes.append(p)
p.start()
当然,我不能做if __name__ == '__main__':,因为它在一个函数中。
我不确定是否有使用多处理的解决方法,或者我是否只是以错误的方式处理这个问题。任何帮助将不胜感激。
【问题讨论】:
-
如果可能的话,我建议先拆分作业,然后再将它们放入队列。然后在他们自己的进程中运行几个消费者。如果这样的逻辑对您不起作用,请更详细地解释您的问题。
标签: python multiprocessing rabbitmq python-multiprocessing