【发布时间】:2014-02-07 09:42:41
【问题描述】:
我的项目中有 2 个生产者和两个消费者,它们通过 node.js 中的 amqp 模块使用 rabbitmq。
为消费者建立连接的代码如下所示:
function init_consumers( ) {
console.log( 'mq: consumers connection established. Starting to init stuff..' );
global.queue.consumers.connection = con_c;
var fq_name = global.queue.fq_name;
var aq_name = global.queue.aq_name;
var q_opts = { durable:true };
var subscr_opt = { ack: true, prefetchCount: 1 };
var fq = con_c.queue( fq_name, q_opts, function() {
console.log( 'mq: consumer f queue prepared. ');
global.queue.consumers.fq = fq;
fq.subscribe( subscr_opt, function(msg) {
global.controllers.fc.ParseF( msg, function() { global.queue.consumers.fq.shift(); } );
});
});
var aq = con_c.queue( aq_name, q_opts, function() {
console.log( 'mq: consumer a queue prepared. ');
global.queue.consumers.aq = aq;
aq.subscribe( subscr_opt, function(msg) {
global.controllers.fc.ParseAndSaveToDB( msg, function() { global.queue.consumers.aq.shift(); } );
});
});
}
// connect and init
var con_c = amqp.createConnection( { url: global.queue.consumers.host }, global.queue.con_extra_options );
con_c.on( 'ready', init_consumers );
con_c.on( 'error', function(e) {
console.log( 'consumer error', e );
con_c.end();
if( typeof global.queue.consumers.connection !== 'undefined' ) { global.queue.consumers.connection.end(); }
});
连接额外选项是:
con_extra_options: { reconnect: true, // Enable reconnection
reconnectBackoffStrategy: 'linear',
reconnectBackoffTime: 5000, // Try reconnect 5 seconds
},
在 rabbitmq 管理控制台中启动项目后,我清楚地看到有 2 个通道建立了 1 个连接。很好。
然后,它工作得很好,直到发生错误(可能与某些断开连接有关),在控制台中给我这个输出:
consumer error { [Error: read ETIMEDOUT] code: 'ETIMEDOUT', errno: 'ETIMEDOUT', syscall: 'read' }
问题是,当发生此错误时,amqp 会尝试重新连接到队列服务器,但似乎即使使用我上面的代码,连接仍然保持打开状态。结果经过几个小时的工作,我打开了 10 个连接,打开了大约 50 个频道,剩下的唯一事情就是重新启动项目。
所以,我有两个问题:
- 为什么会出现这种错误?
- 我应该实施哪些修改来阻止连接和通道随时间增长?
【问题讨论】:
标签: javascript node.js rabbitmq amqp