【发布时间】:2018-02-21 16:21:25
【问题描述】:
我编写了一个脚本来启动多个并行运行的进程(简单的单元测试)。它将一次执行N 和num_workers 并行进程的作业。
我的第一个实现以num_workers 的批次运行进程并且似乎工作正常(我在这里使用false 命令来测试行为)
import subprocess
errors = 0
num_workers = 10
N = 100
i = 0
while i < N:
processes = []
for j in range(i, min(i+num_workers, N)):
p = subprocess.Popen(['false'])
processes.append(p)
[p.wait() for p in processes]
exit_codes = [p.returncode for p in processes]
errors += sum(int(e != 0) for e in exit_codes)
i += num_workers
print(f"There were {errors}/{N} errors")
但是,测试不会花费相同的时间,所以我有时会等待缓慢的测试完成。因此,我重写了它,以便在任务完成后继续分配任务
import subprocess
import os
errors = 0
num_workers = 40
N = 100
assigned = 0
completed = 0
processes = set()
while completed < N:
if assigned < N:
p = subprocess.Popen(['false'])
processes.add((assigned, p))
assigned += 1
if len(processes) >= num_workers or assigned == N:
os.wait()
for i, p in frozenset(processes):
if p.poll() is not None:
completed += 1
processes.remove((i, p))
err = p.returncode
print(i, err)
if err != 0:
errors += 1
print(f"There were {errors}/{N} errors")
但是,这会在最后几个进程中产生错误的结果。例如,在上面的示例中,它产生 98/100 错误而不是 100。我检查过,这与并发无关;由于某种原因,最近 2 个作业以退出代码 0 返回。
为什么会这样?
【问题讨论】:
-
考虑使用multiprocessing,而不是管理您自己的并行进程。
-
@ndmeiri 什么比赛条件?只有主线程读取/写入
errors变量。 -
不是竞争条件;
os.wait()破坏了poll()的返回码。 -
@NathanVērzemnieks 好眼光!
标签: python subprocess exit-code