【问题标题】:java.lang.NoClassDefFoundError (CheckpointedRestoring) when trying to create a Kafka Consumer in Flink尝试在 Flink 中创建 Kafka Consumer 时出现 java.lang.NoClassDefFoundError (CheckpointedRestoring)
【发布时间】:2018-10-17 07:37:26
【问题描述】:

我正在尝试在 Apache Flink 中创建一个 Kafka 消费者。我一直在关注 Apache Flink 文档中的设置和指南。但我收到 java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring 错误。

错误是

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at com.agt.examples.KafkaConsumer.main(KafkaConsumer.java:34)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 30 more

KafkaConsumer.class 是

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // For Apache Kafka Consumer
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");

    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer08<String>("my-flink-topic", new SimpleStringSchema(), properties));

    stream.map(new MapFunction<String, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(String value) throws Exception {
            return value;
        }
    }).print();
    env.execute();

pom.xml 是

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>

Flink 版本是 1.4.2,java 版本是 1.8,我使用的是 intellij IDEA。我认为这一定与依赖关系没有正确链接有关,但我无法理解这里可能出现的问题。我检查了上面提到的类,但它不在我的库中的流/api/检查点中,当我在线检查时,它似乎是一个已弃用的类。我尝试了 mvn clean install、无效缓存和重新启动、生成源和更新文件夹,但仍然弹出相同的错误。我按照 apache flink 设置页面 https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/java_api_quickstart.html 中的说明创建了该项目,但是在我从不同来源进行开发时添加了一些其他依赖项。它与依赖项之一的版本有关吗?我在这里完全超出了我的深度。

非常感谢您的帮助:)

【问题讨论】:

    标签: java maven apache-kafka apache-flink


    【解决方案1】:

    您的flink-clients_2.10flink-connector-kafka-0.8_2.10 依赖项的版本似乎不一致。尝试在以下部分使用${flink.version} 而不是1.2.1

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>
    

    【讨论】:

    • 非常感谢亚历克斯。为客户端将其更改为 ${flink.version} ,然后我也为连接器做了同样的事情。连接器的唯一更改是在参考 maven 存储库后,我必须将 artifactId 更改为“flink-connector-kafka-0.8_2.11”。但现在它就像一个魅力。再次非常感谢:)
    猜你喜欢
    • 2018-10-28
    • 1970-01-01
    • 2018-05-26
    • 2021-01-30
    • 2015-11-03
    • 1970-01-01
    • 2016-10-16
    • 2016-09-30
    • 1970-01-01
    相关资源
    最近更新 更多