【问题标题】:parquet file size, firehose vs. sparkparquet 文件大小,firehose 与 spark
【发布时间】:2019-11-10 19:08:46
【问题描述】:

我通过两种方法生成 Parquet 文件:Kinesis Firehose 和 Spark 作业。它们都被写入 S3 上的相同分区结构。可以使用相同的 Athena 表定义来查询两组数据。两者都使用 gzip 压缩。

不过,我注意到 Spark 生成的 Parquet 文件大约是 Firehose 生成的 Parquet 文件的 3 倍。有什么理由应该是这种情况?当我使用 Pyarrow 加载它们时,我确实注意到了一些架构和元数据差异:

>>> import pyarrow.parquet as pq
>>> spark = pq.ParquetFile('<spark object name>.gz.parquet')
>>> spark.metadata
<pyarrow._parquet.FileMetaData object at 0x101f2bf98>
  created_by: parquet-mr version 1.8.3 (build aef7230e114214b7cc962a8f3fc5aeed6ce80828)
  num_columns: 4
  num_rows: 11
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 1558
>>> spark.schema
<pyarrow._parquet.ParquetSchema object at 0x101f2f438>
uri: BYTE_ARRAY UTF8
dfpts.list.element: BYTE_ARRAY UTF8
udids.list.element: BYTE_ARRAY UTF8
uuids.list.element: BYTE_ARRAY UTF8

>>> firehose = pq.ParquetFile('<firehose object name>.parquet')
>>> firehose.metadata
<pyarrow._parquet.FileMetaData object at 0x10fc63458>
  created_by: parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
  num_columns: 4
  num_rows: 156
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 1017
>>> firehose.schema
<pyarrow._parquet.ParquetSchema object at 0x10fc5e7b8>
udids.bag.array_element: BYTE_ARRAY UTF8
dfpts.bag.array_element: BYTE_ARRAY UTF8
uuids.bag.array_element: BYTE_ARRAY UTF8
uri: BYTE_ARRAY UTF8

模式差异是否可能是罪魁祸首?还有什么?

这两个特定文件不包含精确相同的数据,但根据我的 Athena 查询,Firehose 文件中所有行的所有列表的总基数大约是 Spark 中的 2.5 倍文件。

编辑添加:

我写了以下内容基本上将每个 parquet 文件的内容转储到 stdout 每行一行:

import sys
import pyarrow.parquet as pq

table = pq.read_table(sys.argv[1])
pydict = table.to_pydict()
for i in range(0, table.num_rows):
    print(f"{pydict['uri'][i]}, {pydict['dfpts'][i]}, {pydict['udids'][i]}, {pydict['uuids'][i]}")

然后我对每个 parquet 文件运行该程序并将输出通过管道传输到一个文件。以下是原始两个文件的大小、将上述 python 代码指向每个文件的输出,以及该输出的 gzip 压缩版本:

-rw-r--r--  1 myuser  staff  1306337 Jun 28 16:19 firehose.parquet
-rw-r--r--  1 myuser  staff  8328156 Jul  2 15:09 firehose.printed
-rw-r--r--  1 myuser  staff  5009543 Jul  2 15:09 firehose.printed.gz
-rw-r--r--  1 myuser  staff  1233761 Jun 28 16:23 spark.parquet
-rw-r--r--  1 myuser  staff  3213528 Jul  2 15:09 spark.printed
-rw-r--r--  1 myuser  staff  1951058 Jul  2 15:09 spark.printed.gz

请注意,两个 parquet 文件的大小大致相同,但 firehose 文件的“打印”内容大约是 spark 文件“打印”内容的 2.5 倍。而且它们的可压缩性差不多。

那么:如果不是原始数据,什么会占用 Spark parquet 文件中的所有空间?

编辑添加:

以下是“parquet-tools meta”的输出。每列的压缩率看起来相似,但 firehose 文件中每个未压缩字节包含更多值。对于“dfpts”列:

消防水带:

SZ:667849/904992/1.36 VC:161475

火花:

SZ:735561/1135861/1.54 VC:62643

parquet-tools 元输出:

file:            file:/Users/jh01792/Downloads/firehose.parquet 
creator:         parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 

file schema:     hive_schema 
--------------------------------------------------------------------------------
udids:           OPTIONAL F:1 
.bag:            REPEATED F:1 
..array_element: OPTIONAL BINARY L:STRING R:1 D:3
dfpts:           OPTIONAL F:1 
.bag:            REPEATED F:1 
..array_element: OPTIONAL BINARY L:STRING R:1 D:3
uuids:           OPTIONAL F:1 
.bag:            REPEATED F:1 
..array_element: OPTIONAL BINARY L:STRING R:1 D:3
uri:             OPTIONAL BINARY L:STRING R:0 D:1

row group 1:     RC:156 TS:1905578 OFFSET:4 
--------------------------------------------------------------------------------
udids:           
.bag:            
..array_element:  BINARY GZIP DO:0 FPO:4 SZ:421990/662241/1.57 VC:60185 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 58, min/max not defined]
dfpts:           
.bag:            
..array_element:  BINARY GZIP DO:0 FPO:421994 SZ:667849/904992/1.36 VC:161475 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 53, min/max not defined]
uuids:           
.bag:            
..array_element:  BINARY GZIP DO:0 FPO:1089843 SZ:210072/308759/1.47 VC:39255 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 32, min/max not defined]
uri:              BINARY GZIP DO:0 FPO:1299915 SZ:5397/29586/5.48 VC:156 ENC:BIT_PACKED,RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]

file:        file:/Users/jh01792/Downloads/spark.parquet 
creator:     parquet-mr version 1.8.3 (build aef7230e114214b7cc962a8f3fc5aeed6ce80828) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"uri","type":"string","nullable":false,"metadata":{}},{"name":"dfpts","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"udids","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"uuids","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
uri:         REQUIRED BINARY L:STRING R:0 D:0
dfpts:       OPTIONAL F:1 
.list:       REPEATED F:1 
..element:   OPTIONAL BINARY L:STRING R:1 D:3
udids:       OPTIONAL F:1 
.list:       REPEATED F:1 
..element:   OPTIONAL BINARY L:STRING R:1 D:3
uuids:       OPTIONAL F:1 
.list:       REPEATED F:1 
..element:   OPTIONAL BINARY L:STRING R:1 D:3

row group 1: RC:11 TS:1943008 OFFSET:4 
--------------------------------------------------------------------------------
uri:          BINARY GZIP DO:0 FPO:4 SZ:847/2530/2.99 VC:11 ENC:PLAIN,BIT_PACKED ST:[num_nulls: 0, min/max not defined]
dfpts:       
.list:       
..element:    BINARY GZIP DO:0 FPO:851 SZ:735561/1135861/1.54 VC:62643 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]
udids:       
.list:       
..element:    BINARY GZIP DO:0 FPO:736412 SZ:335289/555989/1.66 VC:23323 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]
uuids:       
.list:       
..element:    BINARY GZIP DO:0 FPO:1071701 SZ:160494/248628/1.55 VC:13305 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]

【问题讨论】:

  • 看起来 parquet-mr v1.8.3 用于为 Spark 生成 parquet 文件,而 parquet-mr v1.8.1 用于为 Firehose 生成 parquet 文件。除此之外,元数据似乎相似。 ??????

标签: apache-spark parquet amazon-kinesis-firehose pyarrow


【解决方案1】:

您可能应该以不同的方式提出您的问题:

为什么 Firehose 数据的压缩比 Spark 数据的压缩效率更高?

Parquet 对此有几种可能的解释:

  • 不同的列值基数

    除了压缩方案之外,Parquet 还尝试对您的值使用最有效的编码。特别是对于 BYTE_ARRAY,它会默认尝试使用字典编码,即将每个不同的 BYTE_ARRAY 值映射到一个 int,然后简单地将 int 存储在列数据中(更多信息 here)。如果字典变得太大,它将回退到简单地存储 BYTE_ARRAY 值。

    如果您的 Firehose 数据集包含的值的多样性比您的 Spark 数据集少得多,则其中一个可能正在使用有效的字典编码,而另一个则没有。

  • 排序后的数据

    已排序的数据通常比未排序的数据压缩得更好,因此如果您的 Firehose 列值自然排序(或至少更频繁地重复),parquet 编码和 gzip 压缩将实现更好的压缩率

  • 不同的行组大小

    Parquet 将值拆分为可调整大小的行组(Spark 中的parquet.block.size 配置)。压缩和编码在行组级别应用,因此行组越大,压缩效果越好,但编码可能更差(例如,您可以从字典编码切换到纯 byte_array 值)并且在读取或写入时需要更高的内存。

如何了解您的情况?

使用parquet-tools 检查列的详细编码数据:

例如在我的一个数据集上:

$ parquet-tools meta part-00015-6a77dcbe-3edd-4199-bff0-efda0f512d61.c000.snappy.parquet

...

row group 1:              RC:63076 TS:41391030 OFFSET:4
--------------------------------------------------------------------------------
options:
.list:
..element:                 BINARY SNAPPY DO:0 FPO:6042924 SZ:189370/341005/1,80 VC:269833 ENC:RLE,PLAIN_DICTIONARY ST:[no stats for this column]

...

row group 2:              RC:28499 TS:14806649 OFFSET:11648146
--------------------------------------------------------------------------------
options:
.list:
..element:                 BINARY SNAPPY DO:0 FPO:13565454 SZ:78631/169832/2,16 VC:144697 ENC:RLE,PLAIN_DICTIONARY ST:[no stats for this column]

列数据上的 ENC 属性为您提供用于列的编码(在本例中为 DICTIONARY),SZ 属性为您提供 compressed size/uncompressed size/compression ratioVC 编码值的数量。

您可以在我的示例中看到,由于数据分布,行组 2 的压缩率略好于行组 1。

更新

查看您提供的统计信息,您可以看到数据集中的 dfpts 列的平均编码值大小为 904992/161475 = 5.6 字节,而 spark 版本的大小为 1135861/62643 = 18.13 字节,尽管两者都是相同的字典编码。这可能意味着 RLE 在您的 firehose 数据集上效率更高,因为您有很多重复值或更少不同的值。 如果您在保存到 parquet 之前在 spark 中对 dfpts 列进行排序,您可能会获得与 firehose 数据相似的编码比率。

【讨论】:

  • 尚未尝试所有建议,但范围似乎相当全面,因此授予赏金。特别是,firehose 数据可能比 spark 数据更连续,这可能是正在发生的事情。我最初的理论是它可能是数据类型:bag.array_element 与 list.element,但也许这并不重要。
  • 我添加了 parquet-tools meta 的输出 + 一些评论。看起来火花文件设法为每个未压缩字节存储更少的值。两个文件的 gzip 压缩(每列)大致相同。还有其他想法吗?
  • 我已经更新了我的回复:看起来运行长度编码效率是主要区别,因此排序应该可以帮助您在 spark 中获得类似的结果。
  • 谢谢!我会试试看。
【解决方案2】:

我能想到的两件事不能归因于这种差异。
1. 实木复合地板特性。
在 Spark 中,您可以使用以下 sn-ps 找到与 Parquet 相关的所有属性。
如果属性是使用 Hadoop 配置设置的,

import scala.collection.JavaConverters._

// spark = SparkSsssion
spark.sparkContext.hadoopConfiguration.asScala.filter {
  x =>
    x.getKey.contains("parquet")
}.foreach(println)

如果属性是使用 Spark 设置的(spark-defaults.conf--conf 等)

spark.sparkContext.getConf.getAll.filter {
  case(key, value) => key.contains("parquet")
}.foreach(println)

如果我们也能够获得 firehose(我不熟悉)配置,我们可以进行比较。否则,配置也应该大致了解可能出现的问题。
2. Spark 和 FireHose 使用的 parquet 版本不同。
Parquet 社区本可以在不同版本之间更改 parquet 配置的默认值。

【讨论】:

    猜你喜欢
    • 2019-01-20
    • 2021-04-19
    • 2023-04-03
    • 2017-12-02
    • 2018-12-05
    • 2018-09-24
    • 2017-06-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多