【问题标题】:BZip2 compressed input for Apache FlinkApache Flink 的 BZip2 压缩输入
【发布时间】:2015-06-08 08:50:09
【问题描述】:

我有一个用 bzip2 压缩的维基百科转储(从 http://dumps.wikimedia.org/enwiki/ 下载),但我不想解压缩它:我想在动态解压缩时处理它。

我知道用普通的 Java 可以做到这一点(参见例如Java - Read BZ2 file and uncompress/parse on the fly),但我想知道如何在 Apache Flink 中做到这一点?我可能需要像https://github.com/whym/wikihadoop 这样的东西,但对于 Flink,而不是 Hadoop。

【问题讨论】:

    标签: bzip2 apache-flink


    【解决方案1】:

    在 Apache Flink 中可以读取以下格式的压缩文件:

    org.apache.hadoop.io.compress.BZip2Codec
    org.apache.hadoop.io.compress.DefaultCodec
    org.apache.hadoop.io.compress.DeflateCodec
    org.apache.hadoop.io.compress.GzipCodec
    org.apache.hadoop.io.compress.Lz4Codec
    org.apache.hadoop.io.compress.SnappyCodec
    

    从包名可以看出,Flink 使用 Hadoop 的 InputFormats 来实现这一点。 这是使用 Flink 的 Scala API 读取 gz 文件的示例: (至少需要 Flink 0.8.1)

    def main(args: Array[String]) {
    
      val env = ExecutionEnvironment.getExecutionEnvironment
      val job = new JobConf()
      val hadoopInput = new TextInputFormat()
      FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
      val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job)
    
      lines.print
    
      env.execute("Read gz files")
    }
    

    Apache Flink 仅内置支持 .deflate 文件。添加对更多压缩编解码器的支持很容易,但还没有完成。

    在 Flink 中使用 HadoopInputFormats 不会导致任何性能损失。 Flink 内置了对 Hadoop 的 Writable 类型的序列化支持。

    【讨论】:

      猜你喜欢
      • 2018-08-13
      • 2012-12-12
      • 2011-01-20
      • 1970-01-01
      • 2019-10-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多