【问题标题】:Nesting parallel processes using multiprocessing使用多处理嵌套并行进程
【发布时间】:2021-04-07 20:33:12
【问题描述】:

有没有办法在已经并行化的函数中并行运行函数?我知道使用 multiprocessing.Pool() 这是不可能的,因为守护进程无法创建子进程。我对并行计算还很陌生,正在努力寻找解决方法。

我目前有数千个计算需要使用我与之交互的其他一些商用量子力学代码并行运行。每次计算,在父计算正常终止时,后续计算需要并行执行,如果父计算没有正常终止,则该点计算结束。我总是可以将这三个后续计算合并为一个大计算并正常运行 - 尽管我更喜欢单独并行运行。

Main 目前看起来像这样,run() 是首先为一系列点并行运行的父计算,par_nacmes() 是我想要在正常终止后并行运行三个子计算的函数父母。

  def par_nacmes(nacme_input_data):                                                                                                                                                                                                                                                        
      nacme_dir, nacme_input, index = nacme_input_data  # Unpack info in tuple for the calculation                                                                                                                                                                                                                                    
      axes_index = get_axis_index(nacme_input)                                                                                                                                                                                                                                             
      [norm_term, nacme_outf] = util.run_calculation(molpro_keys, pwd, nacme_dir, nacme_input, index)  # Submit child calculation                                                                                                                                                                                      
      if norm_term:                                                                                                                                                                                                                                                                        
          data.extract_nacme(nacme_outf, molpro_keys['nacme_regex'], index, axes_index)                                                                                                                                                                                                    
      else:                                                                                                                                                                                                                                                                                
          with open('output.log', 'w+') as f:                                                                                                                                                                                                                                              
              f.write('NACME Crashed for GP%s - axis %s' % (index, axes_index))                                                                                                                                                                                                            
                                                                                                                                                                                                                                                                                           
                                                                                                                                                                                                                                                                                           
  def run(grid_point):                                                                                                                                                                                                                                                                     
      index, geom = grid_point                                                                                                                                                                                                                                                             
      if inputs['code'] == 'molpro':                                                                                                                                                                                                                                                       
          [spe_dir, spe_input] = molpro.setup_spe(inputs, geom, pwd, index)                                                                                                                                                                                                                
          [norm_term, spe_outf] = util.run_calculation(molpro_keys, pwd, spe_dir, spe_input, index)  # Run each parent calculation                                                                                                                                                                                        
          if norm_term:  # If parent calculation terminates normally - Extract data and continue with subsequent calculations for each point                                                                                                                                                                                                                                                                   
              data.extract_energies(spe_dir+spe_outf, inputs['spe'], molpro_keys['energy_regex'],                                                                                                                                                                                          
                                    molpro_keys['cas_prog'], index)                                                                                                                                                                                                                        
              if inputs['nacme'] == 'yes':                                                                                                                                                                                                                                                 
                  [nacme_dir, nacmes_inputs] = molpro.setup_nacme(inputs, geom, spe_dir, index)                                                                                                                                                                                                                                                                                                                                                                                                      
                  nacmes_data = [(nacme_dir, nacme_inp, index) for nacme_inp in nacmes_inputs] # List of three tuples - each with three elements. Each tuple describes a child calculation to be run in parallel                                                                                                                                                                                             
                  nacme_pool = multiprocessing.Pool()                                                                                                                                                                                                                                      
                  nacme_pool.map(par_nacmes, [nacme_input for nacme_input in nacmes_data]) # Run each calculation in list of tuples in parallel                                                                                                                                                                                                
                                                                                                                                                                                                                                                                                           
              if inputs['grad'] == 'yes':                                                                                                                                                                                                                                                  
                  pass                                                                                                                                                                                                                                                                     
                                                                                                                                                                                                                                                                                           
          else:                                                                                                                                                                                                                                                                            
              with open('output.log', 'w+') as f:                                                                                                                                                                                                                                          
                  f.write('SPE crashed for GP%s' % index)                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                                                                
      elif inputs['code'] == 'molcas':   # TO DO                                                                                                                                                                                                                                           
          pass                                                                                                                                                                                                                                                                             
                                                                                                                                                                                                                                                                                           
                                                                                                                                                                                                                                                                                           
  if __name__ == "__main__":                                                                                                                                                                                                                                                               
      try:                                                                                                                                                                                                                                                                                 
          pwd = os.getcwd()  # parent dir                                                                                                                                                                                                                                                  
          f = open(inp_geom, 'r')                                                                                                                                                                                                                                                          
          ref_geom = np.genfromtxt(f, skip_header=2, usecols=(1, 2, 3), encoding=None)                                                                                                                                                                                                     
          f.close()                                                                                                                                                                                                                                                                        
          geom_list = coordinate_generator(ref_geom)  # Generate nuclear coordinates                                                                                                                                                                                                                                      
          if inputs['code'] == 'molpro':                                                                                                                                                                                                                                                   
              couplings = molpro.coupled_states(inputs['states'][-1])                                                                                                                                                                                                                      
          elif inputs['code'] == 'molcas':                                                                                                                                                                                                                                                 
              pass                                                                                                                                                                                                                                                                         
          data = setup.global_data(ref_geom, inputs['states'][-1], couplings, len(geom_list))                                                                                                                                                                                              
          run_pool = multiprocessing.Pool()                                                                                                                                                                                                                                                
          run_pool.map(run, [(k, v) for k, v in enumerate(geom_list)])  # Run each parent calculation for each set of coordinates                                                                                                                                                                                                                   
                                                                                                                                                                                                                                                                                           
      except StopIteration:                                                                                                                                                                                                                                                                
          print('Please ensure goemetry file is correct.')    

任何有关如何为每个点并行运行这些子计算的见解都会有很大帮助。我看到有人建议改用多线程或将 daemon 设置为 false,尽管我不确定这是否是最好的方法。

【问题讨论】:

    标签: python multithreading parallel-processing multiprocessing python-multiprocessing


    【解决方案1】:

    首先,我不知道为什么你必须并行运行 par_nacmes,但如果你必须这样做,你可以:
    a 使用线程而不是进程来运行它们 或者 b 使用 multiprocessing.Process 来运行 run 但是这会涉及很多开销,所以我个人不会这样做。

    你所要做的就是 替换

    nacme_pool = multiprocessing.Pool()                                                                                                                                                                                                                                      
                      nacme_pool.map(par_nacmes, [nacme_input for nacme_input in nacmes_data])
    

    在运行中() 与

    threads = []
    for nacme_input in nacmes_data:
         t = Thread(target=par_nacmes, args=(nacme_input,)); t.start()
         threads.append(t)
    for t in threads: t.join()
    

    或者您是否不在乎胎面是否完成

    for nacme_input in nacmes_data:
         t = Thread(target=par_nacmes, args=(nacme_input,)); t.start()
    

    【讨论】:

      最近更新 更多