【问题标题】:How to read a nested collection in Spark如何在 Spark 中读取嵌套集合
【发布时间】:2015-07-12 12:57:36
【问题描述】:

我有一张镶木地板,其中一列是

, 数组>

可以在 Hive 中使用 LATERAL VIEW 语法对该表运行查询。

如何将这个表读入 RDD,更重要的是如何在 Spark 中过滤、映射等嵌套集合?

在 Spark 文档中找不到对此的任何引用。提前感谢您提供任何信息!

ps。我觉得在桌面上提供一些统计数据可能会有所帮助。 主表中的列数~600。行数~200m。 嵌套集合中的“列”数 ~10。嵌套集合中的平均记录数 ~35。

【问题讨论】:

    标签: apache-spark apache-spark-sql nested parquet lateral-join


    【解决方案1】:

    在嵌套集合的情况下没有魔法。 Spark 将以相同的方式处理 RDD[(String, String)]RDD[(String, Seq[String])]

    不过,从 Parquet 文件中读取此类嵌套集合可能会很棘手。

    我们以spark-shell(1.3.1)为例:

    scala> import sqlContext.implicits._
    import sqlContext.implicits._
    
    scala> case class Inner(a: String, b: String)
    defined class Inner
    
    scala> case class Outer(key: String, inners: Seq[Inner])
    defined class Outer
    

    编写拼花文件:

    scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
    outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25
    
    scala> outers.toDF.saveAsParquetFile("outers.parquet")
    

    读取拼花文件:

    scala> import org.apache.spark.sql.catalyst.expressions.Row
    import org.apache.spark.sql.catalyst.expressions.Row
    
    scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
    dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]   
    
    scala> val outers = dataFrame.map { row =>
         |   val key = row.getString(0)
         |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
         |   Outer(key, inners)
         | }
    outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
    

    重要的部分是row.getAs[Seq[Row]](1)struct 的嵌套序列的内部表示是 ArrayBuffer[Row],您可以使用它的任何超类型来代替 Seq[Row]1 是外行中的列索引。我在这里使用了getAs 的方法,但在最新版本的 Spark 中有替代方法。见Row trait的源码。

    现在您有了RDD[Outer],您可以应用任何想要的转换或操作。

    // Filter the outers
    outers.filter(_.inners.nonEmpty)
    
    // Filter the inners
    outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))
    

    请注意,我们仅使用 spark-SQL 库来读取 parquet 文件。例如,您可以直接在 DataFrame 上仅选择所需的列,然后再将其映射到 RDD。

    dataFrame.select('col1, 'col2).map { row => ... }
    

    【讨论】:

    • 感谢 Lomig 的详细回复。我已将其标记为正确响应。虽然我们还没有到 Spark 1.3,但计划本月升级。是否可以在 Spark 1.2 中不使用数据框 API?你能告诉我 getAs[Seq[Row]](1) 是如何工作的吗?索引 [1] 是包含嵌套数组的列的位置,对吗?
    • 查看我的编辑。对于 Spark 1.2,您可以使用完全相同的代码将 Row 转换为您的案例类。旧版本读取parquet文件的语法请参考官方文档,非常接近。
    • 知道了。非常感谢。 github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/…GetSeq[Row](1) 也可以吗?
    • 不客气。是的,getSeq[Row] 将是一个替代方案。不过,我不确定此方法在 Spark 1.2 中是否可用。我让你检查一下。
    • 我今天在 user@spark.apache.org 看到了一个帖子,上面提到 Spark SQL 直接支持 LATERAL VIEW 语法。一旦我们在 Spark 1.3 上,将尝试两种方式; (等待CDH 5.4.1发布后才能升级)
    【解决方案2】:

    我将给出一个基于 Python 的答案,因为这就是我正在使用的。我认为 Scala 也有类似的东西。

    根据Python API docs,在 Spark 1.4.0 中添加了 explode 函数来处理 DataFrames 中的嵌套数组。

    创建一个测试数据框:

    from pyspark.sql import Row
    
    df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
    df.show()
    
    ## +-+--------------------+
    ## |a|             intlist|
    ## +-+--------------------+
    ## |1|ArrayBuffer(1, 2, 3)|
    ## |2|ArrayBuffer(4, 5, 6)|
    ## +-+--------------------+
    

    使用explode 将列表列展平:

    from pyspark.sql.functions import explode
    
    df.select(df.a, explode(df.intlist)).show()
    
    ## +-+---+
    ## |a|_c0|
    ## +-+---+
    ## |1|  1|
    ## |1|  2|
    ## |1|  3|
    ## |2|  4|
    ## |2|  5|
    ## |2|  6|
    ## +-+---+
    

    【讨论】:

    • 谢谢 dnlbrky。它看起来比 Scala 更易于阅读。我肯定会尝试你的 python 示例。不过,一旦 Cloudera 发布 CDH 5.5,我们可能不会在今年年底之前拥有 Spark 1.4 :-) 希望到那时拥有 Spark 1.5。
    • explode操作成本高,你能想到其他办法吗?
    【解决方案3】:

    另一种方法是使用这样的模式匹配:

    val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { 
      case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
        case List(a:String, b: String) => (a, b)
      }).toList
    })
    

    您可以直接在 Row 上进行模式匹配,但由于某些原因可能会失败。

    【讨论】:

      【解决方案4】:

      以上答案都是很好的答案,从不同的角度来解决这个问题; Spark SQL 也是访问嵌套数据的非常有用的方法。

      这里是如何在SQL中直接使用explode()来查询嵌套集合的例子。

      SELECT hholdid, tsp.person_seq_no 
      FROM (  SELECT hholdid, explode(tsp_ids) as tsp 
              FROM disc_mrt.unified_fact uf
           )
      

      tsp_ids 是一个嵌套的结构,它有很多属性,包括我在上面的外部查询中选择的 person_seq_no。

      以上内容已在 Spark 2.0 中进行了测试。我做了一个小测试,它在 Spark 1.6 中不起作用。这个问题是在 Spark 2 不存在时被问到的,所以这个答案很好地添加到了处理嵌套结构的可用选项列表中。

      还可以查看遵循 JIRA 的 Hive 兼容方式来使用 LATERAL VIEW OUTER 语法查询嵌套数据,因为 Spark 2.2 还支持 OUTER 爆炸(例如,当嵌套集合为空时,但您仍然希望拥有属性来自父记录):

      在explode() 上值得注意的未解决JIRA 以进行SQL 访问:

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2019-10-26
        • 1970-01-01
        • 2018-07-17
        • 2021-05-10
        • 1970-01-01
        • 2018-12-23
        • 2021-06-18
        相关资源
        最近更新 更多