【问题标题】:How to consume from an async Queue in a non-blocking way?如何以非阻塞方式从异步队列中消费?
【发布时间】:2017-12-10 14:23:00
【问题描述】:

我正在尝试使用 tornado、websockets 和 asyncio.Queue 在 Web 应用程序中创建类似终端的功能

我现在被困在实现类似 input 的函数中,协程暂停执行并等待用户输入内容

我最初的设计是为实现sendreceivenext 方法的每个用户创建一个会话对象

  • send 方法用于向用户发送消息
  • receive 接收用户消息并将其重定向到处理程序的方法
  • next 方法暂停处理程序的执行,直到下一条用户消息

会话类:

import asyncio

class Session:
    def __init__(self, ws_handler):
        self.ws_handler = ws_handler
        self.cbs = list()
        self.q = asyncio.Queue()
        self.waiting = False

    async def consume(self):
        return await self.q.get()

    async def next(self):
        self.waiting = True
        return await self.consume()

    def send(self, response):
        self.ws_handler.write_message(response.bytes())

    async def receive(self, msg):
        if self.waiting:
            await self.q.put(msg)
            self.waiting = False
            return
        await views.authenticate(self, msg)
        for cb in self.cbs:
            print('calling', cb.__name__)
            await cb(self, msg)

    def register(self, *callbackss):
        self.cbs += list(*callbackss)

因为够笨,我以为我可以这样使用它:

async def handle_input(some_message):
    session.send("Please enter your name")
    name = await session.next()
    # do some stuff

这实际上是在 handle_input 按预期暂停但当然整个服务器被永久阻止的方式。

我的问题是如何正确使用àsyncio.Queue 或任何其他策略来实现input 之类的功能

我正在使用 python 3.6 和 tornado 3.5.2

【问题讨论】:

    标签: python tornado python-3.6 python-asyncio


    【解决方案1】:

    更新: asynctio.Queue 不适用于 Tornado 的事件循环。这就是为什么你得到那个 blocking 行为的原因。见the issue我打开了这个。

    你现在有两个选择:

    1. 使用tornado.queues.Queue 而不是asyncio.Queue
    2. 或者使用 asyncio 的事件循环代替 Tornado 的事件循环。见docs

    【讨论】:

      猜你喜欢
      • 2014-10-16
      • 1970-01-01
      • 2018-04-06
      • 1970-01-01
      • 2014-03-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-10-28
      相关资源
      最近更新 更多