【问题标题】:Multiprocessing script gets stuck多处理脚本卡住
【发布时间】:2018-01-15 08:04:33
【问题描述】:

我有以下 Python 代码:

def workPackage(args):
   try:      
    outputdata                  = dict()
    iterator                        = 1
    for name in outputnames:
        outputdata[name]            = []
    for filename in filelist:
        read_data                   = np.genfromtxt(filename, comments="#", unpack=True, names=datacolnames, delimiter=";")
        mean_va1                    = np.mean(read_data["val1"])
        mean_va2                    = np.mean(read_data["val2"])
        outputdata[outputnames[0]].append(read_data["setpoint"][0])
        outputdata[outputnames[1]].append(mean_val1)
        outputdata[outputnames[2]].append(mean_val2)        
        outputdata[outputnames[3]].append(mean_val1-mean_val2)
        outputdata[outputnames[4]].append((mean_val1-mean_val2)/read_data["setpoint"][0]*100)
        outputdata[outputnames[5]].append(2*np.std(read_data["val1"]))
        outputdata[outputnames[6]].append(2*np.std(read_data["val2"]))      


        print("Process "+str(identifier+1)+": "+str(round(100*(iterator/len(filelist)),1))+"% complete")
        iterator    = iterator+1

    queue.put (outputdata)
 except:
 some message

if __name__ == '__main__':
"Main script"

此代码用于评估大量测量数据。我总共在多个目录中获得了大约 900 个文件(总共约 13GB)。 主脚本确定所有文件路径并将它们存储在 4 个块中。每个块(文件路径列表)都分配给一个进程。

    try:
      print("Distributing the workload on "+str(numberOfProcesses)+" processes...")                     
      for i in range(0,numberOfProcesses):
        q[i]                = multiprocessing.Queue()
        Processes[i]        = multiprocessing.Process(target=workPackage, args=(filelistChunks[i], colnames, outputdatanames, i, q[i]))
        Processes[i].start()
      for i in range(0,numberOfProcesses):
        Processes[i].join()
    except:
       print("Exception while processing stuff...")

然后从队列中读取结果并将其存储到输出文件中。 现在这是我的问题: 该脚本启动了 4 个进程,每个进程都运行到 100%(参见 workPackage 函数中的打印)。它们不会同时完成,而是在大约 2 分钟内完成。 但随后脚本就停止了。 如果我通过简单地剪切文件列表来限制要处理的数据量,它有时会运行到最后,但有时不会。 我不明白,为什么脚本在所有进程达到 100% 后就卡住了。

我真的不知道那里发生了什么。

【问题讨论】:

    标签: python multiprocessing


    【解决方案1】:

    您使用 queue.put() 将项目添加到队列中,然后调用 queue.join(),但我看不到您在哪里调用 queue.get() 或 queue.task_done()。 Join 不会释放线程,直到队列为空并且每个项目都调用了 task_done()。

    【讨论】:

    • 在“try/except”块之后我基本上是这样做的: for i in range(0,numberOfProcesses): outcontainer[i] = q[i].get() 将队列元素推入外容器。我从不使用“task.done()”,应该吗?如果是,在哪里?
    • 啊,对不起,我看错了你的代码。您调用的是 Process.join,而不是 Queue.join,因此这里不需要 task_done。听起来你可能陷入了僵局。线程之间是否有任何数据结构共享,或者任何依赖于线程按特定顺序完成的数据结构?另外,您确定每个工作线程在完成后都会终止吗?是否有任何数据写入输出文件?
    • 我通过简单地使用 apply_async 而不是进程来解决它。在每个工人启动后,我调用 pool.close() 和 pool.join()。
    猜你喜欢
    • 2016-03-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-01-03
    • 2015-12-23
    • 1970-01-01
    • 2019-07-10
    • 2020-09-22
    相关资源
    最近更新 更多