【问题标题】:Format a Dataframe into a nested json in spark scala在 spark scala 中将 Dataframe 格式化为嵌套的 json
【发布时间】:2020-10-02 13:33:08
【问题描述】:

我有一个如下的数据框 df_original

我想把它转换成如下的嵌套 json 格式

到目前为止,我已经完成了这项工作

val df_original =data.groupBy($"unique_id").agg(collect_set(struct($"acct_no",$"ciskey")).as("accounts"))
val data1 = data.groupBy($"unique_id").agg(collect_set(struct($"acct_no",$"ciskey")).as("accounts"))
val resultDf = df_original.join(data1, Seq("unique_id")).dropDuplicates()

生成下面的json

{
  "unique_id": "12345678",
  "transaction_status": "posted",
  "amount": "116.26",
  "category": "Family",
  "email_id": "abcd@gmail.com",
  "acct_no": "51663",
  "ciskey": "47626220",
  "accounts": [
    {
      "acct_no": "51663",
      "ciskey": "47626220"
    },
    {
      "acct_no": "51663",
      "ciskey": "47626221"
    }, 
    {
      "acct_no": "51663",
      "ciskey": "47626222"
    }

  ]
}

Please help me to move forward

【问题讨论】:

  • 首先请不要附上数据的图片。为消费者模拟测试数据非常困难

标签: json scala apache-spark apache-spark-sql


【解决方案1】:

另一种选择-

加载测试数据

  val data =
      """
        |transaction_status|amount|category|email_id      |unique_id|acct_no|ciskey
        |posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626220
        |posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626221
        |posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626222
      """.stripMargin
    val stringDS = data.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS)

    df.show(false)
    df.printSchema()
    /**
      * +------------------+------+--------+--------------+---------+-------+--------+
      * |transaction_status|amount|category|email_id      |unique_id|acct_no|ciskey  |
      * +------------------+------+--------+--------------+---------+-------+--------+
      * |posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626220|
      * |posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626221|
      * |posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626222|
      * +------------------+------+--------+--------------+---------+-------+--------+
      *
      * root
      * |-- transaction_status: string (nullable = true)
      * |-- amount: double (nullable = true)
      * |-- category: string (nullable = true)
      * |-- email_id: string (nullable = true)
      * |-- unique_id: integer (nullable = true)
      * |-- acct_no: integer (nullable = true)
      * |-- ciskey: integer (nullable = true)
      */

创建所需的json

    val groupBy = df.columns.filter(_!="ciskey")
    df.groupBy(groupBy.map(col): _*).agg(collect_list($"ciskey").as("accounts"))
      .withColumn("ciskey", element_at($"accounts", 1) )
      .withColumn("customers", expr("TRANSFORM(accounts, " +
        "x -> named_struct('ciskey_no', x, 'ciskey_val', 'IND'))"))
      .withColumn("accounts",
        struct($"acct_no", $"customers"))
      .drop("customers")
      .toJSON
      .show(false)

    /**
      * +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      * |value                                                                                                                                                                                                                                                                                                                          |
      * +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      * |{"transaction_status":"posted","amount":116.26,"category":"Family","email_id":"abcd@gmail.com","unique_id":12345678,"acct_no":51663,"accounts":{"acct_no":51663,"customers":[{"ciskey_no":47626220,"ciskey_val":"IND"},{"ciskey_no":47626221,"ciskey_val":"IND"},{"ciskey_no":47626222,"ciskey_val":"IND"}]},"ciskey":47626220}|
      * +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      */

Json-

{
    "transaction_status": "posted",
    "amount": 116.26,
    "category": "Family",
    "email_id": "abcd@gmail.com",
    "unique_id": 12345678,
    "acct_no": 51663,
    "accounts": {
        "acct_no": 51663,
        "customers": [{
            "ciskey_no": 47626220,
            "ciskey_val": "IND"
        }, {
            "ciskey_no": 47626221,
            "ciskey_val": "IND"
        }, {
            "ciskey_no": 47626222,
            "ciskey_val": "IND"
        }]
    },
    "ciskey": 47626220
}

【讨论】:

  • 感谢您的回复。我正在寻找这样的东西。似乎 "withColumn("customers", expr("TRANSFORM(accounts, " + "x -> named_struct('ciskey_no', x, 'ciskey_val', 'IND'))"))" 这是我无法弄清楚的部分。
  • 有没有不带变换功能的。我好玩的spark 2.3
  • 那么你需要创建UDF。同时,如果有助于使用 spark 2.3 获得相同的解决方案,请检查其他答案:)
【解决方案2】:

检查下面的代码。

scala> df.show(false)
+------------------+------+--------+--------------+---------+-------+--------+
|transaction_status|amount|category|email_id      |unique_id|acct_no|ciskey  |
+------------------+------+--------+--------------+---------+-------+--------+
|posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626220|
|posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626221|
|posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626222|
+------------------+------+--------+--------------+---------+-------+--------+
scala> 

df
.groupBy($"unique_id")
.agg(
    collect_set(
        struct(
            $"transaction_status",
            $"amount",
            $"category",
            $"email_id",
            $"unique_id",
            $"acct_no"
        )).as("json_data"),
    first($"ciskey").as("ciskey"),
    first("acct_no").as("acct_no"),
    collect_list(struct($"ciskey")).as("customers")
)
.withColumn("json_data",explode($"json_data"))
.withColumn("accounts",struct($"acct_no",$"customers"))
.select($"json_data.*",$"ciskey",$"accounts")
.toJSON
.show(false)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                      |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"transaction_status":"posted","amount":116.26,"category":"Family","email_id":"abcd@gmail.com","unique_id":"12345678","acct_no":"51663","ciskey":"47626220","accounts":{"acct_no":"51663","customers":[{"ciskey":"47626220"},{"ciskey":"47626221"},{"ciskey":"47626222"}]}}|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

上面的代码生成如下数据,你可以在上面添加逻辑。

{
  "transaction_status": "posted",
  "amount": 116.26,
  "category": "Family",
  "email_id": "abcd@gmail.com",
  "unique_id": "12345678",
  "acct_no": "51663",
  "ciskey": "47626220",
  "accounts": {
    "acct_no": "51663",
    "customers": [
      {
        "ciskey": "47626220"
      },
      {
        "ciskey": "47626221"
      },
      {
        "ciskey": "47626222"
      }
    ]
  }
}

【讨论】:

  • 感谢您的回复。虽然这是一个非常好的解决方案,但我一直在寻找通用的东西,我不必传递所有列(我有很多列要处理)。还有另一个答案这对我有用。
猜你喜欢
  • 2019-09-26
  • 1970-01-01
  • 1970-01-01
  • 2020-09-02
  • 1970-01-01
  • 2021-12-29
  • 2018-10-31
  • 2019-07-07
  • 2019-05-07
相关资源
最近更新 更多