【问题标题】:Migrating a Quart project with websockets from asyncio to trio将带有 websockets 的 Quart 项目从 asyncio 迁移到 trio
【发布时间】:2021-07-19 18:27:35
【问题描述】:

我正在尝试将我的 asyncio 项目转换为 trio。 我知道我必须使用内存通道而不是队列,但由于某种原因,我没有得到我期望的结果。

我的主要问题是,当我运行两个客户端时,如果第二个客户端离开,第一个客户端不会收到通知(从服务器广播“部分”消息会引发错误)。 另一个问题是,有时客户端在打开 websocket 时会立即退出。 当我使用 asyncio 时,一切正常。

这是我在第二个客户端断开连接时得到的堆栈跟踪:

[2021-07-30 18:39:51,899] ERROR in app: Exception on websocket /ws
Traceback (most recent call last):
  File "/tmp/debug/venv/lib/python3.9/site-packages/quart_trio/app.py", line 175, in handle_websocket
    return await self.full_dispatch_websocket(websocket_context)
  File "/tmp/debug/venv/lib/python3.9/site-packages/quart_trio/app.py", line 197, in full_dispatch_websocket
    result = await self.handle_user_exception(error)
  File "/tmp/debug/venv/lib/python3.9/site-packages/quart_trio/app.py", line 166, in handle_user_exception
    raise error
  File "/tmp/debug/venv/lib/python3.9/site-packages/quart_trio/app.py", line 195, in full_dispatch_websocket
    result = await self.dispatch_websocket(websocket_context)
  File "/tmp/debug/venv/lib/python3.9/site-packages/quart/app.py", line 1651, in dispatch_websocket
    return await self.ensure_async(handler)(**websocket_.view_args)
  File "/tmp/debug/server.py", line 103, in wsocket
    nursery.start_soon(receiving, u)
  File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 815, in __aexit__
    raise combined_error_from_nursery
trio.MultiError: Cancelled(), Cancelled(), Cancelled()

Details of embedded exception 1:

  Traceback (most recent call last):
    File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 1172, in raise_cancel
      raise Cancelled._create()
  trio.Cancelled: Cancelled

Details of embedded exception 2:

  Traceback (most recent call last):
    File "/tmp/debug/server.py", line 68, in receiving
      data = await websocket.receive_json()
    File "/tmp/debug/venv/lib/python3.9/site-packages/quart/wrappers/websocket.py", line 68, in receive_json
      data = await self.receive()
    File "/tmp/debug/venv/lib/python3.9/site-packages/quart/wrappers/websocket.py", line 57, in receive
      return await self._receive()
    File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_channel.py", line 314, in receive
      return await trio.lowlevel.wait_task_rescheduled(abort_fn)
    File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
      return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
    File "/tmp/debug/venv/lib/python3.9/site-packages/outcome/_impl.py", line 138, in unwrap
      raise captured_error
    File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 1172, in raise_cancel
      raise Cancelled._create()
  trio.Cancelled: Cancelled

Details of embedded exception 3:

  Traceback (most recent call last):
    File "/tmp/debug/server.py", line 54, in sending
      data = await u.queue_recv.receive()
    File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_channel.py", line 314, in receive
      return await trio.lowlevel.wait_task_rescheduled(abort_fn)
    File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
      return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
    File "/tmp/debug/venv/lib/python3.9/site-packages/outcome/_impl.py", line 138, in unwrap
      raise captured_error
    File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 1172, in raise_cancel
      raise Cancelled._create()
  trio.Cancelled: Cancelled

  During handling of the above exception, another exception occurred:

  Traceback (most recent call last):
    File "/tmp/debug/server.py", line 63, in sending
      await broadcast({'type': 'part', 'data': u.name})
    File "/tmp/debug/server.py", line 75, in broadcast
      await user.queue_send.send(message)
    File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_channel.py", line 159, in send
      await trio.lowlevel.checkpoint_if_cancelled()
    File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 2361, in checkpoint_if_cancelled
      await _core.checkpoint()
    File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 2339, in checkpoint
      await _core.wait_task_rescheduled(lambda _: _core.Abort.SUCCEEDED)
    File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
      return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
    File "/tmp/debug/venv/lib/python3.9/site-packages/outcome/_impl.py", line 138, in unwrap
      raise captured_error
    File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 1172, in raise_cancel
      raise Cancelled._create()
  trio.Cancelled: Cancelled

这是代码(将TRIO 设置为False 以使用异步):

server.py

#!/usr/bin/env python
from quart import Quart, websocket, request, jsonify, json
from quart_trio import QuartTrio
from functools import wraps
import uuid
import trio
import asyncio
from quart_auth import AuthUser, AuthManager, login_user, _AuthSerializer

TRIO = True

if TRIO:
    app = QuartTrio(__name__)
else:
    app = Quart(__name__)
app.secret_key = '**changeme**'

authorized_users = set()

class User(AuthUser):
    @staticmethod
    def current():
        token = websocket.cookies['QUART_AUTH']
        serializer = _AuthSerializer('**changeme**', 'quart auth salt')
        user_id = serializer.loads(token)
        for u in authorized_users:
            if u.auth_id == user_id:
                return u
        return None

    def __init__(self, auth_id):
        super().__init__(auth_id)
        self.name = None
        self.queue = None # asyncio
        self.queue_send = None #trio
        self.queue_recv = None #trio
        self.connected = False
        self.websockets = set()    

    def to_dict(self):
        return {
            'id': self.auth_id,
            'name': self.name
        }

auth_manager = AuthManager()
auth_manager.user_class = User

async def sending(u: User):
    await broadcast({'type': 'join', 'data': u.name})
    try:
        while True:
            if TRIO:
                data = await u.queue_recv.receive()
            else:
                data = await u.queue.get()
            for s in u.websockets:
                await s.send_json(data)
    finally:
        u.websockets.remove(websocket._get_current_object())
        if len(u.websockets) == 0:
            u.connected = False
            await broadcast({'type': 'part', 'data': u.name})


async def receiving(u: User):
    while True:
        data = await websocket.receive_json()
        if data['type'] == 'msg':
            await broadcast({'type': 'msg', 'user': u.name, 'data': data['data']})

async def broadcast(message):
    for user in [u for u in authorized_users if u.connected]:
        if TRIO:
            await user.queue_send.send(message)
        else:
            await user.queue.put(message)

@app.route('/api/v1/auth', methods=['POST'])
async def auth_login():
    data = await request.json
    user_id = str(uuid.uuid4())[:8]
    u = User(user_id)
    u.name = data['login'] or 'Anonymous'+user_id
    if TRIO:
        u.queue_send, u.queue_recv = trio.open_memory_channel(float('inf'))
    else:
        u.queue = asyncio.Queue()
    login_user(u, True)
    authorized_users.add(u)
    return jsonify({'id': user_id, 'name': u.name}), 200

@app.websocket('/ws')
async def wsocket():
    u = User.current()
    if u is None:
        return
    u.websockets.add(websocket._get_current_object())
    u.connected = True
    if TRIO:
        async with trio.open_nursery() as nursery:
            nursery.start_soon(sending, u)
            nursery.start_soon(receiving, u)
    else:
        producer = asyncio.create_task(sending(u))
        consumer = asyncio.create_task(receiving(u))
        await asyncio.gather(producer, consumer)


auth_manager.init_app(app)

if __name__ == "__main__":
    app.run(host='localhost', port=8080)

client.py

#!/usr/bin/env python

import asks
import trio
import trio_websocket
import json

asks.init(trio)

class User:
    def __init__(self, name: str="") -> None:
        self.name = name

class Client(User):
    def __init__(self) -> None:
        super(Client, self).__init__()
        self.web_url = 'http://localhost:8080/api/v1'
        self.ws_url = 'ws://localhost:8080/ws'
        self.ws = None
        self.nursery = None
        self.cookiejar = {}
    
    async def send(self, msg: dict) -> None:
        if self.ws is not None:
            await self.ws.send_message(json.dumps(msg))

    async def reader(self, websocket) -> None:
        while True:
            try:
                message_raw = await websocket.get_message()
                msg = json.loads(message_raw)
                if msg['type'] == 'msg':
                    print(f"<{msg['user']}> {msg['data']}")
                elif msg['type'] == 'join':
                    print(f"* {msg['data']} joined")
                elif msg['type'] == 'part':
                    print(f"* {msg['data']} left")
            except trio_websocket.ConnectionClosed:
                break

    async def login(self) -> None:
        rlogin = await asks.post(self.web_url + '/auth', json={'login': self.name, 'password': 'password'})
        for c in rlogin.cookies:
            if c.name == 'QUART_AUTH':
                self.cookiejar = {'QUART_AUTH': c.value}

    async def connect(self) -> None:
        await self.login()
        async with trio_websocket.open_websocket_url(self.ws_url, extra_headers=[('Cookie', 'QUART_AUTH'+'='+self.cookiejar['QUART_AUTH'])]) as websocket:
            self.ws = websocket
            await self.send({'type': 'msg', 'data': 'hello'})
            async with trio.open_nursery() as nursery:
                self.nursery = nursery
                nursery.start_soon(self.reader, websocket)

    def run(self) -> None:
        trio.run(self.connect)

c = Client()
c.name = 'clientA'
c.run()

编辑:我使用 anyio 进行了测试,虽然 anyio+trio 的行为相同,但 anyio+asyncio 重现了问题(无任何例外)。所以我猜它来自队列替换。

【问题讨论】:

  • 这是您遇到的异常吗? hypercorn.utils.UnexpectedMessage: Unexpected message type, websocket.accept given the state ASGIWebsocketState.CONNECTED 尝试在 Trio 下运行您的演示时,在尝试运行客户端时出现异常。从堆栈跟踪来看,Quart-Trio/Hypercorn 似乎试图接受 websocket 两次。在我看来,它就像 Quart-Trio 或 Quart 本身的错误。你能发布堆栈跟踪吗?
  • 为了帮助我诊断,您可以尝试以下操作:使用pip install git+https://gitlab.com/jaytuck/quart.git@bd2c3cf4e40ab27d3188cde9dd5f1d548468fc5e 安装我可能的修复程序我所做的更改在这里:gitlab.com/jaytuck/quart/-/commit/… 从我的调查来看,它看起来像代码流入的方式Trio 它尝试两次接受相同的 websocket。我不确定这是否是正确的解决方法,但如果它解决了您的问题,我们可以将其提交给@pgjones。
  • @JayTuckey 谢谢,您的提交解决了我的一个问题。 80% 的时间里,我都遇到了与你相同的错误。我的主要问题仍然存在:运行两个客户端时,一个客户端看不到另一个断开连接

标签: python websocket quart python-trio


【解决方案1】:

好的,@tibs,我想我找到了问题所在。问题在于 Trio 处理取消的方式。如需完整文档,请阅读此文档:

https://trio.readthedocs.io/en/stable/reference-core.html#cancellation-and-timeouts

然而,为了解释这里发生了什么,当用户断开连接时,Quart-Trio 所做的是在该 websocket 下运行/等待的每个协程中引发 Cancelled 异常。对于 websocket 用户,目前有两个位置正在等待:

async def sending(u: User):

async def sending(u: User):
    await broadcast({'type': 'join', 'data': u.name})
    try:
        while True:
            if TRIO:
                data = await u.queue_recv.receive()  <--- Code is waiting here, Cancelled is raised here
            else:
                data = await u.queue.get()
            for s in u.websockets:
                await s.send_json(data)
    finally:
        u.websockets.remove(websocket._get_current_object())
        if len(u.websockets) == 0:
            u.connected = False
            await broadcast({'type': 'part', 'data': u.name})

async def receiving(u: User):

async def receiving(u: User):
    while True:
        data = await websocket.receive_json()   <--- Code is waiting here, Cancelled is raised here
        if data['type'] == 'msg':
            await broadcast({'type': 'msg', 'user': u.name, 'data': data['data']})

好的,那么从这里会发生什么?好吧,在sending() 函数中,我们向下移动到finally 块,它开始执行,但随后我们调用另一个等待函数:

    finally:
        u.websockets.remove(websocket._get_current_object())
        if len(u.websockets) == 0:
            u.connected = False
            await broadcast({'type': 'part', 'data': u.name})  <--- we call an awaitable here

来自 Trio 文档:

Trio 中的取消是“级别触发”的,这意味着一旦一个块被取消,该块中所有可取消的操作都会不断提高 Cancelled。

所以当await broadcast(...) 被调用时,它立即是Cancelled,不像asyncio 的行为不同。这解释了为什么您的“部分”消息永远不会发送。所以当三重奏时,如果你想在被取消的时候做一些清理工作,你应该打开一个新的取消作用域,并屏蔽它不被取消,像这样:

async def sending(u: User):
    await broadcast({'type': 'join', 'data': u.name})
    try:
        while True:
            if TRIO:
                data = await u.queue_recv.receive()  <--- Code is waiting here, Cancelled is raised here
            else:
                data = await u.queue.get()
            for s in u.websockets:
                await s.send_json(data)
    finally:
        u.websockets.remove(websocket._get_current_object())
        if len(u.websockets) == 0:
            u.connected = False
            with trio.move_on_after(5) as leaving_cancel_scope:
                # Shield from the cancellation for 5s to run the broadcast of leaving
                leaving_cancel_scope.shield = True
                await broadcast({'type': 'part', 'data': u.name})

或者,您也可以在 app Nursery 上启动广播协程。请注意,如果 broadcast(...) 崩溃,您将使整个正在运行的应用程序崩溃,除非您在 broadcast(...) 函数中添加 try/except :

async def sending(u: User):
    await broadcast({'type': 'join', 'data': u.name})
    try:
        while True:
            if TRIO:
                data = await u.queue_recv.receive()
            else:
                data = await u.queue.get()
            for s in u.websockets:
                await s.send_json(data)
    finally:
        u.websockets.remove(websocket._get_current_object())
        if len(u.websockets) == 0:
            u.connected = False
            app.nursery.start_soon(broadcast, {'type': 'part', 'data': u.name})

在此之后,您仍然会收到 Cancelled 异常流向您的 websocket 函数,因此您可能希望在那里捕获它们。请注意,您需要捕获 BaseException 以捕获错误,例如:

@app.websocket('/ws')
async def wsocket():
    u = User.current()
    if u is None:
        return
    u.websockets.add(websocket._get_current_object())
    u.connected = True
    if TRIO:
        try:
            async with trio.open_nursery() as nursery:
                nursery.start_soon(sending, u)
                nursery.start_soon(receiving, u)
        except BaseException as e:
            print(f'websocket funcs crashed with exception: {e}')

这是因为 trio 不允许您静默丢弃异常,您需要捕获它们或崩溃。我希望这足以让您开始解决您遇到的问题。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-04-11
    • 2020-08-14
    • 2010-12-09
    • 2015-02-09
    • 1970-01-01
    相关资源
    最近更新 更多