【问题标题】:Can't get multiprocessing to run processes concurrently无法让多处理同时运行进程
【发布时间】:2025-12-16 07:50:02
【问题描述】:

下面的代码似乎没有同时运行,我也不确定具体原因:

def run_normalizers(config, debug, num_threads, name=None):

    def _run():
        print('Started process for normalizer')
        sqla_engine = init_sqla_from_config(config)
        image_vfs = create_s3vfs_from_config(config, config.AWS_S3_IMAGE_BUCKET)
        storage_vfs = create_s3vfs_from_config(config, config.AWS_S3_STORAGE_BUCKET)

        pp = PipedPiper(config, image_vfs, storage_vfs, debug=debug)

        if name:
            pp.run_pipeline_normalizers(name)
        else:
            pp.run_all_normalizers()
        print('Normalizer process complete')

    threads = []
    for i in range(num_threads):
        threads.append(multiprocessing.Process(target=_run))
    [t.start() for t in threads]
    [t.join() for t in threads]


run_normalizers(...)

config 变量只是在_run() 函数之外定义的字典。所有的进程似乎都被创建了——但它并不比我用一个进程创建的快。基本上,run_**_normalizers() 函数中发生的事情是从数据库(SQLAlchemy)中的队列表中读取数据,然后发出一些 HTTP 请求,然后运行规范化器的“管道”来修改数据,然后将其保存回数据库。我来自 JVM 领域,那里的线程“繁重”并且经常用于并行性 - 我对此有点困惑,因为我认为多进程模块应该绕过 Python 的 GIL 的限制。

【问题讨论】:

  • 多处理模块使用进程,而不是线程。因此它不受 GIL 的影响。
  • 我已经测试了你的代码,基本技术没问题。我不确定共享的config,如果config 字典被大量使用,理论上可能会减慢速度。处理器可能不是您的瓶颈。
  • 我只在我的工作站上运行过它,8 核 16GB RAM Linux。对于 1 或 1、8 或 16 个进程,没有任何变化 - 系统资源很好。
  • 平均负载:0.42、0.31、0.24(这是不运行应用程序时)。这是在运行应用程序时:平均负载:0.59、0.53、0.34。我认为它不受 CPU 限制。
  • 而平均负载最终下降到预运行应用程序的速率,并且在某些时候略低于(我也有 X11 和 Firefox 以及我的工作站上没有运行的东西,所以我没有不认为这甚至在注册)。

标签: python multithreading concurrency multiprocess


【解决方案1】:

修复了我的多处理问题 - 并且实际上切换了线程。不知道实际上是什么修复了它的想法——我只是重新设计了所有东西,制作了工人和任务,还有什么不是,现在事情正在飞速发展。这是我所做的基本工作:

import abc
from Queue import Empty, Queue
from threading import Thread

class AbstractTask(object):
    """
        The base task
    """
    __metaclass__ = abc.ABCMeta

    @abc.abstractmethod
    def run_task(self):
        pass

class TaskRunner(object):

    def __init__(self, queue_size, num_threads=1, stop_on_exception=False):
        super(TaskRunner, self).__init__()
        self.queue              = Queue(queue_size)
        self.execute_tasks      = True
        self.stop_on_exception  = stop_on_exception

        # create a worker
        def _worker():
            while self.execute_tasks:

                # get a task
                task = None
                try:
                    task = self.queue.get(False, 1)
                except Empty:
                    continue

                # execute the task
                failed = True
                try:
                    task.run_task()
                    failed = False
                finally:
                    if failed and self.stop_on_exception:
                        print('Stopping due to exception')
                        self.execute_tasks = False
                    self.queue.task_done()

        # start threads
        for i in range(0, int(num_threads)):
            t = Thread(target=_worker)
            t.daemon = True
            t.start()


    def add_task(self, task, block=True, timeout=None):
        """
            Adds a task
        """
        if not self.execute_tasks:
            raise Exception('TaskRunner is not accepting tasks')
        self.queue.put(task, block, timeout)


    def wait_for_tasks(self):
        """
            Waits for tasks to complete
        """
        if not self.execute_tasks:
            raise Exception('TaskRunner is not accepting tasks')
        self.queue.join()

我所做的只是创建一个 TaskRunner 并向其中添加任务(数千个),然后调用 wait_for_tasks()。所以,显然在我所做的重新架构中,我“修复”了我遇到的其他一些问题。不过很奇怪。

【讨论】:

    【解决方案2】:

    如果您仍在寻找多处理解决方案,您可能首先想了解如何使用工作人员池,然后您就不必自己管理 num_threads 进程:http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

    对于减速问题,您是否尝试过将配置对象作为参数传递给 _run 函数?我不知道这是否/如何在内部做出改变,但猜测它可能会改变一些东西。

    【讨论】: