【问题标题】:Migrate from Multiprocess to MPI in python在 python 中从多进程迁移到 MPI
【发布时间】:2018-11-03 20:17:56
【问题描述】:

我正在尝试使用 python 将代码从 multiprocces 移动到 MPI,因为我正在将我的代码移动到 HPC 中。这就是为什么我想跨越几个节点上的过程,并使用一个信号节点的所有内存。

你能帮我实现它吗?到目前为止我已经尝试过了。

Multiprocces.map 代码:

import Home
import pickle
from multiprocessing import Pool
from functools import partial
import time
import os

def run(a):
    name=a['name']
    people=a['people']
    save_path='res_semi_def'
    save_path='res_semi_def'
    path=os.getcwd()
    if not os.path.exists(os.path.join(os.getcwd(),save_path)):
        os.mkdir(os.path.join(os.getcwd(),save_path))
    if sum(people.values())>0:
        start=time.time()
        home=Home.Home(people)
        try:
            home.simulate()
            print name,time.time()-start, '[s]'
            fname=str(name)+'.pkl'
            with open(os.path.join(os.getcwd(),save_path,fname), "wb") as f:
                pickle.dump(home.Consumption,f)
            #fname=str(name)+'_person.pkl'
            #with open(os.path.join(os.getcwd(),save_path,fname), "wb") as f:
            #    pickle.dump(home.personsList,f)
        except Exception as e:
            print name, 'error', e
            pass



def main():
    p = Pool(6)     
    fname='censimento2011_full_par.pkl'
    with open(fname,'r') as f:
        houses=pickle.load(f)
    house=[{'name':name, 'people':people} for name, people in zip(houses.iterkeys(),houses.itervalues())]
    #name,people=zip(*houses.iteritems())
    #p.map(partial(run,int(name)), people)
    start_t=time.time()
    a=p.map(run, house)
    p.close()
    print time.time()-start_t, 'Total time[s]'

if __name__ == "__main__":
    main()

这里主要用mpiy4py:

def main():
    comm=MPI.COMM_WORLD
    if comm.rank==0:
        fname='censimento2011_full_par.pkl'
        with open(fname,'r') as f:
            houses=pickle.load(f)
        house=[{'name':name, 'people':people} for name, people in zip(houses.iterkeys(),houses.itervalues())]
    else:
        house=None

    my_work=comm.scatter(house)

if __name__ == "__main__":
    main()

但是使用这段代码我得到了这个错误:

===================================================================================
=   BAD TERMINATION OF ONE OF YOUR APPLICATION PROCESSES
=   PID 54118 RUNNING AT compute-0-8
=   EXIT CODE: 4
=   CLEANING UP REMAINING PROCESSES
=   YOU CAN IGNORE THE BELOW CLEANUP MESSAGES
===================================================================================
[proxy:0:0@compute-0-5] HYD_pmcd_pmip_control_cmd_cb (pm/pmiserv/pmip_cb.c:909): assert (!closed) failed
[proxy:0:0@compute-0-5] HYDT_dmxu_poll_wait_for_event (tools/demux/demux_poll.c:76): callback returned error status
[proxy:0:2@compute-0-7] HYD_pmcd_pmip_control_cmd_cb (pm/pmiserv/pmip_cb.c:909): assert (!closed) failed
[proxy:0:2@compute-0-7] HYDT_dmxu_poll_wait_for_event (tools/demux/demux_poll.c:76): callback returned error status
[proxy:0:2@compute-0-7] main (pm/pmiserv/pmip.c:206): demux engine error waiting for event
[proxy:0:1@compute-0-6] HYD_pmcd_pmip_control_cmd_cb (pm/pmiserv/pmip_cb.c:909): assert (!closed) failed
[proxy:0:1@compute-0-6] HYDT_dmxu_poll_wait_for_event (tools/demux/demux_poll.c:76): callback returned error status
[proxy:0:1@compute-0-6] main (pm/pmiserv/pmip.c:206): demux engine error waiting for event
[proxy:0:0@compute-0-5] main (pm/pmiserv/pmip.c:206): demux engine error waiting for event
srun: error: compute-0-5: task 0: Exited with exit code 7
srun: error: compute-0-7: task 2: Exited with exit code 7
srun: error: compute-0-6: task 1: Exited with exit code 7
[mpiexec@compute-0-5] HYDT_bscu_wait_for_completion (tools/bootstrap/utils/bscu_wait.c:76): one of the processes terminated badly; aborting
[mpiexec@compute-0-5] HYDT_bsci_wait_for_completion (tools/bootstrap/src/bsci_wait.c:23): launcher returned error waiting for completion
[mpiexec@compute-0-5] HYD_pmci_wait_for_completion (pm/pmiserv/pmiserv_pmci.c:218): launcher returned error waiting for completion
[mpiexec@compute-0-5] main (ui/mpich/mpiexec.c:344): process manager error waiting for completion

使用 mpi4py 实现过程的正确原因是什么?

【问题讨论】:

    标签: python mpi hpc multiprocess mpi4py


    【解决方案1】:

    我发现您的代码有两个问题。

    1. 您需要在分散操作中指定根。 house = comm.scatter(house, root=0)

    2. 推荐的排名方式是 rank = comm.Get_rank()

    这是来自https://mpi4py.scipy.org/docs/usrman/tutorial.html的示例

    from mpi4py import MPI
    
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    rank = comm.Get_rank()
    
    if rank == 0:
        data = [(i+1)**2 for i in range(size)]
    else:
        data = None
    data = comm.scatter(data, root=0)
    assert data == (rank+1)**2
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-11-22
      • 2017-12-27
      • 2011-04-26
      • 1970-01-01
      • 1970-01-01
      • 2019-06-04
      • 2019-09-06
      • 1970-01-01
      相关资源
      最近更新 更多