【问题标题】:Spark unionAll multiple dataframesSpark unionAll 多个数据帧
【发布时间】:2016-10-03 09:59:40
【问题描述】:

对于一组数据帧

val df1 = sc.parallelize(1 to 4).map(i => (i,i*10)).toDF("id","x")
val df2 = sc.parallelize(1 to 4).map(i => (i,i*100)).toDF("id","y")
val df3 = sc.parallelize(1 to 4).map(i => (i,i*1000)).toDF("id","z")

把他们都联合起来

df1.unionAll(df2).unionAll(df3)

对于任意数量的数据帧是否有更优雅和可扩展的方式来执行此操作,例如来自

Seq(df1, df2, df3) 

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    最简单的解决方案是reduceunion(Spark unionAll):

    val dfs = Seq(df1, df2, df3)
    dfs.reduce(_ union _)
    

    这是相对简洁的,不应该从堆外存储中移动数据但是用每个联合扩展血统需要非线性时间来执行计划分析。如果您尝试合并大量DataFrames,可能会出现什么问题。

    您也可以转换为RDDs 并使用SparkContext.union

    dfs match {
      case h :: Nil => Some(h)
      case h :: _   => Some(h.sqlContext.createDataFrame(
                         h.sqlContext.sparkContext.union(dfs.map(_.rdd)),
                         h.schema
                       ))
      case Nil  => None
    }
    

    它使 lineage short 分析成本较低,但效率低于直接合并 DataFrames

    【讨论】:

    • 感谢所有这些方法!
    • 这在 scala 中很简单吗?会是什么?
    • pySpark 中的等效代码如何?
    • 有很多(比如说,超过 20 个)DataFrame 的性能如何?
    • 对大量 DF 的性能也很好奇
    【解决方案2】:

    对于 pyspark,您可以执行以下操作:

    from functools import reduce
    from pyspark.sql import DataFrame
    
    dfs = [df1,df2,df3]
    df = reduce(DataFrame.unionAll, dfs)
    

    数据帧中列的顺序应该相同以使其正常工作也毫无价值。如果您没有正确的列顺序,这可能会默默地给出意想不到的结果!!

    如果您使用的是 pyspark 2.3 或更高版本,则可以使用 unionByName,这样您就不必对列重新排序。

    【讨论】:

    • 请记住粗体部分。
    • 使用 Python 的 reduce 意味着这些操作不会并行发生.. 对吗?
    【解决方案3】:

    Under the Hood spark 扁平化联合表达式。所以当Union做线性的时候需要更长的时间。

    最好的解决方案是 spark 有一个支持多个 DataFrame 的联合函数。

    但下面的代码可能会在一定程度上加快多个 DataFrames(或 DataSets)的联合。

      def union[T : ClassTag](datasets : TraversableOnce[Dataset[T]]) : Dataset[T] = {
          binaryReduce[Dataset[T]](datasets, _.union(_))
      }
      def binaryReduce[T : ClassTag](ts : TraversableOnce[T], op: (T, T) => T) : T = {
          if (ts.isEmpty) {
             throw new IllegalArgumentException
          }
          var array = ts toArray
          var size = array.size
          while(size > 1) {
             val newSize = (size + 1) / 2
             for (i <- 0 until newSize) {
                 val index = i*2
                 val index2 = index + 1
                 if (index2 >= size) {
                    array(i) = array(index)  // last remaining
                 } else {
                    array(i) = op(array(index), array(index2))
                 }
             }
             size = newSize
         }
         array(0)
     }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-06-23
      • 1970-01-01
      • 2018-09-26
      • 2018-01-15
      • 1970-01-01
      • 1970-01-01
      • 2023-03-25
      • 2016-09-15
      相关资源
      最近更新 更多