【问题标题】:Combine value part of Tuple2 which is a map, into single map grouping by the key of Tuple2通过 Tuple2 的 key 将 Tuple2 的 value 部分作为一个 map 组合成单个 map 分组
【发布时间】:2020-12-18 04:42:00
【问题描述】:

我在 Scala 和 Spark 中这样做。

我有 DatasetTuple2Dataset[(String, Map[String, String])]

下面是Dataset 中的值示例。

(A, {1->100, 2->200, 3->100})
(B, {1->400, 4->300, 5->900})
(C, {6->100, 4->200, 5->100})
(B, {1->500, 9->300, 11->900})
(C, {7->100, 8->200, 5->800})

如果您注意到,元组的键或第一个元素可以重复。另外,同一个 Tuple 的 key 对应的 map 中可以有重复的 key(Tuple2 的第二部分)。

我想创建一个最终的Dataset[(String, Map[String, String])]。并且输出应该如下(来自上面的例子)。此外,地图的最后一个键的值会被保留(检查 B 和 C),并且之前针对 B 和 C 的相同键会被删除。

(A, {1->100, 2->200, 3->100})
(B, {4->300, 1->500, 9->300, 11->900, 5->900})
(C, {6->100, 4->200, 7->100, 8->200, 5->800})

如果需要任何说明,请告诉我。

【问题讨论】:

    标签: scala dataframe apache-spark dataset databricks


    【解决方案1】:

    使用数据框:

    val df = Seq(("A", Map(1 -> 100, 2 -> 200, 3 -> 100)),
        ("B", Map(1 -> 400, 4 -> 300, 5 -> 900)),
        ("C", Map(6 -> 100, 4 -> 200, 5 -> 100)),
        ("B", Map(1 -> 500, 9 -> 300, 11 -> 900)),
        ("C", Map(7 -> 100, 8 -> 200, 5 -> 800))).toDF("a", "b")
    
    val df2 = df.select('a, explode('b))
        .groupBy("a", "key")           //remove the duplicate keys
        .agg(last('value).as("value")) //and take the last value for duplicate keys
        .groupBy("a")
        .agg(map_from_arrays(collect_list('key), collect_list('value)).as("b"))
    df2.show()
    

    打印

    +---+---------------------------------------------------+
    |a  |b                                                  |
    +---+---------------------------------------------------+
    |B  |[5 -> 900, 9 -> 300, 1 -> 500, 4 -> 300, 11 -> 900]|
    |C  |[6 -> 100, 8 -> 200, 7 -> 100, 4 -> 200, 5 -> 800] |
    |A  |[3 -> 100, 1 -> 100, 2 -> 200]                     |
    +---+---------------------------------------------------+
    

    由于涉及到两个聚合,基于 rdd 的 answer 可能会更快

    【讨论】:

      【解决方案2】:

      通过使用rdd,

      val rdd = sc.parallelize(
          Seq(("A", Map(1->100, 2->200, 3->100)),
              ("B", Map(1->400, 4->300, 5->900)),
              ("C", Map(6->100, 4->200, 5->100)),
              ("B", Map(1->500, 9->300, 11->900)),
              ("C", Map(7->100, 8->200, 5->800)))
      )
      
      rdd.reduceByKey((a, b) => a ++ b).collect()
      
      // Array((A,Map(1 -> 100, 2 -> 200, 3 -> 100)), (B,Map(5 -> 900, 1 -> 500, 9 -> 300, 11 -> 900, 4 -> 300)), (C,Map(5 -> 800, 6 -> 100, 7 -> 100, 8 -> 200, 4 -> 200)))
      

      并使用数据框,

      val df = spark.createDataFrame(
          Seq(("A", Map(1->100, 2->200, 3->100)),
              ("B", Map(1->400, 4->300, 5->900)),
              ("C", Map(6->100, 4->200, 5->100)),
              ("B", Map(1->500, 9->300, 11->900)),
              ("C", Map(7->100, 8->200, 5->800)))
      ).toDF("key", "map")
      
      spark.conf.set("spark.sql.mapKeyDedupPolicy","LAST_WIN")
      
      df.withColumn("map", map_entries($"map"))
        .groupBy("key").agg(collect_list($"map").alias("map"))
        .withColumn("map", flatten($"map"))
        .withColumn("map", map_from_entries($"map")).show(false)
      
      +---+---------------------------------------------------+
      |key|map                                                |
      +---+---------------------------------------------------+
      |B  |[1 -> 500, 4 -> 300, 5 -> 900, 9 -> 300, 11 -> 900]|
      |C  |[6 -> 100, 4 -> 200, 5 -> 800, 7 -> 100, 8 -> 200] |
      |A  |[1 -> 100, 2 -> 200, 3 -> 100]                     |
      +---+---------------------------------------------------+
      

      【讨论】:

      • 谢谢。会试试这个。我可以使用 DF/DS 执行此操作吗?
      • @vijayinani 是的 Datasets 也有 groupByreduceValues 函数。
      猜你喜欢
      • 2014-12-19
      • 1970-01-01
      • 2018-01-29
      • 1970-01-01
      • 1970-01-01
      • 2020-10-04
      • 1970-01-01
      • 2021-03-17
      • 1970-01-01
      相关资源
      最近更新 更多