【发布时间】:2015-11-27 21:30:58
【问题描述】:
(部分问题是 docs that say "undocumented" on parallelize 让我阅读一些并不总是适用的示例)
我正在尝试通过执行我们拥有的 Java 类的 N 个操作来创建 RDD 长度 N = 10^6,如果需要,我可以让该类实现 Serializable 或任何 Function。我没有预先固定长度的数据集,我正在尝试创建一个。试图弄清楚是创建一个长度为 N 的虚拟数组来并行化,还是传递一个运行 N 次的函数。
不确定哪种方法有效/更好,如果我开始使用定义明确的数据集(如文档中的单词),我在 Spark 中看到,这些单词的长度/计数已经定义,我只是并行化一些地图或过滤器对该数据进行一些操作。
在我的情况下,我认为它是不同的,尝试并行创建一个包含 10^6 个元素的 RDD...
描述:
在使用 Spark 1.5.1 的 Java 8 中,我们有一个 Java 方法 doDrop(),它接受一个 PipeLinkageData 并返回一个 DropResult。
我想我可以使用 map() 或 flatMap() 来调用一对多的函数,我试图做这样的事情in another question that never quite worked:
JavaRDD<DropResult> simCountRDD = spark.parallelize(makeRange(1,getSimCount())).map(new Function<Integer, DropResult>()
{
public DropResult call(Integer i) {
return pld.doDrop();
}
});
这样想更正确?
// pld is of type PipeLinkageData, it's already initialized
// parallelize wants a collection passed into first param
List<PipeLinkageData> pldListofOne = new ArrayList();
// make an ArrayList of one
pldListofOne.add(pld);
int howMany = 1000000;
JavaRDD<DropResult> nSizedRDD = spark.parallelize(pldListofOne).flatMap(new FlatMapFunction<PipeLinkageData, DropResult>()
{
public Iterable<DropResult> call(PipeLinkageData pld) {
List<DropResult> returnList = new ArrayList();
// is Spark good at spreading a for loop like this?
for ( int i = 0; i < howMany ; i++ ){
returnList.add(pld.doDrop());
}
// EDIT changed from returnRDD to returnList
return returnList;
}
});
另一个问题:JavaRDD 在这里正确吗?我可以看到需要调用 FlatMapFunction 但我不需要 FlatMappedRDD?而且由于我从不尝试将一组数组或列表展平为单个数组或列表,我真的需要展平任何东西吗?
【问题讨论】:
-
能解释一下这里的逻辑到底是什么吗?我看起来不像
pld.doDrop()依赖于任何东西......是否有任何副作用?您的第二次尝试将返回一个空 RDD,因为flatMap没有任何内容。第一个看起来好一点,10^6 小到可以分发。不过,它可以在集群上处理。在旁注中,命名列表*RDD有点令人困惑:) -
非常感谢! pld 是一系列 20,000 到 80,000 个遗传标记,每次调用 doDrop 都会模拟另一个基因型进行统计比较。 (我认为基因型是正确的词,我不是遗传学家,只是这里的一个低级勤工俭学的学生)
-
我认为第二个示例,给定一个单元素 ArrayList,将平面映射为简单地使函数调用运行 N 次,给我们一个 n 长度的 RDD?那么第一种方法是正确的方法吗?
-
link Per this question about the first approach,我永远无法让它发挥作用,因此这里有关于第二种方法的问题:)
-
我错过了
pldListofOne.add(pld),但它仍然不是要走的路,因为它是完全顺序的,并将所有数据放在一个分区上。当您说您无法使第一个版本正常工作时,您能更准确吗? MCVE 可能很有用...
标签: java lambda parallel-processing apache-spark