【问题标题】:django.channels async consumer does not appear to execute asynchronouslydjango.channels 异步使用者似乎没有异步执行
【发布时间】:2018-07-30 00:32:26
【问题描述】:

我已将 django.channels 添加到 django 项目中,以支持通过 websockets 通知用户进度的长时间运行的进程。

除了长时间运行的进程的实现似乎没有异步响应之外,一切似乎都运行良好。

为了测试,我创建了一个AsyncConsumer,它可以识别两种类型的消息“run”和“isBusy”。

“运行”消息处理程序设置一个“忙碌标志”发送回一个“进程正在运行”消息,等待 异步 20 秒重置“忙碌标志”,然后发回一个“进程”完整的消息'

“isBusy”消息返回一条带有忙碌标志状态的消息。

我的期望是,如果我发送一条运行消息,我将立即收到一条“进程正在运行”消息,20 秒后我将收到一条“进程完成”消息。 这按预期工作。

我还希望,如果我发送“isBusy”消息,我将立即收到带有标志状态的响应。

观察到的行为如下:

  • (从客户端)发送消息“运行”
  • 立即收到一条消息“正在运行,请稍候”
  • (从客户端)发送消息“isBusy”
  • 消息到达服务器端的 Web 套接字侦听器
  • 在运行处理程序完成之前什么都不会发生
  • 客户端收到“已完成运行”消息
  • 紧随其后的是“process isBusy:False”消息

这里是 Channel 监听器的实现:

class BackgroundConsoleConsumer(AsyncConsumer):
    def __init__(self, scope):
        super().__init__(scope)
        self.busy = False

    async def run(self, message):
        print("run got message", message)
        self.busy = True
        await self.channel_layer.group_send('consoleChannel',{
                    "type":"consoleResponse",
                    "text":"running please wait"
                })
        await asyncio.sleep(20)
        self.busy = False
        await self.channel_layer.group_send('consoleChannel',{
                    "type":"consoleResponse",
                    "text": "finished running"
                })

    async def isBusy(self,message):
        print('isBusy got message', message)
        await self.channel_layer.group_send('consoleChannel',{
                    "type":"consoleResponse",
                    "text":  "process isBusy:{0}".format(self.busy)
                })

在路由文件中设置通道如下:

application = ProtocolTypeRouter({
    "websocket": AuthMiddlewareStack(
        URLRouter([
            url("^console/$", ConsoleConsumer),
        ])

    ),
    "channel": ChannelNameRouter({
        "background-console":BackgroundConsoleConsumer,
    }),
})

我与一名工作人员一起运行频道(通过 ./manage.py runworker )。

实验是在 django 测试服务器上完成的(通过 runserver)。

任何关于为什么频道消费者似乎没有异步工作的想法将不胜感激。

【问题讨论】:

  • 你能告诉我们run的定义吗? documentation 似乎没有提到它,所以我认为它是在您的代码中定义的而不是继承的。
  • 您好,我说的是runisBusy这两个函数,它们就是上面代码BackgroundConsoleConsumer中显示的函数。我还提到与一名工作人员一起运行频道...也就是说,我从控制台开始频道的进程:./manage.py runworker background-console 其中background-console 是与频道关联的名称(上面描述中的第二个脚本)
  • 消息(run 和 isBusy)通过AsyncJsonWebsocketConsumer 到达BackgroundConsoleConsumerAsyncJsonWebsocketConsumer 侦听来自连接客户端的字符串,然后将消息发送到background-console 通道。因此,在收到套接字消息后,我只需执行以下操作: await self.channel_layer.send('background-console', { 'type': 'run', 'data': { 'some-data': 1} })
  • 抱歉,我指的是run呼叫站点。问题可能是 run 正在等待而不是在后台启动,这导致 isBusy 等待完成。也许在某些时候你应该使用loop.create_task 而不是await。这只是一个猜测,因为我不熟悉渠道架构。
  • 干得好!请把它写下来作为答案,它可能对其他人非常有用。请注意,您可能希望使用loop.create_task(或新的asyncio.create_task)而不是asyncio.ensure_future(如explained by Guido)。

标签: django python-asyncio django-channels


【解决方案1】:

经过一番挖掘,这里是问题和解决方法。

通道将发送给它的消息添加到 asyncio.Queue 并按顺序处理它们。

释放协程控制(通过asyncio.sleep() 或类似的东西)是不够的,必须在消费者收到新消息之前完成对消息处理程序的处理。

这是对前面示例的修复,其行为符合预期(即在处理 run 长时间运行的任务时响应 isBusy 消息)

感谢@user4815162342 的建议。

class BackgroundConsoleConsumer(AsyncConsumer):
    def __init__(self, scope):
        super().__init__(scope)
        self.busy = False

    async def run(self, message):
        loop = asyncio.get_event_loop()
        loop.create_task(self.longRunning())

    async def longRunning(self):
        self.busy = True
        await self.channel_layer.group_send('consoleChannel',{
                    "type":"the.type",
                    "text": json.dumps({'message': "running please wait", 'author': 'background console process'})
                })
        print('before sleeping')
        await asyncio.sleep(20)
        print('after sleeping')
        self.busy = False
        await self.channel_layer.group_send('consoleChannel',{
                    "type":"the.type",
                    "text": json.dumps({'message': "finished running", 'author': 'background console process'})
                })

    async def isBusy(self,message):
        print('isBusy got message', message)
        await self.channel_layer.group_send('consoleChannel',{
                    "type":"the.type",
                    "text":  json.dumps({'message': "process isBusy:{0}".format(self.busy),
                                         'author': 'background console process'})
                })

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-07-19
相关资源
最近更新 更多