【问题标题】:Consume RabbitMq using Pika and push using Socket.io使用 Pika 使用 RabbitMq 并使用 Socket.io 推送
【发布时间】:2021-10-10 07:07:40
【问题描述】:
我正在构建一个使用pika 接收来自rabbitmq 的消息的服务。并使用socket.io向客户端推送消息。
socket.io 服务器和 pika 服务器都阻塞了主线程。
这对于 celery 和 flask 或 Django 也是一样的。
解决这个问题并在同一上下文中运行它们的正确方法是什么?
谢谢,
谢伊
【问题讨论】:
标签:
python
socket.io
rabbitmq
celery
pika
【解决方案1】:
您可以使用Pub/Sub模型,在另一个线程中启动消费过程,注册想要从队列中接收的用户并向订阅的用户发送数据。
import json
import pika
import gevent
from flask import Flask
from flask_sockets import Sockets
connection_url = 'localhost'
channel_queue = 'test'
class PubSubListener(threading.Thread):
def __init__(self, queue_name):
threading.Thread.__init__(self)
self.clients = []
self.queue_name = queue_name
connection = pika.BlockingConnection(pika.ConnectionParameters(connection_url))
self.channel = connection.channel()
self.channel.queue_declare(queue=self.queue_name)
threading.Thread(target=self.channel.basic_consume(queue=self.queue_name,
auto_ack=True,
on_message_callback=self._callback))
def run(self):
self.channel.start_consuming()
def publish(self, body):
self.channel.basic_publish(exchange='',
routing_key=self.queue_name,
body=body)
def subscribe(self, client):
self.clients.append(client)
def _callback(self, channel, method, properties, body):
time.sleep(0.001)
message = json.loads(body)
print(message)
self.send(message)
def send(self, data):
for client in self.clients:
try:
client.send(data)
except Exception:
self.clients.remove(client)
pslistener = PubSubListener(channel_queue)
app = Flask(__name__)
sockets = Sockets(app)
@sockets.route('/echo')
def echo_socket(ws):
pslistener.subscribe(ws)
while not ws.closed:
gevent.sleep(0.1)
@app.route('/')
def hello():
return 'Hello World!'
if __name__ == "__main__":
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
pslistener.start()
print("Started")
server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
server.serve_forever()