【发布时间】:2015-06-03 21:18:05
【问题描述】:
我一直在尝试开始使用 Kafka。我已经按照快速入门设置了一个 0.8.2 代理。使用单个节点代理,我可以运行 bin 文件夹中的脚本以成功生成和使用消息。
然后我尝试编写一些 Scala 代码来使用新的 Producer API 来生成消息。然而,消费者身上似乎没有出现任何东西。我修改了生产者代码以等待代理的元数据响应。这会阻塞,然后在 60 秒后超时。
除了使用新的 Producer API 之外,我能看到的唯一区别是我之前的所有测试都是在安装了 Kafka 的 VM 内运行的。我的代码在主机上运行并连接到 Kafka。我将一些代码用于获取主题元数据以确认与 Kafka 的连接。正确打印主题元数据。我还检查了主题设置为复制因子 1。我在 Kafka 日志中也看不到任何内容。
有人知道是什么导致 Kafka 无法提交消息吗?
下面是生产者代码:
val props = new HashMap[String, Object]()
props.put("bootstrap.servers", "10.1.0.180:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String]("test","The Message")
val partions = producer.partitionsFor("test2").asScala.toList
partions.foreach((p:PartitionInfo) => println(s"${p.partition} ${p.replicas.length}"));
val offest = producer.send(record).get().offset()
println(s"Offset $offest")
【问题讨论】:
标签: apache-kafka