【问题标题】:Multiprocessing slower than sequential with python多处理比使用 python 的顺序慢
【发布时间】:2015-03-10 10:42:21
【问题描述】:

我已经投入了大量时间来重写我的代码以利用更多内核,但是当我对它进行基准测试时,我发现我所做的只是让它比原始代码慢 7 倍,尽管它运行在 16 个内核而不是比一个!这让我相信我一定做错了什么。

代码有 4000 多行,需要大量输入文件,所以我无法发布重现问题的内容。但是,我可以说我调用的函数通常需要 0.1 秒才能运行并使用 ctypes 调用一些 c 库。它还在内存中传递了相当数量的数据——可能是 1 MB?一些看起来很慢的伪代码:

    def AnalyseSection(Args):
        Sectionsi,SectionNo,ElLoads,ElLoadsM,Materials,CycleCount,FlapF,EdgeF,Scaling,Time,FlapFreq,EdgeFreq=Args
        for i in range(len(Sections[Elements])):
           #Do some heavy lifting with ctypes
        return Result

     for i in range(10):
         for j in range(10):
             for k in range(10):
                 Args=[(Sections[i],SectionList[i],ElLoads,ElLoadsM,Materials,CycleCount,FlapF,EdgeF,Scaling,Time,FlapFreq,EdgeFreq) for n in SectionList]
                 pool=mp.Pool(processes=NoCPUs,maxtasksperchild=1)
                 result = pool.map(AnalyseSection,Args)
                 pool.close()
                 pool.join()

我希望有人能发现一个明显的错误,导致它运行得这么慢?该函数需要一段时间才能运行(每次调用通常为 0.1 秒),因此我认为与多处理相关的开销不会使其减慢很多。任何帮助将不胜感激!

【问题讨论】:

  • 启动线程(以及进程)会消耗 很多 时间。可能不是你的代码很慢,而是进程的产生。
  • SectionList 的大小是多少,因为看起来您的 Args 的大小为 SectionList 元组,这意味着您将从 16 个子进程开始 Args 的大小,这可能会很多?还有什么意思是“需要一段时间才能运行”棒球场图可能有用?列表中的下一个问题是您的瓶颈是什么?磁盘 i/o 不是很用力吗?
  • @user3012759 - 对于我目前正在运行的问题,“运行一段时间”大约需要 0.1 秒。至少这是我的估计——我在没有多处理和 16 个部分的情况下对整个事情进行计时(AnalyseSection(Args) 分析的部分,16 个是巧合——与 CPU 的数量无关)在没有多处理的情况下需要 1 到 2 秒和 7 和 8 秒。没有写入磁盘,但正如我暗示的那样,我可能正在访问相当多的 RAM。 ctypes 每次都可以从磁盘读取库吗?无论如何,它们只有大约 20kB,但通过网络。
  • 输入文件太重怎么办?它们在哪里被阅读?
  • 还有 10*10*10*~2*16 你正在创建大约 32k 子进程......如果你在一个进程中花费 0.1 秒,这将不是 CPU 时间的最佳利用,您可以尝试重新调整代码以将您的分叉削减为 32 之类的东西?看看这是否会提高速度

标签: python multiprocessing


【解决方案1】:

这个

 for i in range(10):
     for j in range(10):
         for k in range(10):
             Args=[(Sections[i],SectionList[i],ElLoads,ElLoadsM,Materials,CycleCount,FlapF,EdgeF,Scaling,Time,FlapFreq,EdgeFreq) for n in SectionList]
             pool=mp.Pool(processes=NoCPUs,maxtasksperchild=1)
             result = pool.map(AnalyseSection,Args)
             pool.close()
             pool.join()

可以而且应该转换成这个

 pool=mp.Pool(processes=NoCPUs)

 for i in range(10):
     for j in range(10):
         for k in range(10):
             Args=[(Sections[i],SectionList[i],ElLoads,ElLoadsM,Materials,CycleCount,FlapF,EdgeF,Scaling,Time,FlapFreq,EdgeFreq) for n in SectionList]
             result = pool.map(AnalyseSection,Args)

 pool.join()

这更符合您想要实现的目标。您有一个多处理池,您可以在其中提供数据并等待结果。您不必在每次迭代中启动/停止池。

请记住,开始处理会产生相关成本(如果您习惯于线程,则比线程大得多)。

【讨论】:

  • 谢谢,您发布了 @user3012759 帮助我完成的工作。这行得通,我以为我过去曾尝试过,但没有奏效,但我一定是弄错了。
猜你喜欢
  • 2014-11-12
  • 1970-01-01
  • 1970-01-01
  • 2021-02-22
  • 1970-01-01
  • 2013-09-24
  • 2016-11-08
  • 2015-01-12
  • 1970-01-01
相关资源
最近更新 更多