【发布时间】:2018-10-23 09:16:05
【问题描述】:
我只找到 TextInputFormat 和 CsvInputFormat。那么如何使用 Apache Flink 读取 HDFS 中的 parquet 文件呢?
【问题讨论】:
标签: hdfs apache-flink parquet
我只找到 TextInputFormat 和 CsvInputFormat。那么如何使用 Apache Flink 读取 HDFS 中的 parquet 文件呢?
【问题讨论】:
标签: hdfs apache-flink parquet
好的。我已经找到了一种通过 Apache Flink 在 HDFS 中读取 parquet 文件的方法。
您应该在 pom.xml 中添加以下依赖项
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
创建一个 avsc 文件来定义架构。经验:
{"namespace": "com.flinklearn.models",
"type": "record",
"name": "AvroTamAlert",
"fields": [
{"name": "raw_data", "type": ["string","null"]}
]
}
运行“java -jar D:\avro-tools-1.8.2.jar 编译架构 alert.avsc 。”生成 Java 类并将 AvroTamAlert.java 复制到您的项目中。
使用 AvroParquetInputFormat 读取 hdfs 中的 parquet 文件:
class Main {
def startApp(): Unit ={
val env = ExecutionEnvironment.getExecutionEnvironment
val job = Job.getInstance()
val dIf = new HadoopInputFormat[Void, AvroTamAlert](new AvroParquetInputFormat(), classOf[Void], classOf[AvroTamAlert], job)
FileInputFormat.addInputPath(job, new Path("/user/hive/warehouse/testpath"))
val dataset = env.createInput(dIf)
println(dataset.count())
env.execute("start hdfs parquet test")
}
}
object Main {
def main(args:Array[String]):Unit = {
new Main().startApp()
}
}
【讨论】: