【问题标题】:Using Spark Structured Streaming to Read Data From Kafka, Issue of Over-time is Always Occured使用Spark Structured Streaming从Kafka读取数据,总是出现超时问题
【发布时间】:2018-09-06 16:10:31
【问题描述】:

这是我使用 Spark Structured Streaming 从 Kafka 读取数据的代码,

//ss:SparkSession is defined before. 
import ss.implicits._
val df = ss
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafka_server)
  .option("subscribe", topic_input)
  .option("startingOffsets", "latest")
  .option("kafkaConsumer.pollTimeoutMs", "5000")
  .option("failOnDataLoss", "false")
  .load()

这是错误代码,

  Caused by: java.util.concurrent.TimeoutException: Cannot fetch record xxxx for offset in 5000 milliseconds

如果我将 5000 放大到 10000,仍然会发生此错误。 我用谷歌搜索了这个 qquestion。似乎没有太多关于这个问题的相关信息。

这是 sbt 文件中与此问题相关的部分。

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0" exclude ("org.apache.kafka", "kafka-clients")
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"

【问题讨论】:

  • 有同样的问题。有任何更新/进展吗?
  • 我试过@Norman Bai 建议的方法。这个错误发生的频率稍微降低了一点。但有时它仍然会发生:-(。也许,我认为尝试 Norman Bai 推荐的方法很好。
  • 谢谢。我也会尝试这个提议,以防它再次发生并让你知道。
  • 好的。如果你有任何进展,你可以在这里列出。谢谢。 :-)

标签: apache-spark apache-kafka spark-structured-streaming


【解决方案1】:

我也遇到了这个错误。

我查看了KafkaSourceRDD的源码,一无所获。

我猜是kafka连接器有问题,因此我在“spark-sql-kafka-0-10_2.11”包中排除了kafka-client,并添加了一个新的依赖项,如下所示:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.3.0</version>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <artifactId>kafka-clients</artifactId>
                <groupId>org.apache.kafka</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
    </dependency>

现在可以了。希望对您有所帮助。

我创建了一个 jira 问题来报告这个问题: https://issues.apache.org/jira/browse/SPARK-23829

2018 年 12 月 17 日更新:Spark 2.4 和 Kafka2.0 解决了这个问题。

【讨论】:

  • 谢谢,我试过你建议的这个方法。不幸的是,这个错误仍然发生。
  • @GaoYuan 你的 kafka 版本是什么?我建议使用与您的 kafka 相同的客户端版本。
  • 非常抱歉耽搁了。我使用的 kafka 版本是 0.11.0.0。根据您的建议,问题中的配置文件已更新。
  • 感谢您的帮助。其实我还在测试这个功能。
  • @GaoYuan 你是否使用了 0.11 而不是 0.10.2.1 的 kafka-clients 依赖项?
猜你喜欢
  • 2021-01-05
  • 1970-01-01
  • 1970-01-01
  • 2021-12-05
  • 1970-01-01
  • 1970-01-01
  • 2020-07-12
  • 2021-04-10
  • 2020-12-30
相关资源
最近更新 更多