【问题标题】:python interactive subprocess communicatepython交互式子进程通信
【发布时间】:2021-09-29 20:06:35
【问题描述】:

我正在尝试学习如何编写交互式子进程通信。

我需要不断地从stdout 读取并写入stdin,下面是我的代码,它有点“有效”,但我不确定我是否做得对(这是非常黑的代码)

假设我有一个名为 app.py 的脚本,如下所示

import logging
import random

def app():
    number1 = random.randint(1,100)
    number2 = random.randint(200,500)
    logging.info("number1: %s, number2: %s", number1, number2)
    ans = input("enter sum of {} and {}: ".format(number1, number2))
    logging.info("received answer: %s", ans)
    try:
        if int(ans) != number1+number2:
            raise ValueError
        logging.info("{} is the correct answer".format(ans))
    except (ValueError,TypeError):
        logging.info("{} is incorrect answer".format(ans))

def main():
    logging.basicConfig(level=logging.DEBUG, filename='log.log')
    for x in range(10):
        app()

if __name__ == '__main__':
    main()

与上面的脚本(app.py)交互我有一些非常丑陋的代码

import queue
import time
import threading
import subprocess
import os
import pty
import re

class ReadStdout(object):
    def __init__(self):
        self.queue = queue.Queue()
        self._buffer_ = []

    def timer(self, timeout=0.1):
        buffer_size = 0
        while True:
            if len(self._buffer_) > buffer_size:
                buffer_size = len(self._buffer_)
            time.sleep(timeout)
            if len(self._buffer_) == buffer_size and buffer_size!=0:
                self.queue.put(''.join(self._buffer_))
                self._buffer_ = []
                buffer_size = 0

    def read(self, fd):
        while True:
            self._buffer_.append(fd.read(1))

    def run(self):
        timer = threading.Thread(target=self.timer)
        timer.start()
        master, slave = pty.openpty()
        p = subprocess.Popen(['python', 'app.py'], stdout=slave, stderr=slave, stdin=subprocess.PIPE, close_fds=True)
        stdout = os.fdopen(master)
        read_thread = threading.Thread(target=self.read, args=(stdout,))
        read_thread.start()
        while True:
            if self.queue.empty():
                time.sleep(0.1)
                continue
            msg = self.queue.get()
            digits = (re.findall('(\d+)', msg))
            ans = (int(digits[0])+int(digits[1]))
            print("got message: {} result: {}".format(msg, ans))
            p.stdin.write(b"%d\n" %ans)
            p.stdin.flush()

if __name__ == '__main__':
    x = ReadStdout()
    x.run()

我觉得我的做法不对。与另一个脚本交互的正确方法是什么(我需要标准输出,而不仅仅是盲写到标准输入)

谢谢

【问题讨论】:

  • 不一样。我知道如何将标准输入发送到进程。但我需要先读取标准输出,然后根据结果写入标准输入。上面的线程不用担心标准输出,它只是在标准输入中盲目输入。
  • 我也可以阅读标准输出,但我不确定我是否正确阅读。因为 readline() 将不起作用,因为 input() 示例不发送换行符。我可以阅读(大小),但我不知道这是我需要阅读的“大小”。假设我真的不知道“input()”字符串的大小。
  • 这是 Python 3,对吧?您的语法都没有明确表明它,但我认为您依赖于 app.pyio 模块的微妙之处...

标签: python subprocess


【解决方案1】:

此代码将与您的 app.py 一起使用,因此您可以从中获取交互的基本逻辑。另外,我建议您查看pexpect 模块。在任何情况下 - 您必须知道运行程序会发生什么以及它的输入行如何结束。或者您可以在读取行时实现一些超时,因此如果某些事情不符合预期,则可能会引发异常。

import subprocess
from functools import partial

child = subprocess.Popen(['app.py'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True)

# iterate while child is not terminated
while child.poll() is None:
    line = ''
    # read stdout character by character until a colon appears
    for c in iter(partial(child.stdout.read, 1), ''):
        if c == ':':
            break
        line += c
    if "enter sum" in line:
        numbers = filter(str.isdigit, line.split())
        numbers = list(map(int, numbers))
        child.stdin.write("{0}\n".format(sum(numbers)))

【讨论】:

  • 所以如果我不知道会发生什么,唯一的办法就是超时。没有更好的内置方式来使用它。感谢您的帮助。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-09-27
  • 2017-09-19
  • 2019-04-24
  • 1970-01-01
  • 2013-01-05
相关资源
最近更新 更多