【问题标题】:How to resolve Apache Spark StackOverflowError after multiple unions多个联合后如何解决 Apache Spark StackOverflowError
【发布时间】:2019-11-30 06:33:05
【问题描述】:

我有一个 Spark Scala 程序,它使用 REST API 批量获取数据,一旦检索到所有数据,我就会对它们进行操作。

当前计划:

  • 对于每个批次,创建 RDD 并将其与之前的 RDD 合并 使用之前的 API 调用 rdd.union(currentRdd) 创建。

  • 在最终的 RDD 上运行

重现问题的简单程序:

    def main(args: Array[String]) = {
     val conf = new SparkConf().setAppName("Union test").setMaster("local[1]")
     val sc = new SparkContext(conf)
     val limit = 1000;
     var rdd = sc.emptyRDD[Int]
     for (x <- 1 to limit) {
       val currentRdd = sc.parallelize(x to x + 3)
       rdd = rdd.union(currentRdd)
     }
     println(rdd.sum())
   }

问题: - 当批次数很高时,程序会抛出 StackOverflowError : Exception in thread "main" java.lang.StackOverflowError at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply

我假设,当批次数量增加时,RDD 依赖图变得非常复杂并引发错误。

解决此问题的最佳方法是什么?

【问题讨论】:

    标签: scala apache-spark rdd


    【解决方案1】:

    已经有SparkContext.union 知道如何正确计算多个RDDs 中的union

    val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
    val rdd = sc.union(rdds)
    

    或者,您可以尝试使用this 辅助函数来避免创建unions 的长链:

    val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
    val rdd = balancedReduce(rdds)(_ union _)
    

    它应该起作用的原因与链接答案中的基本相同:unions 的O(n) 链破坏了堆栈,O(log(n))-high 二叉树 unions 没有。

    【讨论】:

      猜你喜欢
      • 2013-11-15
      • 2015-12-21
      • 1970-01-01
      • 1970-01-01
      • 2019-12-05
      • 1970-01-01
      • 2019-10-10
      • 1970-01-01
      • 2018-10-08
      相关资源
      最近更新 更多