【发布时间】:2016-11-13 16:58:52
【问题描述】:
我正在将一些 map-reduce 代码迁移到 Spark 中,并且在构造 Iterable 以在函数中返回时遇到问题。 在 MR 代码中,我有一个按键分组的 reduce 函数,然后(使用 multipleOutputs)将迭代这些值并使用 write(在多个输出中,但这并不重要)这样的代码(简化):
reduce(Key key, Iterable<Text> values) {
// ... some code
for (Text xml: values) {
multipleOutputs.write(key, val, directory);
}
}
但是,在 Spark 中,我已将 map 和 reduce 转换为以下序列: mapToPair -> groupByKey -> flatMap 正如推荐的......在某些书中。
mapToPair 基本上是通过 functionMap 添加一个 Key,它基于记录上的一些值为该记录创建一个 Key。有时键可能具有非常高的基数。
JavaPairRDD<Key, String> rddPaired = inputRDD.mapToPair(new PairFunction<String, Key, String>() {
public Tuple2<Key, String> call(String value) {
//...
return functionMap.call(value);
}
});
rddPaired 被应用了一个 RDD.groupByKey() 来让 RDD 为 flatMap 函数提供数据:
JavaPairRDD<Key, Iterable<String>> rddGrouped = rddPaired.groupByKey();
一旦分组,一个 flatMap 调用来执行 reduce。这里,操作是一个转换:
public Iterable<String> call (Tuple2<Key, Iterable<String>> keyValue) {
// some code...
List<String> out = new ArrayList<String>();
if (someConditionOnKey) {
// do a logic
Grouper grouper = new Grouper();
for (String xml : keyValue._2()) {
// group in a separate class
grouper.add(xml);
}
// operation is now performed on the whole group
out.add(operation(grouper));
} else {
for (String xml : keyValue._2()) {
out.add(operation(xml));
}
return out;
}
}
它可以正常工作...使用没有太多记录的键。实际上,当具有大量值的键在 reduce 上输入“else”时,它会被 OutOfMemory 中断。
注意:我已经包含了“if”部分来解释我要产生的逻辑,但是进入“else”时会发生故障......因为当数据进入“else”时,通常意味着会有由于数据的性质,还有更多的价值。
很明显,必须将所有分组值保留在“out”列表中,如果键有数百万条记录,它将无法扩展,因为它会将它们保留在内存中。我已经到了 OOM 发生的地步(是的,它是在执行上面要求内存的“操作”时 - 并且没有给出。虽然这不是一个非常昂贵的内存操作)。
有什么方法可以避免这种情况以扩大规模?要么通过使用其他指令复制行为以更具可扩展性的方式达到相同的输出,要么能够将值交给 Spark 进行合并(就像我过去对 MR 所做的那样)......
【问题讨论】:
-
你应该发布完整的火花代码。
GroupByKey也可能是内存瓶颈。 -
谢谢,更新了描述。但是,我可以看到问题出现在 flatMap 代码中,因为:a)如果工作流刚刚到达 groupByKey 并且没有 reduce ,则不会出现错误 - 尽管我认为它们可以组合。 b) 在返回值之前必须将它们保存在内存中,显然它的可扩展性不高。无论如何 groupByKey 可能会影响,我似乎很清楚我提到的也是一个问题。
标签: java hadoop apache-spark mapreduce flatmap