【问题标题】:Python subprocess spools too many processesPython子进程假脱机太多进程
【发布时间】:2012-11-21 19:21:36
【问题描述】:

希望有人可以提供帮助,我遇到了一个我似乎无法编写脚本的具有挑战性的情况。我的目标是自动将 SQL 文件加载到 PostgreSQL 中。

我不知道我有多少个 SQL 文件文件夹,所以我最初检查一个文件夹是否存在,然后循环遍历每个文件并使用 psql.exe 将其加载到 PostgreSQL 中

我当前的代码如下所示

if os.path.exists("sql1"):
    for files in os.listdir("sql1"):    
    load1 = subprocess.Popen("psql -d data -U postgres -f sql1\%s" %files)


if os.path.exists("sql2"):
    for files in os.listdir("sql2"):    
    load2 = subprocess.Popen("psql -d data -U postgres -f sql2\%s" %files)

但是,由于它为文件夹中的每个 SQL 文件创建了一个子进程,并且为每个文件夹创建了更多子进程,所以它会假脱机这么多子进程。

如果我将其更改为 subprocess.call,它当然会序列化加载并阻止从下一个文件夹加载文件,而不是为每个文件夹运行单个进程。

有谁知道我如何为每个存在的文件夹创建一个进程?

除此之外,我将在所有进程完成后运行索引。

我可以使用 load.wait() 但这仅适用于一个进程。

提前感谢您的建议和帮助

编辑添加:

听从 Steve 的建议,我引入了一些线程,但它仍然会导致索引在子进程完成之前开始

def threads(self):
  processors = multiprocessing.cpu_count()
  n = 1
  name = "sql%i" %n
  for i in range(processors):
      if os.path.exists(name):
     thread = Thread(target=self.loadData, args=(name,))
     thread.start()             
     n += 1
     name = "sql%i" %n

 def loadData(self, name):
    for files in os.listdir(name):
    load = subprocess.Popen("psql -d osdata -U postgres -f %s\%s" %(name, files))
    load.wait()                                                                                                                                                                                 

但是索引在进程完成之前就开始了。

任何想法如何防止这种情况

【问题讨论】:

  • 如果你想阻塞直到线程完成,你需要做 thread.wait() 。 Python 的 futures 模块可能是您想用来使“运行 X 线程”变得微不足道的东西。我已将此添加到我的答案中。
  • 与其启动单独的psql 会话,不如考虑使用subprocess 模块打开带有双向管道的单个会话。在 Python 中读取每个 sql 文件并将其写入 psql 的标准输入。从 stdout 和 stderr 读取结果。

标签: python postgresql subprocess


【解决方案1】:

我建议为每个文件夹创建一个线程。然后使用subprocess.call 在每个线程中序列化调用。

如果你想限制并发执行的线程数量,你应该看看 Python 的 futures 模块。

http://docs.python.org/dev/library/concurrent.futures.html

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=2) as executor:
    for n in range(processors):
        name = "sql%i" % (n + 1)
        if os.path.exists(name):
            future = executor.submit(loadData, name)

【讨论】:

  • 嗨史蒂夫。感谢您的答复。请查看我上面的编辑以及您是否对解决方案有任何想法。
  • 嗨,史蒂夫看起来已经奏效了。当我使用 Python 2.7 时,我必须安装 PIP 和期货。我不完全理解它,但据我所知,它限制了进程的数量并等待所有进程完成,然后再继续代码的下一部分。非常感谢。
  • 如果您也想检测错误,subprocess.check_call 也很有用。