【问题标题】:Is there a way with kafka-node to request a received messages after a crash?kafka-node 有没有办法在崩溃后请求接收到的消息?
【发布时间】:2018-04-14 03:27:40
【问题描述】:

我一直在看码头,现在想知道这是否甚至是 kafka 或 node-kafka 的一个功能。

所以我的消费者收到来自 kafka 的消息,当我处理该消息时,我喜欢将其标记为好像在完成处理之前我没有收到它。这样,如果服务器在处理过程中崩溃,我可以重新连接并接收我的应用程序在崩溃之前收到的消息。

高级示例。

  1. Kafka 消息是一个用户 ID;
  2. 收到后,我会从数据库中获取用户信息。
  3. 我向用户发送电子邮件。
  4. 发送电子邮件的时间。我将其记录到数据库中。
  5. 我将消息标记为完成。 (我不确定这一步是否可行)

这样,如果在第五步之前有任何步骤崩溃,当服务器备份时,它会再次收到失败的消息。

我正在使用 node.js LTS 8.10.0 和 npm 包 kafka-node@2.4.1

【问题讨论】:

    标签: javascript node.js apache-kafka kafka-consumer-api


    【解决方案1】:

    我将功劳归功于 void,因为他的回答帮助我在 kafka-node 中找到了我需要的东西。这只是 node.js 的答案/示例。

    const kafkaConfig = config.get('kafka');
    const kafkaClient = new kafka.KafkaClient({kafkaHost: kafkaConfig.host});
    const consumer = new kafka.Consumer(kafkaClient, [{topic: kafkaConfig.topic}], {autoCommit: false});
    const offset = new kafka.Offset(kafkaClient);
    
    // very import to retrieve the offset
    offset.fetchLatestOffsets([kafkaConfig.topic], (error, offsets) => {
        const latestOffset = offsets[kafkaConfig.topic][0];
        consumer.setOffset(kafkaConfig.topic, 0, latestOffset);
    });
    
    consumer.on('message', message => {
        // do stuff
        Log.insert({message})
            .then(() => {
                // commit the success so that kafka will update the offset
                consumer.commit((err, data) => console.log(err, data));
            })
    });
    

    【讨论】:

      【解决方案2】:

      执行此操作的方法是将上次读取偏移量保存到您喜欢的数据库中。这样,您可以使用此偏移量在您下次重新启动时启动您的消费者。

      Kafka 在> 0.8 版本中提供了一种内置机制来执行此操作。 Kafka 将消费者组读取的最新偏移存储到名为consumer_offsets 的内部主题中。大多数客户端库都有自动提交最后处理的偏移量的规定,或者您可以选择禁用自动提交并在整个处理完成后自己提交偏移量。

      【讨论】:

      • 因此使用 consumer_offsets,我无需数据库即可获得最新的偏移量。谢谢
      猜你喜欢
      • 2016-05-04
      • 2021-12-30
      • 2020-06-04
      • 1970-01-01
      • 1970-01-01
      • 2019-09-03
      • 2021-04-17
      • 2020-01-10
      • 1970-01-01
      相关资源
      最近更新 更多