【问题标题】:Write spark dataframe to single parquet file将火花数据帧写入单个镶木地板文件
【发布时间】:2019-02-11 20:45:45
【问题描述】:

我正在尝试做一些非常简单的事情,但我遇到了一些非常愚蠢的挣扎。我认为这一定与对 spark 在做什么的根本误解有关。我将不胜感激任何帮助或解释。

我有一个非常大的(~3 TB,~300MM 行,25k 个分区)表,在 s3 中保存为 parquet,我想给某人一个小样本作为单个 parquet 文件。不幸的是,这需要很长时间才能完成,我不明白为什么。我尝试了以下方法:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.coalesce(1).write.saveAsTable("db.tiny_table")

然后当它不起作用时,我尝试了这个,我认为应该是相同的,但我不确定。 (为了调试,我添加了print。)

tiny = spark.table("db.big_table").limit(500).coalesce(1)
print(tiny.count())
print(tiny.show(10))
tiny.write.saveAsTable("db.tiny_table")

当我观看 Yarn UI 时,两个打印语句write 都使用 25k 映射器。 count 用了 3 分钟,show 用了 25 分钟,write 用了大约 40 分钟,尽管它最终确实写了我正在寻找的单个文件表。

在我看来,第一行应该取前 500 行并将它们合并到一个分区,然后其他行应该发生得非常快(在单个映射器/减速器上)。谁能看到我在这里做错了什么?有人告诉我,也许我应该使用sample 而不是limit,但据我所知limit 应该更快。对吗?

提前感谢您的任何想法!

【问题讨论】:

  • 你能签入计划“LIMIT 500”被推送到表吗?
  • @Karthick spark.sql 和数据帧(例如.limit(500))都是由同一个引擎优化的,所以应该不是问题吗?
  • 我不关心 spark.sql 和同一引擎优化的数据帧,没有理由读取花费太长时间才能获得 500 条记录。 spark ui 运行时的计划中的快速点将获得一些方向(应用限制,只有 500 条记录是从表中流出的内容,以及上面的 numpartition 是什么 - 我期望它为“1”的并行度参数)是什么我的想法。

标签: apache-spark pyspark pyspark-sql


【解决方案1】:

我将首先处理 print 函数问题,因为它是理解 spark 的基础。然后limitsample。然后repartition vs coalesce

print 函数以这种方式花费这么长时间的原因是因为coalesce 是一种惰性转换。 spark 中的大多数转换都是惰性的,并且在调用 action 之前不会被评估。

动作是做一些事情的事情,并且(大多数情况下)返回一个新的数据框作为结果。喜欢countshow。它们返回一个数字和一些数据,而 coalesce 返回一个具有 1 个分区的数据帧(有点,见下文)。

发生的情况是,每次对 tiny 数据帧调用操作时,您都在重新运行 sql 查询和 coalesce 调用。这就是他们每次调用都使用 25k 映射器的原因。

为了节省时间,请将.cache() 方法添加到第一行(无论如何,对于您的print 代码)。

然后,数据框转换实际上是在您的第一行执行的,结果会保存在 Spark 节点的内存中。

这不会对第一行的初始查询时间产生任何影响,但至少您不会再运行该查询 2 次,因为结果已被缓存,然后操作可以使用该缓存结果。

要将其从内存中删除,请使用.unpersist() 方法。

现在是您尝试执行的实际查询...

这实际上取决于您的数据是如何分区的。如,它是否在特定字段等上进行了分区...

您在问题中提到了它,但 sample 可能是正确的方法。

这是为什么?

limit 必须搜索 first 行中的 500 行。除非您的数据按行号(或某种递增的 id)分区,否则前 500 行可以存储在 25k 分区中的任何一个中。

所以 spark 必须搜索所有这些,直到找到所有正确的值。不仅如此,它还必须执行一个额外的步骤来对数据进行排序以获得正确的顺序。

sample 只抓取 500 个随机值。由于没有涉及数据的顺序/排序,并且不必在特定分区中搜索特定行,因此操作起来要容易得多。

虽然limit 可以更快,但它也有它的,呃,限制。我通常只将它用于非常小的子集,如 10/20 行。

现在进行分区......

我认为coalesce 的问题是它实际上 改变了分区。现在我不确定这一点,所以一小撮盐。

根据pyspark 文档:

这个操作导致一个狭窄的依赖,例如如果您从 1000 个分区增加到 100 个分区,则不会发生 shuffle,而是 100 个新分区中的每一个都将占用当前分区中的 10 个。

因此,您的 500 行实际上仍将位于您的 25k 个物理分区中,这些物理分区被 spark 视为 1 个虚拟分区。

在这里使用.repartition(1).cache() 进行随机播放(通常很糟糕)并坚持在火花内存中可能是一个好主意。因为当您write 时,不是让 25k 映射器查看物理分区,而是应该只导致 1 个映射器查看 spark 内存中的内容。然后write 变得容易。你还要处理一个小子集,所以任何洗牌都应该(希望)是可控的。

显然这通常是不好的做法,并且不会改变 spark 在执行原始 sql 查询时可能希望运行 25k 映射器的事实。希望sample 能解决这个问题。

编辑以澄清改组,repartitioncoalesce

您在 4 节点集群上的 16 个分区中有 2 个数据集。您想加入它们并在 16 个分区中写入新数据集。

数据 1 的第 1 行可能在节点 1 上,数据 2 的第 1 行可能在节点 4 上。

为了将这些行连接在一起,spark 必须以物理方式移动其中一个或两个,然后写入新分区。

这是一个 shuffle,在集群中物理移动数据。

一切都被 16 分区并不重要,重要的是数据在集群上的位置。

data.repartition(4) 会将数据从每个节点的每 4 组分区物理移动到每个节点的 1 个分区中。

Spark 可能会将所有 4 个分区从节点 1 移动到其他 3 个节点,在这些节点上的一个新的单个分区中,反之亦然。

我不认为它会这样做,但这是一个证明这一点的极端案例。

coalesce(4) 调用虽然不会移动数据,但它更聪明。相反,它识别“我已经每个节点有 4 个分区和总共 4 个节点......我只是将每个节点的所有 4 个分区称为单个分区,然后我将有 4 个总分区!”

所以它不需要移动任何数据,因为它只是将现有分区组合成一个连接的分区。

【讨论】:

  • 谢谢,这很有帮助!对于您的第一点:我对惰性评估的理解是,直到调用 tiny.count() 之类的东西(如您所说),它才真正执行,但我不明白为什么它必须再次评估 i> 用于后续行。我猜是因为我没有缓存df?这导致您的第二点:我从未听说过有关“虚拟”分区的信息。您能否指出有关此的任何文档/解释。我想,一旦tiny.count() 发生,那么所有 500 行都在同一个分区上。
  • 只是为了在肥皂盒上花一点时间:我已经阅读了几十个关于“重新分区与合并”的解释,但似乎没有一个能充分解释它。 “虚拟”分区就是一个完美的例子(即我从未听说过这个概念)。也许“所有重新分区所做的就是调用合并,并将 shuffle 参数设置为true”(来自stackoverflow.com/questions/31610971)是我所需要知道的,我只需要更好地理解改组的含义。我不知道。任何你能指出我的好的解释/文档将不胜感激。
  • @seth127 也刚刚发现这个.. 可能有用edureka.co/blog/demystifying-partitioning-in-spark
【解决方案2】:

试试这个,根据我的经验,重新分区对这类问题更有效:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.saveAsTable("db.tiny_table")

如果您对镶木地板感兴趣,您无需将其保存为表格,那就更好了:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.parquet(your_hdfs_path+"db.tiny_table")

【讨论】:

  • 嗯,我不确定你所说的“喜欢”是什么意思,尽管我确实找到了另一篇文章说“我认为当前的文档很好地涵盖了这一点:github.com/apache/spark/blob/ ......请记住,所有重新分区所做的只是调用 coalesce 并将 shuffle 参数设置为 true。让我知道这是否有帮助。所以这很有趣,但我不清楚为什么这样做会更快。 (另一帖:stackoverflow.com/questions/31610971/…
  • 你说得对,我的意思是我凭经验发现它工作得更好!
猜你喜欢
  • 1970-01-01
  • 2016-01-18
  • 2019-02-09
  • 1970-01-01
  • 2020-04-02
  • 2021-03-26
  • 2018-10-27
  • 1970-01-01
  • 2020-01-10
相关资源
最近更新 更多