【问题标题】:Spark/Hadoop throws exception for large LZO filesSpark/Hadoop 对大型 LZO 文件抛出异常
【发布时间】:2014-10-04 13:41:15
【问题描述】:

我正在对存储在 S3 中的一些 LZO 压缩日志文件运行 EMR Spark 作业。有多个日志文件存储在同一个文件夹中,例如:

...
s3://mylogfiles/2014-08-11-00111.lzo
s3://mylogfiles/2014-08-11-00112.lzo
...

在 spark-shell 中,我正在运行一项计算文件中行数的作业。如果我为每个文件单独计算行数,则没有问题,例如像这样:

// Works fine
...
sc.textFile("s3://mylogfiles/2014-08-11-00111.lzo").count()
sc.textFile("s3://mylogfiles/2014-08-11-00112.lzo").count()
...

如果我使用通配符以单行方式加载所有文件,则会出现两种异常。

// One-liner throws exceptions
sc.textFile("s3://mylogfiles/*.lzo").count()

例外情况是:

java.lang.InternalError: lzo1x_decompress_safe returned: -6
    at com.hadoop.compression.lzo.LzoDecompressor.decompressBytesDirect(Native Method)

java.io.IOException: Compressed length 1362309683 exceeds max block size 67108864 (probably corrupt file)
    at com.hadoop.compression.lzo.LzopInputStream.getCompressedData(LzopInputStream.java:291)

在我看来,解决方案是由最后一个例外给出的文本暗示的,但我不知道如何继续。 LZO 文件的大小是否有限制,或者有什么问题?

我的问题是:我能否运行 Spark 查询,将所有 LZO 压缩文件加载到 S3 文件夹中,而不会出现与 I/O 相关的异常?

有 66 个文件,每个文件大约 200MB。

编辑: 仅在使用 Hadoop2 核心库(ami 3.1.0)运行 Spark 时才会发生异常。使用 Hadoop1 核心库(ami 2.4.5)运行时,一切正常。两种情况均使用 Spark 1.0.1 进行了测试。

【问题讨论】:

  • 您是否仔细检查了所有文件确实没有损坏?
  • 我和 Pimin 坐在这里 :) @samthebest:文件没有损坏; lzop -d 提取它们就好了。

标签: hadoop apache-spark elastic-map-reduce lzo


【解决方案1】:

kgeyti 的回答很好,但是:

LzoTextInputFormat 会影响性能,因为它会检查每个 LZO 文件的 .index 文件。这对于 S3 上的许多 LZO 文件来说尤其痛苦(我经历了长达几分钟的延迟,这是由对 S3 的数千个请求引起的)。

如果您事先知道您的 LZO 文件是不可可拆分的,那么更高效的解决方案是创建自定义的不可拆分输入格式:

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

class NonSplittableTextInputFormat extends TextInputFormat {
    override def isSplitable(context: JobContext, file: Path): Boolean = false
}

并像这样读取文件:

context.newAPIHadoopFile("s3://mylogfiles/*.lzo",
  classOf[NonSplittableTextInputFormat],
  classOf[org.apache.hadoop.io.LongWritable],
  classOf[org.apache.hadoop.io.Text])
.map(_._2.toString)

【讨论】:

  • 我实际上遇到了这个错误:java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found,使用你的建议解决了我的问题,因为我正在尝试使用日志文件而不是压缩数据。
【解决方案2】:

我自己还没有遇到这个具体问题,但看起来.textFile 期望文件是可拆分的,就像 Cedrik 的 Hive 坚持使用 CombineFileInputFormat 的问题

您可以index your lzo files,或尝试使用LzoTextInputFormat - 我很想知道这在 EMR 上是否更有效:

sc.newAPIHadoopFile("s3://mylogfiles/*.lz", 
    classOf[com.hadoop.mapreduce.LzoTextInputFormat],
    classOf[org.apache.hadoop.io.LongWritable],
    classOf[org.apache.hadoop.io.Text])
  .map(_._2.toString) // if you just want a RDD[String] without writing a new InputFormat
  .count

【讨论】:

  • 我们已经测试了您的答案(使用 LzoTextInputFormat)并且它有效。出于性能原因,我们将在某个时候尝试索引方法。感谢您的帮助。
  • 索引的主要性能增益来自创建更多分区。如果您已经有足够的 lzo 文件使集群饱和(每台机器/内核超过 1 个),并且您总是将文件作为一个整体进行处理,那么这并不重要。
  • 是否可以从 maven repo 添加“hadoo-lzo”?谢谢
【解决方案3】:

昨天我们在 EMR 集群上部署了 Hive,并且在 S3 中的一些 LZO 文件遇到了同样的问题,这些文件已被另一个非 EMR 集群毫无问题地采取。在对日志进行了一些挖掘之后,我注意到地图任务以 250MB 块读取 S3 文件,尽管这些文件绝对不可拆分

原来参数ma​​preduce.input.fileinputformat.split.maxsize设置为250000000~250MB。这导致 LZO 从文件中打开流,最终导致 LZO 块损坏。

我将参数 ma​​preduce.input.fileinputformat.split.maxsize=2000000000 设置为我们输入数据的最大文件大小,现在一切正常。

我不确定这与 Spark 有什么关系,但更改 InputFormat 可能会有所帮助,这似乎首先是问题所在,正如 How Amazon EMR Hive Differs from Apache Hive 中提到的那样。

【讨论】:

  • 我们收集到的迹象表明该问题与 Spark 无关,而与 Hadoop 2 有关。如果我们在 EMR Hadoop 1 集群上运行相同的作业,则没有问题。
猜你喜欢
  • 1970-01-01
  • 2020-09-21
  • 1970-01-01
  • 2017-09-05
  • 2018-11-10
  • 2011-04-14
  • 1970-01-01
  • 1970-01-01
  • 2011-12-07
相关资源
最近更新 更多