【问题标题】:Break a large tar.gz file into multiple smaller tar.gz files将一个大的 tar.gz 文件分成多个较小的 tar.gz 文件
【发布时间】:2020-12-31 13:22:48
【问题描述】:

在 spark 中处理大于 1gb 的 tar.gz 文件时,我得到 OutOfMemoryError

为了克服这个错误,我尝试使用“split”命令将 tar.gz 拆分为多个部分,结果发现每个拆分都不是 tar.gz 本身,因此不能这样处理。

dir=/dbfs/mnt/data/temp
b=524288000
for file in /dbfs/mnt/data/*.tar.gz; 
do 
a=$(stat -c%s "$file");
if [[ "$a" -gt "$b" ]] ; then 
split -b 500M -d --additional-suffix=.tar.gz $file "${file%%.*}_part"
mv $file $dir
fi
done

尝试处理拆分文件时出错

Caused by: java.io.EOFException
    at org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream.read(GzipCompressorInputStream.java:281)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
    at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.read(TarArchiveInputStream.java:590)
    at org.apache.commons.io.input.ProxyInputStream.read(ProxyInputStream.java:98)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.Reader.read(Reader.java:140)
    at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:2001)
    at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1980)
    at org.apache.commons.io.IOUtils.copy(IOUtils.java:1957)
    at org.apache.commons.io.IOUtils.copy(IOUtils.java:1907)
    at org.apache.commons.io.IOUtils.toString(IOUtils.java:778)
    at org.apache.commons.io.IOUtils.toString(IOUtils.java:803)
    at linea3796c25fa964697ba042965141ff28825.$read$$iw$$iw$$iw$$iw$$iw$$iw$Unpacker$$anonfun$apply$1.apply(command-2152765781429277:33)
    at linea3796c25fa964697ba042965141ff28825.$read$$iw$$iw$$iw$$iw$$iw$$iw$Unpacker$$anonfun$apply$1.apply(command-2152765781429277:31)
    at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
    at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
    at scala.collection.immutable.Stream.foreach(Stream.scala:595)
    at scala.collection.TraversableOnce$class.toMap(TraversableOnce.scala:316)
    at scala.collection.AbstractTraversable.toMap(Traversable.scala:104)
    at linea3796c25fa964697ba042965141ff28825.$read$$iw$$iw$$iw$$iw$$iw$$iw$Unpacker$.apply(command-2152765781429277:34)
    at linea3796c25fa964697ba042965141ff28827.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-2152765781429278:3)
    at linea3796c25fa964697ba042965141ff28827.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-2152765781429278:3)

我的 tar.gz 文件大小高达 4gb,每个文件最多可包含 7000 个 json 文档,大小从 1mb 到 50mb 不等。

如果我想将大的 tar.gz 文件分成更小的 tar.gz 文件,我唯一的选择是解压缩,然后以某种方式根据文件大小或文件数重新压缩? - “是这样吗?”

【问题讨论】:

    标签: linux bash scala apache-spark tar


    【解决方案1】:

    普通 gzip 文件不可拆分。 GZip Tar 档案更难处理。 Spark 可以处理 gzip 压缩的 json 文件,但不能处理 gzip 压缩的 tar 文件和 tar 文件。 Spark 可以处理每个最大约 2GB 的二进制文件。 Spark 可以处理连接在一起的 JSON

    我建议使用 Pandas UDF 或 .pipe() 运算符来处理每个 tar gzip 压缩文件(每个工作人员一个)。每个工作人员将以流式方式解压缩、解压缩和处理每个 JSON 文档,从不填满内存。希望您有足够的源文件来并行运行它并看到速度加快。

    您可能想要探索流式传输方法,以将压缩的 JSON 文件增量交付到 ADLS Gen 2 / S3,并使用 Databricks 自动加载器功能在文件到​​达后立即加载和处理。

    这个问题的答案How to load tar.gz files in streaming datasets? 似乎很有希望。

    【讨论】:

    • 我更改了使用 bash 命令解压缩 tar.gz 存档文件的过程,但对于某些存档文件,此过程可能需要数天才能完成。我开始了一种流式处理方法来处理新文件,因为它们在源处被解压缩为 json。我注意到即使启用了“检查点”,流进程也无法处理几个 json 文件。也许您推荐的 pandas udf 可能会加快整个过程。我应该尝试使用您提供的链接中实现的 .tgz 编解码器吗?
    • 嗯...单个 4GB .tar.gz 文件的天数听起来有点过分。我会做更多的测试。 AWS S3 CP 非常快,因为它使用并行线程来加快复制时间。使用 I3/DSv2 实例(它们具有 TB 大小的 NVMe RAM 本地驱动器)将有助于 bash 脚本。我最近遇到了类似的问题,但它没有 tar 文件。我的客户有 50GB gzipped CSV 文件。此编解码器:github.com/nielsbasjes/splittablegzip 通过允许并行执行节省了时间。更多内核,但时间更少。支持 Tar 流的类似编解码器应该对您有所帮助。
    • 实际上,一个 4GB 的 tar.gz 文件最多需要 30 分钟,我有大约 750 个 tar 文件(最小:100MB,最大:4GB),总共 1TB。解压缩总共 1 TB 需要几天时间。我们使用 Azure,集群由 DS4v2 工作程序(min10,max20)和 D32sv3 驱动程序(128gb ram,32 个内核)组成。以前我在基于the link 的scala 中使用UDF。 UDF 返回键(json 文件名)和值(实际 json 内容)对的映射,即每个 tar.gz 文件最多 7000 对。但是它只适用于 500MB 以下的 tar.gz 文件
    • pigz 可能更快 tar -xzf serverfault.com/questions/270814/fastest-way-to-extract-tar-gz。它将在计算优化的实例类型上运行得更快,包括本地 NVMe 文件系统上的文件。 Cluster -> Metrics -> Ganglia 将有助于显示瓶颈在哪里(CPU/IO/网络)。 UDF 看起来很有用,但听起来它会将所有内容都处理到同一个 spark 分区中?原版
    • 解决了拆分 .tar.gz 的原始问题。您必须将大的 .tar.gz 重写为多个较小的 .tar.gz 文件。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多