【问题标题】:Store aggregate value of a PySpark dataframe column into a variable将 PySpark 数据框列的聚合值存储到变量中
【发布时间】:2022-03-17 22:58:17
【问题描述】:

我在这里使用 PySpark 数据框。 “test1”是我的 PySpark 数据框,event_date 是 TimestampType。因此,当我尝试获取不同的 event_date 计数时,结果是一个整数变量,但是当我尝试获取同一列的最大值时,结果是一个数据帧。我想了解哪些操作会导致数据框和变量。我也想知道如何将事件日期的最大值存储为变量

产生整数类型的代码:

loop_cnt=test1.select('event_date').distinct().count()
type(loop_cnt)

产生数据框类型的代码:

last_processed_dt=test1.select([max('event_date')])
type(last_processed_dt)

已编辑以添加可重现的示例:

schema = StructType([StructField("event_date", TimestampType(), True)])

df = sqlContext.createDataFrame([(datetime(2015, 8, 10, 2, 44, 15),),(datetime(2015, 8, 10, 3, 44, 15),)], schema)

返回数据框的代码:

last_processed_dt=df.select([max('event_date')])
type(last_processed_dt)

返回变量的代码:

loop_cnt=df.select('event_date').distinct().count()
type(loop_cnt) 

【问题讨论】:

  • 你应该展示一个可重现的例子!

标签: apache-spark pyspark


【解决方案1】:

您不能直接访问数据框中的值。 Dataframe 返回一个 Row 对象。相反,Dataframe 为您提供了将其转换为 python 字典的选项。通过以下示例,我将计算平均字数:

wordsDF = sqlContext.createDataFrame([('cat',), ('elephant',), ('rat',), ('rat',), ('cat', )], ['word'])
wordCountsDF = wordsDF.groupBy(wordsDF['word']).count()
wordCountsDF.show()

这里是字数统计结果:

+--------+-----+
|    word|count|
+--------+-----+
|     cat|    2|
|     rat|    2|
|elephant|    1|
+--------+-----+

现在我计算计数列应用 collect() 操作的平均值。记住 collect() 返回一个列表。这里列表只包含一个元素。

averageCount = wordCountsDF.groupBy().avg('count').collect()

结果看起来像这样。

[Row(avg(count)=1.6666666666666667)]

您无法使用某些 python 变量直接访问平均值。您必须将其转换为字典才能访问它。

results={}
for i in averageCount:
  results.update(i.asDict())
print results

我们的最终结果如下所示:

{'avg(count)': 1.6666666666666667}

最后,您可以使用以下方法访问平均值:

print results['avg(count)']

1.66666666667

【讨论】:

    【解决方案2】:

    我很确定 df.select([max('event_date')]) 返回一个 DataFrame,因为在该列中可能有不止一行具有最大值。在您的特定用例中,该列中可能没有两行具有相同的值,但很容易想象这样一种情况,即多行可以具有相同的最大值 event_date

    df.select('event_date').distinct().count() 返回一个整数,因为它告诉您该特定列中有多少不同的值。它不会告诉你哪个值最大。

    如果您希望代码获取最大值event_date 并将其存储为变量,请尝试以下max_date = df.select([max('event_date')]).distinct().collect()

    【讨论】:

    • 我尝试使用 max_date = df.select([max('event_date')]).distinct().collect() 并且 max_date 对象的类型是一个列表
    • 列表是什么样的?
    • 当我打印列表时,我得到 [Row(max(event_date)=datetime.datetime(2015, 8, 10, 3, 44, 15))]
    • 这只是一个 Row 对象。我不确定,但尝试索引它。像max_date[0][0] 这样的东西。索引直到找到datetime 对象。
    【解决方案3】:

    使用collect()

    import pyspark.sql.functions as sf
    
    
    distinct_count = df.agg(sf.countDistinct('column_name')).collect()[0][0]
    

    使用first()

    import pyspark.sql.functions as sf
    
    
    distinct_count = df.agg(sf.countDistinct('column_name')).first()[0]
    

    【讨论】:

      【解决方案4】:
      last_processed_dt=df.select([max('event_date')])
      

      要获得最大日期,我们应该尝试类似

      last_processed_dt=df.select([max('event_date').alias("max_date")]).collect()[0]
      last_processed_dt["max_date"]
      

      基于 sujit 的例子。我们实际上可以在不迭代/循环的情况下打印值 [Row(avg(count)=1.6666666666666667)] 通过提供averageCount[0][0]。

      注意:我们没有通过循环,因为它只会返回一个值。

      【讨论】:

        【解决方案5】:

        试试这个

        loop_cnt=test1.select('event_date').distinct().count()
        var = loop_cnt.collect()[0]
        

        希望对你有帮助

        【讨论】:

          【解决方案6】:
          trainDF.fillna({'Age':trainDF.select('Age').agg(avg('Age')).collect()[0][0]})
          

          【讨论】:

          • 您能否为此添加一些解释以使其成为真正的答案?
          【解决方案7】:

          您可以尝试访问 collect() 函数。 从 spark 3.0 开始,您可以执行以下操作:

          loop_cnt=test1.select('event_date').distinct().count().collect()[0][0]
          print(loop_cnt)
          

          【讨论】:

            猜你喜欢
            • 2020-11-27
            • 1970-01-01
            • 2019-12-16
            • 1970-01-01
            • 2020-12-01
            • 1970-01-01
            • 2018-02-17
            • 1970-01-01
            • 2021-04-27
            相关资源
            最近更新 更多