这里没有通用的方法。使用内置函数 (size) 可以轻松提取简单的指标,例如 array 中的元素数量。
case class Measurement(temperature: Double, speed: Double)
val df = sc.parallelize(Seq(
(1L, Array(Measurement(0.5, 10.0), Measurement(6.2, 3.7))),
(2L, Array(Measurement(22.0, 5.0)))
)).toDF("id", "measurements")
df.select($"*", size($"measurements")).show
// +---+--------------------+------------------+
// | id| measurements|size(measurements)|
// +---+--------------------+------------------+
// | 1|[[0.5,10.0], [6.2...| 2|
// | 2| [[22.0,5.0]]| 1|
// +---+--------------------+------------------+
更复杂的事情需要exploding:
val expanded = df.withColumn("measurement",explode($"measurements"))
val withStats = expanded
.groupBy($"id")
.agg(
avg($"measurement.temperature").alias("avg_temp"),
avg($"measurement.speed").alias("avg_speed"),
first($"measurements")) // This assumes a single row per ID!
withStats.show
// +---+--------+---------+---------------------+
// | id|avg_temp|avg_speed|first(measurements)()|
// +---+--------+---------+---------------------+
// | 1| 3.35| 6.85| [[0.5,10.0], [6.2...|
// | 2| 22.0| 5.0| [[22.0,5.0]]|
// +---+--------+---------+---------------------+
或 UDF(您希望在 PySpark 中避免的事情):
def my_mean(c: String) = udf((xs: Seq[Row]) =>
Try(xs.map(_.getAs[Double](c)).sum / xs.size).toOption
)
val withAvgTemp = df.withColumn(
"avg_temperature", my_mean("temperature")($"measurements"))
withAvgTemp.show
// +---+--------------------+---------------+
// | id| measurements|avg_temperature|
// +---+--------------------+---------------+
// | 1|[[0.5,10.0], [6.2...| 3.35|
// | 2| [[22.0,5.0]]| 22.0|
// +---+--------------------+---------------+
您也可以尝试 Spark DataSets,但这些仍远未稳定。
一般来说,嵌套结构主要用于导入(和可选的导出),否则这些是第二类对象。
注意 (Spark :
如果您使用旧版本的 Spark,您可以将上述部分与 selectExpr 一起使用(它将需要 HiveContext):
df.selectExpr("id", "size(measurements) AS n")
df.selectExpr("id", "explode(measurements) AS measurement")