【问题标题】:Pyspark: wrong results for calculating min and avg after groupbyPyspark:在 groupby 之后计算 min 和 avg 的错误结果
【发布时间】:2021-03-11 01:29:31
【问题描述】:

我有一个 Spark 数据框,其中包含 id (date_from) 和 price 列。示例:

id          date_from   price
10000012    2021-08-12  19283.334
10000012    2021-05-16  4400.0
10000012    2021-06-08  5718.69
10000012    2021-07-09  15283.333
10000012    2021-07-02  9087.5
10000012    2021-07-04  15283.333
10000012    2021-06-22  9061.111
10000012    2021-06-26  9076.667
10000012    2021-06-27  9080.77
10000012    2021-07-10  15283.333
10000012    2021-08-14  19283.334
10000012    2021-05-09  4400.0
10000012    2021-05-12  4400.0
10000012    2021-06-17  9065.64
10000012    2021-05-19  4400.0
10000166    2021-05-06  5801.4287
10000166    2021-04-01  4954.375
10000166    2021-04-22  5173.7856
10000166    2021-06-27  12655.429
10000166    2021-02-23  5167.5

我想计算最低价格和平均价格。为此我已经尝试过:

groupBy_id = ["id"]
aggregate = ["price"]
funs = [min, mean]
exprs = [f(col(c)) for f in funs for c in aggregate]
df = df.groupby(*groupBy_id).agg(*exprs)

还有:

df = df.groupby("id").agg(min("price").alias("min(norm_price)"),avg("price").alias("avg(norm_price)"))

但某些min(norm_price) 值大于avg(norm_price) 值。 输出:

id,min(norm_price),avg(norm_price)
10000012,11150.0,10287.276085889778
10000166,10370.761904761903,6082.360302835207
10000185,5054.642857142857,5424.533834586466
10000421,3990.0,3990.0

我做错了什么?

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    您需要确保 norm_price 是双精度类型,而不是字符串类型。否则min 将返回最小字符串,而不是最小数字。

    df = df.withColumn('price', col('price').cast('double'))
    df = df.groupby(*groupBy_id).agg(*exprs)
    

    【讨论】:

      【解决方案2】:

      我做了一些相当简单的事情:

      from pyspark.sql.types import (
           StringType,
           StructField,
           StructType,
           FloatType
      )
      from pyspark.sql import functions as F
      
      schema = StructType([
           StructField('id', StringType(), True),
           StructField('date', StringType(), True),
           StructField('price', FloatType(), True)
      ])
      df = spark.read.csv(
              "price.csv",
              header='true',
              schema=schema
      )
      df.groupBy("id").agg(F.avg('price'), F.min('price')).show()
      

      这给了我想要的结果:

      【讨论】:

        猜你喜欢
        • 2021-11-11
        • 2019-01-08
        • 1970-01-01
        • 2018-03-07
        • 1970-01-01
        • 2019-01-07
        • 2020-01-19
        • 2022-11-30
        • 1970-01-01
        相关资源
        最近更新 更多