【问题标题】:Python queue linking object running asyncio coroutines with main thread inputPython队列链接对象运行异步协程与主线程输入
【发布时间】:2015-03-29 01:22:05
【问题描述】:

我有一个脚本正在运行,主线程从标准输入获取输入,然后使用队列将其传递给子线程。在子线程中,我使用 asyncio 协程在套接字上启动侦听器并等待连接。建立连接后,我现在可以从主线程通过侦听器发送数据。

这一切似乎都运行良好,但由于 asyncio.BaseEventLoop 不是线程安全的,我会遇到问题吗?

这是我尝试解决将python的cmd模块等阻塞库与asyncio一起使用的问题。

我的代码如下。

import sys
import asyncio
from time import sleep
from threading import Thread
from queue import Queue

stdin_q = Queue()

clients = {} # task -> (reader, writer)

def client_connected_handler(client_reader, client_writer):
    # Start a new asyncio.Task to handle this specific client connection
    task = asyncio.Task(handle_client(client_reader, client_writer))
    clients[task] = (client_reader, client_writer)

    def client_done(task):
        # When the tasks that handles the specific client connection is done
        del clients[task]

    # Add the client_done callback to be run when the future becomes done
    task.add_done_callback(client_done)

@asyncio.coroutine
def handle_client(client_reader, client_writer):
    # Handle the requests for a specific client with a line oriented protocol
    while True:

        cmd = yield from get_input()
        client_writer.write(cmd.encode())

        data = yield from client_reader.read(1024)

        print(data.decode(),end="",flush=True)

@asyncio.coroutine
def get_input():
  while True:
    try:
      return stdin_q.get()
    except:
      pass



class Control:

    def start(self):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        self.loop = asyncio.get_event_loop()

        server = self.loop.run_until_complete(asyncio.start_server(client_connected_handler, '0.0.0.0', 2222))
        self.loop.run_forever()
        self.stop()

    def stop(self):    
        self.loop.stop()
        self.loop.close()

def fire_control():
    con = Control()
    con.start()

if __name__ == "__main__":

    stdin_q.put("\n")
    t = Thread(target=fire_control)
    t.start()
    sleep(2)
    _cmd = ""
    while _cmd.lower() != "exit":
        _cmd = input("")
        if _cmd == "":
          _cmd = "\r\n"

        stdin_q.put(_cmd)      

【问题讨论】:

    标签: python multithreading thread-safety python-asyncio


    【解决方案1】:

    这不会完全正确,因为对stdin_q.get() 的调用会阻塞您的事件循环。这意味着如果您的服务器有多个客户端,那么所有客户端都将被首先到达stdin_q.get() 的任何一个完全阻止,直到您将数据发送到队列中。解决此问题的最简单方法是使用BaseEvent.loop.run_in_executor 在后台ThreadPoolExecutor 中运行stdin_q.get,这样您就可以在不阻塞事件循环的情况下等待它:

    @asyncio.coroutine
    def get_input():
        loop = asyncio.get_event_loop()
        return (yield from loop.run_in_executor(None, stdin_q.get))  # None == use default executor.
    

    编辑(16 年 1 月 27 日):

    有一个名为janus 的库,它提供了一个异步友好、线程安全的队列实现。

    使用该库,您的代码将如下所示(我省略了未更改的部分):

    ...
    import janus
    
    loop = asyncio.new_event_loop()
    stdin_q = janus.Queue(loop=loop)
    ...
    
    @asyncio.coroutine
    def get_input():
      loop = asyncio.get_event_loop()
      return (yield from stdin_q.async_q.get())
    
    class Control:
    
        def start(self):
            asyncio.set_event_loop(loop)
            self.loop = asyncio.get_event_loop()
    
            server = self.loop.run_until_complete(asyncio.start_server(client_connected_handler, '0.0.0.0', 2222))
            self.loop.run_forever()
            self.stop()
    
        def stop(self):    
            self.loop.stop()
            self.loop.close()
    
    ...
    
    if __name__ == "__main__":
    
        stdin_q.sync_q.put("\n")
        t = Thread(target=runner)
        t.start()
        sleep(2)
        _cmd = ""
        while _cmd.lower() != "exit":
            _cmd = input("")
            if _cmd == "":
              _cmd = "\r\n"
    
            stdin_q.sync_q.put(_cmd)
    

    【讨论】:

    • 有趣。在 asyncio 方面,您拥有丰富的知识。因此,如果我想将数据发送到特定连接,是否只需要为每个连接创建一个队列以在主线程和子线程之间进行通信?
    • 没错,但您也可以只使用asyncio.Queue,我认为它可以让您使用await stdin_q.get()
    • @dalanmiller asyncio.Queue 不是线程安全的,因此不能用于在线程之间传递值。但是,自从我写了这个答案后,我就知道了janus,它提供了一个异步友好、线程安全的队列实现。
    • 啊太棒了。出于好奇,为什么在这种情况下使用线程协程?这里有性能优势吗?
    • @dalanmiller 我认为 OP 这样做是为了让他们可以在一个线程中从标准输入读取输入,并将其馈送到运行事件循环的另一个线程。另一种方法是以非阻塞、异步友好的方式从标准输入读取。这两种方法都没有性能优势,如果您不能/不想为某事使用非阻塞 I/O,则使用线程有时会更方便。
    猜你喜欢
    • 1970-01-01
    • 2013-09-10
    • 2016-05-09
    • 2015-12-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多