【发布时间】:2018-09-06 16:10:31
【问题描述】:
这是我使用 Spark Structured Streaming 从 Kafka 读取数据的代码,
//ss:SparkSession is defined before.
import ss.implicits._
val df = ss
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_server)
.option("subscribe", topic_input)
.option("startingOffsets", "latest")
.option("kafkaConsumer.pollTimeoutMs", "5000")
.option("failOnDataLoss", "false")
.load()
这是错误代码,
Caused by: java.util.concurrent.TimeoutException: Cannot fetch record xxxx for offset in 5000 milliseconds
如果我将 5000 放大到 10000,仍然会发生此错误。 我用谷歌搜索了这个 qquestion。似乎没有太多关于这个问题的相关信息。
这是 sbt 文件中与此问题相关的部分。
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0" exclude ("org.apache.kafka", "kafka-clients")
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
【问题讨论】:
-
有同样的问题。有任何更新/进展吗?
-
我试过@Norman Bai 建议的方法。这个错误发生的频率稍微降低了一点。但有时它仍然会发生:-(。也许,我认为尝试 Norman Bai 推荐的方法很好。
-
谢谢。我也会尝试这个提议,以防它再次发生并让你知道。
-
好的。如果你有任何进展,你可以在这里列出。谢谢。 :-)
标签: apache-spark apache-kafka spark-structured-streaming