【问题标题】:Consume RabbitMq using Pika and push using Socket.io使用 Pika 使用 RabbitMq 并使用 Socket.io 推送
【发布时间】:2021-10-10 07:07:40
【问题描述】:

我正在构建一个使用pika 接收来自rabbitmq 的消息的服务。并使用socket.io向客户端推送消息。

socket.io 服务器和 pika 服务器都阻塞了主线程。
这对于 celeryflaskDjango 也是一样的。

解决这个问题并在同一上下文中运行它们的正确方法是什么?

谢谢, 谢伊

【问题讨论】:

  • 设置 2 个彼此独立运行的脚本是不行的吗?

标签: python socket.io rabbitmq celery pika


【解决方案1】:

您可以使用Pub/Sub模型,在另一个线程中启动消费过程,注册想要从队列中接收的用户并向订阅的用户发送数据。

import json
import pika
import gevent
from flask import Flask
from flask_sockets import Sockets

connection_url = 'localhost'
channel_queue = 'test'

class PubSubListener(threading.Thread):
    def __init__(self, queue_name):
        threading.Thread.__init__(self)

        self.clients = []
        self.queue_name = queue_name

        connection = pika.BlockingConnection(pika.ConnectionParameters(connection_url))
        self.channel = connection.channel()
        self.channel.queue_declare(queue=self.queue_name)

        threading.Thread(target=self.channel.basic_consume(queue=self.queue_name,
            auto_ack=True,
            on_message_callback=self._callback))

    def run(self):
        self.channel.start_consuming()

    def publish(self, body):
        self.channel.basic_publish(exchange='',
            routing_key=self.queue_name,
            body=body)

    def subscribe(self, client):
        self.clients.append(client)

    def _callback(self, channel, method, properties, body):
        time.sleep(0.001)
        message = json.loads(body)
        print(message)
        self.send(message)

    def send(self, data):
        for client in self.clients:
            try:
                client.send(data)
            except Exception:
                self.clients.remove(client)

pslistener = PubSubListener(channel_queue)

app = Flask(__name__)
sockets = Sockets(app)

@sockets.route('/echo')
def echo_socket(ws):
    pslistener.subscribe(ws)

    while not ws.closed:
        gevent.sleep(0.1)

@app.route('/')
def hello():
    return 'Hello World!'


if __name__ == "__main__":
    from gevent import pywsgi
    from geventwebsocket.handler import WebSocketHandler

    pslistener.start()

    print("Started")
    server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
    server.serve_forever()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-08-26
    • 1970-01-01
    • 2011-07-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多