【问题标题】:NoClassDefFoundError when trying to integrate Kafka with Flink尝试将 Kafka 与 Flink 集成时出现 NoClassDefFoundError
【发布时间】:2018-05-26 00:50:09
【问题描述】:

我正在尝试对 kafkaflink 进行交互。这个想法是使用一个 kafka 队列并使用 flink 转换数据。我正在关注下面提到的示例

https://github.com/abhishek-ch/evolveML/blob/master/flink/kafka-flink-example/pom.xml

这些是我的依赖项。

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-core</artifactId>
            <version>0.9.1</version>
           </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.0.0</version>
        </dependency>

我也在项目中包含 kafka 和 flink 类,如下所示。

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-dependency-plugin</artifactId>
            <version>3.0.2</version>
            <executions>
                <execution>
                    <id>unpack</id>
                    <!-- executed just before the package phase -->
                    <phase>prepare-package</phase>
                    <goals>
                        <goal>unpack</goal>
                    </goals>
                    <configuration>
                        <artifactItems>
                            <!-- For Flink connector classes -->
                            <artifactItem>
                                <groupId>org.apache.flink</groupId>
                                <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
                                <version>1.3.2</version>
                                <type>jar</type>
                                <overWrite>false</overWrite>
                                <outputDirectory>${project.build.directory}/classes</outputDirectory>
                                <includes>org/apache/flink/**</includes>
                            </artifactItem>
                            <!-- For Kafka API classes -->
                            <artifactItem>
                                <groupId>org.apache.kafka</groupId>
                                <artifactId>kafka_2.11</artifactId>
                                <version>1.0.0</version>
                                <type>jar</type>
                                <overWrite>false</overWrite>
                                <outputDirectory>${project.build.directory}/classes</outputDirectory>
                                <includes>kafka/**</includes>
                            </artifactItem>
                        </artifactItems>
                    </configuration>
                </execution>
            </executions>
       </plugin>

我使用 kafka 队列的 Java 代码是

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Map<String, String> map = new HashMap<>();
    map.put("bootstrap.servers", kafka_server);
    map.put("zookeeper.connect", "localhost:40862");
    map.put("group.id", "test");
    map.put("topic", "data");

    // parse user parameters
    ParameterTool parameterTool = ParameterTool.fromMap(map);

    DataStream<String> messageStream = null;
    try {
        messageStream = env.addSource(new org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082<>(
                parameterTool.getRequired("topic"),
                new SimpleStringSchema(),
                parameterTool.getProperties()));
    } catch (Exception e) {
        LOGGER.error("Error", e);
    }

    // print() will write the contents of the stream to the TaskManager's standard out stream
    // the rebelance call is causing a repartitioning of the data so that all machines
    // see the messages (for example in cases when "num kafka partitions" < "num flink operators"
    messageStream.rebalance().map(new MapFunction<String, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(String value) throws Exception {
            LOGGER.info("============================" + value);
            return "Kafka and Flink says: " + value;
        }
    }).print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }

此代码示例来自我之前提到的 github 项目。此代码在部署在 tomcat 中的 war 文件中运行。

运行此代码时出现以下错误。

      Unrecoverable error java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082 

我在战争摘要中提到了职业。我试图弄清楚如何解决这个问题。非常感谢任何帮助或建议。

【问题讨论】:

    标签: java apache-kafka maven-3 apache-flink


    【解决方案1】:

    您应该修改 flink-streaming-core 依赖项,改为依赖于 flink-streaming-java_2.11,版本 1.3.2。 (几年前,flink-streaming-core 更名为 flink-streaming-java 和 flink-streaming-scala。)

    另外,flink-connector-kafka-0.8_2.11 适用于 Kafka 版本 0.8.x,而您将它与 Kafka 版本 1.0.0 结合使用。我建议你删除 kafka_2.11 依赖,依赖 maven 传递包含正确版本的 kafka jar。

    【讨论】:

      【解决方案2】:

      NoClassDefFoundError 经常暗示版本/依赖问题,实际上你的依赖有点混乱。

      您正在从 1.3.2(当前版本)和 0.9.1(相当古老的版本)导入 Flink 依赖项。 Flink Kafka 连接器适用于 Kafka 0.8,但您引入了 Kafka 1.0.0 依赖项。

      【讨论】:

        猜你喜欢
        • 2021-05-15
        • 1970-01-01
        • 2021-01-30
        • 2017-06-13
        • 2018-03-21
        • 2018-10-17
        • 1970-01-01
        • 2018-09-25
        • 2017-01-23
        相关资源
        最近更新 更多