【问题标题】:How to write non-blocking, chunked RequestHandler in Tornado如何在 Tornado 中编写非阻塞、分块的 RequestHandler
【发布时间】:2020-08-18 03:57:56
【问题描述】:

这里有两个简单的RequestHandlers

class AsyncHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        while True:
            future = Future()
            global_futures.add(future)
            s = yield future
            self.write(s)
            self.flush()


class AsyncHandler2(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        for f in global_futures:
            f.set_result(str(dt.now()))
        global_futures.clear()
        self.write("OK")

第一个“订阅”流,第二个将消息传递给所有订阅者。

问题是我不能拥有超过一堆(在我的情况下是 5-6 个)订阅者。一旦我订阅超过允许的数量,对第二种方法的下一个请求就会挂起。

我认为这是由于第一个处理程序没有正确异步而发生的。那是因为我使用全局对象来存储订阅者列表吗?

如何同时打开更多流式传输请求,什么是逻辑限制?

【问题讨论】:

标签: python tornado concurrent.futures


【解决方案1】:

问题是 global_futures 在您迭代它时正在被修改:当 AsyncHandler.get 唤醒时,它会从一个 yield 运行到下一个,这意味着它会创建下一个 Future 并将其添加到在控制权返回AsyncHandler2 之前设置。这是未定义的,行为取决于迭代器在集合中的位置:有时新的未来被插入到迭代器“后面”并且一切都很好,有时它被插入到迭代器“前面”并且相同的消费者处理程序将被唤醒第二次(并插入其自身的第三个副本,可能在前面或后面......)。当您只有少数消费者时,您会经常遇到“幕后”案例,以至于事情会奏效,但如果消费者太多,它就变得极不可能完成。

解决方案是在迭代之前复制global_futures,而不是在最后清除它:

@gen.coroutine
def get(self);
    fs = list(global_futures)
    global_futures.clear()
    for f in fs:
        f.set_result(str(dt.now()))
    self.write("OK")

请注意,我认为这只是 Tornado 4.x 和更早版本中的问题。在 Tornado 5 中进行了更改,set_result 不再立即调用等待处理程序,因此不再有并发修改。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 2023-03-22
  • 2013-11-12
  • 2021-08-28
  • 2011-08-05
  • 1970-01-01
  • 2013-01-26
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多