【发布时间】:2021-04-08 02:24:59
【问题描述】:
我有一个节点 js 服务,它使用来自 Kafka 的消息并通过转换逻辑的各个步骤对其进行处理。在处理过程中,服务使用 Redis 和 mongo 进行存储和缓存。最后,它通过 UDP 数据包将转换后的消息发送到另一个目的地。
在启动时,它开始在一段时间后从 Kafka 消费消息,它崩溃并出现未处理的错误:ERR_CANNOT_SEND 无法发送数据(见下图)。 重新启动应用程序可以暂时解决问题。 我最初认为这可能与通过 UDP 套接字进行转发有关,但消费者可以访问转发目的地!
如果能提供任何帮助,我将不胜感激。我有点卡在这里。
消费者代码:
const readFromKafka = ({host, topic, source}, transformationService) => {
const logger = createChildLogger(`kafka-consumer-${topic}`);
const options = {
// connect directly to kafka broker (instantiates a KafkaClient)
kafkaHost: host,
groupId: `${topic}-group`,
protocol: ['roundrobin'], // and so on the other kafka config.
};
logger.info(`starting kafka consumer on ${host} for ${topic}`);
const consumer = new ConsumerGroup(options, [topic]);
consumer.on('error', (err) => logger.error(err));
consumer.on('message', async ({value, offset}) => {
logger.info(`recieved ${topic}`, value);
if (value) {
const final = await transformationService([
JSON.parse(Buffer.from(value, 'binary').toString()),
]);
logger.info('Message recieved', {instanceID: final[0].instanceId, trace: final[1]});
} else {
logger.error(`invalid message: ${topic} ${value}`);
}
return;
});
consumer.on('rebalanced', () => {
logger.info('cosumer is rebalancing');
});
return consumer;
};
消费者服务启动及错误处理代码:
//init is the async function used to initialise the cache and other config and components.
const init = async() =>{
//initialize cache, configs.
}
//startConsumer is the async function that connects to Kafka,
//and add a callback for the onMessage listener which processes the message through the transformation service.
const startConsumer = async ({ ...config}) => {
//calls to fetch info like topic, transformationService etc.
//readFromKafka function defn pasted above
readFromKafka( {topicConfig}, transformationService);
};
init()
.then(startConsumer)
.catch((err) => {
logger.error(err);
});
通过 UDP 套接字转发代码。 以下代码间歇性地抛出未处理的错误,因为这似乎适用于前几千条消息,然后突然崩溃
const udpSender = (msg, destinations) => {
return Object.values(destinations)
.map(({id, host, port}) => {
return new Promise((resolve) => {
dgram.createSocket('udp4').send(msg, 0, msg.length, port, host, (err) => {
resolve({
id,
timestamp: Date.now(),
logs: err || 'Sent succesfully',
});
});
});
});
};
【问题讨论】:
-
这里完全是在黑暗中拍摄,但这是比赛条件的问题吗?请参阅此 github 评论了解我的想法:github.com/nodejs/help/issues/2484#issuecomment-590944091 我对套接字初始化知之甚少,但听起来好像在确保套接字正在侦听之前尝试在套接字上发送至少会导致您的真正问题被隐藏。
-
我不确定评论是否对我的情况有帮助,因为我为每个目的地打开一个单独的套接字以将味精发送到目的地,所以有可能在某个时刻,可能会创建大约 100 个单独的套接字,但每个套接字将绑定到一个随机端口(因为我没有调用
socket.bind())。 -
此外,应该调用一个错误回调,然后通过错误记录解决承诺。如果我是正确的?
-
更大的堆栈跟踪作为文本会更有帮助,但是如果您的错误回调正常工作,那么您在堆栈跟踪中看到的
SOCKET_CANNOT_SEND应该是记录的内容。我的意思是,如果我阅读了我正确链接的评论,当您在未绑定的套接字上调用socket.send时,Node 将为您隐式处理绑定。但是,如果该绑定/发送错误,无论出于何种原因,隐式绑定处理程序在内部都会混淆SOCKET_CANNOT_SEND错误背后的真正错误。再说一次,如果我没看错评论的话。 -
您可能希望将您的 UDP 代码更改为类似
var mySock = dgram.createSocket('udp4'); mySock.bind(port, host, () => { mySock.send(msg, 0, msg.length, (err) => { resolve({ id, timestamp: Date.now(), logs: err || 'Sent succesfully', }); }); });的内容,这可能会将您的错误消息更改为更有帮助且不那么模棱两可的内容,这正是我所希望的
标签: node.js apache-kafka udp kafka-consumer-api dgrams