【问题标题】:Usage of StreamingFileSink is throwing NoClassDefFoundErrorStreamingFileSink 的使用抛出 NoClassDefFoundError
【发布时间】:2019-09-23 15:54:40
【问题描述】:

我知道这可能是我的问题,但我试图解决一段时间。 我正在尝试在 AWS EMR 集群中运行 flink。

我的设置是: 来自 Kinesis 的时间序列事件 -> flink 作业 -> 将其保存到 S3

    DataStream<Event> kinesis =
                env.addSource(new FlinkKinesisConsumer< (this.streamName, new EventSchema(), kinesisConsumerConfig)).name("source");
    final StreamingFileSink<Event> streamingFileSink =
                StreamingFileSink.<Event>forRowFormat(
                        new org.apache.flink.core.fs.Path("s3a://"+ this.bucketName + "/" + this.objectPrefix),
                        new SimpleStringEncoder<>("UTF-8"))
                        .withBucketAssignerAndPolicy(new OrgIdBucketAssigner(), DefaultRollingPolicy.create().build())
                        .build();

DataStream<Event> eventDataStream =  kinesis
                .rebalance()
                .keyBy(createKeySelectorByChoosingOrgIdFromTheEvent())
                .process(new KeyedProcessFunction<String, Event, Event>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<DeviceEvent> out) throws Exception {
                        out.collect(value);
                    }
                });
eventDataStream.addSink(streamingFileSink).name("streamingFileSink");

从其中一个网站, https://www.mail-archive.com/user@flink.apache.org/msg25039.html

更新: 我开始知道,为了使 StreamingFileSink 工作,必须将一个 jar flink-s3-fs-hadoop-1.7.1.jar 放到 /usr/lib/flink/lib 文件夹中。 我在 EMR 主节点中的 /usr/lib/flink/lib 文件夹如下所示

-rw-r--r-- 1 root root     9924 Mar 20 01:06 slf4j-log4j12-1.7.15.jar
-rw-r--r-- 1 root root 42655628 Mar 20 01:06 flink-shaded-hadoop2-uber-1.7.1.jar
-rw-r--r-- 1 root root   483665 Mar 20 01:06 log4j-1.2.17.jar
-rw-r--r-- 1 root root   140172 Mar 20 01:06 flink-python_2.11-1.7.1.jar
-rw-r--r-- 1 root root 92070994 Mar 20 01:08 flink-dist_2.11-1.7.1.jar
-rw-r--r-- 1 root root 23451686 May  5 23:04 flink-s3-fs-hadoop-1.7.1.jar

当我尝试运行 flink 作业时,它会在 EMR 从站中引发以下异常。

2019-05-06 01:43:49,589 INFO  org.apache.flink.runtime.taskmanager.Task                     - KeyedProcess -> Sink: streamingFileSink (3/4) (31000a186f6ab11f0066556116c669ba) switched from RUNNING to FAILED.
java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:374)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:553)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:531)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.newAmazonS3Client(DefaultS3ClientFactory.java:80)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.createS3Client(DefaultS3ClientFactory.java:54)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:256)
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)

你能告诉我我缺少什么基本的东西吗?

【问题讨论】:

    标签: java hadoop-yarn apache-flink amazon-emr


    【解决方案1】:

    首先出现的错误是您将来自不同 flink 版本的 JAR 混合在一起。这些来自 Flink 1.7.1

    -rw-r--r-- 1 root root 42655628 Mar 20 01:06 flink-shaded-hadoop2-uber-1.7.1.jar
    -rw-r--r-- 1 root root   140172 Mar 20 01:06 flink-python_2.11-1.7.1.jar
    -rw-r--r-- 1 root root 92070994 Mar 20 01:08 flink-dist_2.11-1.7.1.jar
    

    虽然这个来自 Flink 1.8.0

    -rw-r--r-- 1 root root 23451686 May  5 23:04 flink-s3-fs-hadoop-1.8.0.jar
    

    这行不通;选择一个版本或另一个。请注意,1.7.2 是 1.7 系列中最新的错误修复版本。

    我还建议您阅读文档的这一部分:Streaming File Sink: Important Considerations for S3

    【讨论】:

    • 我也添加了 1.7.1 版本的 jar。但结果相同。
    • 我也看了你发的flink文档链接。最初在使用流文件接收器时,我使用s3:// 作为路径前缀。但看了文档后,我决定使用s3a://
    • 当我研究 flink 库的内部结构时,要使流式文件接收器工作,它需要可恢复的写入器。当 flink session 启动时,所有的工厂实现都被加载,最终为可恢复的 writer 铺平了道路。我感兴趣的工厂实现是org.apache.flink.fs.s3.common.FlinkS3FileSystem,它可以注入S3RecoverableWriter。我能做到这一切的唯一方法是把这个库放在/usr/lib/flink/lib 文件夹中。我想知道,我将如何将所有这些东西在 CD 世界中发挥作用。但这是一个单独的讨论。
    • 如果有人在 AWS EMR 中使用 StreamingFileSink 取得了成功,请告诉我我缺少什么。 “NoClassDefFoundError”看起来很幼稚。罐子flink-s3-fs-hadoop-1.8.0.jar 确实有org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler。还查看了该类中的所有静态变量,初始化它应该没有问题。
    • 其他细节不多,值得分享。我正在使用最新的 EMR 版本“emr-5.23.0”。我的 EMR 集群设置中有一个主节点和 4 个从节点。
    【解决方案2】:

    终于找到了原因。虽然它非常非常具有欺骗性。 Flink 内部使用 AWS SDK 版本1.11.271。 导致 NoClassDefFoundError 的类 S3ErrorResponseHandler... 有以下静态变量。

    public class S3ErrorResponseHandler implements
            HttpResponseHandler<AmazonServiceException> {
        /** Shared logger for profiling information */
        private static final Log log = LogFactory
                .getLog(S3ErrorResponseHandler.class);
    
        /** Shared factory for creating XML event readers */
        private static final XMLInputFactory xmlInputFactory = XMLInputFactory
                .newInstance();
    
        private static enum S3ErrorTags {
            Error, Message, Code, RequestId, HostId
        };
       ....
       ...
    

    在 AWS SDK 1.11.272 中,已删除 XMLInputFactory 的初始化。 这给了我一个线索。 我通过切换到1.11.272 重建了 flink 库。瞧,它开始工作了。 这仍然给我提出了几个未回答的问题。 我在 EMR 中运行的 Flink JVM 中进行了彻底的调试。 类路径显然有flink-s3-fs-hadoop-1.7.1.jar。我写了一个代码来读取那个 jar 并打印它的所有条目,我确实看到了S3ErrorResponseHandler。 在我的 flink 运算符中,我可以如下初始化 -

    XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance()
    

    类加载器显然引用了 jre 库。 然而,flink 无法初始化这个特定的类。我想知道为什么!!是不是因为 Flink 对待算子的方式!!! Flink 下面将算子序列化并传输到从节点。在从节点中,算子被反序列化、初始化并作为任务运行。 在这些不同的阶段之间,Flink 类加载器不知何故无法从 JRE 访问 XMLInputFactory 的默认实现。太诡异了!!!此外,我希望 JRE 更具体地说明在加载类时它无法初始化的静态变量。 我应该将其称为 Flink 中的错误吗? 没有人在使用 Flink 的时候在 AWS EMR 中报过这个错误?

    还注意到 flink relase 1.7.1 存在 StreamingFlineSink 的错误。如果您的 EMR 集群有 2 个挂载,它会尝试对这两个挂载执行读/写操作,从而导致失败。 它已在 1.8.0 及更高版本中解决。但是,AWS EMR 默认仍使用 1.7.1 版本。所以请确保将 1.8.0 库放在 /usr/lib/flink/lib 下

    【讨论】:

      猜你喜欢
      • 2023-02-07
      • 1970-01-01
      • 2023-04-08
      • 2019-12-31
      • 2010-12-19
      • 1970-01-01
      • 2012-10-06
      • 2016-01-08
      • 2012-12-31
      相关资源
      最近更新 更多