【发布时间】:2016-09-01 12:48:54
【问题描述】:
我使用以下代码向我的 Kafka 发送了一条消息:
def getHealthSink(kafkaHosts: String, zkHosts: String) = {
val kafkaHealth: Subscriber[String] = kafka.publish(ProducerProperties(
brokerList = kafkaHosts,
topic = "health_check",
encoder = new StringEncoder()
))
Sink.fromSubscriber(kafkaHealth).runWith(Source.single("test"))
}
val kafkaHealth = getHealthSink(kafkaHosts, zkHosts)
我收到以下错误消息:
错误 kafka.utils.Utils$ 获取主题的主题元数据 [Set(health_check)] 来自经纪人 [ArrayBuffer(id:0,host:****,port:9092)] 失败 kafka.common.KafkaException:获取主题的主题元数据 [Set(health_check)] 来自经纪人 [ArrayBuffer(id:0,host:****,port:9092)] 失败
您知道可能是什么问题吗?
【问题讨论】:
标签: apache-kafka