【问题标题】:How to convert RDD[Array[String]] to RDD[(Int, HashMap[String, List])]?如何将 RDD[Array[String]] 转换为 RDD[(Int, HashMap[String, List])]?
【发布时间】:2018-11-26 18:32:11
【问题描述】:

我有输入数据:

time, id, counter, value
00.2,  1 , c1     ,  0.2
00.2,  1 , c2     ,  0.3
00.2,  1 , c1     ,  0.1

我希望为每个 id 创建一个结构来存储计数器和值。在考虑了向量并拒绝它们之后,我得出了这个结论:

(id, Hashmap( (counter1, List(Values)), (Counter2, List(Values)) ))
(1, HashMap( (c1,List(0.2, 0.1)), (c2,List(0.3)))

问题是我无法在映射转换中转换为 Hashmap,另外我不知道是否可以通过计数器减少映射中的列表。

有人知道吗?

我的代码是:

val data = inputRdd
          .map(y => (y(1).toInt, mutable.HashMap(y(2), List(y(3).toDouble)))).reduceByKey(_++_)
  }

【问题讨论】:

  • 为什么 RDD API 不是 DataFrame/Dataset?!

标签: scala list apache-spark hashmap


【解决方案1】:

在我的脑海中,未经测试:

import collection.mutable.HashMap

inputRdd
  .map{ case Array(t, id, c, v) => (id.toInt, (c, v)) }
  .aggregateByKey(HashMap.empty[String, List[String]])(
    { case (m, (c, v)) => { m(c) ::= v; m } },
    { case (m1, m2) => { for ((k, v) <- m2) m1(k) ::= v ; m1 } }
  )

【讨论】:

    【解决方案2】:

    这是一种方法:

    val rdd = sc.parallelize(Seq(
      ("00.2", 1, "c1", 0.2),
      ("00.2", 1, "c2", 0.3),
      ("00.2", 1, "c1", 0.1)
    ))
    
    rdd.
      map{ case (t, i, c, v) => (i, (c, v)) }.
      groupByKey.mapValues(
        _.groupBy(_._1).mapValues(_.map(_._2)).map(identity)
      ).
      collect
    // res1: Array[(Int, scala.collection.immutable.Map[String,Iterable[Double]])] = Array(
    //   (1,Map(c1 -> List(0.2, 0.1), c2 -> List(0.3)))
    // )
    

    请注意,最后的map(identity) 是对SO answer 中建议的Map#mapValues not serializable problem 的补救措施。

    【讨论】:

    • 为什么要考虑用case 替换下划线?这会提高可读性和理解力吗?
    • @Jacek Laskowski,为了可读性,我通常更喜欢使用带有更有意义变量的 case,而不是元组访问器(尤其是当元组大小大于 2 和/或转换涉及分组/位置改组时)。
    【解决方案3】:

    如果,正如你所提到的,拥有inputRdd

    //inputRdd: org.apache.spark.rdd.RDD[Array[String]] = ParallelCollectionRDD[0] at parallelize at ....
    

    然后,分组值上的简单 groupByfoldLeft 应该可以帮助您获得最终所需的结果

    val resultRdd = inputRdd.groupBy(_(1))
                              .mapValues(x => x
                                .foldLeft(Map.empty[String, List[String]]){(a, b) => {
                                  if(a.keySet.contains(b(2))){
                                    val c = a ++ Map(b(2) -> (a(b(2)) ++ List(b(3))))
                                    c
                                  }
                                  else{
                                    val c = a ++ Map(b(2) -> List(b(3)))
                                    c
                                  }
                                }}
                              )
    //resultRdd: org.apache.spark.rdd.RDD[(String, scala.collection.immutable.Map[String,List[String]])] = MapPartitionsRDD[3] at mapValues at ...
    //(1,Map(c1 -> List(0.2, 0.1), c2 -> List(0.3)))
    

    RDD[(String, scala.collection.immutable.Map[String,List[String]])] 更改为RDD[(Int, HashMap[String,List[String]])] 只是强制转换,我希望你这样做会更容易

    希望回答对你有帮助

    【讨论】:

    • 你为什么考虑用mutable.MapgetOrElseUpdate替换a.keySet.contains(b(2))
    • @JacekLaskowski 感谢您的回复,如果键存在,getOrElseUpdate 将返回我的值,否则值是新操作,但场景就像在存在键时添加新列表。我只是对您的信息如何有助于改进答案感到困惑?
    猜你喜欢
    • 1970-01-01
    • 2015-12-11
    • 1970-01-01
    • 2018-07-06
    • 1970-01-01
    • 2020-08-17
    • 2018-04-05
    • 1970-01-01
    • 2018-05-15
    相关资源
    最近更新 更多