在嵌套集合的情况下没有魔法。 Spark 将以相同的方式处理 RDD[(String, String)] 和 RDD[(String, Seq[String])]。
不过,从 Parquet 文件中读取此类嵌套集合可能会很棘手。
我们以spark-shell(1.3.1)为例:
scala> import sqlContext.implicits._
import sqlContext.implicits._
scala> case class Inner(a: String, b: String)
defined class Inner
scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer
编写拼花文件:
scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> outers.toDF.saveAsParquetFile("outers.parquet")
读取拼花文件:
scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row
scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]
scala> val outers = dataFrame.map { row =>
| val key = row.getString(0)
| val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
| Outer(key, inners)
| }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
重要的部分是row.getAs[Seq[Row]](1)。 struct 的嵌套序列的内部表示是 ArrayBuffer[Row],您可以使用它的任何超类型来代替 Seq[Row]。 1 是外行中的列索引。我在这里使用了getAs 的方法,但在最新版本的 Spark 中有替代方法。见Row trait的源码。
现在您有了RDD[Outer],您可以应用任何想要的转换或操作。
// Filter the outers
outers.filter(_.inners.nonEmpty)
// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))
请注意,我们仅使用 spark-SQL 库来读取 parquet 文件。例如,您可以直接在 DataFrame 上仅选择所需的列,然后再将其映射到 RDD。
dataFrame.select('col1, 'col2).map { row => ... }