【问题标题】:PySpark: Expand the json to new columnsPySpark:将 json 扩展到新列
【发布时间】:2020-06-22 11:31:24
【问题描述】:

我有以下类型的数据:

+-----------+-----------+-------------------------------------------------------------+
|         id|      point|                         data                                |
+-------------------------------------------------------------------------------------+
|        dfb|          6|[{"key1":"124", "key2": "345"},{"key3":"324", "key1":"wfe"}] |
|        bgd|          7|[{"key3":"324", "key1":"wfe"},{"key1":"777", "key2":"888"}]  |
|        34d|          6|[{"key1":"111", "key4": "788", "key2":"dfef}]                |

我想把它转换成

+-----------+-----------+-----------------------------------------------+
|         id|      point|                         key1                  |
+-----------------------------------------------------------------------+
|        dfb|          6|    124                                        |     
|        bgd|          7|    777                                        |
|        34d|          6|    111                                        |

存在一个 JSON 列表,它们可能共享公共键,但我想从也有 key2 的 json 中提取 key1 的值。

这在python中很容易实现。

在 pyspark 中,我看到了基于固定架构的解决方案 (How to split a list to multiple columns in Pyspark?),但是在这种情况下,如果没有固定架构,我该如何实现。

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql pyspark-dataframes


    【解决方案1】:

    涉及filtertransformhigher_order_functions (spark 2.4+) 的另一种方法可以是:

    import pyspark.sql.functions as F
    
    schema = ArrayType(MapType(StringType(),StringType()))
    
    (df.withColumn("data",F.from_json(F.col("data"),schema))
       .withColumn("Key1",F.expr('''transform(filter(data,x-> 
       array_contains(map_keys(x),"key2")),y->y["key1"])''')[0])).show()
    

    +---+-----+--------------------+----+
    | id|point|                data|Key1|
    +---+-----+--------------------+----+
    |dfb|    6|[[key1 -> 124, ke...| 124|
    |bgd|    7|[[key3 -> 324, ke...| 777|
    |34d|    6|[[key1 -> 111, ke...| 111|
    +---+-----+--------------------+----+
    

    【讨论】:

      【解决方案2】:

      检查下面的代码。

      from pyspark.sql import functions as F
      from pyspark.sql.types import *
      
      df.show()
      +---+-----+---------------------------------------------------------+
      |id |point|data                                                     |
      +---+-----+---------------------------------------------------------+
      |dfb|6    |[{"key1":"124","key2":"345"},{"key3":"324","key1":"wfe"}]|
      |bgd|7    |[{"key3":"324","key1":"wfe"},{"key1":"777","key2":"888"}]|
      |34d|6    |[{"key1":"111","key4":"788","key2":"dfef"}]              |
      +---+-----+---------------------------------------------------------+
      
      schema = ArrayType(MapType(StringType(),StringType()))
      
      df.withColumn("data",F.explode(F.from_json(F.col("data"),schema))).withColumn("data",F.when(F.col("data")["key1"].cast("long").isNotNull(),F.col("data")["key1"])).filter(F.col("data").isNotNull()).show()
      
      +---+-----+----+
      | id|point|data|
      +---+-----+----+
      |dfb|    6| 124|
      |bgd|    7| 777|
      |34d|    6| 111|
      +---+-----+----+
      

      【讨论】:

        猜你喜欢
        • 2022-10-04
        • 1970-01-01
        • 2020-11-17
        • 1970-01-01
        • 1970-01-01
        • 2018-11-02
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多