【问题标题】:Node JS App crashes with ERR_SOCKET_CANNOT_SEND errorNode JS 应用程序因 ERR_SOCKET_CANNOT_SEND 错误而崩溃
【发布时间】:2021-04-08 02:24:59
【问题描述】:

我有一个节点 js 服务,它使用来自 Kafka 的消息并通过转换逻辑的各个步骤对其进行处理。在处理过程中,服务使用 Redis 和 mongo 进行存储和缓存。最后,它通过 UDP 数据包将转换后的消息发送到另一个目的地。

在启动时,它开始在一段时间后从 Kafka 消费消息,它崩溃并出现未处理的错误:ERR_CANNOT_SEND 无法发送数据(见下图)。 重新启动应用程序可以暂时解决问题。 我最初认为这可能与通过 UDP 套接字进行转发有关,但消费者可以访问转发目的地!

如果能提供任何帮助,我将不胜感激。我有点卡在这里。

消费者代码:

const readFromKafka =  ({host, topic, source}, transformationService) => {
    const logger = createChildLogger(`kafka-consumer-${topic}`);
    const options = {
        // connect directly to kafka broker (instantiates a KafkaClient)
        kafkaHost: host,
        groupId: `${topic}-group`,
        protocol: ['roundrobin'], // and so on the  other kafka config.
    };

    logger.info(`starting kafka consumer on ${host} for ${topic}`);
    const consumer = new ConsumerGroup(options, [topic]);
    consumer.on('error', (err) => logger.error(err));
    consumer.on('message', async ({value, offset}) => {
        logger.info(`recieved ${topic}`, value);
        if (value) {
            const final = await transformationService([
                JSON.parse(Buffer.from(value, 'binary').toString()),
            ]);
            logger.info('Message recieved', {instanceID: final[0].instanceId, trace: final[1]});
         
        } else {
            logger.error(`invalid message: ${topic} ${value}`);
        }
        return;
    });
    consumer.on('rebalanced', () => {
        logger.info('cosumer is rebalancing');
    });
    return consumer;
};

消费者服务启动及错误处理代码:

//init is the async function used to initialise the cache and other config and components.
const init = async() =>{
    //initialize cache, configs.
}

//startConsumer is the async function that connects to Kafka,
//and add a callback for the onMessage listener which processes the message through the transformation service.
const startConsumer = async ({ ...config}) => {
    //calls to fetch info like topic, transformationService etc.
   //readFromKafka function defn pasted above
    readFromKafka( {topicConfig}, transformationService);
};

init()
    .then(startConsumer)
    .catch((err) => {
        logger.error(err);
    });

通过 UDP 套接字转发代码。 以下代码间歇性地抛出未处理的错误,因为这似乎适用于前几千条消息,然后突然崩溃

const udpSender = (msg, destinations) => {
    return Object.values(destinations)
        .map(({id, host, port}) => {
            return new Promise((resolve) => {
                dgram.createSocket('udp4').send(msg, 0, msg.length, port, host, (err) => {
                    resolve({
                        id,
                        timestamp: Date.now(),
                        logs: err || 'Sent succesfully',
                    });
                });
            });
        });
};

【问题讨论】:

  • 这里完全是在黑暗中拍摄,但这是比赛条件的问题吗?请参阅此 github 评论了解我的想法:github.com/nodejs/help/issues/2484#issuecomment-590944091 我对套接字初始化知之甚少,但听起来好像在确保套接字正在侦听之前尝试在套接字上发送至少会导致您的真正问题被隐藏。
  • 我不确定评论是否对我的情况有帮助,因为我为每个目的地打开一个单独的套接字以将味精发送到目的地,所以有可能在某个时刻,可能会创建大约 100 个单独的套接字,但每个套接字将绑定到一个随机端口(因为我没有调用 socket.bind())。
  • 此外,应该调用一个错误回调,然后通过错误记录解决承诺。如果我是正确的?
  • 更大的堆栈跟踪作为文本会更有帮助,但是如果您的错误回调正常工作,那么您在堆栈跟踪中看到的SOCKET_CANNOT_SEND 应该是记录的内容。我的意思是,如果我阅读了我正确链接的评论,当您在未绑定的套接字上调用 socket.send 时,Node 将为您隐式处理绑定。但是,如果该绑定/发送错误,无论出于何种原因,隐式绑定处理程序在内部都会混淆SOCKET_CANNOT_SEND 错误背后的真正错误。再说一次,如果我没看错评论的话。
  • 您可能希望将您的 UDP 代码更改为类似 var mySock = dgram.createSocket('udp4'); mySock.bind(port, host, () => { mySock.send(msg, 0, msg.length, (err) => { resolve({ id, timestamp: Date.now(), logs: err || 'Sent succesfully', }); }); }); 的内容,这可能会将您的错误消息更改为更有帮助且不那么模棱两可的内容,这正是我所希望的

标签: node.js apache-kafka udp kafka-consumer-api dgrams


【解决方案1】:

根据我们的评论交流,我认为问题在于您的资源不足。

在您的应用程序的整个生命周期中,每次发送消息时,您都会打开一个全新的套接字。但是,您在发送该消息后没有进行任何清理,因此该套接字无限期地保持打开状态。然后,您打开的套接字继续堆积,消耗资源,直到您最终用完……某些东西。也许是内存,也许是端口,也许是其他东西,但最终你的应用程序会崩溃。

幸运的是,解决方案并不太复杂:只需重用现有的套接字。事实上,如果您愿意,您可以为整个应用程序重用一个套接字,因为在内部 socket.send 会为您处理排队,因此无需进行任何智能切换。但是,如果您想要更多的并发性,这里有一个循环队列的快速实现,我们预先创建了一个包含 10 个套接字的池,只要我们想发送消息,我们就可以从中获取:

const MAX_CONCURRENT_SOCKETS = 10;

var rrIndex = 0;

const rrSocketPool = (() => {
    var arr = [];
    for (let i = 0; i < MAX_CONCURRENT_SOCKETS; i++) {
        let sock = dgram.createSocket('udp4');
        arr.push(sock);
    }
    return arr;
})();

const udpSender = (msg, destinations) => {
    return Object.values(destinations)
        .map(({ id, host, port }) => {
            return new Promise((resolve) => {
                var sock = rrSocketPool[rrIndex];
                rrIndex = (rrIndex + 1) % MAX_CONCURRENT_SOCKETS;
                
                sock.send(msg, 0, msg.length, port, host, (err) => {
                    resolve({
                        id,
                        timestamp: Date.now(),
                        logs: err || 'Sent succesfully',
                    });
                });
            });
        });
};

请注意,由于某些原因,此实现仍然很幼稚,主要是因为套接字本身仍然没有错误处理,仅在其.send 方法上。您应该查看文档以获取有关捕获事件(例如 error 事件)的更多信息,特别是如果这是一个应该无限期运行的生产服务器,但基本上您在 .send 回调中放置的错误处理只会工作...如果在调用.send 时发生错误。如果在发送消息之间,当您的套接字空闲时,发生一些超出您控制范围的系统级错误并导致您的套接字中断,您的套接字可能会发出一个错误事件,该事件将无法处理(就像您当前的实现中发生的那样,与您在致命错误之前看到的间歇性错误)。到那时,它们现在可能永久无法使用,这意味着它们应该被替换/恢复或以其他方式处理(或者,像我一样,强制应用重新启动并收工:-))。

【讨论】:

  • 欣赏它,伙计。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-02-07
  • 2014-04-02
  • 1970-01-01
相关资源
最近更新 更多