【问题标题】:Timeout WebSocket connections in aiohttpaiohttp 中的超时 WebSocket 连接
【发布时间】:2018-04-19 14:47:07
【问题描述】:

我的 WebSocket 服务器实现是对外开放的,但是客户端需要在连接建立后发送一个身份验证消息,否则服务器应该关闭连接。

如何在 aiohttp 中实现它?看来,我需要做以下事情:

  1. 为每个套接字连接创建一个on_open 方法:我找不到创建此类事件的方法(类似于 Tornado 中的 on_open)。

  2. 创建一个计时器:可以使用主事件循环的 asyncio 的 sleepcall_back 方法。但是我找不到将 WebSocketResponse 发送到回调函数的方法:

    await asyncio.sleep(10, timer, loop=request.app.loop)

  3. 如果未通过身份验证则关闭连接

这是我之前使用 Tornado 时的情况:

def open(self, *args, **kwargs):
    self.timeout = ioloop.IOLoop.instance().add_timeout(
        datetime.timedelta(seconds=60),
        self._close_on_timeout
    )

def remove_timeout_timer(self):
    ioloop.IOLoop.instance().remove_timeout(self.timeout)
    self.timeout = None

def on_message(self, message):
    if message = 'AUTHENTICATE':
        self.authenticated = True
        self.remove_timeout_timer

def _close_on_timeout(self):
    if not self.authenticated:
        if self.ws_connection:
            self.close()

这是我使用 aiohttp 设置计时器的内容:

async def ensure_client_logged(ws):
    await asyncio.sleep(3)  # wait 3 seconds
    await ws.send_str('hello')

async def ws_handler(request):
    ws = web.WebSocketResponse()

    asyncio.ensure_future(ensure_client_logged(ws), loop=request.app.loop)

但代码以阻塞方式运行,这意味着服务器在休眠时变得无响应。

有人可以指点我正确的方向吗?

【问题讨论】:

  • async with async_timeout.timeout(3): await ws.receive()

标签: python-3.x python-asyncio aiohttp


【解决方案1】:

您需要确定身份验证程序的最后期限。 asyncio.wait_for 是一种方便的方法:

async def ws_handler(request):
    loop = asyncio.get_event_loop()
    ws = web.WebSocketResponse()
    loop.create_task(handle_client(ws))

async def handle_client(ws):
    try:
        authenticated = await asyncio.wait_for(_authenticate(ws), 10)
    except asyncio.TimeoutError:
        authenticated = False
    if not authenticated:
        ws.close()
        return
    # continue talking to the client

async def _authenticate(ws):
    # implement authentication here, without worrying about
    # timeout - the coroutine will be automatically canceled
    # once the timeout elapses
    ...
    return True  # if successfully authenticated

【讨论】:

  • 但是如何传递 ws_handler?与 Tornado 一样,处理程序是 NOT 类。我将编辑我的答案以向您展示一个示例。
  • @bman 谢谢。以及身份验证是如何发生的,将self.authenticated 设置为true 的代码在哪里?
  • 我还没有实现那部分。但它将在应用程序中使用 defaultdict 属性。像这样:docs.aiohttp.org/en/stable/…
  • 欣赏更新的答案。这仍然不是我想要完成的。除非用户发送验证消息,否则连接应仅保持打开 10 秒。 (请看龙卷风的例子)
  • @bman 不幸的是我不使用 Tornado,所以我不了解它的细节。但是,修改后的答案应该完全做到这一点 - 如果用户在 10 秒内未进行身份验证,则连接将关闭(在 if not self.authenticated: ... 下的部分)。
【解决方案2】:

这是为未来用户带来好处的完整示例:

from aiohttp import web
import asyncio

async def wait_for_authentication(ws, app):
    async for msg in ws:
        if msg.type == web.WSMsgType.TEXT and msg.data == 'AUTHENTICATE':  # Implement your own authentication
            await ws.send_str('WELCOME')
            return True
        else:
            await ws.send_str('NOT AUTHENTICATED')


async def authenticate(ws, app) -> bool:
    try:
        authenticated = await asyncio.wait_for(wait_for_authentication(ws, app), 5)
    except asyncio.TimeoutError:
        authenticated = False

    if not authenticated:
        await ws.send_str('The AUTHENTICATE command was not received. Closing the connection...')
        await ws.close()
        return False


async def ws_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    await request.app.loop.create_task(authenticate(ws, request.app))

    async for msg in ws:
        if msg.type != web.WSMsgType.TEXT:
            continue

        await ws.send_str(msg.data)

def init():
    app = web.Application()

    app.router.add_get('/', ws_handler)

    return app

web.run_app(init())

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-05-04
    • 1970-01-01
    • 2014-05-15
    • 1970-01-01
    • 1970-01-01
    • 2017-12-29
    • 1970-01-01
    相关资源
    最近更新 更多