【问题标题】:Count key value that matches certain value in pyspark dataframe using udf使用 udf 计算与 pyspark 数据帧中的某个值匹配的键值
【发布时间】:2020-12-11 15:27:23
【问题描述】:

我有一个 pyspark 数据框,其中有一列值为字符串 json。如何计算与字典内列表中特定值匹配的值并作为列报告?我想通过使用 Python 函数和 pyspark udf 来做到这一点。

例如,下面是数据框,df:

+---------------------------------------------------------------------------+
|col                                                                        |
+---------------------------------------------------------------------------+
|{"field":{"list":[{"item":1,"upgrade":false},{"item":2,"upgrade":true}]}}
+----------------------------------------------------------------------------+
|{"field":{"list":[{"item":1,"upgrade":false},{"item":2,"upgrade":false}]}}
+--------------------------------------------------------------------------+

我想做什么:

def upgrade_false(doc):
    string = str(doc) 
    return string.count('"upgrade":false')

df2= df.withColumn('upgrade_false', (F.udf(lambda j: upgrade_false(json.loads(j)),t.StringType()))('col'))

但它不起作用。有人能解释一下可能出了什么问题吗?

理想的结果如下所示:

+---------------------------------------------------------------------------+----------------+
|col                                                                        | upgrade_false
+---------------------------------------------------------------------------+-----------------+
|{"field":{"list":[{"item":1,"upgrade":false},{"item":2,"upgrade":true}]}}   | 1
+----------------------------------------------------------------------------+----------------+
|{"field":{"list":[{"item":1,"upgrade":false},{"item":2,"upgrade":false}]}}  | 2
+----------------------------------------------------------------------------+-----------------+

【问题讨论】:

    标签: python apache-spark dictionary pyspark apache-spark-sql


    【解决方案1】:

    json.loads 将您的字符串更改为 'upgrade': False 而不是 "upgrade":false,因此您无法获得任何匹配项。

    >>> str(json.loads('{"field":{"list":[{"item":1,"upgrade":false},{"item":2,"upgrade":true}]}}'))
    "{'field': {'list': [{'item': 1, 'upgrade': False}, {'item': 2, 'upgrade': True}]}}"
    

    试试下面的 udf,它会计算正确的字符串:

    df2 = df.withColumn(
        'upgrade_false',
        F.udf(lambda j: str(json.loads(j)).count("'upgrade': False"))('col')
    )
    

    【讨论】:

    • 奇怪的是我仍然得到零计数
    • @kihhfeue 你能显示str(json.loads(df.select('col').collect()[0][0])) 的结果吗?
    • 我这样做了,火花作业运行了很长时间,仍然没有完成......
    • 哦,抱歉,我不应该使用 collect。用take(1)替换collect()怎么样?
    • @kihhfeue 你说你得到零计数。你能显示显示零计数的数据框的行吗?请通过编辑问题而不是在 cmets 中放置它们
    猜你喜欢
    • 2021-03-22
    • 2019-05-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-07
    • 2021-04-25
    • 2023-01-27
    相关资源
    最近更新 更多