【问题标题】:python Tornado handler IO blocks entire server networkingpython Tornado 处理程序 IO 阻止整个服务器网络
【发布时间】:2024-05-20 20:40:01
【问题描述】:

我正在使用 tornado 实现 REST api,并希望它是非阻塞的。

目前,与问题相关的代码如下:

class ReprsHandler(web.RequestHandler):
    async def get(self, name):

        db = await dbf.create_handler()

        if 'id' in list(self.request.query_arguments.keys()):
            db_future = asyncio.ensure_future(db.get_repr(name, self.get_query_argument('id')))
        else:
            db_future = asyncio.ensure_future(db.get_reprs(name))

        result = await db_future
        response = result.toSerializedStream()

        self.set_status(HTTPStatus.OK)
        self.write(response)
        self.set_header('Content-Type', 'text/plain')
        self.finish()


class App(object):
    def __init__(self, loop):
        self.server_app = web.Application(
            handlers=[
                (r"/api/v1/([a-zA-Z0-9_-]+)/reprs", ReprsHandler),
            ]
        )

def main():
    AsyncIOMainLoop().install()
    loop = asyncio.get_event_loop()
    app = App(loop)
    server = tornado.httpserver.HTTPServer(app.server_app, max_body_size=config['max_upload_size'], max_buffer_size=config['max_upload_size'])
    server.bind(config['server_port'])
    server.start()

    loop.run_forever()

代码比较简单,但是数据量比较大,所以大概需要3~4分钟才能全部发送完。

我希望处理程序的逻辑和网络 IO 都是非阻塞的,但它会在发送数据作为响应时阻塞服务器网络。逻辑很好。他们不会阻止其他请求。

详情:

  • 此代码在 docker 上运行,ubuntu 16.04,使用 python 3.5 实现。
  • 服务器正在使用 nginx 作为端口代理。

可能是什么问题?我不知道是什么造成了这个问题。

【问题讨论】:

  • result.toSerializedStream() 是做什么的?既然您提到“数据非常大”,我假设 result 变量是数据并且它很大。因此,如果.toSerializedStream() 对大数据执行任何 CPU 密集型任务,那么这可能是阻塞的原因。
  • 该方法使用pickle转储生成数据对象的序列化字符串,但逻辑的时间成本没有网络成本那么大。不过,我可以对此进行一些性能改进。谢谢。

标签: python python-3.x nginx tornado


【解决方案1】:

好的。这个问题太傻了。

我希望这个非阻塞 API 能够作为并行网络工作,这样整个网络就不会相互中断。这不是龙卷风的设计初衷。它显然是非阻塞的,但仍然是单线程的。

【讨论】:

    【解决方案2】:

    既然你提到result.toSerializedStream() 腌制数据。所以,是的,你说得对,阻塞是因为网络 io。

    为避免这种情况,您可以在中发送数据,并在每个self.write() 之后调用self.flush()。调用flush 会将响应写入网络。由于你可以awaitflush,直到数据写入网络套接字,协程将暂停,服务器不会阻塞。这允许其他处理程序异步运行。

    代码示例:

    async def get(self, name):
        ...
        response = result.toSerializedStream()
    
        chunk_size = 1024 * 1024 * 10 # 10 MiB
    
        start_byte = 0
        while True:
            chunk = response[start_byte : start_byte + chunk_size]
            if not chunk:
                break
            self.write(chunk)
            await self.flush() # wait while data is flushed to network
    
            start_byte += chunk_size # move start_byte forward
    

    重要:

    这里需要注意的重要一点是self.flush() 非常快。如果您将小数据刷新到网络,await 延迟非常小,以至于协程会不停地运行,从而阻塞服务器。

    在上面的示例代码中,我将chunk_size 设置为10 MiB,但如果您的计算机速度很快,await 的延迟会非常非常小,并且循环可能会一直运行直到整个数据被发送。

    我鼓励您根据需要增加或减少 chunk_size 的值。


    进一步的改进建议:

    所有数据都在内存中。现在您的处理程序是异步的并且不会阻塞,如果另一个请求来自ReprsHandler,这将导致更多的数据存储在内存中。如果越来越多的请求进来,那么你就可以知道会发生什么。

    为避免这种情况,您可以将其转储到文件中,而不是腌制内存中的数据。然后在您的处理程序中,只需 open 该文件并分块读取并发送它。

    【讨论】:

    • 很好的建议。对此,我真的非常感激。立即着手处理。
    最近更新 更多