【问题标题】:Spark bucketing read performanceSpark 分桶读取性能
【发布时间】:2018-06-27 23:07:04
【问题描述】:

Spark 版本 - 2.2.1。

我创建了一个有 64 个桶的桶表,我正在执行一个聚合函数 select t1.ifa,count(*) from $tblName t1 where t1.date_ = '2018-01-01' group by ifa 。我可以看到 Spark UI 中有 64 个任务,它们仅使用 20 个执行器(每个执行器有 16 个内核)。有没有办法可以扩展任务的数量,或者这就是分桶查询应该运行的方式(运行内核的数量)作为桶的数量)?

这里是创建表:

sql("""CREATE TABLE level_1 (
 bundle string,
  date_ date,
 hour SMALLINT)
 USING ORC
 PARTITIONED BY (date_ , hour )
 CLUSTERED BY (ifa)
 SORTED BY (ifa)
 INTO 64 BUCKETS
 LOCATION 'XXX'""")

这是查询:

sql(s"select t1.ifa,count(*) from $tblName t1 where t1.date_ = '2018-01-01' group by ifa").show

【问题讨论】:

    标签: apache-spark spark-dataframe apache-spark-dataset apache-spark-2.2


    【解决方案1】:

    使用分桶,任务数 == 桶数,因此您应该了解您需要/想要使用的核心/任务数,然后将其设置为桶数。

    【讨论】:

    • 这是否意味着分桶不适用于随时间增长的数据集?因为当附加到这样的数据集时,桶的数量(以及每个日期分区文件夹中的文件)保持不变,因此从这个数据集(表)中读取不会扩展。..
    【解决方案2】:

    num of task = num of buckets 可能是 Spark 中分桶的最重要和讨论不足的方面。存储桶(默认情况下)在历史上仅用于创建可以优化大型连接的“预混洗”数据帧。当您读取分桶表时,每个存储桶的所有文件或文件都由单个 spark 执行程序读取(读取数据时 30 个桶 = 30 个 spark 任务),这将允许该表连接到另一个存储在同一桶上的表 #的列。我发现这种行为很烦人,就像上面提到的用户可能会增长的表格一样。

    您现在可能会问自己,我为什么以及何时想要存储桶,以及我的实际数据何时会随着时间的推移以完全相同的方式增长? (老实说,您可能按日期对大数据进行了分区)根据我的经验,您可能没有很好的用例来以默认的 spark 方式存储表。但是所有的东西都不会因为铲斗而丢失!

    输入“bucket-pruning”。 Bucket pruning 仅在您存储 ONE 列时有效,但自 SparkSQL 和 Dataframes 出现以来,它可能是您在 Spark 中最好的朋友。它允许 Spark 根据查询中的某些过滤器确定表中的哪些文件包含特定值,这可以大大减少 spark 物理读取的文件数量,从而实现非常高效和快速的查询。 (我将 2 小时以上的查询减少到 2 分钟和 1/100 的 Spark 工作人员)。但您可能并不在意,因为任务桶数问题意味着如果每个桶、每个分区的文件过多,您的表将永远不会“扩展”。

    进入 Spark 3.2.0。当您禁用基于存储桶的读取时,将允许存储桶修剪保持活动状态,从而允许您通过存储桶修剪/扫描分配火花读取。我还有一个技巧可以用 spark

    val table = "ex_db.ex_tbl"
    val target_partition = "2021-01-01"
    val bucket_target = "valuex"
    val bucket_col = "bucket_col"
    val partition_col = "date"
    
    import org.apache.spark.sql.functions.{col, lit}
    import org.apache.spark.sql.execution.FileSourceScanExec
    import org.apache.spark.sql.execution.datasources.{FileScanRDD,FilePartition}
    
    
    val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target)))
    val sparkplan = df.queryExecution.executedPlan
    val scan = sparkplan.collectFirst { case exec: FileSourceScanExec => exec }.get
    val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
    val bucket_files = for
    
    { FilePartition(bucketId, files) <- rdd.filePartitions f <- files }
    yield s"$f".replaceAll("path: ", "").split(",")(0)
    val format = bucket_files(0).split("
    .").last
    val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target))
    

    【讨论】:

      猜你喜欢
      • 2020-05-17
      • 2016-01-05
      • 1970-01-01
      • 2015-08-21
      • 1970-01-01
      • 2020-01-02
      • 2017-11-19
      • 2015-10-29
      • 1970-01-01
      相关资源
      最近更新 更多