【问题标题】:How to process WebSocket messages in parallel using Django Channels?如何使用 Django Channels 并行处理 WebSocket 消息?
【发布时间】:2021-12-06 09:49:51
【问题描述】:

我们刚开始使用 Django Channels,并且正在努力解决以下用例:

我们的应用会在短时间内收到来自单个客户端(另一台服务器)的多个请求。创建每个响应需要很长时间。向客户端发送响应的顺序无关紧要

我们希望保持一个开放的 WebSocket 连接,以减少从同一个客户端发送多个请求和响应的连接开销。

Django Channels 似乎严格按顺序处理同一个 WebSocket 连接上的消息,并且在前一帧得到响应之前不会开始处理下一帧

考虑以下示例:

示例

服务器端

import asyncio
from channels.generic.websocket import AsyncWebsocketConsumer

class QuestionConsumer(AsyncWebsocketConsumer):
    async def websocket_connect(self, event):
        await self.accept()

    async def complicated_answer(self, question):
        await asyncio.sleep(3)
        return {
             "What is the Answer to Life, The Universe and Everything?": "42",
             "Why?": "Because.",
        }.get(question, "Don't know")


    async def receive(self, text_data=None, bytes_data=None):
        # while awaiting below, we should start processing the next WS frame
        answer = await self.complicated_answer(text_data)
        await self.send(answer)

asgi.py:

from django.urls import re_path
from channels.routing import ProtocolTypeRouter, URLRouter
application = ProtocolTypeRouter(
    {"websocket": URLRouter([
        re_path(r"^questions", QuestionConsumer.as_asgi(), name="questions",)
    ]}
  )
)

客户端

import asyncio
import websockets
from time import time

async def main():
    async with websockets.connect("ws://0.0.0.0:8000/questions") as ws:
        tasks = []
        for m in [
                "What is the Answer to Life, The Universe and Everything?",
                "Why?"
        ]:
            tasks.append(ws.send(m))
        # send all requests (without waiting for response)
        time_before = time()
        await asyncio.gather(*tasks)
        # wait for responses
        for t in tasks:
            print(await ws.recv())
            print("{:.1f} seconds since first request".format(time() - time_before))
asyncio.get_event_loop().run_until_complete(main())

结果

实际

42
3.0 seconds since first request
Because.
6.0 seconds since first request

希望

42
3.0 seconds since first request
Because.
3.0 seconds since first request

换句话说,我们希望事件循环在异步任务之间切换,不仅适用于多个消费者,而且适用于同一消费者处理的所有任务这是可能的还是有我们忽略的解决方法?您是否使用过 Django Channels 来应对类似的挑战?您是如何解决的?

【问题讨论】:

    标签: python websocket django-channels


    【解决方案1】:

    消费者的receive函数为每个传入的WebSocket消息顺序调用,当到达第一个接收的await时,第二个消息没有调用接收方法,因此将上下文切换到第二个co - 例程还不可能。我找不到这个来源,但我猜这是 ASGI 协议本身的一部分。对于许多用例,可能需要严格按照接收顺序处理 WebSocket 消息。

    异步处理消息的解决方案是不从receive方法发送响应,而是从通过loop.create_task调度的协程发送响应。

    调度生成响应的长时间运行的协程允许receive 完成,并让下一个receive 开始。一旦第二条消息的响应生成被调度,两个协程就会被调度,解释器可以切换上下文来异步执行它们。

    对于问题中的示例,这是我找到的解决方案:

    class QuestionConsumer(AsyncWebsocketConsumer):
    
        async def complicated_answer(self, question):
            await asyncio.sleep(3)
            answer = {
                 "What is the Answer to Life, The Universe and Everything?": "42",
                 "Why?": "Because.",
            }.get(question, "Don't know")
            # instead of returning the answer, send it directly to client as a response
            await self.send(answer)
    
    
        async def receive(self, text_data=None, bytes_data=None):
            # instead of awaiting, schedule the coroutine
            loop = asyncio.get_running_loop()
            loop.create_task(
                self.complicated_answer(text_data)
            )
    

    这个改变的消费者的输出与问题给出的期望输出相匹配。请注意,响应可能会乱序返回,并且客户端负责将请求与响应匹配。

    请注意,对于 Python 版本 get_event_loop 而不是 get_running_loop

    【讨论】:

      猜你喜欢
      • 2020-06-04
      • 1970-01-01
      • 2012-02-29
      • 1970-01-01
      • 2019-02-26
      • 2011-07-27
      • 2021-07-14
      • 2021-07-24
      • 2021-05-29
      相关资源
      最近更新 更多