【问题标题】:Java Kafka consumer group failing to consume a few messagesJava Kafka 消费者组未能消费几条消息
【发布时间】:2018-06-04 23:17:57
【问题描述】:

注意到一个问题,其中 Kafka 消费者组(在 java 中实现)始终丢失来自代理的一些消息。作为调试的第一行,通过 kafka 控制台消费者,我可以看到代理中可用的那些消息。

Kafka 代理版本:0.10.1.0

Kafka 客户端版本:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.9.0.1</version>
</dependency>

Kafka 消费者配置:

Properties props = new Properties();
props.put("bootstrap.servers","broker1,broker2,broker3");
props.put("group.id", "myGroupIdForDemo");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("heartbeat.interval.ms", "25000"); 
props.put("session.timeout.ms", "30000"); 
props.put("max.poll.interval.ms", "300000");
props.put("max.poll.records", "1");
props.put("zookeeper.session.timeout.ms", "120000");
props.put("zookeeper.sync.time.ms", "10000");
props.put("auto.commit.enable", "false");
props.put("auto.commit.interval.ms", "60000");
props.put("auto.offset.reset", "earliest");
props.put("consumer.timeout.ms", "-1");
props.put("rebalance.max.retries", "20");
props.put("rebalance.backoff.ms", "6000");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

编辑 - 添加更多信息

我想补充一些更多信息: 共有6个分区。但是,对于具有相同消费者组 ID 的主题,消费者总数为 40。我明白 34 位消费者无所事事。

但是,我想了解的方面是,如果消费者未能发送心跳到经纪人认为已死亡并重新分配分区的程度,那么空闲的消费者是否有机会消费消息?这个消息未被消费的问题总是只在某些分区中被注意到。我的意思是消息无法从同一分区传递/使用。

感谢任何帮助。谢谢。

【问题讨论】:

  • 是否错过第一条/最后一条/随机消息?
  • 是否有任何其他进程/线程使用myGroupIdForDemo 运行?如果您为其分配一个随机值,该行为是否仍然存在?
  • @Natalia,这是随机消息。
  • @SzymonBiliński,是的,因为它是一个消费者组,所以其他一些具有相同组 id 的消费者也在运行。
  • @thomas 更准确地说:在这个“调试场景”中,您确定消息不是由使用相同消费者组 ID 运行的不同消费者处理的吗?

标签: java apache-kafka


【解决方案1】:

a) 即使在 Kafka 中也可能不存在消息 - 在这种情况下,请检查消息大小是否不超过 kafka 代理配置中允许的最大消息大小。

b) 如果您的消费者连接到 Kafka 实例 1 并且 2-d 实例未连接,您可能会错过来自 2-d kafka 的消息:因此,请在消费者连接字符串中指定所有代理。

3)如果kafka上存在消息并且你连接了,你可能无法反序列化消息,所以,尝试另一个反序列化器,可能不是字符串,而是字节数组,看看会发生什么,消息会被消费吗?如果是,则转换为字符串是有问题的。

4) 消息可能被另一个工作的消费者“窃取”,在相同的组 ID 下工作,请选择唯一的组 ID。

5) 你用什么记录器来查看消费的消息?你不怀疑这是一个记录器问题吗?

6) 可能是您在消费所有消息之前杀死/停止消费者?

7) 可能是您消费了,但由于消费者内存限制而失败?我可以增加 -Xmx。 (堆大小)

【讨论】:

  • 嗨@Vladimir,这七点都不成立。但是,我怀疑当分区重新分配发生时(由于消费者之一延迟向代理发送心跳),然后发生了一些我需要了解的事情。
  • 在所描述的情况下,消费者可以断开连接,因此会发生重新平衡。您需要防止消费者断开连接。我怀疑您的消费者 heratbeat 失败是有原因的:某些消息只会杀死您的消费者。可能是消息大小,可能是反序列化器错误,等等..
  • 感谢您的回复。没有错误,但是处理某些消息需要时间;处理是 I/O 绑定操作,即。在数据库中查找,与其他服务交互等,这导致了重新平衡。你认为如果我可以发送心跳以避免重新平衡会解决这个问题吗?
  • 我认为设置很长的 session.timeout.ms 可能会对你有所帮助
猜你喜欢
  • 1970-01-01
  • 2017-09-23
  • 1970-01-01
  • 2017-10-19
  • 2020-08-11
  • 2022-06-15
  • 1970-01-01
  • 1970-01-01
  • 2021-06-06
相关资源
最近更新 更多