【问题标题】:RabbitMQ Pika and Django Channels websocketRabbitMQ Pika 和 Django Channels websocket
【发布时间】:2020-05-07 07:17:51
【问题描述】:

我第一次使用 Django Channels 和 RabbitMQ pika。我正在尝试从 RabbitMQ 队列中消费。我正在使用 Django Channels AsyncConsumer 将其分组发送给在 websocket 中连接的每个人。

User type 1:可以创建任务

User type 2 :可以接受任务。

用例:当user type 1 创建任务时,它会在rabbitmq 中发布。当它从队列中被消耗时,它必须被 group-sent 发送到前端。当user type 2 接受任务时,user type 2 的其他实例无法接受相同的任务,我们再次从队列中消费并将队列中的下一个任务发送给每个人。

我已经使用sync_to_async 在不同的线程中创建了连接,我将它附加到回调函数的内存列表中。 每当有人接受时,我都会将其从列表中弹出并确认队列。

class AcceptTaskConsumer(AsyncConsumer):
    body = [] #IN MEMORY LIST 
    delivery = {} #To store ack delivery_tag 


    async def websocket_connect(self, event):
        print("AcceptTaskConsumer connected", event)
        AcceptTaskConsumer.get_task() #STARTS Queue listener in new thread
        self.room_group_name = "user_type_2"
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.send({
            "type": "websocket.accept"
        })

    async def websocket_receive(self, event):
        if event["text"] == "Hi": #If connecting first time
            if AcceptTaskConsumer.body:
                await self.channel_layer.group_send(
                    self.room_group_name,
                    {
                        "type": "message",
                        "text": AcceptTaskConsumer.body[0]["body"]
                    }
                )
            else:
                await self.channel_layer.group_send(
                    self.room_group_name,
                    {
                        "type": "message",
                        "text": "No New Tasks"
                    }
                )

        else: #When someone accepts a task-> ack and send next task in queue
            print(json.loads(event["text"])["id"])
            AcceptTaskConsumer.channel.basic_ack(delivery_tag=AcceptTaskConsumer.delivery[json.loads(event["text"])["id"]])
            AcceptTaskConsumer.delivery.pop(json.loads(event["text"])["id"])
            AcceptTaskConsumer.body.pop(0)
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    "type": "message",
                    "text": "No New Tasks"
                }
            )

            if AcceptTaskConsumer.body:
                await self.channel_layer.group_send(
                    self.room_group_name,
                    {
                        "type": "message",
                        "text": AcceptTaskConsumer.body[0]["body"]
                    }
                )

    async def message(self, event):
        await self.send({
            "type": "websocket.send",
            "text": event["text"]
        })

    @classmethod
    @sync_to_async
    def get_task(cls): #pika consumer
        cls.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))
        cls.channel = cls.connection.channel()

        cls.channel.queue_declare(queue='task_', arguments={"x-max-priority": 3})

        cls.channel.basic_consume(
            queue='task_', on_message_callback=AcceptTaskConsumer.callback, auto_ack=False)
        cls.channel.start_consuming()

    @classmethod
    def callback(cls, ch, method, properties, body):
        task_obj = {"body": json.dumps(body.decode("utf-8")),
                    "delivery_tag": method.delivery_tag}
        AcceptTaskConsumer.body.append(task_obj)
        AcceptTaskConsumer.delivery[json.loads(json.loads(task_obj["body"]))["id"]] = method.delivery_tag
        cls.channel.stop_consuming()

    async def websocket_disconnect(self, event):
        print(event)
        await self.send({
            "type": "websocket.close"
        })

        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

我很确定这不是正确的做法,因为它没有按预期工作

我经常遇到类似的错误。

  • 169 个渠道中的 39 个在群发中超出容量
  • pika.exceptions.StreamLostError:流连接丢失:BrokenPipeError(32, 'Broken pipe')

我也尝试像this answer 这样运行队列侦听器。没有任何工作。 任何有经验的人对此有任何想法吗?有没有更好的方法来解决这个问题?

【问题讨论】:

    标签: django rabbitmq django-channels pika


    【解决方案1】:

    您应该将 rabitMQ cosumering 逻辑移出 websocket 消费者。

    只要有一个运行 Rabbit Consumer 的 django command,该消费者可以从 RabbitMQ 获取消息,然后使用 send_group 将它们通过组发送到通道。

    如果您的 django 命令需要调用 send_group,请参阅 https://channels.readthedocs.io/en/latest/topics/channel_layers.html#using-outside-of-consumers

    from channels.layers import get_channel_layer
    
    channel_layer = get_channel_layer()
    
    async_to_sync(
        channel_layer.group_send
    )(
        "user_type_2",
        {"type": "message", "msg": 123}
    )
    

    然后在 websocket 消费者中,您应该订阅用户想要/有权获得的组。

    【讨论】:

    • 感谢发帖 如果我配置自定义命令,它会在不同的线程中运行吗?因为线程似乎是这里的问题。如果你知道更好的方法来做到这一点。我全神贯注
    • 另外,如果我们在不同的命令中运行它,我们如何将消息传递到 send_group?我们需要第三方脏店对吧?
    • (我已经更新了 awser 以包括如何从您的命令中发送到组)
    猜你喜欢
    • 2014-05-21
    • 2021-07-24
    • 2021-05-12
    • 2018-05-11
    • 2022-01-12
    • 2021-07-03
    • 2019-05-03
    • 2012-03-19
    • 1970-01-01
    相关资源
    最近更新 更多