【问题标题】:What is the performance difference between accumulator and collect() in Spark?Spark 中的 accumulator 和 collect() 之间的性能差异是什么?
【发布时间】:2019-09-05 06:14:55
【问题描述】:

Accumulator 基本上是 spark 中的共享变量,由执行程序更新,但只能由驱动程序读取。 spark中的Collect()是将所有数据从executors中获取到驱动中。

所以,当我最终只在驱动程序中获取数据时。那么,当我们使用累加器或collect() 将大 RDD 转换为 LIST 时,性能有何不同?

使用累加器将数据帧转换为列表的代码

val queryOutput = spark.sql(query)
val acc = spark.sparkContext.collectionAccumulator[Map[String,Any]]("JsonCollector")
val jsonString = queryOutput.foreach(a=>acc.add(convertRowToJSON(a)))
acc.value.asScala.toList


def convertRowToJSON(row: Row): Map[String,Any] = {
    val m = row.getValuesMap(row.schema.fieldNames)
    println(m)
    JSONObject(m).obj
  }

使用 collect() 将数据框转换为列表的代码

val queryOutput = spark.sql(query)
queryOutput.toJSON.collectAsList()

【问题讨论】:

  • 你可以添加你的累加器代码
  • 我已经用代码更新了原始问题。

标签: apache-spark


【解决方案1】:

将大RDD转换为LIST

这不是一个好主意。 collect 会将数据从所有执行程序移动到驱动程序内存。如果内存不足,则会抛出 Out Of Memory (OOM) 异常。如果您的数据适合单机内存,那么您可能不需要 spark。

Spark 原生支持数字类型的累加器,程序员可以添加对新类型的支持。它们可用于实现计数器(如在 MapReduce 中)或求和。累加器的 OUT 参数应该是可以原子读取(例如 Int、Long)或线程安全(例如同步集合)的类型,因为它将从其他线程读取。

CollectionAccumulator .value 返回 List(ArrayList 实现),如果 size 大于驱动内存,它会抛出 OOM。

【讨论】:

  • 我们可以使用累加器将数据从大型 RDD 转换为列表,就像我已经做过的那样。唯一我不明白 collect() 如何产生 OOM 问题但 accumulator 没有。
  • @KaranManchanda:你能把这个添加到问题中吗? IE。您与有效累加器一起使用的代码。这将使分析差异变得更容易。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-06-05
  • 2015-02-12
  • 1970-01-01
相关资源
最近更新 更多