【问题标题】:Apache Spark java heap space error during matrix multiplication矩阵乘法期间的Apache Spark java堆空间错误
【发布时间】:2017-06-29 23:53:17
【问题描述】:

我正在使用 Spark 2.0.1 和两个工作人员(每个工作人员一个执行程序),每个工作人员 20Gb。并运行以下代码:

JavaRDD<MatrixEntry> entries = ...; // filing the data
CoordinateMatrix cmatrix = new CoordinateMatrix(entries.rdd());
BlockMatrix matrix = cmatrix.toBlockMatrix(100, 1000);
BlockMatrix cooc = matrix.transpose().multiply(matrix);

我的矩阵包含 10 000 000 个非空单元格(每个单元格等于 1.0),并且大约有。 3000 列。没有那么大的数据。但是在乘法过程中,我总是得到:

17/01/24 08:03:10 WARN TaskMemoryManager: leak 1322.6 MB memory from org.apache.spark.util.collection.ExternalAppendOnlyMap@649e7019
17/01/24 08:03:10 ERROR Executor: Exception in task 1.0 in stage 57.0 (TID 83664)
java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.mllib.linalg.DenseMatrix$.zeros(Matrices.scala:453)
        at org.apache.spark.mllib.linalg.Matrix$class.multiply(Matrices.scala:101)
        at org.apache.spark.mllib.linalg.SparseMatrix.multiply(Matrices.scala:565)
        at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9$$anonfun$apply$11.apply(BlockMatrix.scala:483)
        at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9$$anonfun$apply$11.apply(BlockMatrix.scala:480)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:285)
        at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9.apply(BlockMatrix.scala:480)
        at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9.apply(BlockMatrix.scala:479)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:115)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:30)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at org.apache.spark.util.collection.CompactBuffer.flatMap(CompactBuffer.scala:30)
        at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23.apply(BlockMatrix.scala:479)
        at org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23.apply(BlockMatrix.scala:478)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
        at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

现在我什至尝试每个执行程序只使用一个核心。 可能是什么问题?以及如何调试它并找到根本原因?谢谢。

更新:失败阶段的详细信息:

org.apache.spark.rdd.RDD.flatMap(RDD.scala:374)
org.apache.spark.mllib.linalg.distributed.BlockMatrix.multiply(BlockMatrix.scala:478)
MyClass.generate(SimilarityGenerator.java:57)
MyClass.main(GenerateSimilarity.java:54)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:497)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

【问题讨论】:

  • 这是 300 亿双倍,或仅用于矩阵的约 240 GB RAM。然后是向量和结果所需的内存。 OOME 似乎很清楚。你有多少物理内存?有多少分配给 JVM?解决这个问题的最好方法是使用像 VisualVM 这样的分析器来查看各代人在做什么。您使用的是 JDK 8 吗?它不再有烫发。
  • 每个worker挂载2Tb硬盘,executor-memory为20Gb。是的,我使用的是 JDK 8。如果 spark 在仅限控制台的集群上运行,我可以使用 VisualVM 吗?另外,Spark 是否应该将一些分区交换到 HDD 并仅将 RAM 用于少数分区?
  • 32Gb 中的 20Gb 可用,但其余部分应该由 Cassandra 使用。
  • 硬盘与JVM无关。让我们再次计算一下:300 亿个双精度值,每个双精度值 8 个字节,意味着大约 240 GB 仅用于保存您的矩阵。如果该数学是正确的,则 240 > 20。因此 OOME。我错过了什么?
  • 我正在使用 Apache Spark,希望它足够聪明,不会将所有内容都保存在内存中。矩阵应该被分成块(默认为 1024x1024),并且在任何给定的时间只有其中一些应该在内存中。

标签: java apache-spark


【解决方案1】:

似乎稀疏矩阵乘法并没有以我认为的方式实现。即使在几乎所有单元格中都为零,Spark 也会自然地将块矩阵相乘。我们实现了自己的乘法。这是 Scala 代码(也是从某个地方复制的):

def multiply(left: CoordinateMatrix, right: CoordinateMatrix): CoordinateMatrix = {
  val leftEntries = left.entries.map({ case MatrixEntry(i, j, v) => (j, (i, v)) })
  val rightEntries = right.entries.map({ case MatrixEntry(j, k, w) => (j, (k, w)) })

  val productEntries = leftEntries
    .join(rightEntries)
    .map({ case (_, ((i, v), (k, w))) => ((i, k), (v * w)) })
    .reduceByKey(_ + _)
    .map({ case ((i, k), sum) => MatrixEntry(i, k, sum) })

  new CoordinateMatrix(productEntries)
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-06
    • 2016-04-18
    • 1970-01-01
    • 2017-07-18
    • 1970-01-01
    相关资源
    最近更新 更多