【发布时间】:2017-01-11 19:23:10
【问题描述】:
背景: 我在 Scala 中使用带有 Kafka 的 Spark Streaming 程序。我的目的是读取一个文件到 Kafka 并将这些消息发布到 Spark Streaming 应用程序以进行一些分析。
问题 但是,当我将文件通过管道传输到 Kafka 并启动我的 Streaming 应用程序以收听特定主题时,我会在我的 Kafka 生产者控制台上看到这些错误消息。
用于读取文件的命令:
C:\Kafka\bin\windows>kafka-console-producer --broker-list localhost:9092 --topic mytopic2 < C:\somefile.csv
错误:
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
我在我的 Windows 机器上本地运行这个应用程序,Kafka 服务器也在我的机器上本地运行。
Spark 应用程序如下所示:
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = List("mytopic2").toSet
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
lines.foreachRDD((rdd, time) => {
// do something
}
我不确定关于 Kafka/Spark 的错误究竟意味着什么。
我们将不胜感激。
【问题讨论】:
标签: apache-spark apache-kafka spark-streaming kafka-consumer-api kafka-producer-api