【问题标题】:Iterate across columns in spark dataframe and calculate min max value遍历火花数据框中的列并计算最小值最大值
【发布时间】:2017-07-18 16:02:06
【问题描述】:

我想在我的 Spark 程序中遍历数据框的列并计算最小值和最大值。 我是 Spark 和 scala 的新手,一旦在数据帧中获取列,就无法迭代列。

我尝试运行以下代码,但需要将列号传递给它,问题是如何从数据框中获取它并动态传递它并将结果存储在集合中。

val parquetRDD = spark.read.parquet("filename.parquet")

parquetRDD.collect.foreach ({ i => parquetRDD_subset.agg(max(parquetRDD(parquetRDD.columns(2))), min(parquetRDD(parquetRDD.columns(2)))).show()})

感谢您对此的任何帮助。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-streaming spark-dataframe


    【解决方案1】:

    您不应该对行或记录进行迭代。你应该使用聚合函数

    import org.apache.spark.sql.functions._
    val df = spark.read.parquet("filename.parquet")
    val aggCol = col(df.columns(2))
    df.agg(min(aggCol), max(aggCol)).show()
    

    首先,当您执行 spark.read.parquet 时,您正在读取数据帧。 接下来,我们使用 col 函数定义我们想要处理的列。 col 函数将列名转换为列。您可以改用 df("name") ,其中 name 是列的名称。

    agg 函数采用聚合列,因此 min 和 max 是聚合函数,它们采用一列并返回具有聚合值的列。

    更新

    根据 cmets,目标是为所有列设置最小值和最大值。因此,您可以这样做:

    val minColumns = df.columns.map(name => min(col(name)))
    val maxColumns = df.columns.map(name => max(col(name)))
    val allMinMax = minColumns ++ maxColumns
    df.agg(allMinMax.head, allMinMax.tail: _*).show()
    

    你也可以这样做:

    df.describe().show()
    

    它为您提供所有列的统计信息,包括 min、max、avg、count 和 stddev

    【讨论】:

    • 感谢 Assaf 的回复。但是在 val aggCol = col(df.columns(2)) 中,我们实际上并没有手动传递列号(在这种情况下为 2 )。有没有一种方法可以动态传递它,这样我就可以在循环中逐个遍历列并生成最小值最大值。谢谢你。
    • 感谢阿萨夫!它确实有帮助,但可以将其写在一个循环中,这样我就不需要手动传递列名。当我在我的问题中说迭代时,我的意思是逐列循环。在下面的示例中,我们有三列,我希望动态选择每一列,计算其最小最大值,而不必手动传递列名。假设 col1、col2、col3 col1 col2 col3 的任何随机值集
    猜你喜欢
    • 1970-01-01
    • 2018-07-13
    • 1970-01-01
    • 2019-06-13
    • 2015-09-12
    • 2021-12-18
    • 2020-04-02
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多