【问题标题】:Headers exchange example using RabbitMQ in Node.js在 Node.js 中使用 RabbitMQ 的标头交换示例
【发布时间】:2015-12-10 18:35:00
【问题描述】:

我一直在到处寻找在 Node.js 中使用 RabbitMQheaders exchange 示例。如果有人能指出我正确的方向,那就太好了。这是我目前所拥有的:

发布者方法(创建发布者)

RabbitMQ.prototype.publisher = function(exchange, type) {
 console.log('New publisher, exchange: '+exchange+', type: '+type);
 amqp.then(function(conn) {
    conn.createConfirmChannel().then(function(ch) {
        publishers[exchange] = {};
        publishers[exchange].assert = ch.assertExchange(exchange, type, {durable: true});
        publishers[exchange].ch = ch;
    });
 },function(err){
    console.error("[AMQP]", err.message);
    return setTimeout(function(){
        self.connect(URI);
    }, 1000);
 }).then(null, console.log);
};

发布方法

RabbitMQ.prototype.publish = function(exchange, routingKey, content, headers) {
 try {    
    publishers[exchange].assert.then(function(){
        publishers[exchange].ch.publish(exchange, routingKey, new Buffer(content), { persistent: true, headers: headers }, function(err, ok) {
            if (err) {
                console.error("[AMQP] publish", err);
                offlinePubQueue.push([exchange, routingKey, content]);
                publishers[exchange].ch.connection.close();
            }
        });
    });
 } catch (e) {                                                                                                                               
    console.error("[AMQP] publish", e.message);
    offlinePubQueue.push([exchange, routingKey, content]);
 }
};

消费者方法(创建消费者)

RabbitMQ.prototype.consumer = function(exchange, type, routingKey, cb) {
 amqp.then(function(conn) {
  conn.createChannel().then(function(ch) {

    var ok = ch.assertExchange(exchange, type, {durable: true});

    ok.then(function() {
      ch.assertQueue('', {exclusive: true});
    });

    ok = ok.then(function(qok) {
      var queue = qok.queue;
      ch.bindQueue(queue,exchange,routingKey)
    });

    ok = ok.then(function(queue) {
      ch.consume(queue, function(msg){
            cb(msg,ch);
      }, {noAck: false});
    });

    ok.then(function() {
      console.log(' [*] Waiting for logs. To exit press CTRL+C.');
    });

  });
 }).then(null, console.warn);
};

上面的例子在topics 上运行良好,但我不确定如何转换到headers。我很确定我需要改变我的绑定方法,但找不到任何关于如何准确完成此操作的示例。

任何帮助将不胜感激!

【问题讨论】:

    标签: javascript node.js express rabbitmq amqp


    【解决方案1】:

    我偶然发现了这个问题,正在寻找与 amqplib 相同的答案。不幸的是,像你一样,我找到了所有可用的documentationlacking。在查看了源代码,稍微阅读了协议,并尝试了一些组合之后,这终于为我完成了。

    ...
    let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
    chan.publish(XCHANGE, '', Buffer.from(output), opts);
    ...
    

    ...
    let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
    chan.bindQueue(q.queue, XCHANGE, '', opts);
    ...
    

    完整的工作代码如下。下面的身份验证信息是伪造的,因此您必须使用自己的身份验证信息。我也在使用 ES6、nodejs 6.5 版和 amqplib。给您的标头提供 x- 前缀和/或使用保留字作为标头名称可能存在问题,但我不太确定(我必须查看 RabbitMQ 源代码)。

    emit.js:

    #!/usr/bin/env node
    
    const XCHANGE = 'headers-exchange';
    
    const Q      = require('q');
    const Broker = require('amqplib');
    
    let scope = 'anonymous';
    
    process.on('uncaughtException', (exception) => {
        console.error(`"::ERROR:: Uncaught exception ${exception}`);
    });
    
    process.argv.slice(2).forEach((arg) =>
    {
        scope = arg;
        console.info('[*] Scope now set to ' + scope);
    });
    
    Q.spawn(function*()
    {
        let conn = yield Broker.connect('amqp://root:root@localhost');
        let chan = yield conn.createChannel();
    
        chan.assertExchange(XCHANGE, 'headers', { durable: false });
    
        for(let count=0;; count=++count%3)
        {
            let output = (new Date()).toString();
            let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
            chan.publish(XCHANGE, '', Buffer.from(output), opts);
            console.log(`[x] Published item "${output}" to <${XCHANGE} : ${JSON.stringify(opts)}>`);
    
            yield Q.delay(500);
        }
    });
    

    receive.js:

    #!/usr/bin/env node
    
    const Q      = require('q');
    const Broker = require('amqplib');
    const uuid   = require('node-uuid');
    const Rx     = require('rx');
    
    Rx.Node = require('rx-node');
    
    const XCHANGE = 'headers-exchange';
    const WORKER_ID = uuid.v4();
    const WORKER_SHORT_ID = WORKER_ID.substr(0, 4);
    
    Q.spawn(function*() {
        let conn = yield Broker.connect('amqp://root:root@localhost');
        let chan = yield conn.createChannel();
    
        chan.assertExchange(XCHANGE, 'headers', { durable: false });
    
        let q = yield chan.assertQueue('', { exclusive: true });
        let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
    
        chan.bindQueue(q.queue, XCHANGE, '', opts);
        console.info('[*] Binding with ' + JSON.stringify(opts));
    
        console.log(`[*] Subscriber ${WORKER_ID} (${WORKER_SHORT_ID}) is online!`);
    
        chan.consume(q.queue, (msg) =>
        {
            console.info(`[x](${WORKER_SHORT_ID}) Received pub "${msg.content.toString()}"`);
            chan.ack(msg);
        });
    });
    

    【讨论】:

      猜你喜欢
      • 2015-05-03
      • 2017-10-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-06-27
      • 1970-01-01
      相关资源
      最近更新 更多