【发布时间】:2019-09-05 06:14:55
【问题描述】:
Accumulator 基本上是 spark 中的共享变量,由执行程序更新,但只能由驱动程序读取。
spark中的Collect()是将所有数据从executors中获取到驱动中。
所以,当我最终只在驱动程序中获取数据时。那么,当我们使用累加器或collect() 将大 RDD 转换为 LIST 时,性能有何不同?
使用累加器将数据帧转换为列表的代码
val queryOutput = spark.sql(query)
val acc = spark.sparkContext.collectionAccumulator[Map[String,Any]]("JsonCollector")
val jsonString = queryOutput.foreach(a=>acc.add(convertRowToJSON(a)))
acc.value.asScala.toList
def convertRowToJSON(row: Row): Map[String,Any] = {
val m = row.getValuesMap(row.schema.fieldNames)
println(m)
JSONObject(m).obj
}
使用 collect() 将数据框转换为列表的代码
val queryOutput = spark.sql(query)
queryOutput.toJSON.collectAsList()
【问题讨论】:
-
你可以添加你的累加器代码
-
我已经用代码更新了原始问题。
标签: apache-spark