【发布时间】:2015-02-12 06:01:59
【问题描述】:
我正在尝试实现之前在 Spark 中运行良好的 Hadoop Map/Reduce 作业。 Spark 应用定义如下:
val data = spark.textFile(file, 2).cache()
val result = data
.map(//some pre-processing)
.map(docWeightPar => (docWeightPar(0),docWeightPar(1))))
.flatMap(line => MyFunctions.combine(line))
.reduceByKey( _ + _)
MyFunctions.combine 在哪里
def combine(tuples: Array[(String, String)]): IndexedSeq[(String,Double)] =
for (i <- 0 to tuples.length - 2;
j <- 1 to tuples.length - 1
) yield (toKey(tuples(i)._1,tuples(j)._1),tuples(i)._2.toDouble * tuples(j)._2.toDouble)
如果用于输入的列表很大,那么combine 函数会生成大量映射键,并且这是引发异常的地方。
在 Hadoop Map Reduce 设置中,我没有遇到问题,因为这是 combine 函数产生的点,也是 Hadoop 将映射对写入磁盘的点。 Spark 似乎将所有内容都保留在内存中,直到它以 java.lang.OutOfMemoryError: GC overhead limit exceeded 爆炸。
我可能在做一些非常基本的错误,但我找不到任何关于如何解决这个问题的指示,我想知道如何避免这种情况。由于我是 Scala 和 Spark 的菜鸟,我不确定问题是来自一个还是另一个,或者两者兼而有之。我目前正在尝试在我自己的笔记本电脑上运行这个程序,它适用于tuples 数组的长度不是很长的输入。提前致谢。
【问题讨论】:
-
“数据”已经是 RDD 了吗?
-
我刚刚编辑了代码以显示我如何加载数据。
-
好的,我继续回答,因为已经假设它实际上是(如您所示)一个 RDD
-
很高兴看到您的帖子末尾包含最终解决方案,谢谢!
-
最终的解决方案是我用接受的答案标记的那个:)
标签: scala apache-spark