【问题标题】:apache spark - creating RDD from Iterable from groupByKey resultsapache spark - 从 groupByKey 结果的 Iterable 创建 RDD
【发布时间】:2015-08-31 16:37:51
【问题描述】:

我正在尝试根据给定的 PairRDD 创建新的 RDD。我有一个带有几个键的 PairRDD,但每个键都有很大(大约 100k)的值。我想以某种方式重新分区,将每个 Iterable<v> 放入 RDD[v] 以便我可以进一步有效地对这些值应用 map、reduce、sortBy 等。我感觉到 flatMapValues 是我的朋友,但想与其他火花一起检查。这是用于实时火花应用程序。我已经尝试过 collect() 并计算应用服务器内存中的所有度量,但试图对其进行改进。 这是我尝试的(伪)

class ComputeMetrices{

    transient JavaSparkContext sparkContext;

    /**
     * This method compute 3 measures: 2 percentiles of different values and 1 histogram 
     * @param javaPairRdd
     * @return
     */
    public Map<String, MetricsSummary> computeMetrices(JavaPairRDD<String, InputData> javaPairRdd) {

      JavaPairRDD<String, MetricsSummary> rdd = javaPairRdd.groupByKey(10).mapValues(itr => {

      MetricsSummary ms = new MetricsSummary();

      List<Double> list1 
      List<Double> list2

      itr.foreach{ list1.add(itr._2.height); list2.add(itr._2.weight)}
       //Here I want to convert above lists into RDD 
      JavaRDD<V> javaRdd1 = sparContext.parallelize(list1) //null pointer ; probably at sparkContext
      JavaRDD<V> javaRdd2 = sparContext.parallelize(list2)
      JavaPairRDD1 javaPairRdd1 = javaRdd1.sortBy.zipWithIndex()
      JavaPairRDD2 javaPairRdd2 = javaRdd2.sortBy.zipWithIndex()
      //Above two PairRDD will be used further to find Percentile values for range of (0..100)
      //Not writing percentile algo for sake of brevity
      double[] percentile1 = //computed from javaPairRdd1
      double[] percentile2 = //computed from javaPairRdd2
      ms.percentile1(percentile1)
      ms.percentile2(percentile2)
      //compute histogram
      JavaDoubleRDD dRdd = sparkContext.parallelizeDoubles(list1)
      long[] hist = dRdd.histogram(10)
      ms.histo(hist)
      return ms
      })
      return rdd.collectAsMap
    }
}

我想从 groupByKey 结果中的 Iterable 创建 RDD,以便我可以使用进一步的 spark 转换。

【问题讨论】:

  • 你能举个例子吗?
  • @VijayInnamuri 你的意思是不是我已经发布的其他例子?我的问题是我找不到任何方法从现有 RDD 或在转换期间从 Iterable 创建 RDD
  • 请张贴输入输入数据结构的示例和您希望您的课程产生的结果示例。
  • 已编辑示例。我正在尝试从一个 RDD 计算多个度量值。正如你看到的那样,我正在尝试创建多个 RDD,以便我可以以更分布式的方式计算这些度量,而不是仅在一个节点上。

标签: apache-spark spark-streaming rdd


【解决方案1】:

sparContext 为 null 的原因是您的 mapValues 中的代码是在工作人员上执行的 - 在工作人员上没有可用的 sparContext,它仅在驱动程序上可用。

如果我理解您的代码,我可以告诉您,如果您希望 mapValues 生成排序和索引对,则无需创建。

请记住,该代码的结果如下所示:

RDD(String, V) ->groupByKey-> RDD(String, List(V)) 
->mapValues-> RDD(String, List(Int,V))

key1, List((0,V1), (0,V2)
key1, List((0,V1), (0,V2)

mapValues 独立应用于分组列表中的每个 V。所以 counter 永远是 0。

如果你想用 K 转换从单个 RDD 中发射多个 RDD,List(V) 比 flatMapValues 会帮助你。仍然存在问题 - 流式操作在新 rdd 上的效率如何 - map 和 reduce 肯定会起作用,但 sortBy 将取决于您的窗口大小。

RDD(K, List(V)) -> flatMapValues(x=>x) -> RDD((K, V1), (K, V2) ... )

【讨论】:

  • 如果我错了,请纠正我,但我在 mapValues 之前做了 grouBy。所以 mapValues 适用于每个 Iterable(v) 而不是每个 v。第二件事是我拥有的键数远远少于集群上的节点数,所以我希望使用 RDD 分配它的值,然后执行任务会更快。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-02-08
  • 1970-01-01
  • 1970-01-01
  • 2014-12-18
  • 2014-05-13
  • 1970-01-01
  • 2020-09-22
相关资源
最近更新 更多