【问题标题】:Apache Kafka: Time Out Exception w/ Spark StreamingApache Kafka:带有/ Spark Streaming 的超时异常
【发布时间】:2017-01-11 19:23:10
【问题描述】:

背景: 我在 Scala 中使用带有 Kafka 的 Spark Streaming 程序。我的目的是读取一个文件到 Kafka 并将这些消息发布到 Spark Streaming 应用程序以进行一些分析。

问题 但是,当我将文件通过管道传输到 Kafka 并启动我的 Streaming 应用程序以收听特定主题时,我会在我的 Kafka 生产者控制台上看到这些错误消息。

用于读取文件的命令:

C:\Kafka\bin\windows>kafka-console-producer --broker-list localhost:9092 --topic mytopic2 < C:\somefile.csv

错误:

[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0

我在我的 Windows 机器上本地运行这个应用程序,Kafka 服务器也在我的机器上本地运行。

Spark 应用程序如下所示:

val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")        
val topics = List("mytopic2").toSet
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)     

lines.foreachRDD((rdd, time) => {
// do something
}

我不确定关于 Kafka/Spark 的错误究竟意味着什么。

我们将不胜感激。

【问题讨论】:

    标签: apache-spark apache-kafka spark-streaming kafka-consumer-api kafka-producer-api


    【解决方案1】:

    该错误与 Spark/Spark Streaming 无关。 您的 Kafka 设置似乎有问题。

    当 Zookeeper 设置出现问题时,通常会发生超时错误。您是否正确配置了动物园管理员? 确保正确设置。另外,请先尝试运行 Kafka 附带的简单 Kafka 生产者和消费者脚本。

    【讨论】:

    • 您好 Manav,感谢您的回复。我按照这里给出的所有说明来配置 Zookeeper 并设置 Kafka 服务器。我再次配置了整个设置。两个控制台上的生产者/消费者测试工作正常,但是当我使用 Spark 流应用程序尝试它时,它给了我同样的错误。链接:[链接]youtube.com/watch?v=OJKesEpO6ok&feature=youtu.be
    • 谢谢,马纳夫。我想我开始深入研究 Kafka 而不是 Spark/Streaming 的工作是因为你的提示!
    【解决方案2】:

    这毕竟是 Kafka 的一个问题。我怀疑这与我下载的用于 Spark Streaming 的 Kafka 版本有关,而与 Kafka 设置本身的关系不大。

    我已经为 Spark Streaming 1.6.2 下载了 Kafka 0.10.0.x -> 这是我收到 TimeOut 错误的时候。 我找到了这个链接:https://spark.apache.org/docs/1.6.2/streaming-programming-guide.html#linking,它声明:“Kafka:Spark Streaming 1.6.2 is compatible with Kafka 0.8.2.1.”。

    所以,当我下载 0.8.2.1 时,它运行良好 - 我不再收到“超时错误”。

    【讨论】:

      猜你喜欢
      • 2020-10-29
      • 1970-01-01
      • 2017-09-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-12-09
      • 1970-01-01
      相关资源
      最近更新 更多