【问题标题】:Spark partitioning for file write is very slow用于文件写入的 Spark 分区非常慢
【发布时间】:2016-07-21 01:47:57
【问题描述】:

使用 Spark 将文件写入 HDFS 时,在不使用分区的情况下速度非常快。取而代之的是,当我使用分区写入文件时,写入延迟会增加约 24 倍。

对于同一个文件,不分区写入大约需要 600 毫秒。按 ID 写入分区(将生成 1.000 个分区,因为文件中有 1.000 个 id)大约需要 14 秒。

你们中的一些人是否有过编写分区文件需要很长时间的经历?造成这种情况的根本原因是什么,也许 Spark 需要为每个分区创建 1.000 个文件夹和文件? 您知道如何加快速度吗?

val myRdd = streamedRdd.map { case ((id, metric, time), value) => Record(id, metric, getEpoch(time), time, value) }

val df = myRdd.toDF

df.write.mode(SaveMode.Append)
.partitionBy("id")
.parquet(path)

【问题讨论】:

  • 能否包含您使用的代码?

标签: hadoop apache-spark hdfs parquet hadoop-partitioning


【解决方案1】:

Spark 执行器与 HDFS 通信以写入它们拥有的数据,这取决于分区后数据在集群中的分布方式。

与顺序写入整个文件相比,显然对于较小的数据块,建立从多个执行程序节点到 HDFS 的连接和写入的时间会更长。

如何避免这种情况:

默认情况下,spark 使用哈希分区器对数据进行分区(哈希键和具有相同哈希的键到同一个节点)尝试指定范围分区器,请在下面找到示例 sn-ps:

以下sn-p使用Hash partitioner yourRdd.groupByKey().saveAsTextFile("HDFS PATH");

以下 sn-p 使用我们的自定义范围分区器 它创建了RangePartitioner(8, yourRdd) 中提到的 8 个分区,通过 8 个连接写入将是比通过 1000 个连接写入更好的选择。

val tunedPartitioner = new RangePartitioner(8, yourRdd)
val partitioned = yourRdd.partitionBy(tunedPartitioner).saveAsTextFile("HDFS PATH");

这也是要写入的数据和您创建的分区数量之间的权衡。

【讨论】:

  • 这个想法很好,但它不适用于数据帧。你能展示一个重新分区数据并将其保存到 parquet 中的示例吗?
  • @alexeipab 您目前无法使用自定义分区器进行分区。您唯一能做的就是使用重新分区按列分区。作为替代方案,您可以使用 myDF.rdd.partitionBy() 对数据帧底层的 RDD 进行分区。
猜你喜欢
  • 2016-12-13
  • 2018-05-07
  • 2021-05-09
  • 2016-07-25
  • 2016-08-23
  • 2017-11-10
  • 2017-05-13
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多