【问题标题】:Convert multiple list columns to json array column in dataframe in pyspark将多个列表列转换为pyspark数据框中的json数组列
【发布时间】:2021-04-06 05:39:27
【问题描述】:

我有一个数据框,它有多个列表列并转换一个 JSON 数组列。

使用了以下逻辑但没有任何想法?

def test(test1,test2):
    d = {'data': [{'marks': a, 'grades': t} for a, t in zip(test1, test2)]}
    return d

UDF 定义为如下的数组类型,并尝试使用 column 调用但没有解决任何想法?

arrayToMapUDF = udf(test ,ArrayType(StringType()))

df.withcolumn("jsonarraycolumn", arrayToMapUDF(col("col"),col("col2")))
marks grades
[100, 150, 200, 300, 400] [0.01, 0.02, 0.03, 0.04, 0.05]

需要进行如下转换。

marks grades Json-array-column
[100, 150, 200, 300, 400] [0.01, 0.02, 0.03, 0.04, 0.05] {attribute:[{marks: 1000,
grades: 0.01},
{marks: 15000,
grade: 0.02},
{marks: 2000,
grades: 0.03}
]}

【问题讨论】:

    标签: json apache-spark pyspark apache-spark-sql


    【解决方案1】:

    您可以使用StringType,因为它返回的是 JSON 字符串,而不是字符串数组。您还可以使用json.dumps 将字典转换为 JSON 字符串。

    import pyspark.sql.functions as F
    from pyspark.sql.types import StringType
    import json
    
    def test(test1,test2):
        d = [{'amount': a, 'discount': t} for a, t in zip(test1, test2)]
        return json.dumps(d)
    
    arrayToMapUDF = F.udf(test, StringType())
    
    df2 = df.withColumn("jsonarraycolumn", arrayToMapUDF(F.col("amount"), F.col("discount")))
    
    df2.show(truncate=False)
    +-------------------------------+------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |amount                         |discount                      |jsonarraycolumn                                                                                                                                                                      |
    +-------------------------------+------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |[1000, 15000, 2000, 3000, 4000]|[0.01, 0.02, 0.03, 0.04, 0.05]|[{"amount": 1000, "discount": 0.01}, {"amount": 15000, "discount": 0.02}, {"amount": 2000, "discount": 0.03}, {"amount": 3000, "discount": 0.04}, {"amount": 4000, "discount": 0.05}]|
    +-------------------------------+------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    

    如果你不想要引号,

    import pyspark.sql.functions as F
    from pyspark.sql.types import StringType
    import json
    
    def test(test1,test2):
        d = [{'amount': a, 'discount': t} for a, t in zip(test1, test2)]
        return json.dumps(d).replace('"', '')
    
    arrayToMapUDF = F.udf(test, StringType())
    
    df2 = df.withColumn("jsonarraycolumn", arrayToMapUDF(F.col("amount"), F.col("discount")))
    
    df2.show(truncate=False)
    +-------------------------------+------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |amount                         |discount                      |jsonarraycolumn                                                                                                                                                  |
    +-------------------------------+------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |[1000, 15000, 2000, 3000, 4000]|[0.01, 0.02, 0.03, 0.04, 0.05]|[{amount: 1000, discount: 0.01}, {amount: 15000, discount: 0.02}, {amount: 2000, discount: 0.03}, {amount: 3000, discount: 0.04}, {amount: 4000, discount: 0.05}]|
    +-------------------------------+------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
    

    如果你想要一个真正的 JSON 类型的列:

    def test(test1,test2):
        d = [{'amount': a, 'discount': t} for a, t in zip(test1, test2)]
        return d
    
    arrayToMapUDF = F.udf(test, 
        ArrayType(
            StructType([
                StructField('amount', StringType()), 
                StructField('discount', StringType())
            ])
        )
    )
    
    df2 = df.withColumn("jsonarraycolumn", arrayToMapUDF(F.col("amount"), F.col("discount")))
    
    df2.show(truncate=False)
    +-------------------------------+------------------------------+-----------------------------------------------------------------------+
    |amount                         |discount                      |jsonarraycolumn                                                        |
    +-------------------------------+------------------------------+-----------------------------------------------------------------------+
    |[1000, 15000, 2000, 3000, 4000]|[0.01, 0.02, 0.03, 0.04, 0.05]|[[1000, 0.01], [15000, 0.02], [2000, 0.03], [3000, 0.04], [4000, 0.05]]|
    +-------------------------------+------------------------------+-----------------------------------------------------------------------+
    
    df2.printSchema()
    root
     |-- amount: array (nullable = false)
     |    |-- element: integer (containsNull = false)
     |-- discount: array (nullable = false)
     |    |-- element: double (containsNull = false)
     |-- jsonarraycolumn: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- amount: string (nullable = true)
     |    |    |-- discount: string (nullable = true)
    

    【讨论】:

    • 也许我把你弄糊涂了,但给定的解决方案很好。但是当我写回数据库时
    • 也许我把你弄糊涂了,但给定的解决方案很好。但是当我作为 "attributes": "{\"attributes\": [{\"amount\": \"8000000\", \"discount\": \"0.01\"} 写回数据库时,添加了一个额外的char backslah 和 anslo 属性名称是重复的(因为我们提到了 json)所以我想将数据保存到数据库中。我尝试了但没有运气。def zip(xs,ys):return [{'amount':a,'折扣': t} for a, t zip(xs, ys)] arrayToMapUDF = udf(zip ,(StructType([StructField('marks', IntegerType()),StructField('marks1', DecimalType())])) )
    • 不使用转储 JSON 我们可以创建一个包含键值对的数组吗?
    • @mike def zip(xs, ys): return [{'amount': a, 'discount': t} for a, t zip(xs, ys)] arrayToMapUDF = udf(zip , (StructType([StructField('marks', IntegerType()),StructField('marks1', DecimalType())])))
    • jsoncolumn [{"amount": 1000, "discount": 0.01}, {"amount": 15000, "discount": 0.02}, {"amount": 2000, "discount": 0.03 }, {"amount": 3000, "discount": 0.04}, {"amount": 4000, "discount": 0.05}]
    【解决方案2】:

    要避免使用udf函数,可以使用high-order functions

    import pyspark.sql.functions as f
    
    transform_expr = "TRANSFORM(arrays_zip(amount, discount), value -> value)"
    df = df.withColumn('jsonarraycolumn', f.to_json(f.expr(transform_expr)))
    
    df.show(truncate=False)
    

    输出:

    +-------------------------------+------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |amount                         |discount                      |jsonarraycolumn                                                                                                                                                             |
    +-------------------------------+------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |[1000, 15000, 2000, 3000, 4000]|[0.01, 0.02, 0.03, 0.04, 0.05]|[{"amount":1000.0,"discount":0.01},{"amount":15000.0,"discount":0.02},{"amount":2000.0,"discount":0.03},{"amount":3000.0,"discount":0.04},{"amount":4000.0,"discount":0.05}]|
    +-------------------------------+------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    

    【讨论】:

    • 当我运行代码时,我收到一条错误消息。 Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.expr. : org.apache.spark.sql.catalyst.parser.ParseException: extraneous input '->' expecting {')', ','}(line 1, pos 43)
    • 你的 spark 版本是什么?
    • ```` Spark 2.4.5 ````
    • @john 检查我的编辑,我发现了如何在 spark 2.4.5 版本上运行
    • @ Kafels ``` TypeError: 'str' object is not callable ---------------------------- ----------------------------------------------------------- TypeError Traceback (最近一次通话最后一次) 中的 12 .withColumn("id", col("supplierName")) ---> 14 volumediscountDf = volumediscountDf.withColumn('jsonarraycolumn', to_json(expr(expr) )) TypeError: 'str' object is not callable ```
    猜你喜欢
    • 1970-01-01
    • 2021-09-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-16
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多