【问题标题】:In Apache Spark, can I easily repeat/nest a SparkContext.parallelize?在 Apache Spark 中,我可以轻松地重复/嵌套 SparkContext.parallelize 吗?
【发布时间】:2015-12-15 01:52:44
【问题描述】:

我正在尝试对我们试图解决的遗传学问题进行建模,并逐步建立它。我可以从 Spark 示例中成功运行 PiAverage 示例。该示例在一个圆圈(在我们的例子中为 10^6)“投掷飞镖”并计算“落在圆圈中”的数字以估计 PI

假设我想重复该过程 1000 次(并行)并平均所有这些估计值。我试图看到最好的方法,似乎会有两个调用来并行化?嵌套调用?有没有办法将 map 或 reduce 调用链接在一起?我看不到。

我想知道以下想法的智慧。我想过使用累加器跟踪结果估计。 jsc 是我的 SparkContext,单次运行的完整代码在问题的末尾,感谢您的任何输入!

Accumulator<Double> accum = jsc.accumulator(0.0);

// make a list 1000 long to pass to parallelize (no for loops in Spark, right?)
List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES);

// pass this "dummy list" to parallelize, which then 
// calls a pieceOfPI method to produce each individual estimate  
// accumulating the estimates. PieceOfPI would contain a 
// parallelize call too with the individual test in the code at the end
jsc.parallelize(numberOfEstimates).foreach(accum.add(pieceOfPI(jsc, numList, slices, HOW_MANY_ESTIMATES)));

// get the value of the total of PI estimates and print their average
double totalPi = accum.value();

// output the average of averages
System.out.println("The average of " + HOW_MANY_ESTIMATES + " estimates of Pi is " + totalPi / HOW_MANY_ESTIMATES);

我在 SO 上看到的似乎不是一个矩阵或其他答案,所以给出了这个特定问题的答案,我已经进行了几次搜索,但我没有看到如何在没有“并行化并行化”的情况下做到这一点。这是个坏主意吗?

(是的,我意识到在数学上我可以做更多的估计并有效地得到相同的结果 :) 尝试构建我老板想要的结构,再次感谢!

如果有帮助,我已将我的整个单一测试程序放在这里,没有我正在测试的累加器。其核心将变为 PieceOfPI():

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.Accumulable;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;

public class PiAverage implements Serializable {

public static void main(String[] args) {

    PiAverage pa = new PiAverage();
    pa.go();

}

public void go() {

    // should make a parameter like all these finals should be
    // int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
    final int SLICES = 16;

    // how many "darts" are thrown at the circle to get one single Pi estimate
    final int HOW_MANY_DARTS = 1000000;

    // how many "dartboards" to collect to average the Pi estimate, which we hope converges on the real Pi
    final int HOW_MANY_ESTIMATES = 1000;

    SparkConf sparkConf = new SparkConf().setAppName("PiAverage")
        .setMaster("local[4]");

    JavaSparkContext jsc = new JavaSparkContext(sparkConf);

    // setup "dummy" ArrayList of size HOW_MANY_DARTS -- how many darts to throw
    List<Integer> throwsList = new ArrayList<Integer>(HOW_MANY_DARTS);
    for (int i = 0; i < HOW_MANY_DARTS; i++) {
        throwsList.add(i);
    }

    // setup "dummy" ArrayList of size HOW_MANY_ESTIMATES
    List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES);
    for (int i = 0; i < HOW_MANY_ESTIMATES; i++) {
        numberOfEstimates.add(i);
    }

    JavaRDD<Integer> dataSet = jsc.parallelize(throwsList, SLICES);

    long totalPi = dataSet.filter(new Function<Integer, Boolean>() {
        public Boolean call(Integer i) {
            double x = Math.random();
            double y = Math.random();
            if (x * x + y * y < 1) {
                return true;
            } else
                return false;
        }
    }).count();

    System.out.println(
            "The average of " + HOW_MANY_DARTS + " estimates of Pi is " + 4 * totalPi / (double)HOW_MANY_DARTS);

    jsc.stop();
    jsc.close();
}
}

【问题讨论】:

  • 只是关于这个问题的一点背景知识,我的老板查看了创建 RDD 的结构,然后分配给它的 map 函数的输出。他想知道为什么需要“额外的 RDD”,因为 map 会生成额外的 RDD。这可能是一个单独的问题,但激发了我关于是否可以链接一系列地图的问题,但迭代不同的次数,例如 for i do {for j do} 循环

标签: java scala parallel-processing apache-spark nested-loops


【解决方案1】:

让我从你的“背景问题”开始。 mapjoingroupBy等转换操作分为两类;那些需要将数据洗牌作为来自所有分区的输入的那些,以及那些不需要的。像groupByjoin 这样的操作需要一个shuffle,因为你需要将来自所有RDD 分区的所有记录与相同的键组合在一起(想想SQL JOINGROUP BY 操作是如何工作的)。另一方面,mapflatMapfilter 等不需要洗牌,因为该操作在上一步的分区的输入上运行良好。它们一次处理单个记录,而不是具有匹配键的一组记录。因此,不需要改组。

这个背景对于理解“额外地图”没有显着开销是必要的。诸如mapflatMap 等一系列操作被“压缩”在一起形成一个“阶段”(当您在 Spark Web 控制台中查看作业的详细信息时会显示该阶段),因此只有一个 RDD 被物化,舞台结束时的那个。

关于你的第一个问题。我不会为此使用蓄电池。它们适用于“边带”数据,例如计算您解析的错误行数。在此示例中,您可以使用累加器来计算有多少 (x,y) 对在 1 的半径内与在半径外,例如。

Spark 发行版中的JavaPiSpark 示例几乎与它所能得到的一样好。你应该研究它为什么有效。这是大数据系统的正确数据流模型。您可以使用“聚合器”。在Javadocs 中,单击“索引”并查看aggaggregateaggregateByKey 函数。但是,它们在这里不再可以理解,也没有必要。它们提供了比mapreduce 更大的灵活性,因此值得了解

您的代码的问题在于,您实际上是在试图告诉 Spark 该做什么,而不是表达您的意图并让 Spark 为您优化它的方式。

最后,我建议你购买并学习 O'Reilly 的“Learning Spark”。它很好地解释了内部细节,例如登台,它还显示了许多您可以使用的示例代码。

【讨论】:

  • 非常有用,每一个字!我有那本书,每次我把头放在里面,我都会学到一些惊人的东西。谢谢你让我在这么多层次上直截了当。我认为您的回答涉及很多层面,很多 Spark 新手都会觉得它很有帮助。
  • 所以在再次查看Learning Spark这本书之后,我仍然非常喜欢你的回答,但我不知道如何完成这两个基本的事情。我可能会尝试发布其他更通用的问题?这两个问题是 1) 鉴于我没有固定长度的数据集开始,并且需要告诉 Spark 做 x 次 (x=10^6) 的事情,创建一个虚拟数组并传递它似乎很奇怪并行化,但这就是我到目前为止所看到的。我应该编辑问题还是发布单独的问题?
  • 和“嵌套”问题:2)如果我想将 b() 作为“内循环”运行 10^6 次,然后在 b() 的结果上运行 a() 作为“外循环”我做嵌套并行调用吗? (是的,你可能会撞到墙上,但我仍然没有在那本书中看到最好的方法,抱歉)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2010-10-27
  • 2019-06-21
  • 1970-01-01
  • 2011-06-19
  • 2012-01-16
  • 2011-06-20
  • 1970-01-01
相关资源
最近更新 更多