【问题标题】:live output from subprocess command子进程命令的实时输出
【发布时间】:2013-08-27 14:56:09
【问题描述】:

我正在使用 python 脚本作为流体动力学代码的驱动程序。运行模拟时,我使用subprocess.Popen 运行代码,将stdoutstderr 的输出收集到subprocess.PIPE --- 然后我可以打印(并保存到日志文件) 输出信息,并检查是否有任何错误。问题是,我不知道代码是如何进行的。如果我直接从命令行运行它,它会输出关于它的迭代时间、时间、下一个时间步长等信息。

有没有办法既存储输出(用于日志记录和错误检查)又生成实时流输出?

我的代码的相关部分:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed\n\n%s\n\n" % (errors)
    success = False

if( errors ): log_file.write("\n\n%s\n\n" % errors)

最初我将run_command 传送到tee 以便副本直接进入日志文件,并且流仍然直接输出到终端——但这样我就无法存储任何错误(到我的知识)。


到目前为止我的临时解决方案:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

然后,在另一个终端中,运行 tail -f log.txt (s.t. log_file = 'log.txt')。

【问题讨论】:

  • 也许你可以像a previous Stack Overflow question一样使用Popen.poll
  • 一些显示进度指示的命令(例如,git)只有在它们的输出是“tty 设备”(通过 libc isatty() 测试)时才会这样做。在这种情况下,您可能必须打开一个伪 tty。
  • @torek 什么是(伪)tty?
  • 类 Unix 系统上的设备,允许进程在串行端口上伪装成用户。例如,这就是 ssh(服务器端)的工作方式。见python pty library,也见pexpect
  • 临时解决方案:不需要调用flush,如果子进程产生大量stderr输出,需要从stderr管道读取。评论区没有足够的空间来解释这个......

标签: python shell logging error-handling subprocess


【解决方案1】:

适用于 Python 3 的 TLDR:

import subprocess
import sys
with open('test.log', 'wb') as f: 
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for c in iter(lambda: process.stdout.read(1), b''): 
        sys.stdout.buffer.write(c)
        f.buffer.write(c)

您有两种方法可以做到这一点,通过从 readreadline 函数创建迭代器并执行以下操作:

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for c in iter(lambda: process.stdout.read(1), ''):  # replace '' with b'' for Python 3
        sys.stdout.write(c)
        f.write(c)

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for line in iter(process.stdout.readline, ''):  # replace '' with b'' for Python 3
        sys.stdout.write(line)
        f.write(line)

或者您可以创建一个reader 和一个writer 文件。将writer 传递给Popen 并从reader 读取

import io
import time
import subprocess
import sys

filename = 'test.log'
with io.open(filename, 'wb') as writer, io.open(filename, 'rb', 1) as reader:
    process = subprocess.Popen(command, stdout=writer)
    while process.poll() is None:
        sys.stdout.write(reader.read())
        time.sleep(0.5)
    # Read the remaining
    sys.stdout.write(reader.read())

这样您就可以将数据写入test.log 以及标准输出中。

文件方法的唯一优点是您的代码不会阻塞。因此,您可以同时做任何您想做的事情,并以非阻塞方式随时从reader 阅读。当您使用PIPEreadreadline 时,函数将阻塞,直到分别将一个字符写入管道或将一行写入管道。

【讨论】:

  • 呃 :-) 写入文件,从中读取,然后在循环中休眠?在您完成阅读文件之前,该过程也有可能结束。
  • 使用 Python 3,您需要 iter(process.stdout.readline, b'')(即传递给 iter 的标记必须是二进制字符串,因为 b'' != ''
  • 对于二进制流,这样做:for line in iter(process.stdout.readline, b''): sys.stdout.buffer.write(line)
  • 添加到@JohnMellor 的答案,在 Python 3 中需要进行以下修改:process = subprocess.Popen(command, stderr=subprocess.STDOUT, stdout=subprocess.PIPE) for line in iter(process.stdout.readline, b'') sys.stdout.write(line.decode(sys.stdout.encoding))
  • 但是输出不是实时的,是吗?以我的经验,它只是等到进程完成执行,然后才打印到控制台。链接->stackoverflow.com/questions/30026045/…
【解决方案2】:

执行摘要(或“tl;dr”版本):最多只有一个 subprocess.PIPE 时很容易,否则很难。

也许是时候解释一下subprocess.Popen是如何工作的了。

(警告:这是针对 Python 2.x 的,虽然 3.x 类似;而且我对 Windows 变体很模糊。我对 POSIX 的理解要好得多。)

Popen 函数需要同时处理零到三个 I/O 流。像往常一样,这些标记为 stdinstdoutstderr

您可以提供:

  • None,表示不想重定向流。它将像往常一样继承这些。请注意,至少在 POSIX 系统上,这并不意味着它将使用 Python 的 sys.stdout,而只是使用 Python 的 actual 标准输出;见最后的演示。
  • int 值。这是一个“原始”文件描述符(至少在 POSIX 中)。 (旁注:PIPESTDOUT 在内部实际上是 ints,但它们是“不可能的”描述符,-1 和 -2。)
  • 流——实际上,任何具有fileno 方法的对象。 Popen 将使用stream.fileno() 找到该流的描述符,然后按照int 值继续。
  • subprocess.PIPE,表示 Python 应该创建一个管道。
  • subprocess.STDOUT(仅适用于 stderr):告诉 Python 使用与 stdout 相同的描述符。这仅在您为stdout 提供(非None)值时才有意义,即便如此,只有在您设置stdout=subprocess.PIPE 时才需要。 (否则,您可以提供与 stdout 相同的参数,例如 Popen(..., stdout=stream, stderr=stream)。)

最简单的情况(没有管道)

如果您什么都不重定向(将所有三个保留为默认的 None 值或提供明确的 None),Pipe 很容易。它只需要剥离子进程并让它运行。或者,如果您重定向到非PIPE——int 或流的fileno()——仍然很容易,因为操作系统会完成所有工作。 Python 只需要剥离子进程,将其标准输入、标准输出和/或标准错误连接到提供的文件描述符。

仍然很简单的案例:一根管道

如果你只重定向一个流,Pipe 仍然很容易。让我们一次选择一个流并观看。

假设您想提供一些stdin,但让stdoutstderr 不重定向,或者转到文件描述符。作为父进程,您的 Python 程序只需要使用write() 将数据发送到管道中。您可以自己执行此操作,例如:

proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
proc.stdin.write('here, have some data\n') # etc

或者您可以将标准输入数据传递给proc.communicate(),然后它会执行上面显示的stdin.write。没有输出返回,所以communicate() 只有另一项真正的工作:它还会为您关闭管道。 (如果您不调用proc.communicate(),则必须调用proc.stdin.close() 来关闭管道,以便子进程知道没有更多数据通过。)

假设您想捕获stdout,但不理会stdinstderr。同样,这很简单:只需调用proc.stdout.read()(或等效项),直到没有更多输出。由于proc.stdout() 是一个普通的 Python I/O 流,您可以在其上使用所有普通的构造,例如:

for line in proc.stdout:

或者,同样,您可以使用proc.communicate(),它只是为您执行read()

如果您只想捕获stderr,它的工作原理与stdout 相同。

在事情变得艰难之前,还有一个技巧。假设您要捕获stdout,还要捕获stderr,但在与标准输出相同的管道上:

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

在这种情况下,subprocess“作弊”!好吧,它必须这样做,所以它并不是真正的作弊:它启动子进程时,它的 stdout 和它的 stderr 都指向(单个)管道描述符,该管道描述符反馈给它的父(Python)进程。在父端,再次只有一个管道描述符用于读取输出。所有“stderr”输出都显示在proc.stdout 中,如果您调用proc.communicate(),则stderr 结果(元组中的第二个值)将为None,而不是字符串。

困难的情况:两个或多个管道

当您想使用至少两个管道时,所有问题都会出现。其实subprocess代码本身就有这个位:

def communicate(self, input=None):
    ...
    # Optimization: If we are only using one pipe, or no pipe at
    # all, using select() or threads is unnecessary.
    if [self.stdin, self.stdout, self.stderr].count(None) >= 2:

但是,可惜的是,我们已经制作了至少两个,也许是三个不同的管道,所以count(None) 返回 1 或 0。我们必须努力做事。

在 Windows 上,这使用 threading.Thread 来累积 self.stdoutself.stderr 的结果,并让父线程传递 self.stdin 输入数据(然后关闭管道)。

在 POSIX 上,如果可用,则使用 poll,否则使用 select,以累积输出并提供标准输入。所有这些都在(单个)父进程/线程中运行。

这里需要线程或轮询/选择以避免死锁。例如,假设我们已将所有三个流重定向到三个单独的管道。进一步假设在写入过程暂停之前可以将多少数据填充到管道中的限制很小,等待读取过程从另一端“清除”管道。让我们将这个小限制设置为单个字节,只是为了说明。 (实际上这就是事情的运作方式,只是限制远大于一个字节。)

如果父 (Python) 进程尝试写入多个字节——例如,'go\n'proc.stdin,则第一个字节进入,然后第二个字节导致 Python 进程挂起,等待子进程读取第一个字节字节,清空管道。

同时,假设子进程决定打印一个友好的“Hello!Don't Panic!”问候。 H 进入其 stdout 管道,但 e 导致它挂起,等待其父级读取 H,清空 stdout 管道。

现在我们陷入困境:Python 进程处于睡眠状态,等待说完“go”,而子进程也处于睡眠状态,等待说完“Hello!Don't Panic!”。

subprocess.Popen 代码通过线程或选择/轮询避免了这个问题。当字节可以通过管道时,它们就会通过。当它们不能时,只有一个线程(而不是整个进程)必须休眠——或者,在选择/轮询的情况下,Python 进程同时等待“可以写入”或“数据可用”,写入进程的标准输入仅当有空间时,并且仅在数据准备好时才读取其标准输出和/或标准错误。一旦发送了所有标准输入数据(如果有)并且所有标准输出和/或标准错误数据都已累积,proc.communicate() 代码(实际上是处理多毛情况的_communicate)返回。

如果您想在两个不同的管道上同时读取stdoutstderr(不管任何stdin 重定向),您也需要避免死锁。这里的死锁场景不同——当你从stdout拉数据时子进程向stderr写了很长的东西,反之亦然——但它仍然存在。


演示

我承诺证明,未重定向的 Python subprocesses 写入底层标准输出,而不是 sys.stdout。所以,这里有一些代码:

from cStringIO import StringIO
import os
import subprocess
import sys

def show1():
   print 'start show1'
   save = sys.stdout
   sys.stdout = StringIO()
   print 'sys.stdout being buffered'
   proc = subprocess.Popen(['echo', 'hello'])
   proc.wait()
   in_stdout = sys.stdout.getvalue()
   sys.stdout = save
   print 'in buffer:', in_stdout

def show2():
   print 'start show2'
   save = sys.stdout
   sys.stdout = open(os.devnull, 'w')
   print 'after redirect sys.stdout'
   proc = subprocess.Popen(['echo', 'hello'])
   proc.wait()
   sys.stdout = save

show1()
show2()

运行时:

$ python out.py
start show1
hello
in buffer: sys.stdout being buffered

start show2
hello

请注意,如果添加stdout=sys.stdout,第一个例程将失败,因为StringIO 对象没有fileno。如果您添加stdout=sys.stdout,第二个将省略hello,因为sys.stdout 已被重定向到os.devnull

(如果您重定向 Python 的 file-descriptor-1,子进程遵循该重定向。open(os.devnull, 'w') 调用会生成一个 fileno() 大于 2 的流。)

【讨论】:

  • 嗯。您的演示似乎最终显示了与索赔相反的情况。您正在将 Python 的 stdout 重新定向到缓冲区,但子进程 stdout 仍将进入控制台。这有什么用?我错过了什么吗?
  • @GuySirton:演示显示子进程标准输出(当未明确指向 sys.stdout 时)进入 Python 的标准输出,而不是 Python 程序的 (sys.) 标准输出。我承认这是一个……奇怪的区别。有没有更好的表达方式?
  • 很高兴知道,但我们真的想在这里捕获子进程输出,所以更改 sys.stdout 很酷,但我认为对我们没有帮助。良好的观察必须使用 select()、poll 或线程之类的通信。
  • @SamirAguiar:我不知道有什么好的简短摘要,但它非常简单:在 POSIX 操作系统级别,“stdout”只是“文件描述符 #1”。当你打开一个文件时,你会得到 next available fd,通常从 3 开始(因为 0、1 和 2 是 stdin、stdout、stderr)。如果你然后设置 Python 的 sys.stdout 来写入它——例如,从你最近的 open 操作到 fd 5——然后 fork 和 exec,你 exec 的东西将写入 its fd#1。除非你特别安排,否则他们的 fd1 就是你的 fd1,不再是你的 sys.stdout。
【解决方案3】:

我们也可以使用默认的文件迭代器来读取标准输出,而不是使用带有 readline() 的 iter 构造。

import subprocess
import sys
process = subprocess.Popen(your_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in process.stdout:
    sys.stdout.write(line)

【讨论】:

  • 这里最优雅的答案!
  • 此方案不实时显示。它等到该过程完成并立即显示所有输出。在 Viktor Kerkez 的解决方案中,如果“your_command”渐进式显示,则输出渐进式,只要“your_command”不时刷新标准输出(因为管道)。
  • @Nir 因为它还没有上线。
  • 此解决方案迭代默认描述符,因此它只会在输出中的行更新时更新。对于基于字符的更新,您需要迭代 read() 方法,如 Viktor 的解决方案中所示。但这对我的用例来说太过分了。
  • 相当实时,无需等待进程退出。非常感谢
【解决方案4】:

除了所有这些答案之外,一种简单的方法也可以如下:

process = subprocess.Popen(your_command, stdout=subprocess.PIPE)

while process.stdout.readable():
    line = process.stdout.readline()

    if not line:
        break

    print(line.strip())

只要可读,就循环通过可读流,如果结果为空,则停止。

这里的关键是readline()只要有输出就返回一行(末尾有\n),如果真的在末尾则为空。

希望这对某人有所帮助。

【讨论】:

    【解决方案5】:

    如果您能够使用第三方库,您也许可以使用sarge 之类的东西(披露:我是它的维护者)。该库允许对子进程的输出流进行非阻塞访问 - 它位于 subprocess 模块之上。

    【讨论】:

    • 在 sarge 上做得很好,顺便说一句。这确实解决了 OP 的要求,但对于该用例可能有点笨拙。
    • 如果您建议使用一种工具,请至少显示一个针对这种情况的使用示例。
    【解决方案6】:

    解决方案 1:实时同时记录 stdoutstderr

    一个简单的解决方案,将标准输出和标准错误同时实时逐行记录到日志文件中。

    import subprocess as sp
    from concurrent.futures import ThreadPoolExecutor
    
    
    def log_popen_pipe(p, stdfile):
    
        with open("mylog.txt", "w") as f:
    
            while p.poll() is None:
                f.write(stdfile.readline())
                f.flush()
    
            # Write the rest from the buffer
            f.write(stdfile.read())
    
    
    with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:
    
        with ThreadPoolExecutor(2) as pool:
            r1 = pool.submit(log_popen_pipe, p, p.stdout)
            r2 = pool.submit(log_popen_pipe, p, p.stderr)
            r1.result()
            r2.result()
    

    解决方案 2:一个函数 read_popen_pipes(),允许您同时实时迭代两个管道 (stdout/stderr)

    import subprocess as sp
    from queue import Queue, Empty
    from concurrent.futures import ThreadPoolExecutor
    
    
    def enqueue_output(file, queue):
        for line in iter(file.readline, ''):
            queue.put(line)
        file.close()
    
    
    def read_popen_pipes(p):
    
        with ThreadPoolExecutor(2) as pool:
            q_stdout, q_stderr = Queue(), Queue()
    
            pool.submit(enqueue_output, p.stdout, q_stdout)
            pool.submit(enqueue_output, p.stderr, q_stderr)
    
            while True:
    
                if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                    break
    
                out_line = err_line = ''
    
                try:
                    out_line = q_stdout.get_nowait()
                    err_line = q_stderr.get_nowait()
                except Empty:
                    pass
    
                yield (out_line, err_line)
    
    # The function in use:
    
    with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:
    
        for out_line, err_line in read_popen_pipes(p):
            print(out_line, end='')
            print(err_line, end='')
    
        p.poll()
    
    

    【讨论】:

    • 感谢您的“read_popen_pipes”。它就像一个魅力,即使对于像我这样的 Python 线程新手也很容易使用。其他人注意:“return p.poll()”假设代码在函数内部运行。要使其作为独立示例运行,只需将“return p.poll()”替换为“sys.exit(p.poll())”另外,将“my_cmd”替换为 [“ls”] 或任何您想要的命令运行。
    • @DoomGoober 感谢您的客气话。我已经按照您的建议修复了代码。我离开了sys.exit,以使示例尽可能简单。
    【解决方案7】:

    与之前的答案类似,但以下解决方案适用于我在 Windows 上使用 Python3 提供一种实时打印和登录的常用方法 (getting-realtime-output-using-python):

    def print_and_log(command, logFile):
        with open(logFile, 'wb') as f:
            command = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
    
            while True:
                output = command.stdout.readline()
                if not output and command.poll() is not None:
                    f.close()
                    break
                if output:
                    f.write(output)
                    print(str(output.strip(), 'utf-8'), flush=True)
            return command.poll()
    

    【讨论】:

    • 如果我也想最后返回标准输出我会修改什么?
    【解决方案8】:

    如果您只需要输出将在控制台上可见,对我来说最简单的解决方案是将以下参数传递给 Popen

    with Popen(cmd, stdout=sys.stdout, stderr=sys.stderr) as proc:
    

    这将使用你的 python 脚本 stdio 文件句柄

    【讨论】:

      【解决方案9】:

      一个好的但“重量级”的解决方案是使用 Twisted - 见底部。

      如果你愿意只使用标准输出,那么这些方面的东西应该可以工作:

      import subprocess
      import sys
      popenobj = subprocess.Popen(["ls", "-Rl"], stdout=subprocess.PIPE)
      while not popenobj.poll():
         stdoutdata = popenobj.stdout.readline()
         if stdoutdata:
            sys.stdout.write(stdoutdata)
         else:
            break
      print "Return code", popenobj.returncode
      

      (如果你使用 read() 它会尝试读取整个“文件”,这是没有用的,我们真正可以在这里使用的是读取管道中所有数据的东西)

      也可以尝试使用线程来解决这个问题,例如:

      import subprocess
      import sys
      import threading
      
      popenobj = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True)
      
      def stdoutprocess(o):
         while True:
            stdoutdata = o.stdout.readline()
            if stdoutdata:
               sys.stdout.write(stdoutdata)
            else:
               break
      
      t = threading.Thread(target=stdoutprocess, args=(popenobj,))
      t.start()
      popenobj.wait()
      t.join()
      print "Return code", popenobj.returncode
      

      现在我们可以通过两个线程潜在地添加 stderr。

      请注意,子流程文档不鼓励直接使用这些文件,并建议使用communicate()(主要关注死锁,我认为这不是上面的问题),解决方案有点笨拙,所以看起来真的像 subprocess 模块不能胜任这项工作(另请参阅:http://www.python.org/dev/peps/pep-3145/),我们需要看看其他内容。

      更复杂的解决方案是使用Twisted,如下所示:https://twistedmatrix.com/documents/11.1.0/core/howto/process.html

      使用Twisted 执行此操作的方式是使用reactor.spawnprocess() 创建进程并提供一个ProcessProtocol,然后异步处理输出。 Twisted 示例 Python 代码在这里:https://twistedmatrix.com/documents/11.1.0/core/howto/listings/process/process.py

      【讨论】:

      • 谢谢!我刚刚尝试过这样的事情(基于@PauloAlmeida 的评论,但是我对 subprocess.Popen 的调用被阻塞了——即它只有在返回时才进入 while 循环......
      • 这不是正在发生的事情。它立即进入while循环,然后阻塞read()调用,直到子进程退出并且父进程在管道上收到EOF
      • @Alp 很有趣!就是这样。
      • 是的,我发布得太快了。它实际上不能正常工作,也不容易修复。回到绘图桌。
      • @zhermes:所以 read() 的问题是它会尝试读取整个输出直到 EOF 没有用处。 readline() 有帮助,并且可能是您所需要的(虽然很长的行也可能是一个问题)。您还需要注意启动过程中的缓冲...
      【解决方案10】:

      基于以上所有我建议稍微修改版本(python3):

      • while 循环调用 readline(建议的 iter 解决方案对我来说似乎永远阻塞 - Python 3、Windows 7)
      • 结构化,因此在轮询返回 not-None 后不需要重复处理读取数据
      • stderr 通过管道传输到 stdout,以便读取两个输出输出
      • 添加了获取 cmd 退出值的代码。

      代码:

      import subprocess
      proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
                              stderr=subprocess.STDOUT, universal_newlines=True)
      while True:
          rd = proc.stdout.readline()
          print(rd, end='')  # and whatever you want to do...
          if not rd:  # EOF
              returncode = proc.poll()
              if returncode is not None:
                  break
              time.sleep(0.1)  # cmd closed stdout, but not exited yet
      
      # You may want to check on ReturnCode here
      

      【讨论】:

        【解决方案11】:

        看起来行缓冲输出对您有用,在这种情况下,可能适合以下内容。 (警告:它未经测试。)这只会实时提供子进程的标准输出。如果您想同时拥有 stderr 和 stdout 实时,则必须使用 select 做一些更复杂的事情。

        proc = subprocess.Popen(run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
        while proc.poll() is None:
            line = proc.stdout.readline()
            print line
            log_file.write(line + '\n')
        # Might still be data on stdout at this point.  Grab any
        # remainder.
        for line in proc.stdout.read().split('\n'):
            print line
            log_file.write(line + '\n')
        # Do whatever you want with proc.stderr here...
        

        【讨论】:

          【解决方案12】:

          为什么不直接将stdout 设置为sys.stdout?如果你还需要输出到日志,那么你可以简单地重写 f 的 write 方法。

          import sys
          import subprocess
          
          class SuperFile(open.__class__):
          
              def write(self, data):
                  sys.stdout.write(data)
                  super(SuperFile, self).write(data)
          
          f = SuperFile("log.txt","w+")       
          process = subprocess.Popen(command, stdout=f, stderr=f)
          

          【讨论】:

          • 那行不通:子进程模块分叉并将stdout 文件描述符设置为传递的文件对象的文件描述符。 write-method 永远不会被调用(至少 subprocess 对 stderr 是这样做的,我猜对 stdout 也是如此)。
          【解决方案13】:

          我尝试的所有上述解决方案要么未能分离 stderr 和 stdout 输出(多个管道),要么在操作系统管道缓冲区已满时永远阻塞,当您运行的命令输出太快时会发生这种情况(有一个警告这在子进程的python poll() 手册上)。我发现唯一可靠的方法是通过 select,但这是一个仅限 posix 的解决方案:

          import subprocess
          import sys
          import os
          import select
          # returns command exit status, stdout text, stderr text
          # rtoutput: show realtime output while running
          def run_script(cmd,rtoutput=0):
              p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
              poller = select.poll()
              poller.register(p.stdout, select.POLLIN)
              poller.register(p.stderr, select.POLLIN)
          
              coutput=''
              cerror=''
              fdhup={}
              fdhup[p.stdout.fileno()]=0
              fdhup[p.stderr.fileno()]=0
              while sum(fdhup.values()) < len(fdhup):
                  try:
                      r = poller.poll(1)
                  except select.error, err:
                      if err.args[0] != EINTR:
                          raise
                      r=[]
                  for fd, flags in r:
                      if flags & (select.POLLIN | select.POLLPRI):
                          c = os.read(fd, 1024)
                          if rtoutput:
                              sys.stdout.write(c)
                              sys.stdout.flush()
                          if fd == p.stderr.fileno():
                              cerror+=c
                          else:
                              coutput+=c
                      else:
                          fdhup[fd]=1
              return p.poll(), coutput.strip(), cerror.strip()
          

          【讨论】:

          • 另一种选择是在每个管道中分离一个线程。每个线程都可以在管道上执行阻塞 I/O,而不会阻塞其他线程。但这引入了它自己的一系列问题。所有方法都有烦恼,您只需选择您认为最不烦人的方法。 :-)
          • 对我不起作用TypeError: can only concatenate str (not "bytes") to str -Python 3.8.5
          【解决方案14】:

          我认为subprocess.communicate 方法有点误导:它实际上填充了您在subprocess.Popen 中指定的stdoutstderr

          然而,从subprocess.PIPE 中读取您可以提供给subprocess.Popenstdoutstderr 参数最终会填满操作系统管道缓冲区并死锁您的应用程序(特别是如果您有多个必须使用subprocess 的进程/线程)。

          我建议的解决方案是为 stdoutstderr 提供文件 - 并读取文件的内容,而不是从死锁 PIPE 中读取。这些文件可以是tempfile.NamedTemporaryFile() - 当它们被subprocess.communicate 写入时,也可以访问以进行读取。

          以下是示例用法:

                  try:
                      with ProcessRunner(('python', 'task.py'), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                          for out in process_runner:
                              print(out)
                  catch ProcessError as e:
                      print(e.error_message)
                      raise
          

          这是准备好使用的源代码,我可以提供尽可能多的 cmets 来解释它的作用:

          如果您使用的是 python 2,请确保首先从 pypi 安装最新版本的 subprocess32 包。

          
          import os
          import sys
          import threading
          import time
          import tempfile
          import logging
          
          if os.name == 'posix' and sys.version_info[0] < 3:
              # Support python 2
              import subprocess32 as subprocess
          else:
              # Get latest and greatest from python 3
              import subprocess
          
          logger = logging.getLogger(__name__)
          
          
          class ProcessError(Exception):
              """Base exception for errors related to running the process"""
          
          
          class ProcessTimeout(ProcessError):
              """Error that will be raised when the process execution will exceed a timeout"""
          
          
          class ProcessRunner(object):
              def __init__(self, args, env=None, timeout=None, bufsize=-1, seconds_to_wait=0.25, **kwargs):
                  """
                  Constructor facade to subprocess.Popen that receives parameters which are more specifically required for the
                  Process Runner. This is a class that should be used as a context manager - and that provides an iterator
                  for reading captured output from subprocess.communicate in near realtime.
          
                  Example usage:
          
          
                  try:
                      with ProcessRunner(('python', task_file_path), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                          for out in process_runner:
                              print(out)
                  catch ProcessError as e:
                      print(e.error_message)
                      raise
          
                  :param args: same as subprocess.Popen
                  :param env: same as subprocess.Popen
                  :param timeout: same as subprocess.communicate
                  :param bufsize: same as subprocess.Popen
                  :param seconds_to_wait: time to wait between each readline from the temporary file
                  :param kwargs: same as subprocess.Popen
                  """
                  self._seconds_to_wait = seconds_to_wait
                  self._process_has_timed_out = False
                  self._timeout = timeout
                  self._process_done = False
                  self._std_file_handle = tempfile.NamedTemporaryFile()
                  self._process = subprocess.Popen(args, env=env, bufsize=bufsize,
                                                   stdout=self._std_file_handle, stderr=self._std_file_handle, **kwargs)
                  self._thread = threading.Thread(target=self._run_process)
                  self._thread.daemon = True
          
              def __enter__(self):
                  self._thread.start()
                  return self
          
              def __exit__(self, exc_type, exc_val, exc_tb):
                  self._thread.join()
                  self._std_file_handle.close()
          
              def __iter__(self):
                  # read all output from stdout file that subprocess.communicate fills
                  with open(self._std_file_handle.name, 'r') as stdout:
                      # while process is alive, keep reading data
                      while not self._process_done:
                          out = stdout.readline()
                          out_without_trailing_whitespaces = out.rstrip()
                          if out_without_trailing_whitespaces:
                              # yield stdout data without trailing \n
                              yield out_without_trailing_whitespaces
                          else:
                              # if there is nothing to read, then please wait a tiny little bit
                              time.sleep(self._seconds_to_wait)
          
                      # this is a hack: terraform seems to write to buffer after process has finished
                      out = stdout.read()
                      if out:
                          yield out
          
                  if self._process_has_timed_out:
                      raise ProcessTimeout('Process has timed out')
          
                  if self._process.returncode != 0:
                      raise ProcessError('Process has failed')
          
              def _run_process(self):
                  try:
                      # Start gathering information (stdout and stderr) from the opened process
                      self._process.communicate(timeout=self._timeout)
                      # Graceful termination of the opened process
                      self._process.terminate()
                  except subprocess.TimeoutExpired:
                      self._process_has_timed_out = True
                      # Force termination of the opened process
                      self._process.kill()
          
                  self._process_done = True
          
              @property
              def return_code(self):
                  return self._process.returncode
          
          
          
          

          【讨论】:

            【解决方案15】:

            这是我在我的一个项目中使用的一个类。它将子进程的输出重定向到日志。起初我尝试简单地覆盖写方法,但这不起作用,因为子进程永远不会调用它(重定向发生在文件描述符级别)。所以我使用我自己的管道,类似于它在子进程模块中的完成方式。这具有将所有日志记录/打印逻辑封装在适配器中的优点,您可以简单地将记录器的实例传递给Popensubprocess.Popen("/path/to/binary", stderr = LogAdapter("foo"))

            class LogAdapter(threading.Thread):
            
                def __init__(self, logname, level = logging.INFO):
                    super().__init__()
                    self.log = logging.getLogger(logname)
                    self.readpipe, self.writepipe = os.pipe()
            
                    logFunctions = {
                        logging.DEBUG: self.log.debug,
                        logging.INFO: self.log.info,
                        logging.WARN: self.log.warn,
                        logging.ERROR: self.log.warn,
                    }
            
                    try:
                        self.logFunction = logFunctions[level]
                    except KeyError:
                        self.logFunction = self.log.info
            
                def fileno(self):
                    #when fileno is called this indicates the subprocess is about to fork => start thread
                    self.start()
                    return self.writepipe
            
                def finished(self):
                   """If the write-filedescriptor is not closed this thread will
                   prevent the whole program from exiting. You can use this method
                   to clean up after the subprocess has terminated."""
                   os.close(self.writepipe)
            
                def run(self):
                    inputFile = os.fdopen(self.readpipe)
            
                    while True:
                        line = inputFile.readline()
            
                        if len(line) == 0:
                            #no new data was added
                            break
            
                        self.logFunction(line.strip())
            

            如果您不需要记录而只是想使用print(),您显然可以删除大部分代码并保持类更短。您还可以通过__enter____exit__ 方法对其进行扩展,并在__exit__ 中调用finished,以便您可以轻松地将其用作上下文。

            【讨论】:

              【解决方案16】:

              没有一个 Pythonic 解决方案对我有用。 原来proc.stdout.read() 或类似的可能会永远阻塞。

              因此,我像这样使用tee

              subprocess.run('./my_long_running_binary 2>&1 | tee -a my_log_file.txt && exit ${PIPESTATUS}', shell=True, check=True, executable='/bin/bash')
              

              如果您已经在使用shell=True,此解决方案会很方便。

              ${PIPESTATUS} 捕获整个命令链的成功状态(仅在 Bash 中可用)。 如果我省略了&amp;&amp; exit ${PIPESTATUS},那么这将始终返回零,因为tee 永远不会失败。

              unbuffer 可能需要立即将每一行打印到终端中,而不是等待太久直到“管道缓冲区”被填满。 但是,unbuffer 会吞掉 assert (SIG Abort) 的退出状态...

              2&gt;&amp;1 还将 stderror 记录到文件中。

              【讨论】:

                【解决方案17】:

                我找到了一个非常复杂的问题的简单解决方案。

                1. stdout 和 stderr 都需要流式传输。
                2. 两者都需要是非阻塞的:当没有输出时和输出太多时。
                3. 不想使用线程或多处理,也不愿意使用 pexpect。

                这个解决方案使用了我发现的一个要点here

                import subprocess as sbp
                import fcntl
                import os
                
                def non_block_read(output):
                    fd = output.fileno()
                    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
                    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
                    try:
                        return output.readline()
                    except:
                        return ""
                
                with sbp.Popen('find / -name fdsfjdlsjf',
                                shell=True,
                                universal_newlines=True,
                                encoding='utf-8',
                                bufsize=1,
                                stdout=sbp.PIPE,
                                stderr=sbp.PIPE) as p:
                    while True:
                        out = non_block_read(p.stdout)
                        err = non_block_read(p.stderr)
                        if out:
                            print(out, end='')
                        if err:
                            print('E: ' + err, end='')
                        if p.poll() is not None:
                            break
                

                【讨论】:

                • 像魅力一样工作
                【解决方案18】:
                import os
                
                def execute(cmd, callback):
                    for line in iter(os.popen(cmd).readline, ''): 
                            callback(line[:-1])
                
                execute('ls -a', print)
                

                【讨论】:

                  猜你喜欢
                  • 2020-10-19
                  • 2021-12-10
                  • 2022-01-22
                  • 2011-09-30
                  • 1970-01-01
                  • 1970-01-01
                  • 1970-01-01
                  • 1970-01-01
                  相关资源
                  最近更新 更多