【发布时间】:2015-12-10 18:35:00
【问题描述】:
我一直在到处寻找在 Node.js 中使用 RabbitMQ 的 headers exchange 示例。如果有人能指出我正确的方向,那就太好了。这是我目前所拥有的:
发布者方法(创建发布者)
RabbitMQ.prototype.publisher = function(exchange, type) {
console.log('New publisher, exchange: '+exchange+', type: '+type);
amqp.then(function(conn) {
conn.createConfirmChannel().then(function(ch) {
publishers[exchange] = {};
publishers[exchange].assert = ch.assertExchange(exchange, type, {durable: true});
publishers[exchange].ch = ch;
});
},function(err){
console.error("[AMQP]", err.message);
return setTimeout(function(){
self.connect(URI);
}, 1000);
}).then(null, console.log);
};
发布方法
RabbitMQ.prototype.publish = function(exchange, routingKey, content, headers) {
try {
publishers[exchange].assert.then(function(){
publishers[exchange].ch.publish(exchange, routingKey, new Buffer(content), { persistent: true, headers: headers }, function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
publishers[exchange].ch.connection.close();
}
});
});
} catch (e) {
console.error("[AMQP] publish", e.message);
offlinePubQueue.push([exchange, routingKey, content]);
}
};
消费者方法(创建消费者)
RabbitMQ.prototype.consumer = function(exchange, type, routingKey, cb) {
amqp.then(function(conn) {
conn.createChannel().then(function(ch) {
var ok = ch.assertExchange(exchange, type, {durable: true});
ok.then(function() {
ch.assertQueue('', {exclusive: true});
});
ok = ok.then(function(qok) {
var queue = qok.queue;
ch.bindQueue(queue,exchange,routingKey)
});
ok = ok.then(function(queue) {
ch.consume(queue, function(msg){
cb(msg,ch);
}, {noAck: false});
});
ok.then(function() {
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
});
});
}).then(null, console.warn);
};
上面的例子在topics 上运行良好,但我不确定如何转换到headers。我很确定我需要改变我的绑定方法,但找不到任何关于如何准确完成此操作的示例。
任何帮助将不胜感激!
【问题讨论】:
标签: javascript node.js express rabbitmq amqp