【问题标题】:How can I use Apache Flink to read parquet file in HDFS?如何使用 Apache Flink 读取 HDFS 中的 parquet 文件?
【发布时间】:2018-10-23 09:16:05
【问题描述】:

我只找到 TextInputFormat 和 CsvInputFormat。那么如何使用 Apache Flink 读取 HDFS 中的 parquet 文件呢?

【问题讨论】:

    标签: hdfs apache-flink parquet


    【解决方案1】:

    好的。我已经找到了一种通过 Apache Flink 在 HDFS 中读取 parquet 文件的方法。

    1. 您应该在 pom.xml 中添加以下依赖项

      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hadoop-compatibility_2.11</artifactId>
        <version>1.6.1</version>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-avro</artifactId>
        <version>1.6.1</version>
      </dependency>
      <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.10.0</version>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>3.1.1</version>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>3.1.1</version>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-core</artifactId>
        <version>1.2.1</version>
      </dependency>
      
    2. 创建一个 avsc 文件来定义架构。经验:

        {"namespace": "com.flinklearn.models",
         "type": "record",
         "name": "AvroTamAlert",
         "fields": [
            {"name": "raw_data", "type": ["string","null"]}
         ]
        }
    
    1. 运行“java -jar D:\avro-tools-1.8.2.jar 编译架构 alert.avsc 。”生成 Java 类并将 AvroTamAlert.java 复制到您的项目中。

    2. 使用 AvroParquetInputFormat 读取 hdfs 中的 parquet 文件:

    class Main {
        def startApp(): Unit ={
            val env = ExecutionEnvironment.getExecutionEnvironment
    
            val job = Job.getInstance()
    
            val dIf = new HadoopInputFormat[Void, AvroTamAlert](new AvroParquetInputFormat(), classOf[Void], classOf[AvroTamAlert], job)
            FileInputFormat.addInputPath(job, new Path("/user/hive/warehouse/testpath"))
    
            val dataset = env.createInput(dIf)
    
            println(dataset.count())
    
            env.execute("start hdfs parquet test")
        }
    }
    
    object Main {
        def main(args:Array[String]):Unit = {
            new Main().startApp()
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-09-14
      • 2019-04-12
      • 2017-08-07
      • 1970-01-01
      • 2020-01-02
      • 1970-01-01
      相关资源
      最近更新 更多