【问题标题】:Spark: sc.WholeTextFiles takes a long time to executeSpark:sc.WholeTextFiles 需要很长时间才能执行
【发布时间】:2023-11-02 03:41:01
【问题描述】:

我有一个集群,我执行wholeTextFiles,它应该会提取大约一百万个文本文件,总和约为10GB 我有一个 NameNode 和两个 DataNode,每个都有 30GB 的 RAM,每个有 4 个内核。数据存储在HDFS

我没有运行任何特殊参数,而且这项工作需要 5 个小时才能读取数据。这是预期的吗?是否有任何参数可以加快读取速度(spark 配置或分区、执行程序的数量?)

我才刚刚开始,我以前从未需要优化工作

编辑:另外,有人能准确解释一下 wholeTextFiles 函数是如何工作的吗? (不是如何使用它,而是它是如何编程的)。对了解partition参数等很感兴趣。

编辑 2: 基准评估

所以我在wholeTextFile之后尝试了repartition,问题是一样的,因为第一次读取还是使用预定义的分区数,所以没有性能提升。加载数据后,集群的性能非常好......在处理数据(对于 200k 文件)时,我在整个文本文件中收到以下警告消息:

15/01/19 03:52:48 WARN scheduler.TaskSetManager: Stage 0 contains a task of very large size (15795 KB). The maximum recommended task size is 100 KB.

这会是表现不佳的原因吗?我该如何对冲?

此外,在执行 saveAsTextFile 时,根据 Ambari 控制台,我的速度为 19MB/s。使用 wholeTextFiles 进行读取时,我的速度为 300kb/s.....

似乎通过增加wholeTextFile(path,partitions) 中的分区数,我的性能会更好。但仍然只有 8 个任务同时运行(我的 CPU 数量)。我正在进行基准测试以观察极限...

【问题讨论】:

  • 5 小时听起来很高。您是否尝试过使用较小的子集?在达到 100 万之前说 10K 或 100K 文件。其次,如果你不需要(文件名,内容)然后你压缩所有数据并使用.textFile读取。读取数据后,尝试在 RDD 上调用repartition ( numPartitions )。您可以用值 8、16、32 等尝试numPartitions,看看它是否有所作为。你可以在这里查看实现github.com/apache/spark/blob/…
  • 我已经尝试了 200k 个文件,大约需要一个小时,所以估计听起来是线性的......我使用的是 wholeTextFiles,因为然后我会解析它们中的每一个以转换为 xml。我无法读取 textFile,因为它将逐行读取并且我无法再解析...除非我错了?
  • 你试过repartition 吗?我要求尝试 textFile 的原因是查看 IO(read) 是否由于文件数量或 wholeTextFiles 的实现而变慢
  • 首先,设置正确的执行参数而不是默认参数。我推荐--num-executors 4 --executor-memory 12g --executor-cores 4,这会提高你的并行水平。其次,在 HDFS 上以这种方式存储数据确实很糟糕,在 sc.wholeTextFiles 之后您应该做的第一个任务是将它们保存到具有块压缩和 Snappy/gzip 编解码器的单个压缩序列文件中。计算中的瓶颈是您启动的线程数量和您读取的单独文件的数量(加载 NameNode)
  • 这里您可以找到如何保存在压缩序列文件中的示例:0x0fff.com/spark-hdfs-integration。大约 4 - 这只是一个假设,在我提供的配置中,您将有 4 个 JVM 进程,每个进程有 12GB 堆,每个进程将利用 4 个内核(并行运行 4 个 spark 任务),为您提供 16 个并行读取器线程

标签: scala hadoop optimization configuration apache-spark


【解决方案1】:

总结一下我对 cme​​ts 的建议:

  1. HDFS 不适合存储许多小文件。首先,NameNode 将元数据存储在内存中,因此您可能拥有的文件和块的数量是有限的(对于典型的服务器来说,最多 100m 块)。接下来,每次读取文件时,首先向 NameNode 查询块位置,然后连接到存储文件的 DataNode。这种连接和响应的开销确实很大。
  2. 应始终检查默认设置。默认情况下,Spark 在 YARN 上启动,有 2 个执行程序 (--num-executors),每个执行程序有 1 个线程 (--executor-cores) 和 512m 的 RAM (--executor-memory),只给你 2 个线程,每个线程有 512MB RAM,这对于真正的人来说真的很小-世界任务

所以我的建议是:

  1. 使用 --num-executors 4 --executor-memory 12g --executor-cores 4 启动 Spark,这将为您提供更多并行性 - 在这种特殊情况下为 16 个线程,这意味着 16 个任务并行运行
  2. 使用sc.wholeTextFiles 读取文件,然后将它们转储到压缩的序列文件中(例如,使用 Snappy 块级压缩),以下是如何做到这一点的示例:http://0x0fff.com/spark-hdfs-integration/。这将大大减少下一次迭代读取它们所需的时间

【讨论】:

  • 很好的总结。最后一个问题,如果文件最初被压缩在一个gz文件下,然后gz文件包含许多小文件,有没有办法让我从那个gz文件中读取,然后直接在内存中解压缩和wholeTextFile?
  • gzip 不允许在单个存档中压缩多个文件,单个 gz 存档是单个文件。 Gzip 压缩文件会自动解压缩。但如果是 tar.gz 存档恐怕你得自己写 InputFormat
  • 是的,它是 .tar.gz。如果您有关于如何编写我自己的 InputFormat 的链接,我会接受它,否则我会环顾四周。非常感谢您的帮助!
  • 从这个开始:*.com/questions/17875277/… - 一次读取整个文件。这样,您将在内存缓冲区中拥有一个完整的文件,之后您将能够应用 Java 库来对该缓冲区进行gunzip 并解压缩其内容
  • 但是,@0x0FFF,我认为执行者不可能共享核心,所以我不认为为每个执行者分配 4 核心,4 执行者(16 cores total) 将起作用,因为总共只有8 cores。
最近更新 更多