【问题标题】:how to join 2 rdd's in spark scala如何在 spark scala 中加入 2 rdd
【发布时间】:2021-09-08 17:23:03
【问题描述】:

我有 2 个如下的 RDD

val rdd1 = spark.sparkContext.parallelize(Seq((123, List(("000000011119",20),("000000011120",30),("000000011121",50))),(234, List(("000000011119",20),("000000011120",30),("000000011121",50)))))
val rdd2 = spark.sparkContext.parallelize(Seq((123, List("000000011119","000000011120")),(234, List("000000011121","000000011120"))))

我想根据 rdd2 中的密钥对来执行 rdd1 中的值相加。

需要输出:

RDD[(123,50),(234,80)]

任何帮助将不胜感激。

【问题讨论】:

    标签: scala apache-spark rdd


    【解决方案1】:

    实际上,这是对行的第一个元素和每个内容的第一个元素的连接。

    所以我会把它分解成多行并以这种方式加入

    val flat1 = rdd1.flatMap(r => r._2.map(e => ((r._1, e._1), e._2))) // looks like ((234,000000011119),20)
    val flat2 = rdd2.flatMap(r => r._2.map(e => ((r._1, e), true))) // looks like ((234,000000011121),true)
    
    val res =  flat1.join(flat2)
      .map(r => (r._1._1, r._2._1))  // looks like (123, 30)
      .reduceByKey(_ + _)  // total each key group
    

    .foreach(println) 的结果

    scala> :pas
    // Entering paste mode (ctrl-D to finish)
    
    flat1.join(flat2)
      .map(r => (r._1._1, r._2._1))  // looks like (123, 30)
      .reduceByKey(_ + _)  // total each key group
      .foreach(println)
    
    // Exiting paste mode, now interpreting.
    
    (123,50)
    (234,80)
    

    像往常一样,使用 Dataset 时这些东西要简单得多,所以这将是我对未来的建议。

    【讨论】:

      猜你喜欢
      • 2018-06-01
      • 2019-03-25
      • 2016-09-07
      • 2017-06-27
      • 2018-11-24
      • 2015-10-31
      • 2018-03-03
      • 2019-02-07
      • 2018-11-19
      相关资源
      最近更新 更多