【发布时间】:2015-12-15 01:52:44
【问题描述】:
我正在尝试对我们试图解决的遗传学问题进行建模,并逐步建立它。我可以从 Spark 示例中成功运行 PiAverage 示例。该示例在一个圆圈(在我们的例子中为 10^6)“投掷飞镖”并计算“落在圆圈中”的数字以估计 PI
假设我想重复该过程 1000 次(并行)并平均所有这些估计值。我试图看到最好的方法,似乎会有两个调用来并行化?嵌套调用?有没有办法将 map 或 reduce 调用链接在一起?我看不到。
我想知道以下想法的智慧。我想过使用累加器跟踪结果估计。 jsc 是我的 SparkContext,单次运行的完整代码在问题的末尾,感谢您的任何输入!
Accumulator<Double> accum = jsc.accumulator(0.0);
// make a list 1000 long to pass to parallelize (no for loops in Spark, right?)
List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES);
// pass this "dummy list" to parallelize, which then
// calls a pieceOfPI method to produce each individual estimate
// accumulating the estimates. PieceOfPI would contain a
// parallelize call too with the individual test in the code at the end
jsc.parallelize(numberOfEstimates).foreach(accum.add(pieceOfPI(jsc, numList, slices, HOW_MANY_ESTIMATES)));
// get the value of the total of PI estimates and print their average
double totalPi = accum.value();
// output the average of averages
System.out.println("The average of " + HOW_MANY_ESTIMATES + " estimates of Pi is " + totalPi / HOW_MANY_ESTIMATES);
我在 SO 上看到的似乎不是一个矩阵或其他答案,所以给出了这个特定问题的答案,我已经进行了几次搜索,但我没有看到如何在没有“并行化并行化”的情况下做到这一点。这是个坏主意吗?
(是的,我意识到在数学上我可以做更多的估计并有效地得到相同的结果 :) 尝试构建我老板想要的结构,再次感谢!
如果有帮助,我已将我的整个单一测试程序放在这里,没有我正在测试的累加器。其核心将变为 PieceOfPI():
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.Accumulable;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
public class PiAverage implements Serializable {
public static void main(String[] args) {
PiAverage pa = new PiAverage();
pa.go();
}
public void go() {
// should make a parameter like all these finals should be
// int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
final int SLICES = 16;
// how many "darts" are thrown at the circle to get one single Pi estimate
final int HOW_MANY_DARTS = 1000000;
// how many "dartboards" to collect to average the Pi estimate, which we hope converges on the real Pi
final int HOW_MANY_ESTIMATES = 1000;
SparkConf sparkConf = new SparkConf().setAppName("PiAverage")
.setMaster("local[4]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// setup "dummy" ArrayList of size HOW_MANY_DARTS -- how many darts to throw
List<Integer> throwsList = new ArrayList<Integer>(HOW_MANY_DARTS);
for (int i = 0; i < HOW_MANY_DARTS; i++) {
throwsList.add(i);
}
// setup "dummy" ArrayList of size HOW_MANY_ESTIMATES
List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES);
for (int i = 0; i < HOW_MANY_ESTIMATES; i++) {
numberOfEstimates.add(i);
}
JavaRDD<Integer> dataSet = jsc.parallelize(throwsList, SLICES);
long totalPi = dataSet.filter(new Function<Integer, Boolean>() {
public Boolean call(Integer i) {
double x = Math.random();
double y = Math.random();
if (x * x + y * y < 1) {
return true;
} else
return false;
}
}).count();
System.out.println(
"The average of " + HOW_MANY_DARTS + " estimates of Pi is " + 4 * totalPi / (double)HOW_MANY_DARTS);
jsc.stop();
jsc.close();
}
}
【问题讨论】:
-
只是关于这个问题的一点背景知识,我的老板查看了创建 RDD 的结构,然后分配给它的 map 函数的输出。他想知道为什么需要“额外的 RDD”,因为 map 会生成额外的 RDD。这可能是一个单独的问题,但激发了我关于是否可以链接一系列地图的问题,但迭代不同的次数,例如 for i do {for j do} 循环
标签: java scala parallel-processing apache-spark nested-loops