【问题标题】:How to delay consuming message in Kafka nodejs?如何在 Kafka nodejs 中延迟消费消息?
【发布时间】:2016-05-31 12:14:37
【问题描述】:

我正在使用 NodeJS 来消费来自 Kafka 的消息,收到消息后,我会将其带到 Elasticsearch 中创建索引。这是我的一段代码:

kafkaConsumer.on('message', function (message) {
    elasticClient.index({
        index: 'test',
        type: 'sample',
        body: message
    }, function (error, response) {
        if (error) {

            // Stop consuming message here

            console.log(error);
        }
        console.log(response);
    });
});

我想确保在继续消费下一条消息之前必须成功创建索引,因为我不希望任何消息丢失。

【问题讨论】:

    标签: node.js elasticsearch apache-kafka elasticjs


    【解决方案1】:

    试试看consumer.pause()/resume().
    pause() 暂停消费者
    resume() 恢复消费者

    【讨论】:

    • 你能证明我有一段完整的代码来实现这个吗? .我在 if(error) 之后添加了 consumer.pause() 但它不起作用。
    【解决方案2】:

    您可以暂停您的消费者,使其不会订阅和获取新消息。一旦您从 elasticClient 获得响应,您就可以恢复您的 kafka 消费者。

    kafkaConsumer.pause();
    kafkaConsumer.resume();
    

    根据您的设置使用它。

    【讨论】:

    • 你能证明我有一段完整的代码来实现这个吗? .我在 if(error) 之后添加了 consumer.pause() 但它不起作用。
    • 我认为您将不得不在错误块之外编写您的代码。一旦消息被消费,您就可以创建索引并暂停您的消费者,直到获得响应。此后,恢复您的消费者。你可以尝试这样的事情:consumer.pause(); if(response) { consumer.resume();} 如果您尝试使用适当的回调为它们编写函数,则可以尝试使用标志。
    猜你喜欢
    • 2018-06-08
    • 1970-01-01
    • 1970-01-01
    • 2020-04-02
    • 2022-08-08
    • 1970-01-01
    • 2012-01-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多