【问题标题】:Pyspark: explode json in column to multiple columnsPyspark:将列中的json分解为多列
【发布时间】:2018-12-06 18:58:08
【问题描述】:

数据是这样的——

+-----------+-----------+-----------------------------+
|         id|      point|                         data|
+-----------------------------------------------------+
|        abc|          6|{"key1":"124", "key2": "345"}|
|        dfl|          7|{"key1":"777", "key2": "888"}|
|        4bd|          6|{"key1":"111", "key2": "788"}|

我正在尝试将其分解为以下格式。

+-----------+-----------+-----------+-----------+
|         id|      point|       key1|       key2|
+------------------------------------------------
|        abc|          6|        124|        345|
|        dfl|          7|        777|        888|
|        4bd|          6|        111|        788|

explode 函数将数据框分解为多行。但这不是理想的解决方案。

注意:此解决方案无法回答我的问题。 PySpark "explode" dict in column

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql


    【解决方案1】:

    只要您使用的是 Spark 2.1 或更高版本,pyspark.sql.functions.from_json 应该会得到您想要的结果,但您需要先定义所需的 schema

    from pyspark.sql.functions import from_json, col
    from pyspark.sql.types import StructType, StructField, StringType
    
    schema = StructType(
        [
            StructField('key1', StringType(), True),
            StructField('key2', StringType(), True)
        ]
    )
    
    df.withColumn("data", from_json("data", schema))\
        .select(col('id'), col('point'), col('data.*'))\
        .show()
    

    这应该给你

    +---+-----+----+----+
    | id|point|key1|key2|
    +---+-----+----+----+
    |abc|    6| 124| 345|
    |df1|    7| 777| 888|
    |4bd|    6| 111| 788|
    +---+-----+----+----+
    

    【讨论】:

    • 您应该能够使用以下内容从数据字段中提取 JSON 的架构...schema = spark.read.json(df.rdd.map(lambda row: row.data)).schema
    • 有没有办法在不提供架构的情况下做到这一点?在火花流作业的上下文中,上述模式提取不是一个选项@SimonPeacock,写下完整的模式是.. 混乱(至少可以说)而且非常不灵活,因为我希望出现额外的字段而不必适应和重新启动整个流式传输作业
    • 使用df.schema 获取模式并且不要忘记使用所有数据类型作为StringType() 否则它可能会为其他数据类型以及字符串类型提供null
    • 如果您想选择所有其余的 DF 列并扩展 json 列,请使用df2 = df.select("*", col("data.*"))
    【解决方案2】:

    正如@pault 所建议的,数据字段是string 字段。由于行中的 JSON 字符串中的键是相同的(即 'key1'、'key2'),您还可以使用 json_tuple()(根据文档,此功能是版本 1.6 中的新功能)

    from pyspark.sql import functions as F
    
    df.select('id', 'point', F.json_tuple('data', 'key1', 'key2').alias('key1', 'key2')).show()
    

    下面是我的原始帖子:如果原始表来自df.show(truncate=False),则很可能是错误,因此data字段不是python数据结构.

    由于您已将数据分解为行,我假设列 data 是 Python 数据结构而不是字符串:

    from pyspark.sql import functions as F
    
    df.select('id', 'point', F.col('data').getItem('key1').alias('key1'), F.col('data')['key2'].alias('key2')).show()
    

    【讨论】:

    • 我认为这在这种情况下不起作用 - 您需要一个 MapType() 列才能使用 getItem() 但它看起来像是一个字符串。
    • OP提到的结果已经被分解成多行,这听起来不是一个字符串字段。
    • “explode 函数将数据框分解为多行。” 听起来 OP 是在陈述一个事实,而不是他们尝试过的。此外,如果它是 MapType(),则不会像帖子中显示的那样显示。
    • 谢谢,我想你可能是对的。但我认为当 JSON 字符串中的键是常量时,它会简单得多。
    • 这行得通。我不知道 json_tuple - 这比定义架构要容易得多。
    【解决方案3】:

    这适用于我的用例

    data1 = spark.read.parquet(path)
    json_schema = spark.read.json(data1.rdd.map(lambda row: row.json_col)).schema
    data2 = data1.withColumn("data", from_json("json_col", json_schema))
    col1 = data2.columns
    col1.remove("data")
    col2 = data2.select("data.*").columns
    append_str ="data."
    col3 = [append_str + val for val in col2]
    col_list = col1 + col3
    data3 = data2.select(*col_list).drop("json_col")
    

    【讨论】:

      【解决方案4】:

      正如@jxc 所提到的,json_tuple 应该可以正常工作,如果您无法事先定义架构并且您只需要处理单级 json 字符串。我认为它更直接且更易于使用。奇怪的是,我之前没有发现其他人提到过这个功能。

      在我的用例中,原始数据框架构:StructType(List(StructField(a,StringType,true))),json 字符串列显示为:

      +---------------------------------------+
      |a                                      |
      +---------------------------------------+
      |{"k1": "v1", "k2": "2", "k3": {"m": 1}}|
      |{"k1": "v11", "k3": "v33"}             |
      |{"k1": "v13", "k2": "23"}              |
      +---------------------------------------+
      

      使用json_tuple 将 json 字段扩展为新列:

      from pyspark.sql import functions as F
      
      df = df.select(F.col('a'), 
          F.json_tuple(F.col('a'), 'k1', 'k2', 'k3') \
          .alias('k1', 'k2', 'k3'))
      
      df.schema
      df.show(truncate=False)
      

      文档并没有说太多,但至少在我的用例中,json_tuple 提取的新列是StringType,它只提取单个深度的 JSON 字符串。

      StructType(List(StructField(k1,StringType,true),StructField(k2,StringType,true),StructField(k3,StringType,true)))
      
      +---------------------------------------+---+----+-------+
      |a                                      |k1 |k2  |k3     |
      +---------------------------------------+---+----+-------+
      |{"k1": "v1", "k2": "2", "k3": {"m": 1}}|v1 |2   |{"m":1}|
      |{"k1": "v11", "k3": "v33"}             |v11|null|v33    |
      |{"k1": "v13", "k2": "23"}              |v13|23  |null   |
      +---------------------------------------+---+----+-------+
      

      【讨论】:

        【解决方案5】:

        所有学分都归功于 Shrikant Prabhu

        你可以简单地使用 SQL

        SELECT id, point, data.*
        FROM original_table
        

        这样,新表的架构会在数据发生变化时进行调整,您无需在管道中执行任何操作。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2021-06-30
          • 1970-01-01
          • 1970-01-01
          • 2023-01-03
          • 1970-01-01
          • 2021-01-12
          • 2020-11-24
          相关资源
          最近更新 更多