【问题标题】:Spark RDD, how to generate JavaRDD of length N?Spark RDD,如何生成长度为N的JavaRDD?
【发布时间】:2015-11-27 21:30:58
【问题描述】:

(部分问题是 docs that say "undocumented" on parallelize 让我阅读一些并不总是适用的示例)

我正在尝试通过执行我们拥有的 Java 类的 N 个操作来创建 RDD 长度 N = 10^6,如果需要,我可以让该类实现 Serializable 或任何 Function。我没有预先固定长度的数据集,我正在尝试创建一个。试图弄清楚是创建一个长度为 N 的虚拟数组来并行化,还是传递一个运行 N 次的函数。

不确定哪种方法有效/更好,如果我开始使用定义明确的数据集(如文档中的单词),我在 Spark 中看到,这些单词的长度/计数已经定义,我只是并行化一些地图或过滤器对该数据进行一些操作。

在我的情况下,我认为它是不同的,尝试并行创建一个包含 10^6 个元素的 RDD...

描述:

在使用 Spark 1.5.1 的 Java 8 中,我们有一个 Java 方法 doDrop(),它接受一个 PipeLinkageData 并返回一个 DropResult。

我想我可以使用 map() 或 flatMap() 来调用一对多的函数,我试图做这样的事情in another question that never quite worked:

JavaRDD<DropResult> simCountRDD = spark.parallelize(makeRange(1,getSimCount())).map(new Function<Integer, DropResult>()
    {
      public DropResult call(Integer i) { 
         return pld.doDrop(); 
      }
    });

这样想更正确?

    // pld is of type PipeLinkageData, it's already initialized

    // parallelize wants a collection passed into first param
    List<PipeLinkageData> pldListofOne = new ArrayList();

    // make an ArrayList of one
    pldListofOne.add(pld);

    int howMany = 1000000;

    JavaRDD<DropResult> nSizedRDD = spark.parallelize(pldListofOne).flatMap(new FlatMapFunction<PipeLinkageData, DropResult>() 
        { 
            public Iterable<DropResult> call(PipeLinkageData pld) {

                List<DropResult> returnList = new ArrayList();

                // is Spark good at spreading a for loop like this?
                for ( int i = 0; i < howMany ; i++ ){
                    returnList.add(pld.doDrop());  
                }

                // EDIT changed from returnRDD to returnList
                return returnList;
            }

            }); 

另一个问题:JavaRDD 在这里正确吗?我可以看到需要调用 FlatMapFunction 但我不需要 FlatMappedRDD?而且由于我从不尝试将一组数组或列表展平为单个数组或列表,我真的需要展平任何东西吗?

【问题讨论】:

  • 能解释一下这里的逻辑到底是什么吗?我看起来不像 pld.doDrop() 依赖于任何东西......是否有任何副作用?您的第二次尝试将返回一个空 RDD,因为 flatMap 没有任何内容。第一个看起来好一点,10^6 小到可以分发。不过,它可以在集群上处理。在旁注中,命名列表*RDD 有点令人困惑:)
  • 非常感谢! pld 是一系列 20,000 到 80,000 个遗传标记,每次调用 doDrop 都会模拟另一个基因型进行统计比较。 (我认为基因型是正确的词,我不是遗传学家,只是这里的一个低级勤工俭学的学生)
  • 我认为第二个示例,给定一个单元素 ArrayList,将平面映射为简单地使函数调用运行 N 次,给我们一个 n 长度的 RDD?那么第一种方法是正确的方法吗?
  • link Per this question about the first approach,我永远无法让它发挥作用,因此这里有关于第二种方法的问题:)
  • 我错过了pldListofOne.add(pld),但它仍然不是要走的路,因为它是完全顺序的,并将所有数据放在一个分区上。当您说您无法使第一个版本正常工作时,您能更准确吗? MCVE 可能很有用...

标签: java lambda parallel-processing apache-spark


【解决方案1】:
  1. 第一种方法应该与DropResult 一样工作,并且可以序列化PipeLinkageData,并且其内部逻辑没有问题(例如取决于共享状态)。

  2. 当前形式的第二种方法没有意义。将在单个分区上处理单个记录。这意味着整个过程将是完全顺序的,如果数据不适合单个工作内存,则可能会崩溃。增加元素数量应该可以解决问题,但与第一种方法相比并没有改善

  3. 最后你可以初始化一个空的RDD,然后使用mapPartititionsFlatMapFunction替换为几乎相同的MapPartitionsFunction,并为每个分区生成所需数量的对象。

【讨论】:

  • 太棒了,谢谢,您对这个项目有很大的影响!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-01-08
  • 2015-12-04
  • 1970-01-01
  • 2015-03-05
  • 1970-01-01
  • 2017-09-29
  • 1970-01-01
相关资源
最近更新 更多