【问题标题】:Spark parquet uneven blocksize火花拼花不均匀块尺寸
【发布时间】:2018-08-25 16:29:09
【问题描述】:

由于Out of Memory Errors,我检查了一个总是发出喙的 spark 作业的输出 parquet 文件。 我在Cloudera 5.13.1上使用Spark 1.6.0

我注意到镶木地板行组大小不均匀。 第一行和最后一行是巨大的。剩下的真的很小……

parquet-tools RC = row countTS = total size 的输出缩短:

row group 1:                RC:5740100 TS:566954562 OFFSET:4  
row group 2:                RC:33769 TS:2904145 OFFSET:117971092  
row group 3:                RC:31822 TS:2772650 OFFSET:118905225  
row group 4:                RC:29854 TS:2704127 OFFSET:119793188  
row group 5:                RC:28050 TS:2356729 OFFSET:120660675  
row group 6:                RC:26507 TS:2111983 OFFSET:121406541  
row group 7:                RC:25143 TS:1967731 OFFSET:122069351  
row group 8:                RC:23876 TS:1991238 OFFSET:122682160  
row group 9:                RC:22584 TS:2069463 OFFSET:123303246  
row group 10:               RC:21225 TS:1955748 OFFSET:123960700  
row group 11:               RC:19960 TS:1931889 OFFSET:124575333  
row group 12:               RC:18806 TS:1725871 OFFSET:125132862  
row group 13:               RC:17719 TS:1653309 OFFSET:125668057  
row group 14:               RC:1617743 TS:157973949 OFFSET:134217728

这是一个已知的错误吗?如何在 Spark 中设置 parquet 块大小(行组大小)?

编辑:
Spark 应用程序所做的是:它读取一个大的 AVRO 文件,然后通过两个分区键(在选择中使用 distribute by <part_keys>)分配行,然后使用:
DF.write.partitionBy(<part_keys>).parquet(<path>)

【问题讨论】:

  • 我使用了 13 个 Executor。节点本地行是否有可能进入大行组,而来自每个执行程序的远程读取进入单独的行组?
  • 您能否解决您的问题或找到解决方法?
  • 不,我还没有找到解决方法

标签: hadoop apache-spark apache-spark-sql hadoop2 parquet


【解决方案1】:

您的 RDD 可能分区不均匀。每个块中的行数与你的RDD不同分区的大小有关。

创建 RDD 时,每个分区包含大致相同数量的数据(由于 HashPartitioner)。在处理完 Spark 作业之后,一个分区可能包含比另一个分区更多的数据,也许过滤器转换从一个分区中删除的行比从另一个分区中删除的行多。在写入 parquet 文件之前,可以调用 repartition 重新平衡分区。

编辑:如果问题与分区无关,也许减少行组的大小会有所帮助:

sc.hadoopConfiguration.setInt( "parquet.block.size", blockSize ) 

【讨论】:

  • 请看我的编辑。 RDD 在写操作之前被重新分区。
  • 您可以尝试更改块大小(请参阅我的编辑),但我不知道这是否会解决内存不足问题
  • 此设置适用于具有一些示例数据的开发集群。如果我将它与我的实际应用程序一起使用,不幸的是它不会影响任何东西......这个配置是否从某个地方被覆盖?
  • 您能否检查一下如果您在创建 DataFrameWriter (DF.repartition(13).write.partitionBy...)之前调用 repartition 会发生什么?
【解决方案2】:

有一个已知的错误: PARQUET-1337

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-06-19
    • 1970-01-01
    • 2011-01-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-27
    • 1970-01-01
    相关资源
    最近更新 更多