【问题标题】:Flatten Json in Pyspark在 Pyspark 中展平 Json
【发布时间】:2021-04-08 23:24:31
【问题描述】:
my_data=[
    {'stationCode': 'NB001',
       'summaries': [{'period': {'year': 2017}, 'rainfall': 449},
        {'period': {'year': 2018}, 'rainfall': 352.4},
        {'period': {'year': 2019}, 'rainfall': 253.2},
        {'period': {'year': 2020}, 'rainfall': 283},
        {'period': {'year': 2021}, 'rainfall': 104.2}]},
    {'stationCode': 'NA003',
       'summaries': [{'period': {'year': 2019}, 'rainfall': 58.2},
        {'period': {'year': 2020}, 'rainfall': 628.2},
        {'period': {'year': 2021}, 'rainfall': 120}]}]

在 Pandas 中我可以:

import pandas as pd
from pandas import json_normalize
pd.concat([json_normalize(entry, 'summaries', 'stationCode') 
                     for entry in my_data])

这会给我下表:

    rainfall  period.year stationCode
0     449.0         2017       NB001
1     352.4         2018       NB001
2     253.2         2019       NB001
3     283.0         2020       NB001
4     104.2         2021       NB001
0      58.2         2019       NA003
1     628.2         2020       NA003
2     120.0         2021       NA003

这可以在pyspark的一行代码中实现吗?

我已经尝试了下面的代码,它给了我相同的结果。不过太长了,有没有办法缩短呢?;

df=sc.parallelize(my_data)
df1=spark.read.json(df)


  df1.select("stationCode","summaries.period.year","summaries.rainfall").display()
  df1 = df1.withColumn("year_rainfall", F.arrays_zip("year", "rainfall"))
           .withColumn("year_rainfall", F.explode("year_rainfall"))
           .select("stationCode", 
               F.col("year_rainfall.rainfall").alias("Rainfall"), 
               F.col("year_rainfall.year").alias("Year"))
  df1.display(20, False)

向 pyspark 介绍自己,因此我们将不胜感激一些解释或良好的信息来源

【问题讨论】:

    标签: json python-3.x pyspark databricks


    【解决方案1】:

    你所拥有的对我来说看起来不错并且可读。但是你也可以直接压缩和分解:

    out = (df1.select("stationCode", 
          F.explode(F.arrays_zip(*["summaries.period.year","summaries.rainfall"])))
    .select("stationCode",F.col("col")['0'].alias("year"),F.col("col")['1'].alias("rainfall")))
    

    out.show()
    
    +-----------+----+--------+
    |stationCode|year|rainfall|
    +-----------+----+--------+
    |      NB001|2017|   449.0|
    |      NB001|2018|   352.4|
    |      NB001|2019|   253.2|
    |      NB001|2020|   283.0|
    |      NB001|2021|   104.2|
    |      NA003|2019|    58.2|
    |      NA003|2020|   628.2|
    |      NA003|2021|   120.0|
    +-----------+----+--------+
    

    【讨论】:

      【解决方案2】:

      考虑一个包含以下数据的示例 json 文件。

      {
         "Name": "TestName",
         "Date": "2021-04-09",
         "Readings": [
            {
              "Id": 1,
              "Reading": 5.678,
              "datetime": "2021-04-09 00:00:00"
           },
           {
              "Id": 2,
              "Reading": 3.692,
              "datetime": "2020-04-09 00:00:00"
           }
        ]
      }
      

      定义一个我们可以强制读取数据的架构。

      from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
      
      data_schema = StructType(fields=[
         StructField('Name', StringType(), False),
         StructField('Date', StringType(), True),
         StructField(
             'Readings', ArrayType(
                StructType([
                   StructField('Id', IntegerType(), False),
                   StructField('Reading', DoubleType(), True),
                   StructField('datetime', StringType(), True)
                ])
             )
          )
      ])
      

      现在我们可以使用我们的架构来读取目录中的 JSON 文件

      data_df = spark.read.json('/mnt/data/' + '*.json', schema=data_schema)
      

      我们想要嵌套在“Readings”中的数据,这样我们就可以使用explode 来获取这些子列。

      from pyspark.sql.functions import explode
      
      data_df = data_df.select(
          "Name",
          explode("Readings").alias("ReadingsExplode")
      ).select("Name", "ReadingsExplode.*")
      
      data_df.show()
      

      这应该提供带有扁平数据框的所需输出。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-12-29
        • 2021-04-01
        • 1970-01-01
        • 2020-05-15
        • 2022-10-12
        相关资源
        最近更新 更多