【问题标题】:Spark flatMap/reduce: How to scale and avoid OutOfMemory?Spark flatMap/reduce:如何扩展和避免 OutOfMemory?
【发布时间】:2016-11-13 16:58:52
【问题描述】:

我正在将一些 map-reduce 代码迁移到 Spark 中,并且在构造 Iterable 以在函数中返回时遇到问题。 在 MR 代码中,我有一个按键分组的 reduce 函数,然后(使用 multipleOutputs)将迭代这些值并使用 write(在多个输出中,但这并不重要)这样的代码(简化):

reduce(Key key, Iterable<Text> values) {
    // ... some code
    for (Text xml: values) {
        multipleOutputs.write(key, val, directory);
    }
}

但是,在 Spark 中,我已将 map 和 reduce 转换为以下序列: mapToPair -> groupByKey -> flatMap 正如推荐的......在某些书中。

mapToPair 基本上是通过 functionMap 添加一个 Key,它基于记录上的一些值为该记录创建一个 Key。有时键可能具有非常高的基数。

JavaPairRDD<Key, String> rddPaired = inputRDD.mapToPair(new PairFunction<String, Key, String>() { 
    public Tuple2<Key, String> call(String value) {
        //... 
        return functionMap.call(value);
    }
});

rddPaired 被应用了一个 RDD.groupByKey() 来让 RDD 为 flatMap 函数提供数据:

JavaPairRDD<Key, Iterable<String>> rddGrouped = rddPaired.groupByKey();

一旦分组,一个 flatMap 调用来执行 reduce。这里,操作是一个转换:

public Iterable<String> call (Tuple2<Key, Iterable<String>> keyValue) {
    // some code...
    List<String> out = new ArrayList<String>();
    if (someConditionOnKey) { 
        // do a logic
        Grouper grouper = new Grouper();
        for (String xml : keyValue._2()) {
            // group in a separate class
            grouper.add(xml);
        }
        // operation is now performed on the whole group
        out.add(operation(grouper));
    } else {
        for (String xml : keyValue._2()) {
            out.add(operation(xml));
        }
        return out;
    }
}

它可以正常工作...使用没有太多记录的键。实际上,当具有大量值的键在 reduce 上输入“else”时,它会被 OutOfMemory 中断。

注意:我已经包含了“if”部分来解释我要产生的逻辑,但是进入“else”时会发生故障......因为当数据进入“else”时,通常意味着会有由于数据的性质,还有更多的价值。

很明显,必须将所有分组值保留在“out”列表中,如果键有数百万条记录,它将无法扩展,因为它会将它们保留在内存中。我已经到了 OOM 发生的地步(是的,它是在执行上面要求内存的“操作”时 - 并且没有给出。虽然这不是一个非常昂贵的内存操作)。

有什么方法可以避免这种情况以扩大规模?要么通过使用其他指令复制行为以更具可扩展性的方式达到相同的输出,要么能够将值交给 Spark 进行合并(就像我过去对 MR 所做的那样)......

【问题讨论】:

  • 你应该发布完整的火花代码。 GroupByKey 也可能是内存瓶颈。
  • 谢谢,更新了描述。但是,我可以看到问题出现在 flatMap 代码中,因为:a)如果工作流刚刚到达 groupByKey 并且没有 reduce ,则不会出现错误 - 尽管我认为它们可以组合。 b) 在返回值之前必须将它们保存在内存中,显然它的可扩展性不高。无论如何 groupByKey 可能会影响,我似乎很清楚我提到的也是一个问题。

标签: java hadoop apache-spark mapreduce flatmap


【解决方案1】:

flatMap 操作中做条件是低效的。您应该检查外部条件以创建 2 个不同的 RDD 并分别处理它们。

rddPaired.cache();

// groupFilterFunc will filter which items need grouping
JavaPairRDD<Key, Iterable<String>> rddGrouped = rddPaired.filter(groupFilterFunc).groupByKey();
// processGroupedValuesFunction should call `operation` on group of all values with the same key and return the result
rddGrouped.mapValues(processGroupedValuesFunction);

// nogroupFilterFunc will filter which items don't need grouping
JavaPairRDD<Key, Iterable<String>> rddNoGrouped = rddPaired.filter(nogroupFilterFunc);
// processNoGroupedValuesFunction2 should call `operation` on a single value and return the result
rddNoGrouped.mapValues(processNoGroupedValuesFunction2);

【讨论】:

  • 是的,我想这是一种可能性,而且你很紧张。可能原来的 MR 代码毕竟是错误的部分!但是,即使我这样做了,那么分组部分将依赖于一个没有太多值的键。那么问题就来了:有没有办法以一种内存高效的方式执行该操作(即不必将所有值都保存在内存中)?
  • 这真的取决于你想要做什么。如果你的原始MR代码可以使用combiner进行部分聚合,那么你可以尝试在Spark中使用reduceByKeyaggregateByKey来获得类似的效果。正确使用时,部分聚合会大大减少内存使用量。
猜你喜欢
  • 2020-06-24
  • 2016-12-31
  • 1970-01-01
  • 1970-01-01
  • 2023-03-07
  • 2015-03-11
  • 2012-07-08
  • 2023-01-13
相关资源
最近更新 更多