【发布时间】:2020-05-21 13:20:43
【问题描述】:
我的公司正在尝试将服务从 AWS 迁移到 GCP。我们面临一些问题。 AWS Kinesis 收集的数据是 .gz 文件。我们使用 GCP 的 Cloud Storage 将这些文件传输到 GCP 平台,并使用 Dataproc 处理这些数据。所有这些数据都可以在 AWS 中正确处理,但不能被同一个 Spark 作业正确读取。
查看最后抛出的异常。
我尝试在 GCP Cloud Shell 中解压缩其中一个文件,例如 ABC.gz。解压后的文件仍然以.gz:ABC.gz结尾。我认为这是根本原因,因为 Spark 可能会尝试解压缩解压缩的文件。
如果我们通过删除.gz 后缀来重命名这些文件,那么Spark 可以正常运行。但是重命名过程太耗时,处理一天的数据需要几个小时以上。
非常感谢任何建议。提前致谢。
Caused by: java.io.IOException: incorrect header check
at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method)
at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:151)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:191)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:190)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:6
31)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
【问题讨论】:
-
您在哪里存储您正在处理的
.gz文件?在谷歌云存储上?如果是这样,那么你是如何创建它们的?请分享命令和/或代码 sn-p。
标签: apache-spark google-cloud-platform google-cloud-dataproc