【问题标题】:How to create non-blocking continuous reading from `stdin`?如何从`stdin`创建非阻塞连续读取?
【发布时间】:2015-07-24 22:09:39
【问题描述】:

我有一个进程,它是这样创建的:

p = subprocess.Popen(args   = './myapp',
                     stdin  = subprocess.PIPE,
                     stdout = subprocess.PIPE,
                     universal_newlines=True)

稍后,我正在尝试写信给pstdin

p.stdin.write('my message\n')

myapp 进程具有以下设置:

q = queue.Queue()
def get_input():
    for line in iter(sys.stdin.readline, ''):
        q.put(line)
    sys.stdin.close()

threading.Thread(name   = 'input-getter',
                 target = get_input).start()

它正在尝试连续读取新行,如下所示:

try:
    print('input:', q.get_nowait())
except Empty:
    print('no input')

不幸的是,子进程从未收到我的任何消息。当然,当我使用时:

p.communicate('my message\n')

子进程收到消息,但正如预期的那样,communicate 方法关闭了pstdin,因此不再进行通信。

【问题讨论】:

  • 如果你不想结束进程,那么你不应该使用communicate(它只发送那个数据然后等待进程终止);而是直接写信给p.stdin
  • stdin.flush() ?那么使用async_subprocess 之类的模块呢?
  • @InbarRose 已经尝试过了.. 不走运..
  • 这个实现怎么样? code.activestate.com/recipes/…
  • @InbarRose 很快就会尝试,但我希望有比这更简单的解决方案..

标签: python python-3.x subprocess stdin nonblocking


【解决方案1】:
p = subprocess.Popen(args   = './myapp',
                     stdin  = subprocess.PIPE,
                     stdout = subprocess.PIPE,
                     universal_newlines=True)

while p.poll() is None:
    data = p.stdout.readline()

这将在进程退出之前创建对进程的非阻塞读取。 但是,这里有一些注意事项需要注意。例如,如果您也使用管道stderr,但不从中读取。那么您很可能会填充一两个缓冲区,并且无论如何您都会挂起程序。因此,请务必确保在手动执行操作时清除所有缓冲区 I/O。

如果可能的话,更好的选择是使用select.epoll(),这仅在unix系统上可用,但会给你带来更好的性能和错误处理:)

epoll = select.epoll()
epoll.register(p.stdout.fileno(), select.EPOLLHUP) # Use select.EPOLLIN for stdin.

for fileno, event in epoll.poll(1):
    if fileno == p.stdout.fileno():
        # ... Do something ...

注意:请记住,每当一个进程需要输入时,它通常会通过stdout 指示这一点,因此您仍将使用select.epoll 注册STDOUT 以检查“等待输入”。您可以注册select.EPOLLIN 以检查是否提供了输入,但我几乎看不出重点,因为请记住,您选择输入到您应该已经知道的过程中的是“正在发生”。

检查进程是否需要输入

您可以使用select.epoll 来检查进程是否正在等待输入,而不会阻塞您的应用程序执行上面的示例。但是还有更好的选择。

Pexpect 是一个非常出色的库,例如可以与 SSH 一起使用。

它的工作方式与子流程略有不同,但可能是一个不错的选择。

让 subprocess.popen 与 SSH 一起工作

如果这是您所追求的,我将重定向到另一个问题+答案(因为 SSH 将以受保护的方式生成 stdin

Python + SSH Password auth (no external libraries or public/private keys)?

【讨论】:

  • 首先:感谢您的回答。第二:你只提到了stdout,这对stdin也有效,因为我的问题是专门寻找那个?
  • @PeterVaro 由于标准输入是用户控制的(也就是你输入的东西),它本质上已经是非阻塞的。但是看到您的流程可能需要输入,是的 select 也适用(我实际上已经介绍过这个,但我使用了 stdout 结合 select.EPOLLIN 这是我的错误,EPOLLIN 用于 stdinEPOLLHUP是给stdout的。但我已经更新了我的答案,可能适合你的需要。
  • 为什么你认为p.stdout.read() 是非阻塞的?它直到 EOF 才会返回。
  • @J.F.Sebastian 正确,我使用了read() 而不是readline(),这是我的错误。这就是为什么我还没有完全准备好从头顶编写代码:)
  • 为什么你认为p.stdout.readline() 是非阻塞的?直到换行符或 EOF 才会返回。此外,p.poll() 在这里是不必要的:for line in p.stdout: 在 Python 3 上工作得很好。
【解决方案2】:

我认为您可能只是没有看到正在发生的事情的输出。这是一个似乎适用于我的盒子的完整示例,除非我完全误解了你想要的东西。我所做的主要更改是将stdoutp 设置为sys.stdout 而不是subprocess.PIPE。也许我误解了你的问题的主旨,那一点很关键......

这是完整的代码和输出:

在发送(测试)过程中(我将其命名为 test_comms.py)。我目前在 Windows 上,所以请原谅.bat

import time
import subprocess
import sys

# Note I'm sending stdout to sys.stdout for observation purposes
p = subprocess.Popen(args = 'myapp.bat',
                     stdin  = subprocess.PIPE,
                     stdout = sys.stdout,
                     universal_newlines=True)

#Send 10 messages to the process's stdin, 1 second apart                    
for i in range(10):
    time.sleep(1)
    p.stdin.write('my message\n')

myapp.bat 很简单:

echo "In the bat cave (script)"
python myapp.py

myapp.py 包含(使用 Queue 而不是 queue - 当前环境 Python 2):

import Queue
from Queue import Empty
import threading
import sys
import time

def get_input():
    print("Started the listening thread")
    for line in iter(sys.stdin.readline, ''):
        print("line arrived to put on the queue\n")
        q.put(line)
    sys.stdin.close()

print("Hi, I'm here via popen")    
q = Queue.Queue()

threading.Thread(name   = 'input-getter',
                 target = get_input).start()

print("stdin listener Thread created and started")

# Read off the queue - note it's being filled asynchronously based on 
# When it receives messages.  I set the read interval below to 2 seconds
# to illustrate the queue filling and emptying.
while True:
    time.sleep(2)
    try:
        print('Queue size is',q.qsize())
        print('input:', q.get_nowait())
    except Empty:
        print('no input')

print("Past my end of code...")

输出:

D:\>comms_test.py

D:\>echo "In the bat cave (script)"
"In the bat cave (script)"

D:\>python myapp.py
Hi, I'm here via popen
Started the listening threadstdin listener Thread created and started

line arrived to put on the queue

line arrived to put on the queue

('Queue size is', 2)
('input:', 'my message\n')
line arrived to put on the queue

line arrived to put on the queue

('Queue size is', 3)
('input:', 'my message\n')
line arrived to put on the queue

line arrived to put on the queue

('Queue size is', 4)
('input:', 'my message\n')
line arrived to put on the queue

line arrived to put on the queue

('Queue size is', 5)
('input:', 'my message\n')
line arrived to put on the queue

line arrived to put on the queue


D:\>('Queue size is', 6)
('input:', 'my message\n')
('Queue size is', 5)
('input:', 'my message\n')
('Queue size is', 4)
('input:', 'my message\n')
('Queue size is', 3)
('input:', 'my message\n')
('Queue size is', 2)
('input:', 'my message\n')
('Queue size is', 1)
('input:', 'my message\n')
('Queue size is', 0)
no input
('Queue size is', 0)
no input
('Queue size is', 0)
no input

【讨论】:

  • 除非在 Python 脚本中重新分配 sys.stdout 然后完全省略 stdout 参数应该具有相同的效果。
  • Python 3 中有几个缓冲问题;我会使用print('my message', file=p.stdin, flush=True) 而不是p.stdin.write('my message\n')。传递明确的bufsize=1
  • 除非必须在当前没有输入的情况下每 2 秒打印一次“无输入”,否则我会使用简单的 for line in sys.stdin: print('input: ' + line, end='') 而不是线程、队列等。
  • 感谢 cmets 的帮助 - 我正在移动,但稍后会编辑。 TBH,我对用例并不是 100% 清楚,并且取决于任何 OP cmets 可能必须进行大量修改。我也会在 Python 3 上进行测试
【解决方案3】:

为了一切正常,您必须刷新主进程 (p.stdout) 和子进程 (sys.stdout) 中的输出。

communicate 两者都刷新:

  • 关闭时会刷新p.stdin
  • 它等待sys.stdout 输出被刷新(就在退出之前)

工作示例main.py

import subprocess,time
import sys
p = subprocess.Popen(args   = ['python3', './myapp.py'],
                     stdin  = subprocess.PIPE,
                     stdout = subprocess.PIPE,
                     universal_newlines=True)

time.sleep(0.5)
p.stdin.write('my message\n')
p.stdin.flush()
#print("ici")
for i,l in  enumerate(iter(p.stdout.readline, ''),start=1):

    print("main:received:",i,repr(l))
    if i == 6:
        break
    print("mainprocess:send:other message n°{}".format(i))
    p.stdin.write("other message n°{}\n".format(i))
    p.stdin.flush()

print("main:waiting for subprocess")
p.stdin.close()    
p.wait()

myapp.py 的示例 导入队列、线程、系统、时间、rpdb

q = queue.Queue()
def get_input():
    for line in iter(sys.stdin.readline, ''):
        q.put(line)
    sys.stdin.close()

threading.Thread(name   = 'input-getter',
                 target = get_input).start()
for i in range(6):
    try:
        l= q.get_nowait()
        print('myapp:input:', l,end="")
        sys.stdout.flush()

    except queue.Empty:
        print("myapp:no input")
        sys.stdout.flush()    
        time.sleep(1)

结果:

main:received: 1 'myapp:no input\n'
mainprocess:send:other message n°1
main:received: 2 'myapp:input: my message\n'
mainprocess:send:other message n°2
main:received: 3 'myapp:input: other message n°1\n'
mainprocess:send:other message n°3
main:received: 4 'myapp:no input\n'
mainprocess:send:other message n°4
main:received: 5 'myapp:input: other message n°2\n'
mainprocess:send:other message n°5
main:received: 6 'myapp:input: other message n°3\n'
main:waiting for subprocess

【讨论】:

    【解决方案4】:

    为了调查您的程序,我编写了自己的“不断将内容流式传输到 cat 并捕获它返回的内容”程序。我没有实现它的子流程方面,但希望结构是相似的。

    你的程序这行很奇怪...

    for line in iter(sys.stdin.readline, ''):
        q.put(line)
    sys.stdin.close()
    

    看起来很像

    for line in stdin:
        q.put(line)
    

    请注意,当管道关闭时循环将结束,之后无需重新关闭。

    如果您需要持续异步读取标准输入,您应该能够在下面的代码中构造一个与child_reader 几乎相同的读取线程。只需将child.stdout 替换为stdin

    import subprocess
    import threading
    import random
    
    # We may need to guard this?
    child = subprocess.Popen('cat', stdout=subprocess.PIPE, stdin=subprocess.PIPE)
    
    # Continuously print what the process outputs...
    def print_child():
        for line in child.stdout:
            print(line)
    
    child_reader = threading.Thread(target = print_child)
    child_reader.start()
    
    for i in range(10000):
        chars = 'ABC\n'
        child.stdin.write(random.choice(chars).encode())
    
    # Send EOF.
    # This kills the cat.
    child.stdin.close()
    
    # I don't think order matters here?
    child.wait()
    child_reader.join()
    

    【讨论】:

    • 1.你是对的,在 Python 3 中不需要 iter(..) for line in stdin 按原样工作。 2、“以后不用重新关闭。”是错误的。您需要它来避免依赖垃圾收集来处理相应的文件描述符(注意:不要混淆父进程和子进程中的管道——它们是连接的,但每个进程都有自己的集合)。
    • 好的,我可以理解清理文件描述符,但是sys.stdin.close()?
    • 我的意思是subprocess' pipes,例如您的代码中的child.stdout。我同意,在大多数情况下,关闭 sys.stdin 是没有意义的。
    【解决方案5】:

    我编写了一个程序,它...基本上所有涉及异步 IO 的事情。它在线程上读取输入,在线程上输出,创建进程,并在线程上与该进程通信。

    我不确定您的程序需要完成什么,但希望这段代码能够完成它。

    # Asynchronous cat program!
    
    # Asynchronously read stdin
    # Pump the results into a threadsafe queue
    # Asynchronously feed the contents to cat
    # Then catch the output from cat and print it
    # Thread all the things
    
    import subprocess
    import threading
    import queue
    import sys
    
    my_queue = queue.Queue()
    
    # Input!
    def input_method():
        for line in sys.stdin: # End on EOF
            if line == 'STOP\n': # Also end on STOP
                break
            my_queue.put(line)
    input_thread = threading.Thread(target=input_method)
    input_thread.start()
    
    print ('Input thread started')
    
    
    # Subprocess!
    cat_process = subprocess.Popen('cat', stdout=subprocess.PIPE, stdin=subprocess.PIPE)
    
    print ('cat process started')
    
    queue_alive = True
    # Continuously dump the queue into cat
    def queue_dump_method():
        while queue_alive:
            try:
                line = my_queue.get(timeout=2)
                cat_process.stdin.write(line.encode())
                cat_process.stdin.flush() # For some reason, we have to manually flush
                my_queue.task_done() # Needed?
            except queue.Empty:
                pass
    queue_dump_thread = threading.Thread(target = queue_dump_method)
    queue_dump_thread.start()
    
    print ('Queue dump thread started')
    
    # Output!
    def output_method():
        for line in cat_process.stdout:
            print(line)
    output_thread = threading.Thread(target=output_method)
    output_thread.start()
    
    print ('Output thread started')
    
    
    # input_thread will die when we type STOP
    input_thread.join()
    print ('Input thread joined')
    
    # Now we wait for the queue to finish processing
    my_queue.join()
    print ('Queue empty')
    
    queue_alive = False
    queue_dump_thread.join()
    print ("Queue dump thread joined")
    
    # Send EOF to cat
    cat_process.stdin.close()
    
    # This kills the cat
    cat_process.wait()
    print ('cat process done')
    
    # And make sure we're done outputting
    output_thread.join()
    print ('Output thread joined')
    

    【讨论】:

    • 附言。这个程序显然很傻,我怀疑你需要异步执行 all 这个 IO。
    猜你喜欢
    • 1970-01-01
    • 2011-06-25
    • 2020-10-27
    • 2021-12-13
    • 1970-01-01
    • 2012-03-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多