【发布时间】:2017-02-04 04:42:38
【问题描述】:
我对 Spark 如何处理后台数据感到非常困惑。例如,当我运行流式作业并应用 foreachRDD 时,行为取决于变量是从外部范围捕获还是在内部初始化。
val sparkConf = new SparkConf()
dStream.foreachRDD(rdd => {
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
...
})
在这种情况下,我得到一个异常:
java.io.NotSerializableException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData 的对象可能作为 RDD 操作关闭的一部分被序列化。这是因为 DStream 对象是从闭包中引用的。请重写此 DStream 中的 RDD 操作以避免这种情况。已强制执行此操作以避免 Spark 任务因不必要的对象而膨胀。
但如果我将sparkConf 移动到里面,一切似乎都很好:
dStream.foreachRDD(rdd => {
val sparkConf = rdd.sparkContext.getConf
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
...
})
这对我来说看起来很奇怪,因为我认为foreachRDD 在驱动程序节点上运行,所以我没想到会有任何区别。
现在,如果我将 SQL 会话和配置都移到 foreachRDD 之外,它会再次正常工作:
val sparkConf = new SparkConf()
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
dStream.foreachRDD(rdd => {
val df = spark.read.json(rdd)
...
})
Spark 文档中的 snippet 建议使用以前的版本(在 foreachRDD 中创建配置和 SQL 上下文),这对我来说似乎效率较低:如果可以只创建一次,为什么要为每个批次创建它们?
有人可以解释为什么抛出异常以及创建 SQL 上下文的正确方法是什么?
【问题讨论】:
-
我以为foreachRDD运行在驱动节点上传递给
foreachRDD的方法运行在worker上,而不是驱动上。 -
@YuvalItzchakov 我不这么认为,因为
foreachRDD在整个 RDD 上运行,而不是在该 RDD 的分区或元素上运行。并且文档明确表示它在驱动程序节点上运行:spark.apache.org/docs/latest/… -
你说得对,我的措辞不正确。我的意思是说,传递给
rdd的委托(即您想要对数据帧执行的任何操作)将在工作节点上运行。 -
@lizarisk :您不能在工作节点上运行的转换或操作操作中使用任何未序列化的类。
-
@Shankar 我在这里看不到任何在工作节点上运行的东西
标签: apache-spark apache-spark-sql spark-streaming apache-spark-ml