【问题标题】:Kafka Consumer - receiving messages Inconsistently卡夫卡消费者 - 接收消息不一致
【发布时间】:2016-10-26 04:07:36
【问题描述】:

我可以在命令行上针对 Kafka 位置安装发送和接收消息。我还可以通过 Java 代码发送消息。这些消息显示在 Kafka 命令提示符中。我还有一个用于 Kafka 消费者的 Java 代码。代码昨天收到消息。然而,今天早上它没有收到任何消息。代码没有改变。我想知道属性配置是否不太正确。这是我的配置:

制作人:

bootstrap.servers - localhost:9092
group.id - test
key.serializer - StringSerializer.class.getName()
value.serializer - StringSerializer.class.getName()

ProducerRecord 设置为

ProducerRecord<String, String>("test", "mykey",  "myvalue")

消费者:

zookeeper.connect - "localhost:2181"
group.id - "test"
zookeeper.session.timeout.ms - 500
zookeeper.sync.time.ms - 250
auto.commit.interval.ms - 1000
key.deserializer - org.apache.kafka.common.serialization.StringDeserializer
value.deserializer - org.apache.kafka.common.serialization.StringDeserializer

对于 Java 代码:

   Map<String, Integer> topicCount = new HashMap<>();
   topicCount.put("test", 1);

   Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer
            .createMessageStreams(topicCount);
   List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

缺少什么?

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    可能会发生很多事情。

    首先,您的消费者的 ZooKeeper 会话超时非常低,这意味着消费者可能会由于垃圾收集暂停而经历许多“软故障”。发生这种情况时,消费者组将重新平衡,这可以暂停消费。如果这种情况发生得非常频繁,消费者可能会进入一种从不消费消息的状态,因为它一直在重新平衡。我建议将 ZooKeeper 会话超时时间增加到 30 秒,看看这是否能解决问题。如果是这样,您可以尝试将其设置得更低。

    其次,您能否确认正在为“测试”主题生成新消息?您的消费者只会使用尚未提交的新消息。该主题可能没有任何新消息。

    第三,同一消费者组中是否有其他消费者可以处理消息?如果一个消费者经常遇到软故障,其他消费者将被分配到它的分区。

    最后,您使用的是最终将被删除的“旧”消费者。如果可能,我建议转移到 Kafka 0.9 中提供的“新”消费者 (KafkaConsumer.java)。虽然我不能保证这会解决你的问题。

    希望这会有所帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-07-24
      • 1970-01-01
      • 2021-01-26
      • 2018-12-06
      • 1970-01-01
      • 2019-07-03
      • 2018-05-05
      相关资源
      最近更新 更多