【发布时间】:2021-11-05 14:02:30
【问题描述】:
我有一个自定义的 Flink Sink,它正在写入 HDFS,使用以下代码实例化 FileSystem 对象。
val path = new Path("/path/to/one/hdfs/dir")
val hadoopJob = Job.getInstance
val hadoopConf = hadoopJob.getConfiguration
val fs = FileSystem.get(hadoopConf)
val os = fs.create(path)
我在 flink 配置文件中设置了属性 fs.hdfs.hadoopconf 指向我有 hadoop 配置文件的目录。
在 core-site.xml 我定义了属性 fs.defaultFS 如下所示。
<property>
<name>fs.defaultFS</name>
<value>hdfs://hostname:port</value>
</property>
它失败了,因为它正在实例化一个对象类型 LocalFileSystem,而不是 DistributedFileSystem。以下是我得到的例外。
java.lang.IllegalArgumentException:错误的 FS:hdfs://compute-0-0:9000/esteban.collado/kmers,预期:file:/// 在 org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:665) 在 org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:86) 在 org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:542) 在 org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:528)
谁能给我一些关于可能问题的线索?
谢谢,
【问题讨论】:
-
flink-runtime 和 Hadoop 库是否在类路径中可用?你查看ci.apache.org/projects/flink/flink-docs-master/docs/deployment/… 的文档了吗?
-
是的,我在 FLINK_HOME/lib 文件夹中有 jar 文件,那部分似乎很好,因为如果我使用此代码 "FileSystem.get(new URI("hdfs://compute -0-0:9000")," 它工作文件,它实例化 DistributedFileSystem 类并且工作,但我希望不必这样做,让 Hadoop 和 Flink 进行正确的配置。
标签: hadoop hdfs apache-flink