【问题标题】:apache spark groupBy pivot functionapache spark groupBy 枢轴函数
【发布时间】:2016-12-27 17:21:15
【问题描述】:

我是 spark 新手并使用 spark 1.6.1。我正在使用数据透视函数基于整数值创建一个新列。假设我有一个这样的 csv 文件:

year,winds
1990,50
1990,55
1990,58
1991,45
1991,42
1991,58

我正在像这样加载 csv 文件:

var df =sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("data/sample.csv")

我想聚合过滤那些大于 55 的风的风 colmnn,以便我得到一个像这样的输出文件:

year, majorwinds
1990,2
1991,1

我正在使用以下代码: val df2=df.groupBy("major").pivot("winds").agg(>55)->"count")

但我得到了这个错误

error: expected but integer literal found

这里的正确语法是什么?提前致谢

【问题讨论】:

  • 你应该展示你期望的输出
  • 好的,谢谢。我刚刚更新了我的问题
  • 我很确定你想过滤所有低于 55 的风,然后按年份分组并计数。
  • agg(>55)->"count") 你这里少了一个括号,应该是agg((>55)->"count")

标签: scala csv apache-spark spark-dataframe


【解决方案1】:

在你的情况下,如果你只想要这样的输出:

+----+----------+
|year|majorwinds|
+----+----------+
|1990|         2|
|1991|         1|
+----+----------+

不必使用pivot

您可以使用filtergroupBycount 来达到此目的:

df.filter($"winds" >= 55)
  .groupBy($"year")
  .count()
  .withColumnRenamed("count", "majorwinds")
  .show()

【讨论】:

    【解决方案2】:

    使用这个通用函数来做枢轴

    def transpose(sqlCxt: SQLContext, df: DataFrame, compositeId: Vector[String], pair: (String, String), distinctCols: Array[Any]): DataFrame = {
        val rdd = df.map { row => (compositeId.collect { case id => row.getAs(id).asInstanceOf[Any] }, scala.collection.mutable.Map(row.getAs(pair._1).asInstanceOf[Any] -> row.getAs(pair._2).asInstanceOf[Any])) }
        val pairRdd = rdd.reduceByKey(_ ++ _)
        val rowRdd = pairRdd.map(r => dynamicRow(r, distinctCols))
        sqlCxt.createDataFrame(rowRdd, getSchema(compositeId ++ distinctCols))
      }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-01-19
      • 1970-01-01
      • 2017-08-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多