【问题标题】:GC overhead limit exceeded with large RDD[MatrixEntry] in Apache SparkApache Spark 中的大型 RDD [MatrixEntry] 超出了 GC 开销限制
【发布时间】:2015-05-15 03:51:05
【问题描述】:

我有一个 csv 文件存储了维度为 6,365x214 的用户项的数据,我正在使用 org.apache.spark.mllib.linalg.distributed.CoordinateMatrixcolumnSimilarities() 来查找用户与用户的相似性。

我的代码如下所示:

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.distributed.{RowMatrix, 
MatrixEntry, CoordinateMatrix}
import org.apache.spark.rdd.RDD

def rddToCoordinateMatrix(input_rdd: RDD[String]) : CoordinateMatrix = {

    // Convert RDD[String] to RDD[Tuple3]
    val coo_matrix_input: RDD[Tuple3[Long,Long,Double]] = input_rdd.map(
        line => line.split(',').toList
    ).map{
            e => (e(0).toLong, e(1).toLong, e(2).toDouble)
    }

    // Convert RDD[Tuple3] to RDD[MatrixEntry]
    val coo_matrix_matrixEntry: RDD[MatrixEntry] = coo_matrix_input.map(e => MatrixEntry(e._1, e._2, e._3))

    // Convert RDD[MatrixEntry] to CoordinateMatrix
    val coo_matrix: CoordinateMatrix  = new CoordinateMatrix(coo_matrix_matrixEntry)

    return coo_matrix
}

// Read CSV File to RDD[String]
val input_rdd: RDD[String] = sc.textFile("user_item.csv")

// Read RDD[String] to CoordinateMatrix
val coo_matrix = rddToCoordinateMatrix(input_rdd)

// Transpose CoordinateMatrix
val coo_matrix_trans = coo_matrix.transpose()

// Convert CoordinateMatrix to RowMatrix
val mat: RowMatrix = coo_matrix_trans.toRowMatrix()

// Compute similar columns perfectly, with brute force
// Return CoordinateMatrix
val simsPerfect: CoordinateMatrix = mat.columnSimilarities()

// CoordinateMatrix to RDD[MatrixEntry]
val simsPerfect_entries = simsPerfect.entries

simsPerfect_entries.count()

// Write results to file
val results_rdd = simsPerfect_entries.map(line => line.i+","+line.j+","+line.value)

results_rdd.saveAsTextFile("similarity-output")

// Close the REPL terminal
System.exit(0)

并且,当我在 spark-shell 上运行此脚本时 在运行代码行 simsPerfect_entries.count() 后出现以下错误:

java.lang.OutOfMemoryError: GC overhead limit exceeded

更新:

我尝试了很多其他人已经给出的解决方案,但我没有成功。

1 通过增加每个执行程序进程使用的内存量spark.executor.memory=1g

2 通过减少用于驱动程序进程的内核数量 spark.driver.cores=1

建议我解决此问题的方法。

【问题讨论】:

  • "我尝试了很多其他人已经给出的解决方案,但我没有成功。"你应该列出哪些,这样我们就可以避免多余的答案。
  • Akshay 好像我也面临同样的问题这是问题http://stackoverflow.com/q/37958522/1662775。我尝试增加驱动程序内存,但没有成功。

标签: scala apache-spark garbage-collection


【解决方案1】:

在您真正实现之前,所有 Spark 转换都是惰性的。当您定义 RDD-to-RDD 数据操作时,Spark 只是将操作链接在一起,而不执行实际计算。因此,当您调用simsPerfect_entries.count() 时,会执行一系列操作并获得您的号码。

错误 GC overhead limit exceeded 表示 JVM 垃圾收集器活动如此之高,以至于您的代码的执行已停止。由于以下原因,GC 活动可能如此之高:

  • 您生产了太多的小物件并立即丢弃它们。看起来你不是。
  • 您的数据不适合您的 JVM 堆。就像您尝试将 2GB 文本文件加载到 RAM 中,但只有 1GB 的 JVM 堆。看起来是你的情况。

要解决此问题,请尝试增加 JVM 堆的数量:

  • 您的工作程序节点(如果您有分布式 Spark 设置)。
  • 您的 spark-shell 应用程序。

【讨论】:

  • 感谢您的回答。我通过在 spark-shell 中附加一个用于增加 driver-memory 的额外标志来解决我的问题,默认情况下它是 1g。所以。我把它增加到 4g$ spark-shell --driver-memory 4g
  • 如何解决你的原因 1. 你生产了太多的小物件并立即丢弃。这种情况下的解决方案是什么?
  • 当您考虑分配+丢弃的小对象太多时,最好首先测量它(因为它的机会非常低)。您可以使用示例和所有线索创建一个单独的问题来描述您的案例。
猜你喜欢
  • 2022-01-01
  • 2021-02-04
  • 2018-03-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-05-21
  • 2015-10-30
  • 1970-01-01
相关资源
最近更新 更多