【问题标题】:Run parallel Stata do files in python using multiprocess and subprocess使用多进程和子进程在python中运行并行Stata do文件
【发布时间】:2017-02-01 19:23:12
【问题描述】:

我有一个 stata do 文件 pyexample3.do,它使用它的参数作为回归量来运行回归。回归的 F 统计量保存在文本文件中。代码如下:

clear all
set more off        
local y `1'        
display `"first parameter: `y'"'

sysuse auto
regress price `y'
local f=e(F)
display "`f'"
file open myhandle using test_result.txt, write append
file write myhandle "`f'" _n
file close myhandle
exit, STATA clear

现在我正在尝试在 python 中并行运行 stata do 文件,并将所有 F 统计信息写入一个文本文件中。我的cpu有4个核心。

    import multiprocessing
    import subprocess

    def work(staname):
        dofile = "pyexample3.do"
        cmd = ["StataMP-64.exe","/e", "do", dofile,staname]
        return subprocess.call(cmd, shell=False)

    if __name__ == '__main__':

        my_list =[ "mpg","rep78","headroom","trunk","weight","length","turn","displacement","gear_ratio" ]

        my_list.sort()

        print my_list

        # Get the number of processors available
        num_processes = multiprocessing.cpu_count()

        threads = []

        len_stas = len(my_list)

        print "+++ Number of stations to process: %s" % (len_stas)

        # run until all the threads are done, and there is no data left

        for list_item in my_list:

            # if we aren't using all the processors AND there is still data left to
            # compute, then spawn another thread

            if( len(threads) < num_processes ):

                p = multiprocessing.Process(target=work,args=[list_item])

                p.start()

                print p, p.is_alive()

                threads.append(p)

            else:
                for thread in threads:

                if not thread.is_alive():

                   threads.remove(thread)

虽然 do 文件应该运行 9 次,因为 my_list 中有 9 个字符串,但它只运行了 4 次。那么哪里出错了?

【问题讨论】:

    标签: python subprocess multiprocessing stata


    【解决方案1】:

    在您的 for list_item in my_list 循环中,在启动前 4 个进程后,它会进入 else

    for thread in threads:
        if not thread.is_alive():
            threads.remove(thread)
    

    正如您所见,由于thread.is_alive() 不会阻塞,因此该循环会立即执行,而这 4 个进程中的任何一个都不会完成其任务。因此,总共只执行了前 4 个进程。

    您可以简单地使用while 循环以小间隔不断检查进程状态:

    keep_checking = True
    
    while keep_checking:
        for thread in threads:
            if not thread.is_alive():
               threads.remove(thread)
               keep_checking = False
    
        time.sleep(0.5) # wait 0.5s
    

    【讨论】:

    • 现在 do 文件运行了 7 次,但仍然缺少 2 次。当 0.5s 改变时,数字也会改变。是同时写入文本文件造成的吗?
    • 有可能。由于写入同一个文件不是线程安全的,这意味着如果您同时从不同进程写入文件,可能会出现意外结果,因此您可能需要使用RLock 来确保只有一个进程同时处理文件。
    猜你喜欢
    • 1970-01-01
    • 2014-02-11
    • 1970-01-01
    • 2020-06-16
    • 2022-01-25
    • 1970-01-01
    • 2017-05-31
    • 2012-11-11
    • 2011-12-16
    相关资源
    最近更新 更多