【问题标题】:Kafka hangs when Committing Messages提交消息时卡夫卡挂起
【发布时间】:2015-06-03 21:18:05
【问题描述】:

我一直在尝试开始使用 Kafka。我已经按照快速入门设置了一个 0.8.2 代理。使用单个节点代理,我可以运行 bin 文件夹中的脚本以成功生成和使用消息。

然后我尝试编写一些 Scala 代码来使用新的 Producer API 来生成消息。然而,消费者身上似乎没有出现任何东西。我修改了生产者代码以等待代理的元数据响应。这会阻塞,然后在 60 秒后超时。

除了使用新的 Producer API 之外,我能看到的唯一区别是我之前的所有测试都是在安装了 Kafka 的 VM 内运行的。我的代码在主机上运行并连接到 Kafka。我将一些代码用于获取主题元数据以确认与 Kafka 的连接。正确打印主题元数据。我还检查了主题设置为复制因子 1。我在 Kafka 日志中也看不到任何内容。

有人知道是什么导致 Kafka 无法提交消息吗?

下面是生产者代码:

val props = new HashMap[String, Object]()
props.put("bootstrap.servers", "10.1.0.180:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)

val record = new ProducerRecord[String, String]("test","The Message")

val partions = producer.partitionsFor("test2").asScala.toList

partions.foreach((p:PartitionInfo) => println(s"${p.partition} ${p.replicas.length}"));

val offest = producer.send(record).get().offset()

println(s"Offset $offest")

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    感谢您的回复。不幸的是,我已经尝试过了。我现在已经弄清楚了问题所在。

    我的本​​地机器无法解析虚拟机的主机名。我已将代码配置为通过运行良好的虚拟机 IP 地址进行连接。因此它可以获得主题元数据。不幸的是,它使用虚拟机主机名来通告分区领导者。因此,当将消息发布到队列时,它会超时联系领导节点。

    解决方法只是将虚拟机主机名添加到本地机器的主机文件中。

    【讨论】:

      【解决方案2】:

      你试过重启zookeeper和kafka broker吗?

      【讨论】:

        猜你喜欢
        • 2016-11-03
        • 1970-01-01
        • 2017-06-12
        • 1970-01-01
        • 1970-01-01
        • 2021-01-26
        • 1970-01-01
        • 2016-03-06
        • 1970-01-01
        相关资源
        最近更新 更多