【问题标题】:multiprocessing stops program execution - python多处理停止程序执行 - python
【发布时间】:2021-06-24 12:01:26
【问题描述】:

我是多处理的菜鸟,我正在尝试加快我的旧算法。它工作得非常好,没有多重处理,但在我尝试实现它的那一刻,程序停止工作:它一直待命,直到我中止脚本。 另一个问题是它没有填充数据框:同样,它通常可以工作,但是使用多处理它只返回 NaN。

func 运行良好。

stockUniverse = list(map(lambda s: s.strip(), Stocks)) #Stocks = list

def func(i):

    df.at[i, 'A'] = 1
    df.at[i, 'B'] = 2
    df.at[i, 'C'] = 3
    print(i, 'downloaded')
    return True


if __name__ == "__main__":
    print('Start')

    pool = mp.Pool(mp.cpu_count())
    pool.imap(func, stockUniverse)
    print(df)

结果是:

Index 19  NaN  NaN  NaN
index 20  NaN  NaN  NaN

然后它就停在那里,直到我点击Ctrl+C。 谢谢

【问题讨论】:

  • 你必须向我们展示stockUniverse的代码
  • 我编辑了代码,谢谢
  • d.func的代码 - 对不起
  • imap 返回一个迭代器。您必须迭代迭代器以确保stockUniverse 中的所有隐含任务都已提交、执行并返回结果。您的第二个问题是池中的每个进程都有自己的地址空间,并且没有更新主进程中的df 实例。
  • @Aaron 感谢您的回复。是的,imap通常最终会提交任务,但只有在有机会的情况下。我的意思是,在 OP 发布的代码中,有对 imapprint 的调用,然后程序终止,因此任务将没有机会在程序结束时开始(当然也不会按时间调用print 函数)。可以在调用 print 之前插入对 time.sleep 的调用,猜测任务开始和完成需要多长时间,但我认为我们同意“迭代迭代器”确实是要走的路。

标签: python multiprocessing


【解决方案1】:

map 函数会阻塞,直到所有提交的任务都完成并返回工作函数的返回值列表。但是imap 函数立即返回一个迭代器,该迭代器必须被迭代以逐个返回返回值当每个都可用时。您的原始代码没有迭代该迭代器,而是立即打印出它所期望的更新后的df。但是您不会给任务足够的时间来启动和完成df 的修改。理论上,如果您在print 语句之前插入了对time.sleep 的调用足够长的时间,那么这些任务将在您打印出df 之前开始并完成。但显然,迭代迭代器是确保所有任务都已完成的最有效方法,也是获取返回值的唯一方法。

但是,正如我在评论中提到的,你有一个更大的问题。您提交的任务由您创建的进程池中的进程调用的工作函数func 执行,每个进程都在自己的地址空间中执行。您没有使用您正在运行的平台标记您的问题(每当您使用 multiprocessing 标记问​​题时,您也应该使用平台标记问题),但我可能会推断您正在运行的平台下运行使用spawn 方法创建新进程,例如Windows,这就是为什么你有if __name__ == "__main__": 块控制代码来创建新进程(即处理池)。当spawn 用于创建新进程时,会创建一个新的空地址空间,启动一个新的 Python 解释器并从顶部重新执行源代码(没有 if __name__ == "__main__": 块控制创建新进程的代码,你会进入一个无限的递归循环,创建新的进程)。但这意味着在 if __name__ == "__main__": 块之外的全局范围内对 df 的任何定义(如果您在 Windows 下运行,则必须省略它)将为每个进程在创建每个进程时在池中。

如果您改为在Linux 下运行,其中fork 用于创建新进程,情况会有所不同。新进程将从主进程和所有声明的变量继承原始地址空间,但使用写时复制。这意味着一旦子进程尝试修改此继承存储中的任何变量,就会制作页面的副本,并且该进程现在将在其自己的副本上工作。再次重申,不能出于更新目的共享任何内容。

因此,您应该修改您的程序,让您的工作函数返回值返回到主进程,这将进行必要的更新:

import multiprocessing as mp
import pandas as pd

def func(stock):
    return (stock, (('A', 1), ('B', 1), ('C', 1)))

if __name__ == "__main__":
    stockUniverse = ['abc', 'def', 'ghi', 'klm']
    d = {col: pd.Series(index=stockUniverse, dtype='int32') for col in ['A', 'B', 'C']}
    df = pd.DataFrame(d)

    pool_size = min(mp.cpu_count(), len(stockUniverse))
    pool = mp.Pool(pool_size)
    for result in pool.imap_unordered(func, stockUniverse):
        stock, col_values = result # unpack
        for col_value in col_values:
            col, value = col_value # unpack
            df.at[stock, col] = value
    print(df)

打印:

     A  B  C
abc  1  1  1
def  1  1  1
ghi  1  1  1
klm  1  1  1

请注意,我使用了imap_unordered 而不是imap。前一种方法允许以任意顺序(即它们变得可用)返回结果,并且通常更有效,并且由于返回值包含设置df的正确行所需的所有信息,我们不再需要任何特定的订购。

但是:

如果您的工作函数除了从网站下载和很少的 CPU 密集型处理之外什么都不做,那么您可以(应该)通过以下简单替换来使用线程池:

from multiprocessing.pool import ThreadPool
...
    MAX_THREADS_TO_USE = 100 # or maybe even larger!!!
    pool_size = min(MAX_THREADS_TO_USE, len(stockUniverse))
    pool = ThreadPool(pool_size)

而且由于所有线程共享相同的地址空间,您可以按原样使用您原来的工作函数 func

【讨论】:

  • 非常感谢,非常清晰全面的解释。你一直很友善地向我解释一切,不用担心篇幅。对不起,如果我让你疯狂回答,我刚开始使用多处理。您的代码运行良好,只有一个问题留给我:为什么脚本没有关闭?它仍然在那里等待,程序没有结束。 (对不起,如果这是一个愚蠢的问题,但我就是不明白)。添加pool.close() 工作正常,但即使在官方文档中,它似乎也没有被使用。再次感谢您的宝贵时间。
  • 如果您想向池提交更多任务,则无需调用close()close 阻止提交更多任务并导致进程终止)。如果你知道你已经完成了池,那么你当然可以显式调用 pool.close()pool.terminate() (它会立即停止工作进程而不完成未完成的工作——但在这种情况下没有任何工作)或隐式调用pool.terminate() 通过使用pool 作为上下文管理器:with Pool() as pool: 在这种情况下,当块退出时,terminate() 将被调用。 (...更多)
  • 但最后,您的程序将在print(df) 语句之后终止(假设)实际上没有更多代码跟随。在这种情况下,pool 将被垃圾收集,当这种情况发生时,pool.terminate() 将被调用,进程将结束,等等。所以再次没有真正需要做任何特别的事情。但是,如果后面有更多代码,并且您想立即释放资源并且您没有进一步使用该池,那么您当然可以调用pool.close()
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-11-12
  • 2015-03-08
  • 1970-01-01
  • 2017-01-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多