【问题标题】:Does Spark know the partitioning key of a DataFrame?Spark 是否知道 DataFrame 的分区键?
【发布时间】:2018-11-05 18:39:13
【问题描述】:

我想知道 Spark 是否知道 parquet 文件的分区键并使用此信息来避免洗牌。

上下文:

运行 Spark 2.0.1 运行本地 SparkSession。我有一个 csv 数据集,我将其保存为磁盘上的 parquet 文件,如下所示:

val df0 = spark
  .read
  .format("csv")
  .option("header", true)
  .option("delimiter", ";")
  .option("inferSchema", false)
  .load("SomeFile.csv"))


val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)

df.write
  .mode(SaveMode.Overwrite)
  .format("parquet")
  .option("inferSchema", false)
  .save("SomeFile.parquet")

我正在按列 numerocarte 创建 42 个分区。这应该将多个numerocarte 分组到同一个分区。我不想在write 时间做 partitionBy("numerocarte") 因为我不希望每张卡有一个分区。这将是数以百万计的人。

之后,我在另一个脚本中阅读了这个SomeFile.parquet parquet 文件并对其进行了一些操作。特别是我在它上面运行了一个window function,分区是在parquet文件被重新分区的同一列上完成的。

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df2 = spark.read
  .format("parquet")
  .option("header", true)
  .option("inferSchema", false)
  .load("SomeFile.parquet")

val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))

df2.withColumn("NewColumnName",
      sum(col("dollars").over(w))

read 之后,我可以看到 repartition 按预期工作,DataFrame df2 有 42 个分区,每个分区都有不同的卡。

问题:

  1. Spark 是否知道数据帧 df2 由列 numerocarte 分区?
  2. 如果它知道,则窗口函数中不会有随机播放。是吗?
  3. 如果不知道,它将在窗口函数中进行随机播放。是吗?
  4. 如果它不知道,我如何告诉 Spark 数据已经被右列分区了?
  5. 如何查看DataFrame 的分区键?有这个命令吗?我知道如何检查分区数,但如何查看分区键?
  6. 当我在每个步骤之后打印文件中的分区数时,read 之后有 42 个分区,withColumn 之后有 200 个分区,这表明 Spark 重新分区了我的 DataFrame
  7. 如果我有两个使用同一列重新分区的不同表,连接会使用该信息吗?

【问题讨论】:

  • 要检查分区器数据帧有什么,您应该查看底层 RDD。 df.rdd.partitioner。如果两个 dfs 具有相同的分区器,则可能没有 shuffle。您可以拨打df.explain查看是否会有随机播放。要检查分区数,请致电df.rdd.partitions.length。有关分区的更完整说明,请参阅jaceklaskowski.gitbooks.io/mastering-apache-spark/…

标签: apache-spark partitioning window-functions


【解决方案1】:

Spark 是否知道数据帧 df2 由列 numerocarte 分区?

它没有。

如果它不知道,我如何告诉 Spark 数据已经被右列分区了?

你没有。仅仅因为您保存了已洗牌的数据,并不意味着它将加载相同的拆分。

如何查看DataFrame的分区键?

加载数据后没有分区键,但您可以在queryExecution 中查看Partitioner


在实践中:

  • 如果要支持按键的高效下推,请使用DataFrameWriterpartitionBy 方法。
  • 如果您希望对连接优化提供有限支持,请使用 bucketBy 与元存储和持久表。

有关详细示例,请参阅How to define partitioning of DataFrame?

【讨论】:

  • 保存在 Spark Warehouse 中也保存了 Metastore 中的元数据,如分区、订单等 :) 但这只是一个小补充
  • @T.Gawęda 但是之前的操作没有元数据,是吗?请注意,OP 使用repartition
  • 我必须检查,但repartition 应该明确添加分区信息。尽管如此,这是更“常见”的补充,与问题没有太大联系;)只是为了准确。已经投票了:)
  • 从执行计划看来,在partitionBy列相同,orderBy列相同的情况下,rangeBetween/rowsBetween不同的两个窗口函数只进行一次重新分区。
  • 这篇文章和答案是我在 Spark 上读到的最好的文章。
【解决方案2】:

我正在回答我自己的问题以供将来参考什么有效。

根据@user8371915 的建议,bucketBy 有效!

我正在保存我的 DataFrame df:

df.write
  .bucketBy(250, "userid")
  .saveAsTable("myNewTable")

那么当我需要加载这个表时:

val df2 = spark.sql("SELECT * FROM myNewTable")

val w = Window.partitionBy("userid")

val df3 = df2.withColumn("newColumnName", sum(col("someColumn")).over(w)
df3.explain

我确认当我在由userid 分区的df2 上执行窗口功能时,没有随机播放!谢谢@user8371915!

我在调查时学到的一些东西

  • myNewTable 看起来像一个普通的 parquet 文件,但它不是。您可以使用spark.read.format("parquet").load("path/to/myNewTable") 正常读取它,但是以这种方式创建的DataFrame 不会保留原始分区!您必须使用spark.sql select 才能正确分区DataFrame
  • 您可以使用spark.sql("describe formatted myNewTable").collect.foreach(println) 查看表格内部。这将告诉您哪些列用于分桶以及有多少个桶。
  • 利用分区的窗口函数和连接通常也需要排序。您可以在写入时使用.sortBy() 对存储桶中的数据进行排序,排序也将保留在配置单元表中。 df.write.bucketBy(250, "userid").sortBy("somColumnName").saveAsTable("myNewTable")
  • 在本地模式下工作时,表 myNewTable 保存到我本地 Scala SBT 项目中的 spark-warehouse 文件夹中。通过spark-submit使用mesos以集群模式保存时,保存到hive仓库。对我来说,它位于/user/hive/warehouse
  • 在执行spark-submit 时,您需要在SparkSession 中添加两个选项:.config("hive.metastore.uris", "thrift://addres-to-your-master:9083").enableHiveSupport()。否则您创建的配置单元表将不可见。
  • 如果要将表保存到特定数据库,请在分桶前执行spark.sql("USE your database")

2018 年 5 月 2 日更新

我在使用 spark 分桶和创建 Hive 表时遇到了一些问题。请参考Why is Spark saveAsTable with bucketBy creating thousands of files? 中的问题、回复和cmets

【讨论】:

  • 优秀的帖子。我理解这一点,但我想知道如果使用嵌套的相关子查询会进行哪些优化。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-01-15
  • 2016-03-06
  • 2021-05-28
  • 2018-01-19
  • 1970-01-01
  • 2019-03-02
相关资源
最近更新 更多