【问题标题】:Merge Spark dataframe rows based on key column in Scala基于 Scala 中的键列合并 Spark 数据帧行
【发布时间】:2018-07-18 07:06:44
【问题描述】:

我有一个包含 2 列的流式数据框。一个表示为字符串的键列和一个对象列,它是一个包含一个对象元素的数组。我希望能够使用相同的键合并 Dataframe 中的记录或行,以便合并的记录形成一个对象数组。

Dataframe

----------------------------------------------------------------
|key    | objects                                              |
----------------------------------------------------------------
|abc    | [{"name": "file", "type": "sample", "code": "123"}]  |
|abc    | [{"name": "image", "type": "sample", "code": "456"}] |
|xyz    | [{"name": "doc", "type": "sample", "code": "707"}]   |
----------------------------------------------------------------


Merged Dataframe

-------------------------------------------------------------------------
|key   |  objects                                                        |
-------------------------------------------------------------------------
|abc   |    [{"name": "file", "type": "sample", "code": "123"}, {"name": 
            "image", "type": "sample", "code": "456"}]                   |
|xyz   |   [{"name": "doc", "type": "sample", "code": "707"}]            |
--------------------------------------------------------------------------

执行此操作的一个选项是将其转换为 PairedRDD 并应用 reduceByKey 函数,但如果可能,我更喜欢使用 Dataframes 执行此操作,因为它会更优化。有什么方法可以在不影响性能的情况下使用 Dataframes 做到这一点?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-streaming


    【解决方案1】:

    假设列 objects 是单个 JSON 字符串的数组,以下是如何将 objectskey 合并:

    import org.apache.spark.sql.functions._
    
    case class Obj(name: String, `type`: String, code: String)
    
    val df = Seq(
        ("abc", Obj("file", "sample", "123")),
        ("abc", Obj("image", "sample", "456")),
        ("xyz", Obj("doc", "sample", "707"))
      ).
      toDF("key", "object").
      select($"key", array(to_json($"object")).as("objects"))
    
    df.show(false)
    // +---+-----------------------------------------------+
    // |key|objects                                        |
    // +---+-----------------------------------------------+
    // |abc|[{"name":"file","type":"sample","code":"123"}] |
    // |abc|[{"name":"image","type":"sample","code":"456"}]|
    // |xyz|[{"name":"doc","type":"sample","code":"707"}]  |
    // +---+-----------------------------------------------+
    
    df.groupBy($"key").agg(collect_list($"objects"(0)).as("objects")).
      show(false)
    // +---+---------------------------------------------------------------------------------------------+
    // |key|objects                                                                                      |
    // +---+---------------------------------------------------------------------------------------------+
    // |xyz|[{"name":"doc","type":"sample","code":"707"}]                                                |
    // |abc|[{"name":"file","type":"sample","code":"123"}, {"name":"image","type":"sample","code":"456"}]|
    // +---+---------------------------------------------------------------------------------------------+
    

    【讨论】:

    • 谢谢利奥!这对我来说是一个小的编辑。 (collect_list($"objects"(0)).as("objects")) 失败,出现“org.apache.spark.sql.AnalysisException: Field name should be String Literal, but it's 0;”。我在没有指定索引的情况下让它工作,因为无论如何对象列将只有一个元素并且不需要索引。
    • @bytecode,不确定为什么$"objects"(0) 在您的环境中不起作用。我建议你试试$"objects".getItem(0)。整个 Array 列上的collect_list 将生成 WrappedArray 的嵌套数组,而不是预期结果数据集中显示的 JSON 字符串数组。
    • 在 $"objects" 上显式调用 getItem 有效。不知道为什么速记不起作用。
    • 顺便说一句,没有聚合函数有没有办法做到这一点?
    • 由于您的要求是聚合每个 keyobjects 数组的元素,我看不出有什么方法可以不应用某些聚合函数。
    猜你喜欢
    • 1970-01-01
    • 2020-05-01
    • 1970-01-01
    • 2018-10-19
    • 1970-01-01
    • 1970-01-01
    • 2019-05-26
    • 2020-10-08
    • 2018-04-06
    相关资源
    最近更新 更多