【发布时间】:2019-05-06 18:42:01
【问题描述】:
我有一个在 AWS EMR 中运行的具有高并行度 (400) 的 Flink 应用程序。它使用 BucketingSink 获取 Kafka 并接收到 S3(使用 RocksDb 后端进行检查点)。目的地是使用“s3a://”前缀定义的。 Flink 作业是一个连续运行的流式应用程序。在任何给定时间,所有工作人员组合起来都可能生成/写入 400 个文件(由于 400 个并行度)。几天后,其中一名工人将失败,但例外:
org.apache.hadoop.fs.s3a.AWSS3IOException: copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, bucket/2018-09-01/05/_file-10-1.gz.pending): com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; Request ID: xxxxxxxxxx; S3 Extended Request ID: yyyyyyyyyyyyyyy
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 178)
at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 1803)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)
at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
这似乎是在 BucketingSink 创建新零件文件时随机发生的。奇怪的是,这是随机发生的,当它发生时,它会发生在 1 个并行的 flink 工作人员(不是全部)身上。此外,当发生这种情况时,Flink 作业将转换为 FAILING 状态,但 Flink 作业不会重新启动并从上一个成功的检查点恢复/恢复。这是什么原因,应该如何解决?此外,如何将作业配置为从上次成功的检查点重新启动/恢复,而不是保持在 FAILING 状态?
【问题讨论】:
标签: hadoop amazon-s3 apache-flink amazon-emr