【问题标题】:Python - Asynch Multiprocessing from RabbitMQ ConsumerPython - 来自 RabbitMQ 消费者的异步多处理
【发布时间】:2015-04-14 15:50:46
【问题描述】:

我有一个 Python 程序,它充当 RabbitMQ 的消费者。一旦它从其队列中接收到作业,我希望程序使用多处理拆分作业,但我遇到了多处理的后勤问题。

为了便于阅读,我已经简化了代码。

我的 RabbitMQ 消费者功能:

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue="JobReader", durable=True)
logging.info('Waiting for messages..')


def callback(ch, method, properties, body):
    job_info = json.loads(body)

    logging.info('Start Time: ' + time.strftime("%H:%M:%S"))

    split_jobs = split_job(job_info)

    process_manager.runProcesses(split_jobs)

    ch.basic_ack(delivery_tag=method.delivery_tag)

我的多处理功能:

#!/usr/bin/python

import multiprocessing
import other_package


def worker_process(sub_job):
    other_package.run_job(sub_job)


def runProcesses(jobs):
    processes = []
    for sub_job in jobs:
        p = multiprocessing.Process(target=worker_process, args=(sub_job,))
        processes.append(p)

        p.start()

当然,我不能做if __name__ == '__main__':,因为它在一个函数中。

我不确定是否有使用多处理的解决方法,或者我是否只是以错误的方式处理这个问题。任何帮助将不胜感激。

【问题讨论】:

  • 如果可能的话,我建议先拆分作业,然后再将它们放入队列。然后在他们自己的进程中运行几个消费者。如果这样的逻辑对您不起作用,请更详细地解释您的问题。

标签: python multiprocessing rabbitmq python-multiprocessing


【解决方案1】:

您可以重构 multiprocessing 片段,以便从主脚本初始化其状态:

import process_manager
...

def callback(ch, method, properties, body):
    job_info = json.loads(body)
    logging.info('Start Time: ' + time.strftime("%H:%M:%S"))
    split_jobs = split_job(job_info)
    manager.runProcesses(split_jobs)
    ch.basic_ack(delivery_tag=method.delivery_tag)


if __name__ == "__main__":
    manager = process_manager.get_manager()
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.queue_declare(queue="JobReader", durable=True)
    logging.info('Waiting for messages..')

然后process_manager 看起来像这样:

import multiprocessing
import other_package

def worker_process(sub_job):
    other_package.run_job(sub_job)

_manager = None

def get_manager(): # Note that you don't have to use a singleton here
    global _manager
    if not _manager:
        _manager = Manager()
    return _manager


class Manager(object):
    def __init__(self):
        self._pool = multiprocessing.Pool()

    def runProcesses(self, jobs):
        self._pool.map_async(worker_process, jobs)

请注意,我使用 Pool 而不是为每个作业生成 Process,因为这可能无法很好地扩展。

【讨论】:

  • 啊,这更有意义。谢谢!
猜你喜欢
  • 1970-01-01
  • 2021-04-30
  • 1970-01-01
  • 2016-05-04
  • 1970-01-01
  • 2019-01-20
  • 1970-01-01
  • 2015-10-03
  • 1970-01-01
相关资源
最近更新 更多