【问题标题】:How to use reduceByKey to add a value into a Set in Scala Spark?如何使用 reduceByKey 将值添加到 Scala Spark 中的 Set 中?
【发布时间】:2015-07-22 08:01:41
【问题描述】:

在我将我的 RDD 映射到之后

((_id_1, section_id_1), (_id_1, section_id_2), (_id_2, section_3), (_id_2, section_4))

我想reduceByKey

((_id_1, Set(section_id_1, section_id_2), (_id_2, Set(section_3, section_4)))
val collectionReduce = collection_filtered.map(item => {
      val extras = item._2.get("extras")
      var section_id = ""
      var extras_id = ""
      if (extras != null) {
        val extras_parse = extras.asInstanceOf[BSONObject]
        section_id = extras_parse.get("guid").toString
        extras_id = extras_parse.get("id").toString
      }
      (extras_id, Set {section_id})
    }).groupByKey().collect()

我的输出是

((_id_1, (Set(section_1), Set(section_2))), (_id_2, (Set(section_3), Set(section_4))))

我该如何解决这个问题?

【问题讨论】:

    标签: scala mapreduce apache-spark


    【解决方案1】:

    您可以通过简单地使用++ 组合列表来使用reduceByKey

    val rdd = sc.parallelize((1, Set("A")) :: (2, Set("B")) :: (2, Set("C")) :: Nil)
    val reducedRdd = rdd.reduceByKey(_ ++ _)
    reducedRdd.collect()
    // Array((1,Set(A)), (2,Set(B, C)))
    

    在你的情况下:

    collection_filtered.map(item => {
      // ...
      (extras_id, Set(section_id))
    }).reduceByKey(_ ++ _).collect()
    

    【讨论】:

      【解决方案2】:

      这是groupByKey/mapValues的替代方案

      val rdd = sc.parallelize(List(("_id_1", "section_id_1"), ("_id_1", "section_id_2"), ("_id_2", "section_3"), ("_id_2", "section_4")))
      
      rdd.groupByKey().mapValues( v => v.toSet).foreach(println)
      

      这是使用combineByKey 的另一种选择(推荐超过groupByKey):

      rdd.combineByKey(
              (value: String) => Set(value),
              (x: Set[String], value: String) => x + value ,
              (x: Set[String], y:     Set[String]) => (x ++ y)
          ).foreach(println)
      

      【讨论】:

      • 使用上面的 reduceByKey 和你的方式有什么不同?哪一个更好?谢谢。
      • groupByKey 在这里更简单,因为您不聚合(“reduce”)具有相同键的两个键值对之间的任何信息:您只需连接值在一起。使用 reduce 的实现将完成相同的工作,只是版本稍难阅读。
      • 你应该更喜欢reduceByKey而不是groupByKey,看看这个Spark gitbook的解释。
      • @Peter Neyens:感谢链接
      猜你喜欢
      • 2023-03-11
      • 2014-07-19
      • 2016-08-26
      • 1970-01-01
      • 2017-02-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多