【发布时间】:2026-01-03 11:20:02
【问题描述】:
我正在使用 rabbitMQ 和 socket.io 编写浏览器通知。除了一种情况,我的配置工作正常。
当我使用用户登录我的系统时,它会创建一个通知-UID-userid 队列(现在 queueName 由查询 oaraeter 发送,我将尽快实施更复杂的方法解决问题)
如果我在另一个浏览器上使用另一个用户登录,它会创建另一个队列通知-UID-seconduserid。
如果我注销其中一个用户,队列将消失(因为它不持久)。
问题是,当我在另一个会话上刷新或加载另一个页面时,即使未发送参数队列名,它也会重新创建第二个队列。
server.js
var amqp = require('amqp');
var app = require('express')();
var http = require('http').Server(app);
var io = require('socket.io')(http);
var rabbitMqConnection = null;
var _queue = null;
var _consumerTag = null;
io.use(function (socket, next) {
var handshakeData = socket.handshake;
// Here i will implement token verification
console.log(socket.handshake.query.queueName);
next();
});
// Gets the connection event form client
io.sockets.on('connection', function (socket) {
var queueName = socket.handshake.query.queueName;
console.log("Socket Connected");
// Connects to rabbiMq
rabbitMqConnection = amqp.createConnection({host: 'localhost', reconnect: false});
// Update our stored tag when it changes
rabbitMqConnection.on('tag.change', function (event) {
if (_consumerTag === event.oldConsumerTag) {
_consumerTag = event.consumerTag;
// Consider unsubscribing from the old tag just in case it lingers
_queue.unsubscribe(event.oldConsumerTag);
}
});
// Listen for ready event
rabbitMqConnection.on('ready', function () {
console.log('Connected to rabbitMQ');
// Listen to the queue
rabbitMqConnection.queue(queueName, {
closeChannelOnUnsubscribe: true,
durable: false,
autoClose: true
},
function (queue) {
console.log('Connected to ' + queueName);
_queue = queue;
// Bind to the exchange
queue.bind('users.direct', queueName);
queue.subscribe({ack: false, prefetchCount: 1}, function (message, headers, deliveryInfo, ack) {
console.log("Received a message from route " + deliveryInfo.routingKey);
socket.emit('notification', message);
//ack.acknowledge();
}).addCallback(function (res) {
// Hold on to the consumer tag so we can unsubscribe later
_consumerTag = res.consumerTag;
});
});
});
// Listen for disconnection
socket.on('disconnect', function () {
_queue.unsubscribe(_consumerTag);
rabbitMqConnection.disconnect();
console.log("Socket Disconnected");
});
});
http.listen(8080);
client.js
var io = require('socket.io-client');
$(document).ready(function () {
var socket = io('http://myserver.it:8080/', {
query: { queueName: 'notification-UID-' + UID},
'sync disconnect on unload': true,
});
socket.on('notification', function (data) {
console.log(data);
});
})
有什么想法吗?
【问题讨论】:
标签: javascript socket.io rabbitmq node-amqplib