【问题标题】:Running multiple external programs运行多个外部程序
【发布时间】:2025-12-09 06:05:04
【问题描述】:

所以,我设置了一个简短的脚本来执行一个外部程序(用 Fortran 77 编写)。我想运行该程序的多个实例,因为我的计算机上有 8 个内核,所以我找到的最简单的解决方案是:

import subprocess


import os


i = n

while(i<n):
  dire = "dir/Run"+str(i)+"/"
  os.chdir(dire)
  p1 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+1)+"/"
  os.chdir(dire)
  p2 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+2)+"/"
  os.chdir(dire)
  p3 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+3)+"/"
  os.chdir(dire)
  p4 = subprocess.Popen(['./mej'])  
  dire = "dir/Run"+str(i+4)+"/"
  os.chdir(dire)
  p5 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+5)+"/"
  os.chdir(dire)
  p6 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+6)+"/"
  os.chdir(dire)
  p7 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+7)+"/"
  os.chdir(dire)
  p8 = subprocess.Popen(['./mej'])
  dire = "/Run"+str(i+8)+"/"
  os.chdir(dire)
  p3 = subprocess.Popen(['./mej'])
  exit_codes = [p.wait() for p in p1, p2, p3, p4, p5, p6, p7, p8]
  i = i + 8



print "Job's done!"

现在这最初工作得很好,但是我只是更改为可变时间步长,因此每次集成运行的时间差异很大。现在的问题是脚本将等待最慢的脚本完成,然后再启动一组新的集成。我如何编写它以便始终运行 8 个实例?

【问题讨论】:

    标签: python subprocess external-process


    【解决方案1】:

    您可以使用线程池来保持所有 CPU 忙碌:

    #!/usr/bin/env python
    import os
    import subprocess
    from multiprocessing.pool import ThreadPool
    
    def run(i):
        working_dir = "dir/Run/" + str(i + 1)
        return i, subprocess.call(os.path.join(working_dir, 'mej'), cwd=working_dir)
    
    results = ThreadPool().map(run, range(n))
    

    一旦一个mej 进程完成,下一个进程就会启动。一次运行的并发工作进程不超过 os.cpu_count() 个。

    【讨论】:

    【解决方案2】:

    虽然给定运行的执行时间可能会有很大差异,但通常可以安全地假设,例如10 次连续运行的方差会小得多。

    所以简单的解决方案A是启动8个进程,每个进程调用外部程序10次,然后等待这些进程完成。您仍然需要等待最慢的进程,但开销会小很多。

    当然有一个明显的解决方案B:创建一个待运行的池,其中 8 个进程在完成当前运行后从池中选择一个新的运行。这将真正减少开销,但您必须在此处处理同步原语。

    以下是这 3 种方法的一个小说明(您使用的一种和我正在谈论的两种):

    红色的小方块显示了改进的空间。基本上,方法 A 避免停止每个线程,而是在每次运行后停止。方法 B 更进一步,使一个已完成所有运行的线程能够从另一个线程获取一个。

    【讨论】:

    • 我从另一张海报中得到了一个很好的解决方案,但是感谢您的想法/插图,我以后会记住它。
    【解决方案3】:

    你可以写一些看起来像的东西。定义总运行次数和可用内核数量,以及检查是否完成的延迟。对于延迟,只需输入合理的秒数即可。如果一个进程平均在 10 分钟内运行,60 秒或更短的延迟就足够了。

    import subprocess
    import time
    import os
    
    def runIt(rootDir, prog, i):
        dire = "dir/Run/" + str(i + 1)
        os.chdir(dire)
        return subprocess.Popen(['./mej'])
    
    n=16    #total number of runs
    nProc = 8 # number of cores
    i = 0
    delay = 2 #delays in  second to check if one has returned
    
    pList = [runIt(p) for p in range(min(nProc, n))]
    i = len(pList)
    while(i<n):
        time.sleep(delay) # delays for delay seconds
        for j in range(len(pList)):
            pList[j].poll()
            if pList[j].returncode is not None and i<n:
                pList[j] = runIt(i)
                i = i+1
    print "Job's done!"
    

    【讨论】: