【问题标题】:Spark 2.0 - Flatten JSON file to a CSVSpark 2.0 - 将 JSON 文件展平为 CSV
【发布时间】:2017-07-28 08:02:55
【问题描述】:

我正在尝试将 JSON 文件转换为扁平化的 CSV 文件。这是我尝试过的:

输出:

我不知道如何正确操作 spark sql 中的qualify 列并返回正确的值。

from pyspark.sql.functions import *

dummy = spark.read.json('dummy-3.json')
qualify = dummy.select("user_id", "rec_id", "uut", "hash", explode("qualify").alias("qualify"))
qualify.show()

+-------+------+---+------+--------------------+
|user_id|rec_id|uut|  hash|             qualify|
+-------+------+---+------+--------------------+
|      1|     2| 12|abc123|[cab321,test-1,of...|
|      1|     2| 12|abc123|[cab123,test-2,of...|
+-------+------+---+------+--------------------+

JSON 示例:

{
  "user_id": 1,
  "rec_id": 2,
  "uut": 12,
  "hash": "abc123"
  "qualify":[{
    "offer": "offer-1",
    "name": "test-1",
    "hash": "cab321",
    "qualified": false"
    "rules": [{
      "name": "name of rule 1",
      "approved": true,
      "details": {}
    },
    {
    "name": "name of rule 2",
    "approved": false,
    "details": {}
    }]
  },{
    "offer": "offer-2",
    "name": "test-2",
    "hash": "cab123",
    "qualified": true
    "rules": [{
      "name": "name of rule 1",
      "approved": true,
      "details": {}
    },
    {
    "name": "name of rule 2",
    "approved": false,
    "details": {}
    }]
  }
}

JSON 架构:

root
 |-- hash: string (nullable = true)
 |-- qualify: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- hash: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- offer: string (nullable = true)
 |    |    |-- qualified: boolean (nullable = true)
 |    |    |-- rules: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- approved: boolean (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |-- rec_id: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- uut: long (nullable = true)

我尝试将 DataFrame 转换为 RDD 并创建一个映射函数来返回值,但我认为这不是一个好方法。我错了吗?

有没有人研究过类似的问题?

感谢您的帮助。

【问题讨论】:

  • 您是否尝试将qualified.* 而非explode 放入您的选择查询中?

标签: json csv apache-spark pyspark


【解决方案1】:
qualify = dummy.withColumn('qualify',f.explode(dummy['qualify']))
result = qualify.withColumn('qualify_name', qualify['qualify']['name'])

您可以通过a.ba['b'] 输入StructType()

【讨论】:

    【解决方案2】:

    解决方案

    我使用了explode 函数,但为每次爆炸创建一个新数据框。

    df2 = df.select(col("userId").alias("user_id"),\
                    col("recommendationId").alias("rec_id"),\
                    col("utsId").alias("uts_id"),\
                    col("gitHash").alias("git_hash"), \
                    from_unixtime(col("createdAt")).alias("created"), \
                    explode("qualifyResults").alias("qualify"))
    
    df3 = df2.select("user_id",\
                     "rec_id",\
                     "uts_id",\
                     "git_hash",\
                     "created",\
                     col("qualify.offerId").alias("qualify_offer"),\
                     col("qualify.qualifyName").alias("qualify_name"),\
                     col("qualify.qualifyHash").alias("qualify_hash"),\
                     col("qualify.qualified").alias("qualify_qualified"),\
                     explode("qualify.rulesResult").alias("rules"))
    
    #removi os details ate 
    df4 = df3.select("user_id",\
                     "rec_id",\
                     "uts_id",\
                     "git_hash",\
                     "created",\
                     "qualify_offer",\
                     "qualify_name",\
                     "qualify_hash",\
                     "qualify_qualified",\
                     col("rules.name").alias("rule_name"),\
                     col("rules.approved").alias("rule_approved"),\
                     col("rules.details").alias("rule_details"))
    

    使用这种方法,我能够得到我想要的 CSV 表单。

    【讨论】:

      猜你喜欢
      • 2021-04-19
      • 2012-07-05
      • 1970-01-01
      • 2020-03-10
      • 1970-01-01
      • 2015-07-18
      • 2022-01-08
      • 1970-01-01
      • 2014-11-09
      相关资源
      最近更新 更多