【问题标题】:Cannot access s3 file from Flink in AWS EMR无法从 AWS EMR 中的 Flink 访问 s3 文件
【发布时间】:2020-04-23 20:08:36
【问题描述】:

我们在 AWS EMR 上有一个长期运行的 Flink 集群。它配置有默认角色 (EMR_EC2_DefaultRole)。 我们尝试运行 Flink 作业,但它无法访问 s3 存储桶来读取文件。 我们创建了最小的 main 方法代码来重现它:

String filePath = "s3://<our-bucket>/<the-file>";
logger.info("Path: " + filePath);
Path path = Paths.get(filePath);
logger.info("Successfully got path");
File file = path.toFile();
logger.info("Successfully got creds file");
logger.info("Exists [{}], isFile [{}] ", file.exists(), file.isFile());
String content = FileUtils.readFileToString(file);
logger.info("Content [{}]", content);

我们通过 Flink Web UI 运行 Flink 作业。 我们得到除了Content 日志之外的所有日志。

存在日志为:Exists [false], isFile [false]

我们还收到以下错误:

Caused by: java.io.FileNotFoundException: File 's3:/<our-bucket>/<the-file>' does not exist
    at org.apache.commons.io.FileUtils.openInputStream(FileUtils.java:299)
    at org.apache.commons.io.FileUtils.readFileToString(FileUtils.java:1711)
    at org.apache.commons.io.FileUtils.readFileToString(FileUtils.java:1748)
    at com.<our-package>.Main.main(Main.java:39)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    ... 10 more

当我们 ssh 到主 EC2 实例并运行以下命令时,它会起作用并返回文件内容:

sudo hdfs dfs -cat s3://<our-bucket>/<the-file>

请帮忙:)

【问题讨论】:

  • 我对 FileNotFoundException 中的单个“/”有点困惑。您能否三重检查您提供的网址是否正确?
  • @ArvidHeise 我已经检查过了,这是因为Paths.get(filePath) 行。当 path 对象被记录时,这就是它产生的结果。我无法更改此行,因为它是我正在使用的第 3 方库中的一行。我试图传递一个以s3:/// 开头的网址,但它删除了所有重复的斜杠
  • 如果你使用s3a://&lt;bucket&gt;s3n://&lt;bucket&gt;会发生什么?
  • @kkrugler 同样的事情。

标签: amazon-s3 apache-flink amazon-emr


【解决方案1】:

您似乎正在尝试将 S3 路径传递给 org.apache.commons.io.FileUtils.readFileToString(),我认为这行不通。

您可以从该 S3 路径创建 Flink Path 并使用它来创建输入流,例如

Path = new Path("s3://<our-bucket>/<the-file>");
FileSystem fs = filePath.getFileSystem();
InputStream is = new DataInputStream(fs.open(filePath, readBufferSize));
String s = IOUtils.toString(is, charset);

【讨论】:

  • 是的,这可能会起作用,问题是我无法更改此代码,因为它是我必须使用的第 3 方。 (见我的comment
  • 那你就不走运了,除非你使用像s3fs-fuse这样的东西。请注意,这与 Flink 无关,这是一个纯粹的 AWS/S3 问题。
猜你喜欢
  • 2018-11-02
  • 1970-01-01
  • 2018-05-29
  • 1970-01-01
  • 1970-01-01
  • 2021-12-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多