【问题标题】:Jupyter notebook never finishes processing using multiprocessing (Python 3)Jupyter notebook 永远不会使用多处理(Python 3)完成处理
【发布时间】:2018-04-29 01:14:01
【问题描述】:

Jupyter 笔记本

我基本上在使用多处理模块,我还在学习多处理的功能。我正在使用 Dusty Phillips 的书,这段代码属于它。

import multiprocessing  
import random
from multiprocessing.pool import Pool

def prime_factor(value):
    factors = []
    for divisor in range(2, value-1):
        quotient, remainder = divmod(value, divisor)
        if not remainder:
            factors.extend(prime_factor(divisor))
            factors.extend(prime_factor(quotient))
            break
        else:
            factors = [value]
    return factors

if __name__ == '__main__':
    pool = Pool()
    to_factor = [ random.randint(100000, 50000000) for i in range(20)]
    results = pool.map(prime_factor, to_factor)
    for value, factors in zip(to_factor, results):
        print("The factors of {} are {}".format(value, factors))

在 Windows PowerShell(不是 jupyter notebook)上,我看到以下内容

Process SpawnPoolWorker-5:
Process SpawnPoolWorker-1:
AttributeError: Can't get attribute 'prime_factor' on <module '__main__' (built-in)>

我不知道为什么单元格永远不会结束运行?

【问题讨论】:

    标签: python-3.x debugging multiprocessing jupyter


    【解决方案1】:

    严格来说,即使添加了if __name__="__main__",Windows Jupyter Notebook 也不支持 Python 多处理。

    Windows 10 中的一种解决方法是将 Windows 浏览器与 WSL 中的 Jupyter 服务器连接。

    您可以获得与 Linux 相同的体验。

    您可以手动设置或参考https://github.com/mszhanyi/gemini中的脚本

    【讨论】:

      【解决方案2】:

      为了处理让多进程在 Jupyter 会话中运行良好的许多怪癖,我创建了一个库 mpify,它允许一次性执行多进程函数,并使用简单的 API 将内容从笔记本传递到子进程.

      Jupyter shell 进程本身可以作为工作进程参与。用户可以选择从所有工作人员那里收集结果,也可以只从其中一个人那里收集结果。

      这里是:

      https://github.com/philtrade/mpify

      在底层,它使用multiprocess——一个来自标准pythonmultiprocessing库的积极支持的分支——允许在笔记本中本地定义的变量/函数,可以在子进程中访问。它还使用spawn start 方法,如果子进程要使用多个 GPU(这是一个越来越常见的用例),这是必要的。它使用Process() 而不是Pool(),来自multiprocess API。

      用户可以提供自定义上下文管理器来获取资源,设置/拆除围绕函数执行的执行环境。我提供了一个示例上下文管理器来支持 PyTorch 的分布式数据并行 (DDP) 设置,以及更多关于如何在 Jupyter 中使用 DDP 在多个 GPU 上训练 fastai v2 的示例。

      欢迎大家分享错误报告、PR、用例。

      绝不是一个花哨/强大的库,mpify 只打算支持单主机/多进程类型的分布式设置,并且只是生成-执行-终止。它也不支持持久的进程池和花哨的任务调度——ipyparalleldask 已经做到了。

      我希望它对那些在 Jupyter + 多处理方面苦苦挣扎的人有用,而且对于多 GPU 也是如此。谢谢。

      【讨论】:

        【解决方案3】:

        执行函数无需手动将其写入单独的文件:

        我们可以将要处理的任务动态写入临时文件,导入并执行函数。

        from multiprocessing import Pool
        from functools import partial
        import inspect
        
        def parallal_task(func, iterable, *params):
        
            with open(f'./tmp_func.py', 'w') as file:
                file.write(inspect.getsource(func).replace(func.__name__, "task"))
        
            from tmp_func import task
        
            if __name__ == '__main__':
                func = partial(task, params)
                pool = Pool(processes=8)
                res = pool.map(func, iterable)
                pool.close()
                return res
            else:
                raise "Not in Jupyter Notebook"
        

        然后我们可以像这样在笔记本单元格中简单地调用它:

        def long_running_task(params, id):
            # Heavy job here
            return params, id
        
        data_list = range(8)
        
        for res in parallal_task(long_running_task, data_list, "a", 1, "b"):
            print(res) 
        

        输出:

        ('a', 1, 'b') 0
        ('a', 1, 'b') 1
        ('a', 1, 'b') 2
        ('a', 1, 'b') 3
        ('a', 1, 'b') 4
        ('a', 1, 'b') 5
        ('a', 1, 'b') 6
        ('a', 1, 'b') 7
        

        注意:如果您正在使用 Anaconda,并且想查看繁重任务的进度,可以在 long_running_task() 中使用 print()。打印的内容将显示在 Anaconda Prompt 控制台中。

        【讨论】:

        • 我对这篇文章有一个后续问题。如果我有一本字典(比如 id 是字典)进入long_running_task 怎么办?我应该如何更改parallal_task 函数?
        • @H4dr1en。好把戏。我会添加这个轻微的修改:pool = Pool(processes=8) -> pool = Pool(processes=len(iterable))
        【解决方案4】:

        似乎 Jupyter notebook 中的问题与不同的 ide 中的问题是设计功能。因此,我们必须将函数(prime_factor)写入不同的文件并导入模块。此外,我们必须注意调整。例如,就我而言,我已将函数编码到名为 defs.py 的文件中

        def prime_factor(value):
            factors = []
            for divisor in range(2, value-1):
                quotient, remainder = divmod(value, divisor)
                if not remainder:
                    factors.extend(prime_factor(divisor))
                    factors.extend(prime_factor(quotient))
                    break
                else:
                    factors = [value]
            return factors
        

        然后在 jupyter notebook 中我写了以下几行

        import multiprocessing  
        import random
        from multiprocessing import Pool
        import defs
        
        
        
        if __name__ == '__main__':
            pool = Pool()
            to_factor = [ random.randint(100000, 50000000) for i in range(20)]
            results = pool.map(defs.prime_factor, to_factor)
            for value, factors in zip(to_factor, results):
                print("The factors of {} are {}".format(value, factors))
        

        这解决了我的问题

        【讨论】:

        • 它可以使用 Pool 但不能使用 Process。可能是什么原因?
        • 可能很明显,但对于下一个读者:如果问题中的池初始化函数如prime_factor() 调用另一个函数,它们也必须与prime_factor() 一起放在同一个包中
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-09-26
        • 1970-01-01
        相关资源
        最近更新 更多