【问题标题】:Multiprocessing only using a single thread instead of multiple仅使用单个线程而不是多个线程进行多处理
【发布时间】:2020-05-10 18:00:10
【问题描述】:

这个问题最近被问过并解决了几次,但我有一个非常具体的例子......

我有一个多处理功能,昨天在完全隔离的情况下工作得非常好(在交互式笔记本中),但是,我决定参数化,以便我可以将它作为更大管道的一部分并用于抽象/更清洁的笔记本,现在它是只使用一个线程而不是 6 个。

import pandas as pd
import multiprocessing as mp
from multiprocessing import get_context
mp.set_start_method('forkserver')


def multiprocess_function(func, iterator, input_data):
    result_list = []

    def append_result(result):
        result_list.append(result)

    with get_context('fork').Pool(processes=6) as pool:
        for i in iterator:
            pool.apply_async(func, args = (i, input_data), callback = append_result)
        pool.close()
        pool.join()

    return result_list
multiprocess_function(count_live, run_weeks, base_df)

我以前版本的代码执行方式不同,而不是返回/调用,我在函数底部使用以下代码(现在我已经参数化了它根本不起作用 - 即使分配了 args)

if __name__ == '__main__':
    multiprocess_function()

该函数执行良好,只是根据顶部的输出在一个线程上运行。

抱歉,如果这是非常简单的事情 - 我不是程序员,我是分析师 :)

编辑:如果我在函数底部包含 if__name__ =='ma​​in': 等并执行单元格,则一切正常,但是,当我这样做时,我必须删除参数 - 也许只是与范围界定有关。如果我通过调用函数来执行,无论是否参数化,它都只在单个线程上运行。

【问题讨论】:

  • 为什么不使用starmap 而不是apply_async?您正在调用相同的函数,因此它无需使用callback 或初始化列表。它简化为:result_list = pool.starmap(func, ((i, input_date) for i in iterator))
  • 1) 最终,您一次更改了太多东西。希望您正在使用版本控制,并且可以回溯以找出哪些更改破坏了事情。 2)“现在我已经参数化了它根本不起作用 - 即使分配了参数”为什么不呢?以与旧功能类似的方式使用新功能应该很简单 3)如果您使用的是笔记本,那么大概是 Jupyter 或其他一些基于 iPython 的系统。你还在用 iPython 运行你的新脚本吗? 4) 考虑创建minimal reproducible example
  • "当我这样做时,我必须删除参数" -- 问题中所示的函数 cannot 在没有参数的情况下调用。你确定你运行的是同一个版本的函数吗?
  • 这可能与主要部分至关重要的特定 Windows 问题有关。见stackoverflow.com/questions/20222534/…

标签: python python-3.x multiprocessing python-multiprocessing


【解决方案1】:

你有两个问题:

  1. 您没有使用导入保护。

  2. 您没有在导入保护中设置默认启动方法。

在它们两者之间,您最终告诉 Python 在 forkserver 中生成 forkserver,这只会让您感到悲伤。将代码结构更改为:

import pandas as pd
import multiprocessing as mp
from multiprocessing import get_context


def multiprocess_function(func, iterator, input_data):
    result_list = []
    with get_context('fork').Pool(processes=6) as pool:
        for i in iterator:
            pool.apply_async(func, args=(i, input_data), callback=result_list.append)
        pool.close()
        pool.join()

    return result_list

if __name__ == '__main__':
    mp.set_start_method('forkserver')
    multiprocess_function(count_live, run_weeks, base_df)

由于您没有显示count_liverun_weeksbase_df 的来源,我只想说,对于编写的代码,它们应该在受保护的部分中定义(因为没有任何依赖他们作为一个全球性的)。

还有其他改进(apply_async 的使用方式让我觉得你真的只是想列出pool.imap_unordered 的结果,没有显式循环),但这解决了将破坏 spawnforkserver 启动方法的使用。

【讨论】:

    【解决方案2】:

    使用“get_context('spawn')”而不是“get_context('fork')”也许会解决你的问题

    【讨论】:

    • 离开if __name__ == '__main__': 守卫对于spawnfork 更糟糕(因为spawn 将模块导入为非__main__ 以设置类似于分叉工作方式的全局状态)。
    猜你喜欢
    • 2018-10-27
    • 2019-02-27
    • 2019-04-23
    • 1970-01-01
    • 1970-01-01
    • 2018-12-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多