【问题标题】:Serialization issues in Spark StreamingSpark Streaming 中的序列化问题
【发布时间】:2017-02-04 04:42:38
【问题描述】:

我对 Spark 如何处理后台数据感到非常困惑。例如,当我运行流式作业并应用 foreachRDD 时,行为取决于变量是从外部范围捕获还是在内部初始化。

val sparkConf = new SparkConf()
dStream.foreachRDD(rdd => {
    val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    ...
})

在这种情况下,我得到一个异常:

java.io.NotSerializableException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData 的对象可能作为 RDD 操作关闭的一部分被序列化。这是因为 DStream 对象是从闭包中引用的。请重写此 DStream 中的 RDD 操作以避免这种情况。已强制执行此操作以避免 Spark 任务因不必要的对象而膨胀。

但如果我将sparkConf 移动到里面,一切似乎都很好:

dStream.foreachRDD(rdd => {
    val sparkConf = rdd.sparkContext.getConf
    val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    ...
})

这对我来说看起来很奇怪,因为我认为foreachRDD 在驱动程序节点上运行,所以我没想到会有任何区别。

现在,如果我将 SQL 会话和配置都移到 foreachRDD 之外,它会再次正常工作:

val sparkConf = new SparkConf()
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
dStream.foreachRDD(rdd => {
    val df = spark.read.json(rdd)
    ...
})

Spark 文档中的 snippet 建议使用以前的版本(在 foreachRDD 中创建配置和 SQL 上下文),这对我来说似乎效率较低:如果可以只创建一次,为什么要为每个批次创建它们?

有人可以解释为什么抛出异常以及创建 SQL 上下文的正确方法是什么?

【问题讨论】:

  • 我以为foreachRDD运行在驱动节点上传递给foreachRDD的方法运行在worker上,而不是驱动上。
  • @YuvalItzchakov 我不这么认为,因为foreachRDD 在整个 RDD 上运行,而不是在该 RDD 的分区或元素上运行。并且文档明确表示它在驱动程序节点上运行:spark.apache.org/docs/latest/…
  • 你说得对,我的措辞不正确。我的意思是说,传递给rdd 的委托(即您想要对数据帧执行的任何操作)将在工作节点上运行。
  • @lizarisk :您不能在工作节点上运行的转换或操作操作中使用任何未序列化的类。
  • @Shankar 我在这里看不到任何在工作节点上运行的东西

标签: apache-spark apache-spark-sql spark-streaming apache-spark-ml


【解决方案1】:

ForeachRDD 顾名思义,在流中运行 foreach rdd 为什么要在每个 rdd 上重新创建 spark 上下文? 正确的做法是最后一种:

val sparkConf = new SparkConf()
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
dStream.foreachRDD(rdd => {
    val df = spark.read.json(rdd)
    ...
})

【讨论】:

    【解决方案2】:

    val spark = SparkSession.builder.config(sparkConf).getOrCreate() 不会创建另一个 SparkSession。只有一个存在。 在worker,只需从job 获取。

    【讨论】:

      【解决方案3】:

      在第一种方法中,您尝试为每个不正确的分区实例化 spark 会话对象。

      正如其他人的回答,使用第三种方法。但是如果你需要使用第一种方法,那么你可以使用如下 -

      val sparkConf = new SparkConf()
      dStream.foreachRDD(rdd => {
          lazy val spark = SparkSession.builder.config(sparkConf).getOrCreate()
          ...
      })
      

      这里延迟评估将有助于避免多次实例化 Spark 会话,从而避免序列化问题。

      我希望这会有所帮助。

      【讨论】:

        猜你喜欢
        • 2017-04-02
        • 2018-07-05
        • 2017-07-02
        • 1970-01-01
        • 1970-01-01
        • 2016-10-09
        • 1970-01-01
        • 2017-04-20
        相关资源
        最近更新 更多