【问题标题】:Non-blocking console input?非阻塞控制台输入?
【发布时间】:2011-01-25 09:34:24
【问题描述】:

我正在尝试用 Python 制作一个简单的 IRC 客户端(作为我学习语言时的一种项目)。

我有一个循环,用于接收和解析 IRC 服务器发送给我的内容,但如果我使用 raw_input 输入内容,它会停止循环,直到我输入内容(显然)。

如何在不停止循环的情况下输入内容?

(我认为我不需要发布代码,我只想输入一些内容而不会停止 while 1: 循环。)

我在 Windows 上。

【问题讨论】:

标签: python windows input


【解决方案1】:

对于 Windows,仅限控制台,使用 msvcrt 模块:

import msvcrt

num = 0
done = False
while not done:
    print(num)
    num += 1

    if msvcrt.kbhit():
        print "you pressed",msvcrt.getch(),"so now i will quit"
        done = True

对于 Linux,article 描述了以下解决方案,它需要 termios 模块:

import sys
import select
import tty
import termios

def isData():
    return select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], [])

old_settings = termios.tcgetattr(sys.stdin)
try:
    tty.setcbreak(sys.stdin.fileno())

    i = 0
    while 1:
        print(i)
        i += 1

        if isData():
            c = sys.stdin.read(1)
            if c == '\x1b':         # x1b is ESC
                break

finally:
    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)

对于跨平台,或者如果您还需要 GUI,您可以使用 Pygame:

import pygame
from pygame.locals import *

def display(str):
    text = font.render(str, True, (255, 255, 255), (159, 182, 205))
    textRect = text.get_rect()
    textRect.centerx = screen.get_rect().centerx
    textRect.centery = screen.get_rect().centery

    screen.blit(text, textRect)
    pygame.display.update()

pygame.init()
screen = pygame.display.set_mode( (640,480) )
pygame.display.set_caption('Python numbers')
screen.fill((159, 182, 205))

font = pygame.font.Font(None, 17)

num = 0
done = False
while not done:
    display( str(num) )
    num += 1

    pygame.event.pump()
    keys = pygame.key.get_pressed()
    if keys[K_ESCAPE]:
        done = True

【讨论】:

  • 我已经有了pygame,所以我会试试这个。谢谢。不过,还有其他人有更好的解决方案吗?我想让它成为一个控制台。
  • 当输入通过其他进程传输时,有没有办法使这项工作?
  • 注意:windows解决方案实际上并没有检查stdin中是否有任何东西。它检查是否按下了键盘上的键,因此不会处理管道输入。
  • 我一直在寻找一种正确的方法来中断我在 input() 上阻塞的线程,但是使用 msvcrt 和 getch,我没有得到正确的箭头键历史行为、退格等从输入()获取。有没有办法轻松保持控制台行为,但仍然能够在 input() 上中断线程?
  • Linux 解决方案似乎对我不起作用:termios.error: (25, 'Inappropriate ioctl for device') 在线 old_settings = termios.tcgetattr(sys.stdin)。有什么想法吗?
【解决方案2】:

这是我见过的最棒的solution1。粘贴在这里以防链接失效:

#!/usr/bin/env python
'''
A Python class implementing KBHIT, the standard keyboard-interrupt poller.
Works transparently on Windows and Posix (Linux, Mac OS X).  Doesn't work
with IDLE.

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as 
published by the Free Software Foundation, either version 3 of the 
License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

'''

import os

# Windows
if os.name == 'nt':
    import msvcrt

# Posix (Linux, OS X)
else:
    import sys
    import termios
    import atexit
    from select import select


class KBHit:

    def __init__(self):
        '''Creates a KBHit object that you can call to do various keyboard things.
        '''

        if os.name == 'nt':
            pass

        else:

            # Save the terminal settings
            self.fd = sys.stdin.fileno()
            self.new_term = termios.tcgetattr(self.fd)
            self.old_term = termios.tcgetattr(self.fd)

            # New terminal setting unbuffered
            self.new_term[3] = (self.new_term[3] & ~termios.ICANON & ~termios.ECHO)
            termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term)

            # Support normal-terminal reset at exit
            atexit.register(self.set_normal_term)


    def set_normal_term(self):
        ''' Resets to normal terminal.  On Windows this is a no-op.
        '''

        if os.name == 'nt':
            pass

        else:
            termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term)


    def getch(self):
        ''' Returns a keyboard character after kbhit() has been called.
            Should not be called in the same program as getarrow().
        '''

        s = ''

        if os.name == 'nt':
            return msvcrt.getch().decode('utf-8')

        else:
            return sys.stdin.read(1)


    def getarrow(self):
        ''' Returns an arrow-key code after kbhit() has been called. Codes are
        0 : up
        1 : right
        2 : down
        3 : left
        Should not be called in the same program as getch().
        '''

        if os.name == 'nt':
            msvcrt.getch() # skip 0xE0
            c = msvcrt.getch()
            vals = [72, 77, 80, 75]

        else:
            c = sys.stdin.read(3)[2]
            vals = [65, 67, 66, 68]

        return vals.index(ord(c.decode('utf-8')))


    def kbhit(self):
        ''' Returns True if keyboard character was hit, False otherwise.
        '''
        if os.name == 'nt':
            return msvcrt.kbhit()

        else:
            dr,dw,de = select([sys.stdin], [], [], 0)
            return dr != []


# Test    
if __name__ == "__main__":

    kb = KBHit()

    print('Hit any key, or ESC to exit')

    while True:

        if kb.kbhit():
            c = kb.getch()
            if ord(c) == 27: # ESC
                break
            print(c)

    kb.set_normal_term()

1Simon D. Levy 制作,是compilation of software 的一部分,他在Gnu Lesser General Public License 下编写和发布。

【讨论】:

    【解决方案3】:

    这是一个使用单独线程在 linux 和 windows 下运行的解决方案:

    import sys
    import threading
    import time
    import Queue
    
    def add_input(input_queue):
        while True:
            input_queue.put(sys.stdin.read(1))
    
    def foobar():
        input_queue = Queue.Queue()
    
        input_thread = threading.Thread(target=add_input, args=(input_queue,))
        input_thread.daemon = True
        input_thread.start()
    
        last_update = time.time()
        while True:
    
            if time.time()-last_update>0.5:
                sys.stdout.write(".")
                last_update = time.time()
    
            if not input_queue.empty():
                print "\ninput:", input_queue.get()
    
    foobar()
    

    【讨论】:

    • 似乎是这里唯一适用于 windows cmd-console 和 eclipse 的解决方案!
    • 您需要在sys.stdout.write(".") 之后添加一个sys.stdout.flush()
    • mac呢?
    • @ColorCodin 也应该可以工作,因为它基于 unix
    【解决方案4】:

    我最喜欢获得非阻塞输入的是在线程中使用 python input():

    import threading
    
    class KeyboardThread(threading.Thread):
    
        def __init__(self, input_cbk = None, name='keyboard-input-thread'):
            self.input_cbk = input_cbk
            super(KeyboardThread, self).__init__(name=name)
            self.start()
    
        def run(self):
            while True:
                self.input_cbk(input()) #waits to get input + Return
    
    showcounter = 0 #something to demonstrate the change
    
    def my_callback(inp):
        #evaluate the keyboard input
        print('You Entered:', inp, ' Counter is at:', showcounter)
    
    #start the Keyboard thread
    kthread = KeyboardThread(my_callback)
    
    while True:
        #the normal program executes without blocking. here just counting up
        showcounter += 1
    

    独立于操作系统,仅内部库,支持多字符输入

    【讨论】:

    • 这种方法完全符合我的需要。请参阅下面的答案,了解使用闭包的更简洁的方法。
    • 是的,我觉得这应该是正确的答案,它是最不凌乱和轻量级的。毕竟对于这么简单的需求,我不需要繁重的多线程解决方案!
    【解决方案5】:

    在 Linux 上,这里对 mizipzor 的代码进行了重构,使这变得更容易一些,以防您必须在多个地方使用此代码。

    import sys
    import select
    import tty
    import termios
    
    class NonBlockingConsole(object):
    
        def __enter__(self):
            self.old_settings = termios.tcgetattr(sys.stdin)
            tty.setcbreak(sys.stdin.fileno())
            return self
    
        def __exit__(self, type, value, traceback):
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_settings)
    
    
        def get_data(self):
            if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
                return sys.stdin.read(1)
            return False
    

    以下是如何使用它:此代码将打印一个不断增长的计数器,直到您按下 ESC。

    with NonBlockingConsole() as nbc:
        i = 0
        while 1:
            print i
            i += 1
            if nbc.get_data() == '\x1b':  # x1b is ESC
                break
    

    【讨论】:

    • 使用 GNU/Linux:输入字符后仍然需要按回车,但它可以工作。至少它是非阻塞的,它主要返回普通字符(没有键码,当然除了特殊键,如转义或退格)。谢谢!
    【解决方案6】:

    我认为 curses 库可以提供帮助。

    import curses
    import datetime
    
    stdscr = curses.initscr()
    curses.noecho()
    stdscr.nodelay(1) # set getch() non-blocking
    
    stdscr.addstr(0,0,"Press \"p\" to show count, \"q\" to exit...")
    line = 1
    try:
        while 1:
            c = stdscr.getch()
            if c == ord('p'):
                stdscr.addstr(line,0,"Some text here")
                line += 1
            elif c == ord('q'): break
    
            """
            Do more things
            """
    
    finally:
        curses.endwin()
    

    【讨论】:

    • curses 不可移植。
    • curses 是完全可移植的。它实际上比 Python 本身更便携。我认为这是最好的答案,但这取决于您的用例。例如,curses 会在控制台可用之前清除控制台。
    • curses 控制屏幕,所有后续屏幕输出必须以自己的方式处理,print() 不再起作用。
    【解决方案7】:

    使用 python3.3 及更高版本,您可以使用此答案中提到的 asyncio 模块。 您必须重新考虑您的代码才能使用asyncioPrompt for user input using python asyncio.create_server instance

    【讨论】:

      【解决方案8】:

      我会照 Mickey Chan 所说的去做,但我会使用 unicurses 而不是普通的诅咒。 Unicurses 是通用的(适用于所有或至少几乎所有操作系统)

      【讨论】:

        【解决方案9】:

        由于我发现其中一个answers above 很有帮助,这里有一个类似方法的示例。此代码在输入时创建节拍器效果。

        不同之处在于这段代码使用闭包而不是类,这对我来说感觉更直接一些。此示例还包含一个标志以通过my_thread.stop = True 终止线程,但不使用全局变量。我通过(ab)使用 python 函数是对象的事实来做到这一点,因此可以进行猴子修补,甚至从它们自身内部。

        注意:停止线程应该小心。如果您的线程有需要某种清理过程的数据,或者如果线程产生了自己的线程,这种方法会毫不客气地杀死这些进程。

        # Begin metronome sound while accepting input.
        # After pressing enter, turn off the metronome sound.
        # Press enter again to restart the process.
        
        import threading
        import time
        import winsound  # Only on Windows
        
        beat_length = 1  # Metronome speed
        
        
        def beat_thread():
            beat_thread.stop = False  # Monkey-patched flag
            frequency, duration = 2500, 10
            def run():  # Closure
                while not beat_thread.stop:  # Run until flag is True
                    winsound.Beep(frequency, duration)
                    time.sleep(beat_length - duration/1000)
            threading.Thread(target=run).start()
        
        
        while True:
            beat_thread()
            input("Input with metronome. Enter to finish.\n")
            beat_thread.stop = True  # Flip monkey-patched flag
            input("Metronome paused. Enter to continue.\n\n")
        
        

        【讨论】:

          【解决方案10】:

          ....回到最初的问题...

          我也在学习 python,它花费了我很多文档和示例阅读和头疼...但我认为我找到了一个简单、简单、简短且兼容的解决方案...仅使用输入、列表和线程

          '''
          what i thought:
          - input() in another thread
          - that were filling a global strings list
          - strings are being popped in the main thread
          '''
          
          import threading
          
          consoleBuffer = []
          
          def consoleInput(myBuffer):
            while True:
              myBuffer.append(input())
           
          threading.Thread(target=consoleInput, args=(consoleBuffer,)).start() # start the thread
          
          import time # just to demonstrate non blocking parallel processing
          
          while True:
            time.sleep(2) # avoid 100% cpu
            print(time.time()) # just to demonstrate non blocking parallel processing
            while consoleBuffer:
              print(repr(consoleBuffer.pop(0)))
          

          直到这是我找到的最简单且兼容的方式,请注意默认情况下 stdin stdout 和 stderr 共享相同的终端,因此如果您在键入时在控制台上打印某些内容,则输入的“本地回显”可能看起来不一致,但是之后按下 enter 输入的字符串接收良好...如果您不希望/不喜欢这种行为,请找到一种方法来分隔输入/输出区域,如重定向,或尝试其他解决方案,如 curses、tkinter、pygame 等。

          奖励:ctrl-c 击键可以轻松处理

          try:
            # do whatever
          except KeyboardInterrupt:
            print('cancelled by user') or exit() # overload
          

          【讨论】:

            【解决方案11】:

            以下是上述解决方案之一的类包装器:

            #!/usr/bin/env python3
            
            import threading
            
            import queue
            
            class NonBlockingInput:
            
                def __init__(self, exit_condition):
                    self.exit_condition = exit_condition
                    self.input_queue = queue.Queue()
                    self.input_thread = threading.Thread(target=self.read_kbd_input, args=(), daemon=True)
                    self.input_thread.start()
            
                def read_kbd_input(self):
                    done_queueing_input = False
                    while not done_queueing_input:
                        console_input = input()
                        self.input_queue.put(console_input)
                        if console_input.strip() == self.exit_condition:
                            done_queueing_input = True
            
                def input_queued(self):
                    return_value = False
                    if self.input_queue.qsize() > 0:
                        return_value = True
                    return return_value
            
                def input_get(self):
                    return_value = ""
                    if self.input_queue.qsize() > 0:
                        return_value = self.input_queue.get()
                    return return_value
            
            if __name__ == '__main__':
            
                NON_BLOCK_INPUT = NonBlockingInput(exit_condition='quit')
            
                DONE_PROCESSING = False
                INPUT_STR = ""
                while not DONE_PROCESSING:
                    if NON_BLOCK_INPUT.input_queued():
                        INPUT_STR = NON_BLOCK_INPUT.input_get()
                        if INPUT_STR.strip() == "quit":
                            DONE_PROCESSING = True
                        else:
                            print("{}".format(INPUT_STR))
            

            【讨论】:

              【解决方案12】:

              如果您只想从循环中“退出”一次,您可以拦截 Ctrl-C 信号。

              这是跨平台的,非常简单!

              import signal
              import sys
              
              def signal_handler(sig, frame):
                  print('You pressed Ctrl+C!')
                  sys.exit(0)
              
              signal.signal(signal.SIGINT, signal_handler)
              while True:
                  # do your work here
              

              【讨论】:

                【解决方案13】:

                我下面的示例确实允许在 Windows(仅在 Windows 10 下测试)和 Linux 下从标准输入进行非阻塞读取,而无需外部依赖项或使用线程。它适用于复制粘贴的文本,它禁用 ECHO,因此它可以用于例如某种自定义 UI 并使用循环,因此很容易处理输入到其中的任何内容。

                考虑到上述情况,该示例适用于交互式 TTY,而不是管道输入。

                #!/usr/bin/env python3
                import sys
                
                if(sys.platform == "win32"):
                    import msvcrt
                    import ctypes
                    from ctypes import wintypes
                    kernel32 = ctypes.windll.kernel32
                    oldStdinMode = ctypes.wintypes.DWORD()
                    # Windows standard handle -10 refers to stdin
                    kernel32.GetConsoleMode(kernel32.GetStdHandle(-10), ctypes.byref(oldStdinMode))
                    # Disable ECHO and line-mode
                    # https://docs.microsoft.com/en-us/windows/console/setconsolemode
                    kernel32.SetConsoleMode(kernel32.GetStdHandle(-10), 0)
                else:
                    # POSIX uses termios
                    import select, termios, tty
                    oldStdinMode = termios.tcgetattr(sys.stdin)
                    _ = termios.tcgetattr(sys.stdin)
                    # Disable ECHO and line-mode
                    _[3] = _[3] & ~(termios.ECHO | termios.ICANON)
                    # Don't block on stdin.read()
                    _[6][termios.VMIN] = 0
                    _[6][termios.VTIME] = 0
                    termios.tcsetattr(sys.stdin, termios.TCSAFLUSH, _)
                
                def readStdin():
                    if(sys.platform == "win32"):
                        return msvcrt.getwch() if(msvcrt.kbhit()) else ""
                    else:
                        return sys.stdin.read(1)
                
                def flushStdin():
                    if(sys.platform == "win32"):
                        kernel32.FlushConsoleInputBuffer(kernel32.GetStdHandle(-10))
                    else:
                        termios.tcflush(sys.stdin, termios.TCIFLUSH)
                
                try:
                    userInput = ""
                    print("Type something: ", end = "", flush = True)
                    flushStdin()
                    while 1:
                        peek = readStdin()
                        if(len(peek) > 0):
                            # Stop input on NUL, Ctrl+C, ESC, carriage return, newline, backspace, EOF, EOT
                            if(peek not in ["\0", "\3", "\x1b", "\r", "\n", "\b", "\x1a", "\4"]):
                                userInput += peek
                                # This is just to show the user what they typed.
                                # Can be skipped, if one doesn't need this.
                                sys.stdout.write(peek)
                                sys.stdout.flush()
                            else:
                                break
                    flushStdin()
                    print(f"\nuserInput length: {len(userInput)}, contents: \"{userInput}\"")
                finally:
                    if(sys.platform == "win32"):
                        kernel32.SetConsoleMode(kernel32.GetStdHandle(-10), oldStdinMode)
                    else:
                        termios.tcsetattr(sys.stdin, termios.TCSAFLUSH, oldStdinMode)
                

                【讨论】:

                  【解决方案14】:

                  我正在使用 Linux 编写一个程序,该程序具有更大的主循环,需要定期更新,但还需要以非阻塞方式读取字符。但是重置显示,也会丢失输入缓冲区。 这是我想出的解决方案。每次屏幕更新后,它都会将终端设置为非阻塞,等待主循环通过,然后解释标准输入。 之后,终端将重置为原始设置。

                  #!/usr/bin/python3
                  import sys, select, os, tty, termios, time
                  
                  i = 0
                  l = True
                  oldtty = termios.tcgetattr(sys.stdin)
                  stdin_no = sys.stdin.fileno()
                  
                  while l:
                      os.system('clear')
                      print("I'm doing stuff. Press a 'q' to stop me!")
                      print(i)
                      tty.setcbreak(stdin_no)
                      time.sleep(0.5)
                      if sys.stdin in select.select([sys.stdin], [], [], 0.0)[0]:
                          line = sys.stdin.read(1)
                          print (line, len(line))
                          
                          if "q" in line:
                              l = False
                          else: 
                              pass
                      termios.tcsetattr(stdin_no, termios.TCSADRAIN, oldtty)
                      i += 1
                  
                  
                  

                  【讨论】:

                    猜你喜欢
                    • 1970-01-01
                    • 2011-01-16
                    • 1970-01-01
                    相关资源
                    最近更新 更多