【发布时间】:2019-02-22 22:37:21
【问题描述】:
我在使用消费者组消费 Kafka 消息时收到重复消息。
我正在使用这个 Nodejs 库。 https://www.npmjs.com/package/kafka-node
我的消费者代码如下
const config = require( '../../configs' );
const kafka = require( 'kafka-node' );
var options = {
id: 'consumer1',
kafkaHost: config.kafka.prod.kafka_host, //multiple kafka hosts (comma separated)
groupId: "test-group2",
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'earliest'
};
var consumerGroup = new kafka.ConsumerGroup( options, 'my-replicated-topic3' );
consumerGroup.on( 'message', function ( message ) {
console.log( message );
} );
我得到了低于结果。
{ topic: 'my-replicated-topic3',
value: '{"meta":{"topic":"my-replicated-topic3","added_at":"2019-02-22T09:25:54.708Z","server":"cron"},"data":"1550827554708 ::: Totam quis qui. Sit dolore laboriosam odio. Facilis porro et quam repellat pariatur. Ad voluptatem quidem."}',
offset: 8941,
partition: 0,
highWaterOffset: 8966,
key: null }
---
---
---
---
{ topic: 'my-replicated-topic3',
value: '{"meta":{"topic":"my-replicated-topic3","added_at":"2019-02-22T09:25:54.708Z","server":"cron"},"data":"1550827554708 ::: Totam quis qui. Sit dolore laboriosam odio. Facilis porro et quam repellat pariatur. Ad voluptatem quidem."}',
offset: 8941,
partition: 0,
highWaterOffset: 8970,
key: null }
您可以看到每隔几条记录就会重复相同的消息。这里 offset 的消息是相同的,但 highWaterOffset 是不同的所有重复的消息。
请提出解决此问题的方法。
【问题讨论】:
标签: apache-kafka kafka-consumer-api