【问题标题】:Async IO - reading char from input blocks output异步 IO - 从输入块输出中读取字符
【发布时间】:2023-12-22 18:12:01
【问题描述】:

注意:这个例子是在 linux 终端模拟器上测试的,由于使用了termios(我不知道它是否是跨平台的),它可能无法在其他操作系统的终端上正常运行。


我一直在尝试制作“异步”python 提示符。我的意思是,当用户从提示中输入输入时,他们也可以接收消息,而无需取消输入。

下面是使用asyncio.Queue 和一些termios 标志的实现(对于复杂性,我提前抱歉,我试图尽可能减少它):

import sys, termios, os
import asyncio

def readline(prompt: str = "Input: "):
    # termios stuff to: disable automatic echo so that, when a character is typed, it is not immediately printed on screen
    #                   read a single character from stdin without pressing <Enter> to finish
    fd = sys.stdin.fileno()
    orig_termios = termios.tcgetattr(fd)
    new_termios = termios.tcgetattr(fd)
    new_termios[3] &= ~(termios.ICANON | termios.ECHO)

    # set to new termios
    termios.tcsetattr(fd, termios.TCSADRAIN, new_termios)

    async def terminput(queue: asyncio.Queue):
        """Get terminal input and send it to the queue."""
        while True:
            ch = sys.stdin.read(1) # read a single char (works because of the termios config)

            if ch == "\n":
                await queue.put(("finish", None)) # `None` here because we won't use the second argument
                await asyncio.sleep(0) # strange workaround so the queues actually work
                continue

            await queue.put(("input", ch))
            await asyncio.sleep(0) # strange workaround so the queues actually work

    async def timedsender(queue: asyncio.Queue):
        """Every 0.5 seconds, send a message to the queue."""
        while True:
            await queue.put(("message", "I'm a message!"))
            await asyncio.sleep(0.5)
    
    async def receiver(queue: asyncio.Queue):
        """Handle the receiving of messages and input characters."""
        # Late decision that I might be able to fix easily - I had to use a list to push characters into on a earlier version of the code. It can be a string now, though.
        input_buffer = []

        sys.stdout.write(prompt)
        sys.stdout.flush()

        def clear_line():
            """Clear the current line.

            There might be an escape code that does this already. Eh, anyways...
            """
            sys.stdout.write("\r")
            sys.stdout.write(" " * os.get_terminal_size().columns)
            sys.stdout.write("\r")
            sys.stdout.flush()

        def redraw_input_buffer():
            """Redraw the input buffer.

            Shows the prompt and what has been typed until now.
            """
            sys.stdout.write(prompt + "".join(input_buffer))
            sys.stdout.flush()

        while True:
            # So, lemme explain what this format is.
            # Each item sent on the queue should be a tuple.
            # The first element is what should be done with the content (such as show message, add to input buffer), and the second element is the content itself.
            kind, content = await queue.get()

            if kind == "message":
                clear_line()
                sys.stdout.write(f"Message -- {content}\n")
                sys.stdout.flush()
                redraw_input_buffer()
            elif kind == "input":
                sys.stdout.write(content)
                sys.stdout.flush()
                input_buffer += content
            elif kind == "finish":
                sys.stdout.write("\n")

                sys.stdout.write(f"INPUT FINISHED :: {repr(''.join(input_buffer))}\n")
                sys.stdout.flush()
                
                input_buffer.clear()
                redraw_input_buffer()
                # continue reading more input lines...
            else:
                raise ValueError(f"Unknown kind: {repr(kind)}")

            queue.task_done()
    
    async def main():
        queue = asyncio.Queue()
    
        senders = [terminput(queue), timedsender(queue)]
        recv = receiver(queue)
        await asyncio.gather(*senders, recv)
    
        await queue.join()
        recv.cancel()

    try:
        asyncio.run(main())
    finally:
        # reset to original termios
        termios.tcsetattr(fd, termios.TCSADRAIN, orig_termios)

readline()

这里的主要问题是只有在输入字符时才会读取队列,即使这样,如果我没有等待足够的时间来读取下一个字符,例如asyncio.sleep(0.1),通常只有一个在此期间收到消息。

我不确定问题是队列还是 stdin-stdout 机制的某些内部工作原理(可能在 stdin 被阻塞时我无法写入 stdout)。

【问题讨论】:

    标签: python linux asynchronous stdout stdin


    【解决方案1】:

    刚刚想出了解决这个问题的方法——设置输入字符的最大等待时间。

    readline()的顶部:

    def readline(prompt: str = "Input: "):
        fd = sys.stdin.fileno()
        orig_termios = termios.tcgetattr(fd)
        new_termios = termios.tcgetattr(fd)
        new_termios[3] &= ~(termios.ICANON | termios.ECHO)
        
        # the following lines were added:
        new_termios[6][termios.VMIN] = 0 # minimal amount of characters to
        new_termios[6][termios.VTIME] = 1 # a max wait time of 1/10 second
    

    当直接在 C 上使用它时,超时返回的字符将是代码 170,但在这里似乎甚至没有发生(来自 Python 的读取操作可能已经忽略它们)。

    【讨论】:

      最近更新 更多