【问题标题】:Spark (scala): groupby and aggregate list of values to one list based on index [duplicate]Spark(scala):groupby和聚合值列表到一个基于索引的列表[重复]
【发布时间】:2018-04-15 15:21:29
【问题描述】:

我有以下数据框:

root
 |-- visitor: string (nullable = true)
 |-- asset: array (nullable = true)
 |    |-- element: string (containsNull = true)

我正在尝试将共享相似索引(访问者)的值列表分组为原始列表类型(数组)的单个列表。

示例:

val rawData1 = Seq(("visitor1",Array("item1","item2","item3","item4")),("visitor2",Array("item1","item2","item3")))
val rawData2 = Seq(("visitor1",Array("item1","item2","item5")),("visitor2",Array("item4","item7")))
val df1 = spark.createDataFrame(rawData1).toDF("visitor","asset")
val df2 = spark.createDataFrame(rawData2).toDF("visitor","asset")
val dfJoined = df1.union(df2)
dfJoined.groupBy("visitor").agg(collect_list("asset"))

我得到的是:

visitor collect_list(asset)
visitor2    [WrappedArray(item1, item2, item3), WrappedArray(item4, item7)]
visitor1    [WrappedArray(item1, item2, item3, item4), WrappedArray(item1, item2, item5)]

但我不希望资产列中有两个子列表,我希望将两个列表的所有值组合为一个具有原始类型(数组)的列表。

谢谢!

【问题讨论】:

    标签: scala apache-spark spark-dataframe


    【解决方案1】:

    一种选择是在合并之前将df1df2explode 合并,然后进行聚合:

    (df1.withColumn("asset", explode($"asset"))
        .union(df2.withColumn("asset", explode($"asset")))
        .groupBy("visitor")
        .agg(collect_list("asset"))
    ).show(false)
    
    +--------+-------------------------------------------------+
    |visitor |collect_list(asset)                              |
    +--------+-------------------------------------------------+
    |visitor2|[item1, item2, item3, item4, item7]              |
    |visitor1|[item1, item2, item3, item4, item1, item2, item5]|
    +--------+-------------------------------------------------+
    

    【讨论】:

      猜你喜欢
      • 2019-02-09
      • 2018-09-28
      • 1970-01-01
      • 1970-01-01
      • 2017-03-10
      • 2017-06-07
      • 2018-03-02
      • 1970-01-01
      • 2020-11-05
      相关资源
      最近更新 更多