【发布时间】:2021-05-16 16:29:24
【问题描述】:
我是 Flink 的新生,我想知道如何从 hdfs 读取数据。任何人都可以给我一些建议或一些简单的例子吗?谢谢大家。
【问题讨论】:
标签: hdfs apache-flink
我是 Flink 的新生,我想知道如何从 hdfs 读取数据。任何人都可以给我一些建议或一些简单的例子吗?谢谢大家。
【问题讨论】:
标签: hdfs apache-flink
如果您的文件被格式化为文本文件格式,您可以使用 'ExecutionEnvironment' 对象中的 'readTextFile' 方法。
以下是各种数据源的示例。 (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#data-sources)
【讨论】:
Flink 可以读取任何格式的 HDFS 数据,例如 text、Json、avro 等。 对 Hadoop 输入/输出格式的支持是编写 flink 作业时需要的 flink-java maven 模块的一部分。
示例 1:读取名为 JsonSeries 的文本文件并在控制台上打印
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> lines = env.readTextFile("hdfs://localhost:9000/user/hadoop/input/JsonSeries.txt")
.name("HDFS File read");
lines.print();
示例 2:使用输入格式
DataSet<Tuple2<LongWritable, Text>> inputHadoop =
env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
LongWritable.class, Text.class, "hdfs://localhost:9000/user/hadoop/input/JsonSeries.txt"));
inputHadoop.print();
【讨论】:
在 Centos7 机器上使用 Flink 1.13、Hadoop 3.1.2、Java 1.8.0,我能够从 HDFS 读取数据。
HADOOP_HOME 和 HADOOP_CLASSPATH 已经导出。我认为从 1.11 版本开始发生了一些变化。我什至找不到一个简单的例子。因此,我分享我的例子。
我添加到 pom.xml 以下依赖项
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.2</version>
</dependency>
我的 Scala 代码:
package com.vbo.datastreamapi
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object ReadWriteHDFS extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("hdfs://localhost:9000/user/train/datasets/Advertising.csv")
stream.print()
env.execute("Read Write HDFS")
}
【讨论】: