【问题标题】:Node Kafka - Duplicate messages are coming节点卡夫卡 - 重复的消息来了
【发布时间】:2019-02-22 22:37:21
【问题描述】:

我在使用消费者组消费 Kafka 消息时收到重复消息。

我正在使用这个 Nodejs 库。 https://www.npmjs.com/package/kafka-node

我的消费者代码如下

const config = require( '../../configs' );
const kafka = require( 'kafka-node' );

var options = {
    id: 'consumer1',
    kafkaHost: config.kafka.prod.kafka_host, //multiple kafka hosts (comma separated)
    groupId: "test-group2",
    sessionTimeout: 15000,
    protocol: ['roundrobin'],
    fromOffset: 'earliest'
};

var consumerGroup = new kafka.ConsumerGroup( options, 'my-replicated-topic3' );

consumerGroup.on( 'message', function ( message ) {
    console.log( message );
} );

我得到了低于结果。

{ topic: 'my-replicated-topic3',
  value: '{"meta":{"topic":"my-replicated-topic3","added_at":"2019-02-22T09:25:54.708Z","server":"cron"},"data":"1550827554708 ::: Totam quis qui. Sit dolore laboriosam odio. Facilis porro et quam repellat pariatur. Ad voluptatem quidem."}',
  offset: 8941,
  partition: 0,
  highWaterOffset: 8966,
  key: null }
  ---
  ---
  ---
  ---
  { topic: 'my-replicated-topic3',
  value: '{"meta":{"topic":"my-replicated-topic3","added_at":"2019-02-22T09:25:54.708Z","server":"cron"},"data":"1550827554708 ::: Totam quis qui. Sit dolore laboriosam odio. Facilis porro et quam repellat pariatur. Ad voluptatem quidem."}',
  offset: 8941,
  partition: 0,
  highWaterOffset: 8970,
  key: null }

您可以看到每隔几条记录就​​会重复相同的消息。这里 offset 的消息是相同的,但 highWaterOffset 是不同的所有重复的消息。

请提出解决此问题的方法。

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    您是否重新实例化您的消费者?您的消费者如何提交其抵消?

    默认情况下,您的消费者每 5 秒自动提交一次偏移量。您的图书馆就是这种情况。

    如果您在提交偏移量之前重新实例化您的消费者,它将从上次提交的偏移量重新启动。

    【讨论】:

    • 嗨,我既没有重置也没有提交偏移量,上面发布的代码是使用数据的完整代码。我通过在终端中运行代码开始使用数据,我可以立即看到重复的问题。
    猜你喜欢
    • 2016-03-06
    • 1970-01-01
    • 2019-08-06
    • 2018-12-01
    • 2017-06-12
    • 1970-01-01
    • 2016-11-03
    • 2021-01-26
    • 1970-01-01
    相关资源
    最近更新 更多