【问题标题】:Spark Scala: Querying same table multiple timesSpark Scala:多次查询同一张表
【发布时间】:2019-02-21 07:17:36
【问题描述】:

我正在尝试从同一个表 (bigTable) 中查询多个列以生成一些聚合列(column1_sum、column2_sum、column3_count)。最后,我将所有列连接在一起形成一个表格。

代码如下

val t1 = bigTable
            .filter($"column10" === value1)
            .groupBy("key1","key2")
            .agg(sum("column1") as "column1_sum")

val t2 = bigTable
            .filter($"column11"===1)
            .filter($"column10" === value1)
            .groupBy("key1","key2")
            .agg(sum("column2") as "column2_sum")

val t3 = bigTable
            .filter($"column10" === value3)
            .groupBy("key1","key2")
            .agg(countDistinct("column3") as "column3_count")

tAll
            .join(t1,Seq("key1","key2"),"left_outer")
            .join(t2,Seq("key1","key2"),"left_outer")
            .join(t3,Seq("key1","key2"),"left_outer")

上述代码的问题

bigTable 是一个巨大的表(它包含数百万行)。因此,多次查询它效率不高。查询需要很长时间才能运行。

关于如何以更有效的方式实现相同输出的任何想法?有没有办法查询 bigTable 的次数较少?

非常感谢。

【问题讨论】:

  • 你可以cache中间RDDs/DataSets/DataFrames - 例如val intermediate = bigTable.filter($"column10" === value1).cache - 然后intermediate可以在构建t1和t2时重用 - 你可以unpersistintermediate当你完成它- 火花缓存将结果保存在分布式内存中以提高性能
  • 我想过,但在这种情况下,中间表也会很大。此外,每次我的过滤器也不同。关于我能做什么的任何其他想法?
  • 恐怕问题是您愿意为此投入多少资源。如果你构建一个有足够基础设施的 Spark 集群,那么巨大是完全相对的。
  • 是的,我同意。缓存也很有意义。我会尝试在任何有意义的地方这样做。谢谢@PJFanning

标签: scala performance apache-spark apache-spark-sql processing-efficiency


【解决方案1】:

我的代码的主要改进之一是查询一次 bigTable,而不是问题中提到的多次。

我正在尝试的一段代码(我的代码类似,这只是一个说明):

bigTable
    .filter($"column10" === value1)
    .groupBy("key1", "key2")
    .agg(
      sum("column1") as "column1_sum",
      sum("column2") as "column2_sum",
      countDistinct(when($"column11"===1, col("column3"))) as "column3_count"
)

【讨论】:

    【解决方案2】:

    最简单的改进是仅作为单个聚合执行,其中 predicated 被推入 CASE ... WHEN ... 块,并用近似等效替换 countDistinct

    tAll
      .groupBy("key1","key2")
      .agg(
        sum(
          when($"column10" === "value1", $"column1")
        ).as("column1_sum"),
        sum(
          when($"column10" === "value1" and $"column11" === 1, $"column2")
        ).as("column2_sum"),
        approx_count_distinct(
          when($"column10" === "value3", $"column3")
        ).as("column3_count"))
      .join(tAll, Seq("key1", "key2"), "right_outer"))
    

    根据使用的函数和有关数据分布的先验知识,您还可以尝试用具有类似CASE ... WHEN ... 逻辑的窗口函数替换聚合

    import org.apache.spark.sql.expressions.Window
    
    val w = Window
     .partitionBy("key1", "key2")
     .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    
    tAll
      .withColumn(
        "column1_sum", 
        sum(when($"column10" === "value1", $"column1")).over(w))
     ...
    

    但它通常是一种不太稳定的方法。

    您还应该考虑使用分组列对bigTable 进行分桶:

    val n: Int = ???  // Number of buckets
    bigTable.write.bucketBy(n, "key1", "key2").saveAsTable("big_table_clustered")
    
    val bigTableClustered = spark.table("big_table_clustered")
    

    【讨论】:

    • 感谢@user6910411。我一定会尝试你提到的方法。我同意,在我的情况下,分桶会非常有帮助,它会使查询运行得更快。
    猜你喜欢
    • 2016-09-25
    • 2018-11-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-06-09
    • 2017-06-30
    • 2020-09-08
    • 1970-01-01
    相关资源
    最近更新 更多