【问题标题】:Call async functions in blocking context in Tornado在 Tornado 的阻塞上下文中调用异步函数
【发布时间】:2020-10-25 03:22:45
【问题描述】:

我想在 Tornado 框架中实现一个基于 Web 套接字的服务。当用户关闭 Web 套接字时,我想通知其他用户。但是,on_close 显然是一个阻塞函数,而我的 _broadcast(str) -> None 函数是异步的。

我怎么能调用这个函数呢?

from tornado import websocket

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class SocketHandler(websocket.WebSocketHandler):
    async def open(self, *args, conns, **kwargs):
        logger.info(f"Opened a new connection to client {id(self)}")
        self._conns = conns

    async def on_message(self, message):
        logger.info(f"Client {id(self)} sent message: {message}")
        await self._broadcast(message)

    def on_close(self):
        logger.info(f"Client {id(self)} has left the scene")
        self._conns.remove(self)
        self._broadcast("something")  # TODO

    async def _broadcast(self, msg): 
        for conn in self._conns: 
            try:
                await conn.write_message(msg)
            except websocket.WebSocketClosedError:
                pass


app = web.Application([
    (r'/ws', SocketHandler)
])

if __name__ == '__main__':
    app.listen(9000)
    ioloop.IOLoop.instance().start()

【问题讨论】:

    标签: python async-await tornado


    【解决方案1】:

    您正在寻找的简单解决方案是在调用协程时使用asyncio.create_task

    def on_close(self):
        logger.info(f"Client {id(self)} has left the scene")
        self._conns.remove(self)
        asyncio.create_task(self._broadcast("something"))
    

    (此函数的旧 Tornado 版本是 tornado.gen.convert_yielded,但现在 Tornado 和 asyncio 已集成,没有理由不将 asyncio 版本用于原生协程)

    但是对于这个特殊的问题,在您的_broadcast 函数中使用await 并不理想。等待write_message 用于提供流量控制,但create_taskawait 提供的背压没有任何用处。 (write_message 是相当不寻常的,因为它完全支持在有和没有await 的情况下调用它)。事实上,它对错误的事情施加了背压——一个缓慢的连接会减慢向其后的所有其他连接发出的通知。

    所以在这种情况下,我建议将 _broadcast 设为常规同步函数:

    def _broadcast(self, msg): 
        for conn in self._conns: 
            try:
                conn.write_message(msg)
            except websocket.WebSocketClosedError:
                pass
    

    如果您想更好地控制内存使用(通过await write_message 提供的流量控制),您将需要一个更复杂的解决方案,可能涉及每个连接的有界队列(在on_close 中,使用put_nowait 将消息添加到每个连接的队列中,然后有一个任务从队列中读取并使用await write_message 写入消息)

    【讨论】:

    • 我明白了。感谢您的见解。因此,@acushner 提出的解决方案实际上是这种情况下更好的解决方案。 - 只有一件事,我不明白你所说的“背压”到底是什么意思
    • 背压是指您减慢流(数据或事件)的源速度以匹配目标可以使用它的速度。一般来说,这是一个很好的原则,但不清楚它是否适用于这种情况——你不能减慢客户离开现场的速度;这只是发生的事情。
    • 我明白了,这是有道理的。谢谢你的解释。
    【解决方案2】:

    我认为涉及使用 asyncio.Queue 的解决方案应该适合您。

    我做了一个小班作为模型来测试这一点:

    import asyncio
    import time
    
    
    class Thing:
        on_close_q = asyncio.Queue()
    
        def __init__(self):
            self.conns = range(3)
    
        def on_close(self, id):
            time.sleep(id)
            print(f'closing {id}')
            self.on_close_q.put_nowait((self, id))
    
        async def notify(self, msg):
            print('in notify')
            for conn in range(3):
                print(f'notifying {conn} {msg}')
    
    
    async def monitor_on_close():
        print('monitoring')
        while True:
            instance, id = await Thing.on_close_q.get()
            await instance.notify(f'{id} is closed')
    

    从那里,您需要在从龙卷风获得的 ioloop 中运行 monitor_on_close。我从未使用过龙卷风,但我认为在您的 __main__ 块中添加这样的内容应该可以:

        ioloop.IOLoop.current().add_callback(monitor_on_close) 
    

    【讨论】:

    • 谢谢,这绝对是一个有趣的方法。 on_close_q 应该是一个实例成员,因为我显然可以有多个 Things(每个连接一个),而且我不太喜欢将 on_close 的代码拆分到多个位置。因此,我希望像ioloop.IOLoop.current().run(notify) 这样的东西,我通过互联网搜索没有找到任何东西。
    • 没有理由成为实例变量,它将在传递到队列中的实例上调用。我敢肯定那里有更好的解决方案,但是如果您找不到更好的解决方案,这应该可以解决问题。
    • 好吧,你是对的。谢谢。如果没有更好的解决方案,我会接受。
    猜你喜欢
    • 2012-10-14
    • 1970-01-01
    • 2014-07-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多