【发布时间】:2018-06-04 23:17:57
【问题描述】:
注意到一个问题,其中 Kafka 消费者组(在 java 中实现)始终丢失来自代理的一些消息。作为调试的第一行,通过 kafka 控制台消费者,我可以看到代理中可用的那些消息。
Kafka 代理版本:0.10.1.0
Kafka 客户端版本:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.1</version>
</dependency>
Kafka 消费者配置:
Properties props = new Properties();
props.put("bootstrap.servers","broker1,broker2,broker3");
props.put("group.id", "myGroupIdForDemo");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("heartbeat.interval.ms", "25000");
props.put("session.timeout.ms", "30000");
props.put("max.poll.interval.ms", "300000");
props.put("max.poll.records", "1");
props.put("zookeeper.session.timeout.ms", "120000");
props.put("zookeeper.sync.time.ms", "10000");
props.put("auto.commit.enable", "false");
props.put("auto.commit.interval.ms", "60000");
props.put("auto.offset.reset", "earliest");
props.put("consumer.timeout.ms", "-1");
props.put("rebalance.max.retries", "20");
props.put("rebalance.backoff.ms", "6000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
编辑 - 添加更多信息
我想补充一些更多信息: 共有6个分区。但是,对于具有相同消费者组 ID 的主题,消费者总数为 40。我明白 34 位消费者无所事事。
但是,我想了解的方面是,如果消费者未能发送心跳到经纪人认为已死亡并重新分配分区的程度,那么空闲的消费者是否有机会消费消息?这个消息未被消费的问题总是只在某些分区中被注意到。我的意思是消息无法从同一分区传递/使用。
感谢任何帮助。谢谢。
【问题讨论】:
-
是否错过第一条/最后一条/随机消息?
-
是否有任何其他进程/线程使用
myGroupIdForDemo运行?如果您为其分配一个随机值,该行为是否仍然存在? -
@Natalia,这是随机消息。
-
@SzymonBiliński,是的,因为它是一个消费者组,所以其他一些具有相同组 id 的消费者也在运行。
-
@thomas 更准确地说:在这个“调试场景”中,您确定消息不是由使用相同消费者组 ID 运行的不同消费者处理的吗?
标签: java apache-kafka