【问题标题】:Dataproc cannot unzip .gz file zipped by AWS KinesisDataproc 无法解压缩由 AWS Kinesis 压缩的 .gz 文件
【发布时间】:2020-05-21 13:20:43
【问题描述】:

我的公司正在尝试将服务从 AWS 迁移到 GCP。我们面临一些问题。 AWS Kinesis 收集的数据是 .gz 文件。我们使用 GCP 的 Cloud Storage 将这些文件传输到 GCP 平台,并使用 Dataproc 处理这些数据。所有这些数据都可以在 AWS 中正​​确处理,但不能被同一个 Spark 作业正确读取。

查看最后抛出的异常。

我尝试在 GCP Cloud Shell 中解压缩其中一个文件,例如 ABC.gz。解压后的文件仍然以.gz:ABC.gz结尾。我认为这是根本原因,因为 Spark 可能会尝试解压缩解压缩的文件。

如果我们通过删除.gz 后缀来重命名这些文件,那么Spark 可以正常运行。但是重命名过程太耗时,处理一天的数据需要几个小时以上。

非常感谢任何建议。提前致谢。

Caused by: java.io.IOException: incorrect header check
  at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method)
  at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
  at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
  at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
  at java.io.InputStream.read(InputStream.java:101)
  at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
  at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
  at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
  at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:151)
  at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:191)
  at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
  at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:190)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:6
31)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

【问题讨论】:

  • 您在哪里存储您正在处理的.gz 文件?在谷歌云存储上?如果是这样,那么你是如何创建它们的?请分享命令和/或代码 sn-p。

标签: apache-spark google-cloud-platform google-cloud-dataproc


【解决方案1】:

如果没有更多细节,很难说到底发生了什么,但很可能您存储了未压缩的.gz 文件或使用GCS decompressive transcoding。这意味着 Spark 读取的文件已经解压(如果使用 GCS 解压转码,它们一开始没有被压缩,或者被 HTTP 客户端库解压),这会导致失败,因为 Hadoop/Spark 会自动尝试使用.gz 解压文件扩展名。

如果上述情况属实,除了重命名这些文件以删除 .gz 扩展名之外,您似乎别无选择。另外,请注意在 Spark/Hadoop 中处理 Gzip 压缩文件的效率很低,因为它们不可拆分。

【讨论】:

    【解决方案2】:

    这是由于 Kinesis 随对象写入的标头。 Kinesis 将Content-Encoding=gzip 元数据添加到对象中,该元数据在传输请求对象时添加到响应标头中,这会导致对象在传输时自动解压缩。

    您可以在下载后删除此标头或删除扩展。

    我相信,如果您使用带有 AWS 开发工具包的 PowerShell 来下载文件,那么它不会自动解压缩文件。

    【讨论】: