【问题标题】:Closing amqp promise connection after publishing?发布后关闭 amqp promise 连接?
【发布时间】:2018-01-08 15:22:41
【问题描述】:

我正在研究如何在发布消息后关闭基于 Promise 的连接。

我试图推断我的发送者和接收者的共享代码,所以我有一个这样的连接文件:

connector.js

const amqp = require('amqplib');
class Connector {
  constructor(RabbitMQUrl) {
    this.rabbitMQUrl = RabbitMQUrl;
  }

  connect() {
    return amqp.connect(this.rabbitMQUrl)
      .then((connection) => {
        this.connection = connection;
        process.once('SIGINT', () => {
          this.connection.close();
        });
        return this.connection.createChannel();
      })
      .catch( (err) => {
        console.error('Errrored here');
        console.error(err);
      });
  }
}

module.exports = new Connector(
  `amqp://${process.env.AMQP_HOST}:5672`
);

然后我的发布者/发送者看起来像这样:

publisher.js

const connector = require('./connector');

class Publisher {
  constructor(exchange, exchangeType) {
    this.exchange = exchange;
    this.exchangeType = exchangeType;
    this.durabilityOptions = {
      durable: true,
      autoDelete: false,
    };
  }

  publish(msg) {
    connector.connect()
      .then( (channel) => {
        let ok = channel.assertExchange(
          this.exchange,
          this.exchangeType,
          this.durabilityOptions
        );
        return ok
          .then( () => {
            channel.publish(this.exchange, '', Buffer.from(msg));
            return channel.close();
          })
          .catch( (err) => {
            console.error(err);
          });
      });
  }
}

module.exports = new Publisher(
  process.env.AMQP_EXCHANGE,
  process.env.AMQP_TOPIC
);

但如前所述,在致电publish() 后,我无法弄清楚如何关闭我的连接。

【问题讨论】:

    标签: node.js amqp node-amqp node-amqplib


    【解决方案1】:

    您可以向连接器添加一个 close() 函数:

      close() {
          if (this.connection) {
              console.log('Connector: Closing connection..');
              this.connection.close();
          }
      }
    

    出版商:

    class Publisher {
      constructor(exchange, exchangeType) {
        this.exchange = exchange;
        this.exchangeType = exchangeType;
        this.durabilityOptions = {
          durable: true,
          autoDelete: false,
        };
      }
    
      connect() {
          return connector.connect().then( (channel) => {
             console.log('Connecting..');
             return channel.assertExchange(
              this.exchange,
              this.exchangeType,
              this.durabilityOptions
            ).then (() => {  
                this.channel = channel;
                return Promise.resolve();
            }).catch( (err) => {
                console.error(err);
            });;
          });
      }
    
      disconnect() {
          return this.channel.close().then( () => { return connector.close();});
      }
    
      publish(msg) {
          this.channel.publish(this.exchange, '', Buffer.from(msg));      
      };
    
    }
    

    Test.js

    'use strict'
    
    const connector = require('./connector');
    const publisher = require('./publisher');
    
    
    publisher.connect().then(() => { 
        publisher.publish('message');
        publisher.publish('message2');
        publisher.disconnect();
    });
    

    【讨论】:

    • hmmm 这样做会给我带来错误,例如:Unhandled rejection IllegalOperationError: Connection closing
    • 是的,如果你连续两次调用publish(),你会得到错误,因为响应是一个promise,并且当你调用时通道正在关闭。你必须做类似 publish('message').then(() => { publish('message2');});
    • 我实际上建议您在发布者中打开连接和通道,并且仅在完成后关闭,而不是在每次 publish() 调用之后关闭。
    • 啊,是的,我正试图为发布者和发件人干掉我的代码,因为有这么多共享代码。
    猜你喜欢
    • 1970-01-01
    • 2017-01-17
    • 2021-03-26
    • 1970-01-01
    • 1970-01-01
    • 2017-11-05
    • 1970-01-01
    • 2022-06-26
    • 2023-04-10
    相关资源
    最近更新 更多