【问题标题】:Spark nested JSON aggregationSpark 嵌套 JSON 聚合
【发布时间】:2016-02-22 20:52:50
【问题描述】:

我有以下架构:

root
 |-- Id: string (nullable = true)
 |-- Desc: string (nullable = true)
 |-- Measurements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- time: string (nullable = true)
 |    |    |-- metric: string (nullable = true)
 |    |    |-- value: string (nullable = true)

在我的分析中,我想保持嵌套结构不变,但想向 DataFrame 添加列,其中包含 Measurements 中的元素数,某些列的最小/最大/平均值,特别是value 用于 metric 的某些值,例如'temperature'.

在 SQLContext 中,我可以简单地使用 sqlContext.sql("SELECT Id, SIZE(Measurements) AS num_entries FROM df" 来获取大小,但我想知道是否有一种优雅的方式(在 Scala 中)来做我想做的事情,即无需创建必须的新 DataFrame基于Id重新加入?

【问题讨论】:

    标签: json scala apache-spark


    【解决方案1】:

    这里没有通用的方法。使用内置函数 (size) 可以轻松提取简单的指标,例如 array 中的元素数量。

    case class Measurement(temperature: Double, speed: Double)
    
    val df = sc.parallelize(Seq(
      (1L, Array(Measurement(0.5, 10.0), Measurement(6.2, 3.7))),
      (2L, Array(Measurement(22.0, 5.0)))
    )).toDF("id", "measurements")
    
    df.select($"*", size($"measurements")).show
    
    // +---+--------------------+------------------+
    // | id|        measurements|size(measurements)|
    // +---+--------------------+------------------+
    // |  1|[[0.5,10.0], [6.2...|                 2|
    // |  2|        [[22.0,5.0]]|                 1|
    // +---+--------------------+------------------+
    

    更复杂的事情需要exploding:

    val expanded = df.withColumn("measurement",explode($"measurements"))
    val withStats = expanded
     .groupBy($"id")
     .agg(
       avg($"measurement.temperature").alias("avg_temp"),
       avg($"measurement.speed").alias("avg_speed"),
       first($"measurements")) // This assumes a single row per ID!
    
    withStats.show
    // +---+--------+---------+---------------------+
    // | id|avg_temp|avg_speed|first(measurements)()|
    // +---+--------+---------+---------------------+
    // |  1|    3.35|     6.85| [[0.5,10.0], [6.2...|
    // |  2|    22.0|      5.0|         [[22.0,5.0]]|
    // +---+--------+---------+---------------------+
    

    或 UDF(您希望在 PySpark 中避免的事情):

    def my_mean(c: String) = udf((xs: Seq[Row]) => 
       Try(xs.map(_.getAs[Double](c)).sum / xs.size).toOption
    )
    
    val withAvgTemp = df.withColumn(
      "avg_temperature", my_mean("temperature")($"measurements"))
    
    withAvgTemp.show
    // +---+--------------------+---------------+
    // | id|        measurements|avg_temperature|
    // +---+--------------------+---------------+
    // |  1|[[0.5,10.0], [6.2...|           3.35|
    // |  2|        [[22.0,5.0]]|           22.0|
    // +---+--------------------+---------------+
    

    您也可以尝试 Spark DataSets,但这些仍远未稳定。

    一般来说,嵌套结构主要用于导入(和可选的导出),否则这些是第二类对象。

    注意 (Spark :

    如果您使用旧版本的 Spark,您可以将上述部分与 selectExpr 一起使用(它将需要 HiveContext):

    df.selectExpr("id", "size(measurements) AS n")
    df.selectExpr("id", "explode(measurements) AS measurement")
    

    【讨论】:

    • 非常感谢!我的问题似乎仅限于版本:我被 1.4.1 卡住了,并且 size 函数从 1.5.0 开始就可用了......真是太糟糕了!
    • 总有selectExpr :)
    【解决方案2】:
    import org.apache.spark.sql.functions._
    df.select(df("id"), size(df("Measurements"))).collect
    

    以上应该可以。更多内置函数请关注https://spark.apache.org/docs/1.5.0/api/java/org/apache/spark/sql/functions.html

    【讨论】:

      猜你喜欢
      • 2017-05-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-11-19
      • 1970-01-01
      • 1970-01-01
      • 2017-08-13
      相关资源
      最近更新 更多