【发布时间】:2016-05-31 12:14:37
【问题描述】:
我正在使用 NodeJS 来消费来自 Kafka 的消息,收到消息后,我会将其带到 Elasticsearch 中创建索引。这是我的一段代码:
kafkaConsumer.on('message', function (message) {
elasticClient.index({
index: 'test',
type: 'sample',
body: message
}, function (error, response) {
if (error) {
// Stop consuming message here
console.log(error);
}
console.log(response);
});
});
我想确保在继续消费下一条消息之前必须成功创建索引,因为我不希望任何消息丢失。
【问题讨论】:
标签: node.js elasticsearch apache-kafka elasticjs