【问题标题】:How a django channel websocket middleware can receive messagedjango 通道 websocket 中间件如何接收消息
【发布时间】:2020-05-07 03:26:40
【问题描述】:

中间件如何读取所有 websocket 消息?
据我了解,django-channel 中间件就像https://github.com/django/channels/blob/2a98606c1e0600cbae71ae1f02f31aae5d01f82d/channels/middleware.py

    async def coroutine_call(self, inner_instance, scope, receive, send):
        """
        ASGI coroutine; where we can resolve items in the scope
        (but you can't modify it at the top level here!)
        """
        await inner_instance(receive, send)

我知道如果我调用await receive() 而不是await inner_instance(receive, send),我会收到一条 websocket 消息,但在这种情况下 websocket 处理程序将不再工作。

coroutine_call 怎样才能接收到 websocket 消息,同时将其转发到下一个 websocket 中间件或处理程序?

【问题讨论】:

    标签: django websocket django-channels


    【解决方案1】:

    为了让中间件拦截消息,它需要拦截receivesend 队列。

    这里是一个复杂的中间件示例https://github.com/hishnash/channelsmultiplexer/blob/master/channelsmultiplexer/demultiplexer.py

    一个更简单的版本是:

    
    
    class _InterceptionMiddleware:
        def __init__(self, application_cls, scope):
            self.application = application_cls(scope)
    
    
        async def __call__(self, receive, send):
            self.downstream_send = send
    
            # create a Queue for the upstream consumer to read messages from
            self.upstream_receive_queue = asyncio.Queue()
    
            # create a Queue for the upstream consumer to write messages to
            self.upstream_send_queue = asyncio.Queue()
    
            # pipe messages being sent to the upstream consumer to your interceptor method
            receiver = await_many_dispatch([receive], self.my_receive_interceptor_method)
    
            # pipe messages being sent buy the upstream consumer to your interceptor method
            sender = await_many_dispatch(
                [self.upstream_send_queue.get],
                self.my_send_interceptor_method
            )
    
            # set up an asyncio task to handle these pipes
            receiver_task = asyncio.create_task(receiver)
            sender_task = asyncio.create_task(sender)
    
            # create an asyncio task for the upstream consumer
            upstream_task = asyncio.create_task(
                # pass the `get` and `put` methods of your upstream send and receive queues
                self.application(self.upstream_receive_queue.get, self.upstream_send_queue.put)
            )
    
            # await it all
            done, pending = await asyncio.wait(
                [upstream_task, receiver_task, sender_task],
                # if any of them fail stop
                return_when=asyncio.FIRST_COMPLETED
            )
            for task in [upstream_task, receiver_task, sender_task]:
                if not task.dont():
                    # we need to cancel this task.
                    task.cancel()
                    try:
                        await task
                    except CancelledError:
                        # we expect this error
                        pass
    
    
        async def my_receive_interceptor_method(self, msg):
            # your interception code
            await self.upstream_receive_queue.put(msg)
    
    
        async def my_send_interceptor_method(self, msg):
            # your interception code
            await self.downstream_send(msg)
    
    
    def InterceptionMiddleware(application_cls):
        return functools.partial(_InterceptionMiddleware, application_cls)
    
    

    【讨论】:

      猜你喜欢
      • 2021-08-26
      • 1970-01-01
      • 2019-06-04
      • 2022-12-18
      • 2017-09-30
      • 1970-01-01
      • 1970-01-01
      • 2022-11-10
      • 2019-01-14
      相关资源
      最近更新 更多