【发布时间】:2015-08-31 16:37:51
【问题描述】:
我正在尝试根据给定的 PairRDD 创建新的 RDD。我有一个带有几个键的 PairRDD,但每个键都有很大(大约 100k)的值。我想以某种方式重新分区,将每个 Iterable<v> 放入 RDD[v] 以便我可以进一步有效地对这些值应用 map、reduce、sortBy 等。我感觉到 flatMapValues 是我的朋友,但想与其他火花一起检查。这是用于实时火花应用程序。我已经尝试过 collect() 并计算应用服务器内存中的所有度量,但试图对其进行改进。
这是我尝试的(伪)
class ComputeMetrices{
transient JavaSparkContext sparkContext;
/**
* This method compute 3 measures: 2 percentiles of different values and 1 histogram
* @param javaPairRdd
* @return
*/
public Map<String, MetricsSummary> computeMetrices(JavaPairRDD<String, InputData> javaPairRdd) {
JavaPairRDD<String, MetricsSummary> rdd = javaPairRdd.groupByKey(10).mapValues(itr => {
MetricsSummary ms = new MetricsSummary();
List<Double> list1
List<Double> list2
itr.foreach{ list1.add(itr._2.height); list2.add(itr._2.weight)}
//Here I want to convert above lists into RDD
JavaRDD<V> javaRdd1 = sparContext.parallelize(list1) //null pointer ; probably at sparkContext
JavaRDD<V> javaRdd2 = sparContext.parallelize(list2)
JavaPairRDD1 javaPairRdd1 = javaRdd1.sortBy.zipWithIndex()
JavaPairRDD2 javaPairRdd2 = javaRdd2.sortBy.zipWithIndex()
//Above two PairRDD will be used further to find Percentile values for range of (0..100)
//Not writing percentile algo for sake of brevity
double[] percentile1 = //computed from javaPairRdd1
double[] percentile2 = //computed from javaPairRdd2
ms.percentile1(percentile1)
ms.percentile2(percentile2)
//compute histogram
JavaDoubleRDD dRdd = sparkContext.parallelizeDoubles(list1)
long[] hist = dRdd.histogram(10)
ms.histo(hist)
return ms
})
return rdd.collectAsMap
}
}
我想从 groupByKey 结果中的 Iterable 创建 RDD,以便我可以使用进一步的 spark 转换。
【问题讨论】:
-
你能举个例子吗?
-
@VijayInnamuri 你的意思是不是我已经发布的其他例子?我的问题是我找不到任何方法从现有 RDD 或在转换期间从 Iterable 创建 RDD
-
请张贴输入输入数据结构的示例和您希望您的课程产生的结果示例。
-
已编辑示例。我正在尝试从一个 RDD 计算多个度量值。正如你看到的那样,我正在尝试创建多个 RDD,以便我可以以更分布式的方式计算这些度量,而不是仅在一个节点上。
标签: apache-spark spark-streaming rdd