【问题标题】:Aggregate json data in pyspark在 pyspark 中聚合 json 数据
【发布时间】:2017-02-27 23:55:44
【问题描述】:

我是 pyspark 的新手,谁能帮我汇总这些数据。

我在这样的文本文件中有 json 数据

{"a":1 , "b":"abc", "c":"abc2", "d":"abc3" , "e":"1234"}
{"a":1 , "b":"abc2", "c":"abc", "d":"abc" ,"e":"1234"}
{"a":1 , "b":"abc", "c":"abc2", "d":"abc3","e":"123"}

我想在 'b'、'c'、'd' 列上聚合数据,同时在 'a' 列中添加值,然后添加一个新列 'unique_e',这将在基础上给出唯一的 'e' 列值列'e'。

这是我想要的最终输出

{"a":2 , "b":"abc", "c":"abc2", "d":"abc3" , "unique_e":"2"}
{"a":1 , "b":"abc2", "c":"abc", "d":"abc" , "unique_e":"1"}

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    试试这个:

    import pyspark.sql.functions as f
     df = spark.read.json(pathToFile)
     df2 = df.groupby("b","c","d").agg(f.countDistinct(df["e"]).alias("unique_e"), f.sum(df["a"]).alias("sum_a"))
    

    如果您想将其保存到单个 json 文件中,您可以这样做:

    df2.coalesce(1).write(pathToDir)
    

    json 将在一个看起来像这样的文件中:part-00000-e3421247-f8cd-4ecb-b8e6-fc26894f5282.json

    如果您没有定义 spark(如果您使用 pyspark shell 或正确配置您的 notebook 将被定义),如果您从外部脚本运行代码就是这种情况,您可以执行以下操作:

        spark = SparkSession.builder.master("local").appName("example").getOrCreate()
    

    创建它。

    【讨论】:

    • 嘿,这是一个愚蠢的问题,但你是在导入这个 spark 变量 'spark.read.json(pathToFile)' 像这样 'from pyspark.sql import SQLContext as spark' 并且请你写代码对于列 'a' ,我需要对列 'a' 进行求和运算,同时对 'b','c','d'columns 进行分组
    • 两者都添加了。 spark 是使用 pyspark 生成的 spark session 的默认名称。我添加了一个如何生成它的示例,但您真的应该查看编程指南以根据您的情况正确地完成它
    • 非常感谢您帮助我。上面的代码正在工作。你能推荐一些我可以通过的好的文档吗?:)
    • 嘿,最后想一想。如果我有一列说“f”,其值为“0”或“1”。如果该列的值为“1”,那么只有这些聚合应该执行,否则我想首先删除该行。
    • 然后在聚合之前过滤数据帧。在上面的例子中做 df.filter(df["f"] == 1)
    猜你喜欢
    • 1970-01-01
    • 2021-06-18
    • 1970-01-01
    • 2014-08-08
    • 2016-12-23
    • 1970-01-01
    • 1970-01-01
    • 2019-04-05
    • 1970-01-01
    相关资源
    最近更新 更多